[Contents] [Prev. Chapter] [Next Section] [Next Chapter] [Index] [Help]

10    Messages

Message queues work by exchanging data in buffers. Any number of processes can communicate through message queues, regardless of whether they are related; if a process has adequate access permission, it can send or receive messages through the queue. Message notification can be synchronous or asynchronous. Message queues can store multiple messages, be accessed by multiple processes, be read in any order, and be prioritized according to application needs.

This chapter includes the following sections:


[Contents] [Prev. Chapter] [Next Section] [Next Chapter] [Index] [Help]

10.1    Message Queues

The POSIX 1003.1b message passing facilities provide a deterministic, efficient means for interprocess communication (IPC). Realtime message passing is designed to work with shared memory in order to accommodate the needs of realtime applications with an efficient, deterministic mechanism to pass arbitrary amounts of data between cooperating processes. Predictability is the primary emphasis behind the design for realtime message passing.

Cooperating processes can send and receive messages by accessing system-wide message queues. These message queues are accessed through names that may be pathnames.

The maximum size of each message is defined by the system to optimize the message sending and receiving functions. Message buffers are preallocated, ensuring the availability of resources when they are needed.

If your application involves heavy message traffic, you can prioritize the order in which processes receive messages by assigning a priority to the message or controlling the priority of the receiving process.

Asynchronous notification of the availability of a message on a queue allows a process to do useful work while waiting to receive a message.

Message passing operations that contribute to kernel overhead have been eliminated in the realtime message queue interface. If your application requires the ability to wait on multiple message queues simultaneously or the broadcast of a single message to multiple queues, you may need to write this functionality into your application.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2    The Message Interface

The message queue interface is a set of structures and data that allows you to use a message queue for sending and receiving messages. The message queue is a linked list that serves as a holding place for messages being sent to and received by processes sharing access to the message queue.

The following POSIX 1003.1b message queue functions allow you controlled access to messaging operations on a message queue:

Function Description
mq_close Closes a message queue
mq_getattr Retrieves the attributes of a message queue
mq_notify Requests that a process be notified when a message is available on a queue
mq_open Opens a message queue
mq_receive Receives a message from a queue
mq_send Sends a message to a queue
mq_setattr Sets the attributes of a message queue
mq_unlink Removes a message queue

General usage for message queues is as follows:

  1. Get a message queue descriptor with a call to the mq_open function.

  2. Send and receive messages with calls to the mq_send and mq_receive functions.

  3. Close the message queue with a call to the mq_close function.

  4. Remove the message queue with a call to the mq_unlink function.

Data written to a message queue created by one process is available to all processes that open the same message queue. Message queues are persistent; once unlinked, their names and contents remain until all processes that have opened the queue call the mq_close function. Child processes inherit the message queue descriptor created by the parent process. Once the message queue is opened, the child process can read or write to it according to access permissions. Unrelated processes can also use the message queue, but must first call the mq_open function to establish the connection.

You can identify message queue attributes with a call to the mq_getattr function. You can specify whether the message operation is blocking or non-blocking by calling the mq_setattr function.

A call to the mq_receive function receives the oldest, highest-priority message on the queue. If two or more processes are waiting for an incoming message on the same queue, the process with the highest priority that has been waiting the longest receives the next message.

Often message queues are created and used only while an application is executing. The mq_unlink function removes (deletes) the message queue and its contents, unless processes still have the queue open. The message queue is deleted only when all processes using it have closed the queue.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.1    Opening a Message Queue

To set up a message queue, first create a new message queue or open an existing queue using the mq_open function. If a message queue of the specified name does not already exist, a new message queue is allocated and initialized. If one already exists, the mq_open function checks permissions.

A process can create and open message queues early in the life of the application. Use the mq_open function to open (establish a connection to) a message queue. After a process opens the message queue, each process that needs to use it must call the mq_open function specifying the same pathname.

The mq_open function provides a set of flags that prescribe the characteristics of the message queue for the process and define access modes for the message queue. Message queue access is determined by the OR of the file status flags and access modes listed in Table 10-1.

Table 10-1:  Status Flags and Access Modes for the mq_open Function

Flag Description
O_RDONLY Open for read access only
O_WRONLY Open for write access only
O_RDWR Open for read and write access
O_CREAT Create the message queue, if it does not already exist
O_EXCL When used with O_CREAT, creates and opens a message queue if a queue of the same name does not already exist. If a message queue of the same name exists, the message queue is not opened.
O_NONBLOCK Determines whether a send or receive operation is blocking or nonblocking

The first process to call the mq_open function should use the O_CREAT flag to create the message queue, to set the queue's user ID to that of the calling process, and to set the queue's group ID to the effective group ID of the calling process. This establishes an environment whereby the calling process, all cooperating processes, and child processes share the same effective group ID with the message queue. All processes that subsequently open the message queue must have the same access permission as the creating process.

Each process that uses a message queue must begin by calling the mq_open function. This call can accomplish several objectives:

The mode bit is checked to determine if the caller has permission for the requested operation. If the calling process is not the owner and is not in the group, the mode bits must be set for world access before permission is granted. In addition, the appropriate access bits must be set before an operation is performed. That is, to perform a read operation, the read bit must be set.

For example, the following code creates a message queue and, if it does not already exist, opens it for read and write access.

fd = mq_open("new_queue", (O_CREAT|O_EXCL|O_RDWR);

Once a message queue is created, its name and resources are persistent. It exists until the message queue is unlinked with a call to the mq_unlink function and all other references to the queue are gone.

The message flag parameter is either 0 or O_NONBLOCK. If you specify a flag of 0, then a sending process sleeps if the message cannot be sent to the specified queue, due to the queue being full. The process will sleep until other messages have been removed from the queue and space becomes available. When the flag is specified as O_NONBLOCK, the mq_send function returns immediately with an error status.

Example 10-1 shows the code sequence to establish a connection to a message queue descriptor.

Example 10-1:  Opening a Message Queue

#include <unistd.h>
#include <sys/types.h>
#include <mqueue.h>
#include <fcntl.h>

main ()
      int md;
      int status;

/* Create message queue */

      md = mq_open ("my_queue", O_CREAT|O_RDWR);

/*
 * code to close and unlink the message queue goes here
 */
      status = mq_close(md);          /* Close message queue  */
      status = mq_unlink("my_queue"); /* Unlink message queue */

Use the same access permissions that you would normally use on a call to the file open function. If you intend to only read the queue, specify read permission only on the mq_open function. If you intend to read and write to the queue, open the queue with both read and write permissions.

When finished using a message queue, close the queue with the mq_close function, and remove the queue by calling the mq_unlink function.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.2    Sending and Receiving Messages

For an application in which the intended recipients of messages might be ambiguous because they all use a single message queue, you can establish multiple queues. In some cases you may need to provide a separate queue for each process that receives a message. Two processes that carry on two-way communication between them normally require two message queues:

Use of a single queue by multiple processes could be appropriate for an application that collects and processes data. Consider an application that consists of five processes that monitor data points and a sixth process that accumulates and interprets the data. Each of the five monitoring processes could send information to a single message queue. The sixth process could receive the messages from the queue, with assurance that it is receiving information according to the specified priorities of the incoming messages, in first-in first-out order within each priority.

When a process receives a message from a queue, it removes that message from the queue. Therefore, an application that requires one process to send the same message to several other processes should choose one of the following communication methods:

Once a message queue is open, you can send messages to another process using the mq_send function. The mq_send function takes four parameters, including: the message queue descriptor, a pointer to a message buffer, the size of the buffer, and the message priority. The read/write permissions are checked along with the length of the message, the status of the message queue, and the message flag. If all checks are successful, the message is added to the message queue. If the queue is already full, the sending process can block until space in the queue becomes available, or it can return immediately, according to whether it set the O_NONBLOCK flag when it called the mq_open function.

Once a message has been placed on a queue, you can retrieve the message with a call to the mq_receive function. The mq_receive function includes four parameters: the message queue descriptor, a pointer to a buffer to hold the incoming message, the size of the buffer, and the priority of the message received (the priority is returned by the function). The size of the buffer must be at least the size of the message queue's size attribute.

As with the mq_send function, the read/write operation permissions are checked on a call to the mq_receive function. If more than one process is waiting to receive a message when a message arrives at an empty queue, then the process with the highest priority that has been waiting the longest is selected to receive the message.

When a process uses the mq_receive function to read a message from a queue, the queue may be empty. The receiving process can block until a message arrives in the queue, or it can return immediately, according to the state of the O_NONBLOCK flag established with a preceding call to the mq_open function.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.3    Asynchronous Notification of Messages

A process that wants to read a message from a message queue has three options:

The last option is a good choice for a realtime application. The mq_notify function is used to register a request for asynchronous notification by a signal when a message becomes available on a previously empty queue. The process can then do useful work until a message arrives, at which time a signal is sent according to the signal information specified in the notification argument of the mq_notify function. After notification, the process can call mq_receive to receive the message.

Only one notification request at a time is allowed per message queue descriptor. The previous notification request is canceled when another signal is sent; thus, the request must be re-registered by calling mq_notify again.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.4    Prioritizing Messages

A process can control the relative priority of messages it sends to a specified queue by setting the msg_prio parameter in the mq_send function.

If msg_prio is specified on the mq_send function, the message is inserted into the message queue according to its priority relative to other messages on the queue. A message with a larger numeric value (higher priority) is inserted into the queue before messages with a lower numeric value. The mq_receive function always returns the first message on the queue, so if you assign higher priorities to messages of higher importance, you can receive the most important messages first. If you assign lower priorities to less important messages, you can delay delivery of the messages as more important messages are sent. Messages of equal priority are inserted in a first-in, first-out manner. The ability to assign priorities to messages on the queue reduces the possibility of priority inversion in the realtime messaging interface.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.5    Using Message Queue Attributes

Use the mq_getattr function to determine the message queue attributes of an existing message queue. The attributes are as follows:

Attribute Description
mq_flags The message queue flags
mq_maxmsg The maximum number of messages allowed
mq_msgsize The maximum message size allowed for the queue
mq_curmsgs The number of messages on the queue

The mq_curmsgs attribute describes the current queue status. If necessary, call the mq_setattr function to reset the flags. The mq_maxmsg and mq_msgsize attributes cannot be modified after the initial queue creation. The mqueue.h header file contains information concerning system-wide maximums and other limits pertaining to message queues.


[Contents] [Prev. Chapter] [Prev. Section] [Next Section] [Next Chapter] [Index] [Help]

10.2.6    Closing and Removing a Message Queue

Each process that uses a message queue should close its access to the queue by calling the mq_close function before exiting. When all processes using the queue have called this function, the software removes the queue.

A process can remove a message queue by calling the mq_unlink function. However, if other processes still have the message queue open, the mq_unlink function returns immediately and destruction of the queue is postponed until all references to the queue have been closed.


[Contents] [Prev. Chapter] [Prev. Section] [Next Chapter] [Index] [Help]

10.3    Message Queue Examples

Example 10-2 creates a message queue and sends a loop of messages. The message queue is created using O_CREAT.

Example 10-2:  Using Message Queues to Send Data

/*
 * test_send.c
 *
 * This test goes with test_receive.c.
 * test_send.c does a loop of mq_sends,
 * and test_receive.c does a loop of mq_receives.
 */
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <time.h>
#include <sched.h>
#include <sys/mman.h>
#include <sys/fcntl.h>
#include <signal.h>
#include <sys/rt_syscall.h>
#include <mqueue.h>
#include <errno.h>

#define PMODE 0666
extern int errno;

int main()
{
int i;
int status = 0;
mqd_t mqfd;
char msg_buffer[P4IPC_MSGSIZE];
struct mq_attr attr;
int open_flags = 0;
int num_bytes_to_send;
int priority_of_msg;

printf("START OF TEST_SEND \n");

/* Fill in attributes for message queue */
attr.mq_maxmsg = 20;
attr.mq_msgsize = P4IPC_MSGSIZE;
attr.mq_flags   = 0;

/* Set the flags for the open of the queue.
 * Make it a blocking open on the queue, meaning it will block if
 * this process tries to send to the queue and the queue is full.
 * (Absence of O_NONBLOCK flag implies that the open is blocking)
 *
 * Specify O_CREAT so that the file will get created if it does not
 * already exist.
 *
 * Specify O_WRONLY since we are only planning to write to the queue,
 * although we could specify O_RDWR also.
 */
open_flags = O_WRONLY|O_CREAT;

/* Open the queue, and create it if the receiving process hasn't
 * already created it.
 */
mqfd = mq_open("myipc",open_flags,PMODE,&attr);
if (mqfd == -1)
    {
    perror("mq_open failure from main");
    exit(0);
    };

/* Fill in a test message buffer to send */
msg_buffer[0] = 'P';
msg_buffer[1] = 'R';
msg_buffer[2] = 'I';
msg_buffer[3] = 'O';
msg_buffer[4] = 'R';
msg_buffer[5] = 'I';
msg_buffer[6] = 'T';
msg_buffer[7] = 'Y';
msg_buffer[8] = '1';
msg_buffer[9] = 'a';

num_bytes_to_send = 10;
priority_of_msg = 1;

/* Perform the send 10 times */
for (i=0; i<10; i++)
    {
    status = mq_send(mqfd,msg_buffer,num_bytes_to_send,priority_of_msg);
    if (status == -1)
        perror("mq_send failure on mqfd");
    else
        printf("successful call to mq_send, i = %d\n",i);
    }

/* Done with queue, so close it */
if (mq_close(mqfd) == -1)
    perror("mq_close failure on mqfd");

printf("About to exit the sending process after closing the queue \n");
}

Example 10-3 creates a message queue and receives a loop of messages. The message queue is created using O_CREAT.

Example 10-3:  Using Message Queues to Receive Data

/*
 * test_receive.c
 *
 * This test goes with test_send.c.
 * test_send.c does a loop of mq_sends,
 * and test_receive.c does a loop of mq_receives.
 */
#include <unistd.h>
#include <sys/types.h>
#include <stdio.h>
#include <sys/time.h>
#include <sys/resource.h>
#include <time.h>
#include <sched.h>
#include <sys/mman.h>
#include <sys/fcntl.h>
#include <signal.h>
#include <sys/rt_syscall.h>
#include <mqueue.h>
#include <errno.h>

#define PMODE 0666
extern int errno;

int main()
{
int i;
mqd_t mqfd;
/* Buffer to receive msg into */
char msg_buffer[P4IPC_MSGSIZE];
struct mq_attr attr;
int open_flags = 0;
ssize_t num_bytes_received = 0;
msg_buffer[10] = 0; /* For printing a null terminated string for testing */

printf("START OF TEST_RECEIVE \n");

/* Fill in attributes for message queue */
attr.mq_maxmsg = 20;
attr.mq_msgsize = P4IPC_MSGSIZE;
attr.mq_flags   = 0;

/* Set the flags for the open of the queue.
 * Make it a blocking open on the queue,
 * meaning it will block if this process tries to
 * send to the queue and the queue is full.
 * (Absence of O_NONBLOCK flag implies that
 * the open is blocking)
 *
 * Specify O_CREAT so that the file will get
 * created if it does not already exist.
 *
 * Specify O_RDONLY since we are only
 * planning to write to the queue,
 * although we could specify O_RDWR also.
 */
open_flags = O_RDONLY|O_CREAT;

/* Open the queue, and create it if the sending process hasn't
 * already created it.
 */
mqfd = mq_open("myipc",open_flags,PMODE,&attr);
if (mqfd == -1)
    {
    perror("mq_open failure from main");
    exit(0);
    };

/* Perform the receive 10 times */
for (i=0;i<10;i++)
    {
    num_bytes_received = mq_receive(mqfd,msg_buffer,P4IPC_MSGSIZE,0);
    if (num_bytes_received == -1)
        {
        perror("mq_receive failure on mqfd");
        }
    else
        printf("data read for iteration %d = %s \n",i,msg_buffer);
    }

/* Done with queue, so close it */
if (mq_close(mqfd) == -1)
    perror("mq_close failure on mqfd");

/* Done with test, so unlink the queue,
 * which destroys it.
 * You only need one call to unlink.
 */
if (mq_unlink("myipc") == -1)
    perror("mq_unlink failure in test_ipc");

printf("Exiting receiving process after closing and unlinking queue \n");
}


[Contents] [Prev. Chapter] [Prev. Section] [Next Chapter] [Index] [Help]