Linux 消息队列(message queue)

struct msqid_ds结构体

可以通过命令man msgctl来查看这个结构体的定义。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct msqid_ds {
struct ipc_perm msg_perm; /* Ownership and permissions */
time_t msg_stime; /* Time of last msgsnd(2) */
time_t msg_rtime; /* Time of last msgrcv(2) */
time_t msg_ctime; /* Time of last change */
unsigned long __msg_cbytes; /* Current number of bytes in
queue (nonstandard) */
msgqnum_t msg_qnum; /* Current number of messages
in queue */
msglen_t msg_qbytes; /* Maximum number of bytes
allowed in queue */
pid_t msg_lspid; /* PID of last msgsnd(2) */
pid_t msg_lrpid; /* PID of last msgrcv(2) */
};

msgget()函数

1
2
3
4
5
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int msgget(key_t key, int msgflg);

msgget()会返回一个消息队列标识符,发送和接收消息都需要指定标识符。参数key
以是IPC_PRIVATE或者是由ftok()创建的 IPC key。这个参数似乎也可以手工指定,我
并没有尝试过,但这样可能与系统中已存在的相冲突。

参数msgflg用来在给定一些控制信息,包括创建消息队列(IPC_CREAT)和在创建时设置
权限等。一个小例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main(int argc, char *argv[])
{
int msgqid;
key_t key;

if ((msgqid = msgget(IPC_PRIVATE, 0600)) == -1) {
perror("msgget error");
exit(1);
}
printf("message queue id: [%d]\n", msgqid);

key = ftok("/bin", 1);

if ((msgqid = msgget(key, IPC_CREAT | IPC_EXCL | 0600)) == -1) {
perror("msgget error");
exit(2);
}
printf("message queue id: [%d]\n", msgqid);
return 0;
}

msgctl()函数

1
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

参数cmd用来指定要进行的操作,比如IPC_STAT用来得到内核中关于msqid所指定的消
息队列的信息,结果存放在buf中,IPC_SET用来修改内核中此消息队列的一些信息,由
buf所指向的结构体保存要修改的内容,但并不是所有的信息都可以修改(uid, gid, mode
可以修改),IPC_RMID用来删除消息队列,删除时如果有进程阻塞在此队列,则会被唤醒
且设置其errnoEIDRMIPC_RMID的用法类似于 msgctl(msqid, IPC_RMID, NULL);。下面是一个演示的小程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main(int argc, char *argv[])
{
key_t key;
int msqid;
struct msqid_ds msqid_info;

key = ftok("/bin", 1);
if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
perror("msgget error");
exit(1);
}

if (msgctl(msqid, IPC_STAT, &msqid_info) == -1) {
perror("msgctl error");
exit(2);
}

printf("values in msqid_ds:\n");
printf("\tmsgid_info.msg_stime: [%ld]\n", msqid_info.msg_stime);
printf("\tmsgid_info.msg_rtime: [%ld]\n", msqid_info.msg_rtime);
printf("\tmsgid_info.msg_ctime: [%ld]\n", msqid_info.msg_ctime);
printf("\tmsgid_info.__msg_cbytes: [%ld]\n", msqid_info.__msg_cbytes);
printf("\tmsgid_info.msg_qnum: [%lu]\n", msqid_info.msg_qnum);
printf("\tmsgid_info.msg_qbytes: [%lu]\n", msqid_info.msg_qbytes);
printf("\tmsgid_info.msg_lspid: [%d]\n", msqid_info.msg_lspid);
printf("\tmsgid_info.msg_lrpid: [%d]\n", msqid_info.msg_lrpid);

printf("\tmsgid_info.msg_perm:\n");
printf("\t\tmsqid_info.__key: [%d]\n", msqid_info.msg_perm.__key);
printf("\t\tmsqid_info.uid : [%d]\n", msqid_info.msg_perm.uid );
printf("\t\tmsqid_info.gid : [%d]\n", msqid_info.msg_perm.gid );
printf("\t\tmsqid_info.cuid : [%d]\n", msqid_info.msg_perm.cuid );
printf("\t\tmsqid_info.cgid : [%d]\n", msqid_info.msg_perm.cgid );
printf("\t\tmsqid_info.mode : [%d]\n", msqid_info.msg_perm.mode );
printf("\t\tmsqid_info.__seq: [%d]\n", msqid_info.msg_perm.__seq);

return 0;
}

msgsnd()函数

1
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

参数msgp是要传递的消息,由一个结构体定义,这个结构体需要由函数调用者自己定义,
Linux 手册中给出的一个例子为:

1
2
3
4
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data */
};

可以根据需要自己定义,除了第一个域mtype必须的(且其值必须大于 0),其余不做要
求(包括是否存在及长度),msgsz定义除mtype之外msgbuf的大小。mtype的值可
msgrcv()函数中指定用来选择消息。

msgsnd()会将msgp指向的消息复制一份追加到指定的消息队列中。如果消息队列中有足
够的空间,msgsnd()会立即返回,如果没有足够的空间,则会阻塞直到有空间可用。也可
以在参数msgflg中加入IPC_NOWAIT,这样便不会阻塞,而是出错返回。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MTEXT_SIZE 512

struct msgbuf {
long mtype;
char mtext[MTEXT_SIZE];
};

void init_msgbuf(struct msgbuf *msg);

int main(int argc, char *argv[])
{
key_t key;
int msqid;
struct msgbuf msg;

key = ftok("/", 1);
if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
perror("msgget error");
exit(1);
}
init_msgbuf(&msg);
strcpy(msg.mtext, "sending message through msgsnd\n");

if (msgsnd(msqid, &msg, MTEXT_SIZE, 0) == -1) {
perror("msgsnd error");
exit(2);
}

return 0;
}

void init_msgbuf(struct msgbuf *msg)
{
msg->mtype = 42;
memset(msg->mtext, 0, MTEXT_SIZE);
}

msgrcv()函数

1
2
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);

msgrcv()函数用来从消息队列中移除一条消息,并将移除的消息放在参数msgp中。参数
msgtyp用来指定消息类型。msgsz用来指定消息的最大长度(不包括消息中的msgtyp
部分)。如果消息的长度大于msgsz所指定的长度,则函数的行为取决于 msgflg中是否
指定了MSG_NOERROR,如果指定了,消息会被截断(而被截断的部分将会丢失);如果未指定,
则返回-1,消息不会从队列中移除。

关于msgtyp

  • 值为 0 时,队列中第一个消息会被读取;
  • 值大于 0 时,队列中第一个类型为msgtyp的消息会被读取;
  • 值小于 0 时,队列中类型值最小且小于msgtyp绝对值的第一个消息。

一个简单的例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MTEXT_SIZE 512

struct msgbuf {
long mtype;
char mtext[MTEXT_SIZE];
};

int main(int argc, char *argv[])
{
key_t key;
int msqid;
int ret, i;
struct msgbuf msg;

key = ftok("/bin", 1);
if ((msqid = msgget(key, IPC_CREAT | 0666)) == -1) {
perror("msgget error");
exit(1);
}

if ((ret = msgrcv(msqid, &msg, MTEXT_SIZE, 42, 0)) == -1) {
perror("msgrcv error");
exit(2);
}
printf("msgrcv returned: %d\n\t%s\n", ret, msg.mtext);

return 0;
}

注意

消息队列不会在程序退出后自动删除,需要在程序中使用msgctl()进行删除(cmd参数使用
IPC_RMID),或者使用命令ipcs -q查看,使用ipcrm -q <id>删除。其中参数-q
示对消息队列进行查看或删除,这些命令还可以对信号量和共享内存进行管理。

0%