// mq_attr 结构体如下
structmq_attr{longmq_flags;/* Flags: 0 or O_NONBLOCK */longmq_maxmsg;/* Max. # of messages on queue */longmq_msgsize;/* Max. message size (bytes) */longmq_curmsgs;/* # of messages currently in queue */};
#include<mqueue.h>intmq_send(mqd_tmqdes,constchar*msg_ptr,size_tmsg_len,unsignedintmsg_prio);// Returns 0 on success, or –1 on error
/*
参数 msg_prio 表示消息优先级, 0表示优先级最低
msg_ptr 指向消息缓冲区
msg_len 消息长度
*/ssize_tmq_receive(mqd_tmqdes,char*msg_ptr,size_tmsg_len,unsignedint*msg_prio);// Returns number of bytes in received message on success, or –1 on error
#include<mqueue.h>intmq_notify(mqd_tmqdes,conststructsigevent*notification);// Returns 0 on success, or –1 on error
// sigevent结构体的原型
structsigevent{intsigev_notify;/* Notification method */intsigev_signo;/* Notification signal for SIGEV_SIGNAL */unionsigvalsigev_value;/* Value passed to signal handler or thread function */void(*sigev_notify_function)(unionsigval);/* Thread notification function */void*sigev_notify_attributes;/* Really 'pthread_attr_t' */};
// 线程通知的主要代码
staticvoid/* Thread notification function */threadFunc(unionsigvalsv){ssize_tnumRead;mqd_t*mqdp;void*buffer;structmq_attrattr;mqdp=sv.sival_ptr;if(mq_getattr(*mqdp,&attr)==-1)errExit("mq_getattr");buffer=malloc(attr.mq_msgsize);if(buffer==NULL)errExit("malloc");// 在读取前先重新注册
notifySetup(mqdp);while((numRead=mq_receive(*mqdp,buffer,attr.mq_msgsize,NULL))>=0)printf("Read %ld bytes\n",(long)numRead);if(errno!=EAGAIN)/* Unexpected error */errExit("mq_receive");free(buffer);pthread_exit(NULL);}notifySetup(mqd_t*mqdp){structsigeventsev;// 设置为线程通知
sev.sigev_notify=SIGEV_THREAD;/* Notify via thread */sev.sigev_notify_function=threadFunc;sev.sigev_notify_attributes=NULL;/* Could be pointer to pthread_attr_t structure */sev.sigev_value.sival_ptr=mqdp;/* Argument to threadFunc() */if(mq_notify(*mqdp,&sev)==-1)errExit("mq_notify");}intmain(intargc,char*argv[]){mqd_tmqd;if(argc!=2||strcmp(argv[1],"--help")==0)usageErr("%s mq-name\n",argv[0]);// NONBLOCK打开
mqd=mq_open(argv[1],O_RDONLY|O_NONBLOCK);if(mqd==(mqd_t)-1)errExit("mq_open");notifySetup(&mqd);pause();/* Wait for notifications via thread function */}
// 写东西到共享内存中
fd=shm_open(argv[1],O_RDWR,0);/* Open existing object */if(fd==-1)errExit("shm_open");len=strlen(argv[2]);if(ftruncate(fd,len)==-1)/* Resize object to hold string */errExit("ftruncate");printf("Resized to %ld bytes\n",(long)len);addr=mmap(NULL,len,PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);if(addr==MAP_FAILED)errExit("mmap");if(close(fd)==-1)errExit("close");/* 'fd' is no longer needed */printf("copying %ld bytes\n",(long)len);memcpy(addr,argv[2],len);/* Copy string to shared memory */exit(EXIT_SUCCESS);// 从共享内存中读取
fd=shm_open(argv[1],O_RDONLY,0);/* Open existing object */if(fd==-1)errExit("shm_open");/* Use shared memory object size as length argument for mmap()
and as number of bytes to write() */if(fstat(fd,&sb)==-1)errExit("fstat");addr=mmap(NULL,sb.st_size,PROT_READ,MAP_SHARED,fd,0);if(addr==MAP_FAILED)errExit("mmap");if(close(fd)==-1);/* 'fd' is no longer needed */errExit("close");// 删除共享内存
if(shm_unlink(argv[1])==-1)errExit("shm_unlink");