Message Queues give you another way to send information from one task to another. There is overlap in Message Queue capabilities and Named Pipes. However, there are some substantial differences:
You acquire a message queue resource by making a call that looks like:
msgid = msgget(key, flags);
Many message queue concepts are the same as semaphores and shared memory. For example:
To get rid of the message queue resource from your program, code the following:
msgctl(msgid, IPC_RMID, 0);
#!/bin/csh -f set l = `ipcs -q | grep "$USER"| cut -c12-19` foreach s ( $l ) echo $s ipcrm msg $s end if ($#l != 0 ) echo $#l shared memory\(s\) for $user removed
To receive from a message queue, you use a calling sequence like the following:
msgrcv(msgid, Buffer_pointer, Buffer_size, msg_to_receive,
flags)
msgid is the message Queue identifier returned from the msgget routine.
The Buffer_pointer and Buffer_size are self explainatory
The msg_to_receive indicates what kind of messages to receive. A zero for this parameter indicates all messages are OK to receive. A value > 0 indicates to only receive messages of this type. In this case, the call will only return messages with a a matching msg_to_receive type --- more about this on the msgsend primitive.
If the flags are zero, then the receiver will block if there is nothing to receive in the message queue. All other options are OR'ed in as masking flags:
IPC_NOWAIT indicates that this operation should return an error if there is no packet to receive. In this case we should get a ENOMSG error return code.
MSG_NOERROR indicates that the message should be truncated if necessary to fit the receiving buffer. The error E2BIG is returned when this option is used and the message cannot fit into the buffer.
Starting msg1 with no parameters starts a message queue reader that takes all messages (i.e. msg_to_receive = 0).
Specifying a parameter allows you to set msg_to_receive to some integer.
/* Here's the receiver program. */ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> struct my_msg_st { long int my_msg_type; char some_text[BUFSIZ]; }; int main(int argc, char * argv[]) { int running = 1; int msgid; struct my_msg_st some_data; long int msg_to_receive = 0; int mykey = getuid(); if (argc > 1) { msg_to_receive = atoi(argv[1]); printf("I will only receive messages of type: %d\n", msg_to_receive); } /* First, we set up the message queue. */ msgid = msgget((key_t)mykey, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } /* Then the messages are retrieved from the queue, until an end message is encountered. Lastly, the message queue is deleted. */ while(running) { if (msgrcv(msgid, (void *)&some_data, BUFSIZ, msg_to_receive, 0) == -1) { fprintf(stderr, "msgrcv failed with error: %s\n", strerror(errno)); exit(EXIT_FAILURE); } printf("You wrote: %s", some_data.some_text); if (strncmp(some_data.some_text, "end", 3) == 0) { running = 0; } } if (msgctl(msgid, IPC_RMID, 0) == -1) { fprintf(stderr, "msgctl(IPC_RMID) failed\n"); exit(EXIT_FAILURE); } exit(EXIT_SUCCESS); }
Sending messages is a little confusing.
You need a structure that starts out with a long integer followed by the buffer
you want to send. In the example below we have the structure:
struct my_msg_st {
long int my_msg_type; // message type
char some_text[MAX_TEXT]; // data we need to send
};
The call to send data looks like: The confusing part is that the pointer is to the beginning of the structure (i.e. my_msg_st), but the length specified in the next parameter only includes the length of the data in some_text (include null termination for strings).
If flags is zero, then the above call is blocking until a good message is sent.
If flags includes the IPC_NOWAIT flag then an error code is returned if the message queue is currently full. In this case you would get an EAGAIN error message.
/* The sender program is very similar to msg1.c. In the main set up, delete the msg_to_receive declaration and replace it with buffer[BUFSIZ], remove the message queue delete and make the following changes to the running loop. We now have a call to msgsnd to send the entered text to the queue. */ #include <stdlib.h> #include <stdio.h> #include <string.h> #include <errno.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #define MAX_TEXT 512 struct my_msg_st { long int my_msg_type; char some_text[MAX_TEXT]; }; int main() { int running = 1; struct my_msg_st some_data; int msgid, len; char buffer[BUFSIZ]; int mykey = getuid(); printf("mykey = %d\n", mykey); msgid = msgget((key_t)mykey, 0666 | IPC_CREAT); if (msgid == -1) { fprintf(stderr, "msgget failed with error: %d\n", errno); exit(EXIT_FAILURE); } some_data.my_msg_type = 1; while(running) { printf("Enter some text or :msg_type "); fgets(buffer, BUFSIZ, stdin); strcpy(some_data.some_text, buffer); if (buffer[0] == ':') { printf("Change msg_type from: %d\n", some_data.my_msg_type); some_data.my_msg_type = atoi(&buffer[1]); printf(" to msg_type: %d\n", some_data.my_msg_type); continue; } len = strlen(some_data.some_text) +1; if (msgsnd(msgid, (void *)&some_data, len , 0) == -1) { fprintf(stderr, "msgsnd failed\n"); exit(EXIT_FAILURE); } if (strncmp(buffer, "end", 3) == 0) { running = 0; } } exit(EXIT_SUCCESS); }
Try running msg2 without msg1 and send some messages. Then do an ipcs to see how many messages and the number of bytes stored.
Also do an ipcs -l to see the system limits on message queues.
Try running the above example with multiple msg1 programs.
Try running an instance of msg1 receiving message type 2 and another msg1 program receiving message type 3 messages. Then from a msg2 program send messages to both programs.
Message Queues have a size limitations. If you want to see the limitations
on a Linux machine, issue the command:
ipcs -l
How do you know if your message queue is filling up. A regular ipcs can tell you the current number of messages and the total number of bytes you have in the message queue. But if you are in your program, you can also find out with an msgctl call.
The following call will return information about your message queue:
struct msqid_ds info;
msgctl(msgid, IPC_STAT, &info);
The info datastructure has a number of useful fields. Some of the more useful
are:
msg_cbytes - current # of bytes on Q
msg_qnum - current # of messages on Q
msg_qbytes - max # of bytes on Q
On some systems (Linux only permits this to superusers) you can change msg_qbytes,
by filling in the msqid_ds structure and issuing
a command like:
msgctl(msgid, IPC_SET, &info);
To demo this, we are going modify msg2.c above to create msg_props.c shown below. If you run this program without running msg1.c to flush the message queue, then you can see the queue pile up. Note the following changes:
if you type in text that looks like:
#200:abcde
You will place 200 messages into the queue with this text in it. You can also see the number of messages with ipcs.
The msgsend call has the IPC_NOWAIT flag set. This allows us to fill up the queue without blocking. We will get an error when the queue fills up, but we won't suspend.
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MAX_TEXT 512
struct my_msg_st {
long int my_msg_type;
char some_text[MAX_TEXT];
};
int main()
{
struct msqid_ds info;
int running = 1;
struct my_msg_st some_data;
int msgid, len, repeat, count;
char buffer[BUFSIZ];
int mykey = getuid();
printf("mykey = %d\n", mykey);
msgid = msgget((key_t)mykey, 0666 | IPC_CREAT);
if (msgid == -1) {
fprintf(stderr, "msgget failed with error: %d\n", errno);
exit(EXIT_FAILURE);
}
// The following can only be done as superuser on Linux --- other
// UNIX machines allow this
if (msgctl(msgid, IPC_STAT, &info))
perror("msgctl IPC_STAT error ");
printf("Message Q *Permission* structure information\n");
printf("Owners's user ID \t%d\n", info.msg_perm.uid);
printf("Owner's group ID \t%d\n", info.msg_perm.gid);
printf("Creator's user ID \t%d\n", info.msg_perm.cuid);
printf("Creator's group ID\t%d\n", info.msg_perm.cgid);
printf("Access mode in HEX \t%06X\n", info.msg_perm.mode);
printf("\n\nAdditional Selected Message Queue Structure Information\n");
printf("Current # of bytes on queue\t %d\n", info.msg_cbytes);
printf("Current # of messages on queue\t %d\n", info.msg_qnum);
printf("Maximum # of bytes on queue\t %d\n", info.msg_qbytes);
// This only works if you have superuser priviledges
info.msg_qbytes = 32000;
if (msgctl(msgid, IPC_SET, &info))
perror("msgctl IPC_SET error ");
some_data.my_msg_type = 1;
while(running) {
printf("Enter some text or :msg_type ");
fgets(buffer, BUFSIZ, stdin);
strcpy(some_data.some_text, buffer);
repeat = 1;
if (buffer[0] == '#')
{
repeat = atoi(&buffer[1]);
}
len = strlen(some_data.some_text) +1;
for (count = 0; count < repeat; count++)
{
if (msgsnd(msgid, (void *)&some_data, len , IPC_NOWAIT) == -1) {
perror("msgsnd failed ");
break;
}
}
if (strncmp(buffer, "end", 3) == 0) {
running = 0;
}
if (msgctl(msgid, IPC_STAT, &info))
perror("msgctl error ");
printf("Current # of bytes on queue\t %d\n", info.msg_cbytes);
printf("Current # of messages on queue\t %d\n", info.msg_qnum);
printf("Maximum # of bytes on queue\t %d\n", info.msg_qbytes);
}
exit(EXIT_SUCCESS);
}
See if you can write a client and an echo server using a single message queue. Note that you can use a message type of say 1 for messages sent from the client to the server. You then use a message type of 2 for responses sent from the server back to the client.