Message Queues

Message Queues give you another way to send information from one task to another. There is overlap in Message Queue capabilities and Named Pipes. However, there are some substantial differences:

  1. A Named pipe is byte oriented, which means that reading a Named Pipe from 2 different processes is problematic. However, Message Queues are packet oriented. Reading a Message Queue from 2 different processes is an excellent usage of Message Queues. In effect, you can have N identical worker tasks that pull request from a Message Queue.
  2. Message Queues allow you to take packets out of order in some cases. Each message has a message type associated with it. A Message Queue reader can specify which type of message that it will read. Or it can say that it will read all messages in order.
  3. It is quite possible to have any number of Msg Queue readers, or writers. In fact the same process can be both a writer and a reader.
  4. One real bad characteristic: Msg Queues are based on a system buffer resource. It is possible for one process to let its messages pile up. This can result in all processes on a given system to be hung up because the system is out of resources. This is bad news when it happens.
  5. There is a limit to the size of each packet and there is a limit to the total number of bytes that can show up in any given Message Queue.

Setting Up a Message Queue Application

You acquire a message queue resource by making a call that looks like:
msgid = msgget(key, flags);

Many message queue concepts are the same as semaphores and shared memory. For example:

  1. the key must be unique on a given host
  2. the key can be IPC_PRIVATE if you want shared memory that isn't associated with any key.
  3. The flags include permissions and can optionally contain IPC_CREAT and IPC_EXCL. IPC_CREAT creates a message queue if it doesn't already exist. If IPC_EXCL is included with IPC_CREAT then it is considered a failure if the message queue does already exist. Some valid flag combinations are:
    0660
    0660 | IPC_CREAT
    0600 | IPC_CREAT | IPC_EXCL
  4. A negative return code indicates failure and you should check errno (using perror or strerror).
  5. Normally, your program should free up any message queue that it allocates. However, if your program crashes, your message queue doesn't automatically get freed up. It stays around and can be seen with the ipcs command.
  6. To get rid of the message queue resource from your program, code the following:

    msgctl(msgid, IPC_RMID, 0);

  7. If your program crashes or doesn't free up the message queue, you can free up your message queue with a command like:
    ipcrm msg xxxxx
  8. The script below will destroy all of your message queues:

remove_msg

#!/bin/csh -f
set l = `ipcs -q | grep "$USER"| cut -c12-19`
foreach s ( $l )
    echo $s
    ipcrm msg $s 
end
if ($#l != 0 ) echo $#l shared memory\(s\) for $user removed

Receiving from a Message Queue

To receive from a message queue, you use a calling sequence like the following:

retcode =
    msgrcv(msgid, Buffer_pointer, Buffer_size, msg_to_receive, flags)

msgid is the message Queue identifier returned from the msgget routine.

The Buffer_pointer and Buffer_size are self explainatory

The msg_to_receive indicates what kind of messages to receive. A zero for this parameter indicates all messages are OK to receive. A value > 0 indicates to only receive messages of this type. In this case, the call will only return messages with a a matching msg_to_receive type --- more about this on the msgsend primitive.

If the flags are zero, then the receiver will block if there is nothing to receive in the message queue. All other options are OR'ed in as masking flags:

IPC_NOWAIT indicates that this operation should return an error if there is no packet to receive. In this case we should get a ENOMSG error return code.

MSG_NOERROR indicates that the message should be truncated if necessary to fit the receiving buffer. The error E2BIG is returned when this option is used and the message cannot fit into the buffer.


Starting msg1 with no parameters starts a message queue reader that takes all messages (i.e. msg_to_receive = 0).

Specifying a parameter allows you to set msg_to_receive to some integer.

 

msg1.c

/* Here's the receiver program. */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>


struct my_msg_st {
    long int my_msg_type;
    char some_text[BUFSIZ];
}; 

int main(int argc, char * argv[])
{
    int running = 1;
    int msgid;
    struct my_msg_st some_data;
    long int msg_to_receive = 0;
    int mykey = getuid();

    if (argc > 1) 
    {
        msg_to_receive = atoi(argv[1]);
        printf("I will only receive messages of type: %d\n", 
            msg_to_receive);
    }
/* First, we set up the message queue. */

    msgid = msgget((key_t)mykey, 0666 | IPC_CREAT);

    if (msgid == -1) {
        fprintf(stderr, "msgget failed with error: %s\n", 
            strerror(errno));
        exit(EXIT_FAILURE);
    }

/* Then the messages are retrieved from the queue, until an end message is encountered.
 Lastly, the message queue is deleted. */

    while(running) {
        if (msgrcv(msgid, (void *)&some_data, BUFSIZ,
                   msg_to_receive, 0) == -1) {
            fprintf(stderr, "msgrcv failed with error: %s\n", 
                    strerror(errno));
            exit(EXIT_FAILURE);
        }
        printf("You wrote: %s", some_data.some_text);
        if (strncmp(some_data.some_text, "end", 3) == 0) {
            running = 0;
        }
    }

    if (msgctl(msgid, IPC_RMID, 0) == -1) {
        fprintf(stderr, "msgctl(IPC_RMID) failed\n");
        exit(EXIT_FAILURE);
    }

    exit(EXIT_SUCCESS);
}



Sending Message Queue Messages

Sending messages is a little confusing.

You need a structure that starts out with a long integer followed by the buffer you want to send. In the example below we have the structure:

  struct my_msg_st { 
    long int my_msg_type; // message type
    char some_text[MAX_TEXT]; // data we need to send
    }; 
  
The call to send data looks like:
msgsnd(msgid, pointer_to_my_msg_st, len , flags);

The confusing part is that the pointer is to the beginning of the structure (i.e. my_msg_st), but the length specified in the next parameter only includes the length of the data in some_text (include null termination for strings).

If flags is zero, then the above call is blocking until a good message is sent.

If flags includes the IPC_NOWAIT flag then an error code is returned if the message queue is currently full. In this case you would get an EAGAIN error message.

msg2.c

/* The sender program is very similar to msg1.c. In the main set up, delete the
 msg_to_receive declaration and replace it with buffer[BUFSIZ], remove the message
 queue delete and make the following changes to the running loop.
 We now have a call to msgsnd to send the entered text to the queue. */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MAX_TEXT 512

struct my_msg_st {
    long int my_msg_type;
    char some_text[MAX_TEXT];
};

int main()
{
    int running = 1;
    struct my_msg_st some_data;
    int msgid, len;
    char buffer[BUFSIZ];
    int mykey = getuid();
    printf("mykey = %d\n", mykey);


    msgid = msgget((key_t)mykey, 0666 | IPC_CREAT);

    if (msgid == -1) {
        fprintf(stderr, "msgget failed with error: %d\n", errno);
        exit(EXIT_FAILURE);
    }

    
    some_data.my_msg_type = 1;

    while(running) {
        printf("Enter some text or :msg_type ");
        fgets(buffer, BUFSIZ, stdin);
        strcpy(some_data.some_text, buffer);
        if (buffer[0] == ':')
        {
            printf("Change msg_type from: %d\n", 
                some_data.my_msg_type);
            some_data.my_msg_type = atoi(&buffer[1]);
            printf("    to msg_type: %d\n", 
                some_data.my_msg_type);
            continue; 
        }
        len = strlen(some_data.some_text) +1;
        if (msgsnd(msgid, (void *)&some_data, len , 0) == -1) {
            fprintf(stderr, "msgsnd failed\n");
            exit(EXIT_FAILURE);
        }
        if (strncmp(buffer, "end", 3) == 0) {
            running = 0;
        }
    }

    exit(EXIT_SUCCESS);
}

Things to tryout

Try running msg2 without msg1 and send some messages. Then do an ipcs to see how many messages and the number of bytes stored.

Also do an ipcs -l to see the system limits on message queues.

Try running the above example with multiple msg1 programs.

Try running an instance of msg1 receiving message type 2 and another msg1 program receiving message type 3 messages. Then from a msg2 program send messages to both programs.


Message Queue Properties

Message Queues have a size limitations. If you want to see the limitations on a Linux machine, issue the command:
ipcs -l

How do you know if your message queue is filling up. A regular ipcs can tell you the current number of messages and the total number of bytes you have in the message queue. But if you are in your program, you can also find out with an msgctl call.

The following call will return information about your message queue:
struct msqid_ds info;
msgctl(msgid, IPC_STAT, &info);

The info datastructure has a number of useful fields. Some of the more useful are:
msg_cbytes - current # of bytes on Q
msg_qnum - current # of messages on Q
msg_qbytes - max # of bytes on Q

On some systems (Linux only permits this to superusers) you can change msg_qbytes, by filling in the msqid_ds structure and issuing a command like:
msgctl(msgid, IPC_SET, &info);

To demo this, we are going modify msg2.c above to create msg_props.c shown below. If you run this program without running msg1.c to flush the message queue, then you can see the queue pile up. Note the following changes:

if you type in text that looks like:
#200:abcde

You will place 200 messages into the queue with this text in it. You can also see the number of messages with ipcs.

The msgsend call has the IPC_NOWAIT flag set. This allows us to fill up the queue without blocking. We will get an error when the queue fills up, but we won't suspend.

 

msg_props.c

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MAX_TEXT 512

struct my_msg_st {
    long int my_msg_type;
    char some_text[MAX_TEXT];
};

int main()
{
    struct msqid_ds info;
    int running = 1;
    struct my_msg_st some_data;
    int msgid, len, repeat, count;
    char buffer[BUFSIZ];
    int mykey = getuid();
    printf("mykey = %d\n", mykey);


    msgid = msgget((key_t)mykey, 0666 | IPC_CREAT);

    if (msgid == -1) {
        fprintf(stderr, "msgget failed with error: %d\n", errno);
        exit(EXIT_FAILURE);
    }

    // The following can only be done as superuser on Linux --- other
    // UNIX machines allow this 
    
    if (msgctl(msgid, IPC_STAT, &info))
            perror("msgctl IPC_STAT error ");
        
    printf("Message Q *Permission* structure information\n");
    printf("Owners's user ID \t%d\n", info.msg_perm.uid);
    printf("Owner's group ID \t%d\n", info.msg_perm.gid);
    printf("Creator's user ID \t%d\n", info.msg_perm.cuid);
    printf("Creator's group ID\t%d\n", info.msg_perm.cgid);
    printf("Access mode in HEX \t%06X\n", info.msg_perm.mode);
    printf("\n\nAdditional Selected Message Queue Structure Information\n");
    printf("Current # of bytes on queue\t %d\n", info.msg_cbytes);
    printf("Current # of messages on queue\t %d\n", info.msg_qnum);
    printf("Maximum # of bytes on queue\t %d\n", info.msg_qbytes);

// This only works if you have superuser priviledges
    info.msg_qbytes = 32000;
    if (msgctl(msgid, IPC_SET, &info))
            perror("msgctl IPC_SET error ");

    
    some_data.my_msg_type = 1;

    while(running) {
        printf("Enter some text or :msg_type ");
        fgets(buffer, BUFSIZ, stdin);
        strcpy(some_data.some_text, buffer);
        repeat = 1;
        if (buffer[0] == '#')
        {
            repeat = atoi(&buffer[1]);
        }
        len = strlen(some_data.some_text) +1;
       
        for (count = 0; count < repeat; count++)
        {
            if (msgsnd(msgid, (void *)&some_data, len , IPC_NOWAIT) == -1) {
                perror("msgsnd failed ");
                break;
            }
        }
        if (strncmp(buffer, "end", 3) == 0) {
            running = 0;
        }
        

        if (msgctl(msgid, IPC_STAT, &info))
            perror("msgctl error ");
            
        printf("Current # of bytes on queue\t %d\n", info.msg_cbytes);
        printf("Current # of messages on queue\t %d\n", info.msg_qnum);
        printf("Maximum # of bytes on queue\t %d\n", info.msg_qbytes);

    }

    exit(EXIT_SUCCESS);
}


 


Exercises

See if you can write a client and an echo server using a single message queue. Note that you can use a message type of say 1 for messages sent from the client to the server. You then use a message type of 2 for responses sent from the server back to the client.