一个或多个进程可向消息队列写入消息,而一个或多个进程可从消息队列中读取消息,这种进程间通讯机制通常使用在客户/服务器模型中,客户向服务器发送请求消息,服务器读取消息并执行相应请求。在许多微内核结构的操作系统中,内核和各组件之间的基本通讯方式就是消息队列。例如,在 MINIX 操作系统中,内核、I/O 任务、服务器进程和用户进程之间就是通过消息队列实现通讯的。
Linux中的消息可以被描述成在内核地址空间的一个内部链表,每一个消息队列由一个IPC的标识号唯一的标识。Linux 为系统中所有的消息队列维护一个 msgque 链表,该链表中的每个指针指向一个 msgid_ds 结构,该结构完整描述一个消息队列。
1. 数据结构
(1)消息缓冲区(msgbuf)
我们在这里要介绍的第一个数据结构是msgbuf结构,可以把这个特殊的数据结构看成一个存放消息数据的模板,它在include/linux/msg.h中声明,描述如下:
/* msgsnd 和msgrcv 系统调用使用的消息缓冲区*/
struct msgbuf {
long mtype;
/* 消息的类型,必须为正数 */
char mtext[1]; /* 消息正文 */
};
注意:对于消息数据元素(mtext),不要受其描述的限制。实际上,这个域(mtext)不仅能保存字符数组,而且能保存任何形式的任何数据。这个域本身是任意的,因为这个结构本身可以由应用程序员重新定义:
struct my_msgbuf {
long mtype;
/* 消息类型 */
long
request_id;
/* 请求识别号 */
struct client info; /* 客户消息结构 */
};
我们看到,消息的类型还是和前面一样,但是结构的剩余部分由两个其它的元素代替,而且有一个是结构。这就是消息队列的优美之处,内核根本不管传送的是什么样的数据,任何信息都可以传送。
但是,消息的长度还是有限制的,在Linux中,给定消息的最大长度在include/linux/msg.h中定义如下:
#define MSGMAX 8192 /* max size of message
(bytes) */
消息总的长度不能超过8192字节,包括mtype域,它是4字节长。
(2)消息结构(msg)
内核把每一条消息存储在以msg结构为框架的队列中,它在include/ linux/msg.h中定义如下:
struct msg {
struct msg *msg_next;
/* 队列上的下一条消息 */
long msg_type;
/*消息类型*/
char *msg_spot;
/* 消息正文的地址 */
short msg_ts;
/* 消息正文的大小 */
};
注意:msg_next是指向下一条消息的指针,它们在内核地址空间形成一个单链表。
(3)消息队列结构(msgid_ds)
当在系统中创建每一个消息队列时,内核创建、存储及维护这个结构的一个实例。
/* 在系统中的每一个消息队列对应一个msqid_ds 结构 */
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first;
/* 队列上第一条消息,即链表头*/
struct msg *msg_last;
/* 队列中的最后一条消息,即链表尾 */
time_t msg_stime; /* 发送给队列的最后一条消息的时间 */
time_t msg_rtime;
/* 从消息队列接收到的最后一条消息的时间 */
time_t msg_ctime;
/* 最后修改队列的时间*/
ushort msg_cbytes;
/*队列上所有消息总的字节数 */
ushort msg_qnum;
/*在当前队列上消息的个数 */
ushort msg_qbytes; /* 队列最大的字节数 */
ushort msg_lspid;
/* 发送最后一条消息的进程的pid */
ushort msg_lrpid;
/* 接收最后一条消息的进程的pid */
};
2. 系统调用: msgget()
为了创建一个新的消息队列,或存取一个已经存在的队列,要使用msgget()系统调用。
原型: int msgget ( key_t key, int msgflg
);
返回: 成功,则返回消息队列识别号,失败,则返回-1,
semget()中的第一个参数是键, 这个键值要与现有的键值进行比较,现有的键值指在内核中已存在的其它消息队列的键值。对消息队列的打开或存取操作依赖于msgflg参数的取值:
IPC_CREAT : 如果这个队列在内核中不存在,则创建它。
IPC_EXCL :当与IPC_CREAT一起使用时,如果这个队列已存在,则创建失败。
如果IPC_CREAT单独使用,semget()为一个新创建的消息队列返回标识号,或者返回具有相同键值的已存在队列的标识号。如果IPC_EXCL与IPC_CREAT一起使用,要么创建一个新的队列,要么对已存在的队列返回-1。IPC_EXCL单独是没有用的,当与IPC_CREAT结合起来使用时,可以保证新创建队列的打开和存取。
与文件系统的存取权限一样,每一个IPC对象也具有存取权限,因此,可以把一个八进制与掩码或,形成对消息队列的存取权限。
让我们来创建一个打开或创建消息队列的函数:
int open_queue( key_t keyval )
{
int
qid;
if((qid = msgget( keyval, IPC_CREAT | 0660 )) == -1)
{
return(-1);
}
return(qid);
}
注意,这个例子显式地用了0660权限。这个函数要么返回一个消息队列的标识号,要么返回-1而出错。键值作为唯一的参数必须传递给它。
3. 系统调用: msgsnd()
一旦我们有了队列识别号,我们就可以在这个队列上执行操作。要把一条消息传递给一个队列,你必须用msgsnd()系统调用。
原型:int msgsnd ( int msqid, struct msgbuf *msgp, int msgsz, int msgflg );
返回:成功为0, 失败为-1。
msgsnd()的第一个参数是队列识别号,由msgget()调用返回。第二个参数msgp是一个指针,指向我们重新声明和装载的消息缓冲区。msgsz参数包含了消息以字节为单位的长度,其中包括了消息类型的4个字节。
msgflg参数可以设置成0(忽略),或者:
IPC_NOWAIT :如果消息队列满,消息不写到队列中,并且控制权返回给调用进程(继续执行)。如果不指定IPC_NOWAIT,调用进程将挂起(阻塞)直到消息被写到队列中。
让我们来看一个发送消息的简单函数:
int send_message( int qid, struct mymsgbuf *qbuf )
{
int
result, length;
/*
mymsgbuf结构的实际长度 */
length = sizeof(struct ) - sizeof(long);
if((result = msgsnd( qid, qbuf, length, 0)) == -1)
{
return(-1);
}
return(result);
}
这个小函数试图把缓冲区qbuf中的消息,发送给队列识别号为qid的消息队列。
现在,我们在消息队列里有了一条消息,可以用ipcs命令来看你队列的状态。如何从消息队列检索消息,可以用msgrcv()系统调用。
4.系统调用:msgrcv()
原型:int msgrcv ( int msqid, struct msgbuf *msgp, int msgsz, long mtype, int
msgflg );
返回值:成功,则为拷贝到消息缓冲区的字节数,失败为-1。
很明显,第一个参数用来指定要检索的队列(必须由msgget()调用返回),第二个参数(msgp)是存放检索到消息的缓冲区的地址,第三个参数(msgsz)是消息缓冲区的大小,包括消息类型的长度(4字节)。
第四个参数(mtype)指定了消息的类型。内核将搜索队列中相匹配类型的最早的消息,并且返回这个消息的一个拷贝,返回的消息放在由msgp参数指向的地址。这里存在一个特殊的情况,如果传递给mytype参数的值为0,就可以不管类型,只返回队列中最早的消息。
如果传递给参数msgflg的值为IPC_NOWAIT,并且没有可取的消息,那么给调用进程返回ENOMSG错误消息,否则,调用进程阻塞,直到一条消息到达队列并且满足msgrcv()的参数。如果一个客户正在等待消息,而队列被删除,则返回EIDRM。如果当进程正在阻塞,并且等待一条消息到达但捕获到了一个信号,则返回EINTR。
让我们来看一个从我们已建的消息队列中检索消息的例子
int read_message( int qid, long type, struct mymsgbuf *qbuf
)
{
int
result, length;
/* 计算mymsgbuf结构的实际大小*/
length = sizeof(struct mymsgbuf) - sizeof(long);
if((result = msgrcv( qid, qbuf, length, type, 0)) == -1)
{
return(-1);
}
return(result);
}
当从队列中成功地检索到消息后,这个消息将从队列删除。