Message queues work by exchanging data in buffers. Any number of processes can communicate through message queues, regardless of whether they are related; if a process has adequate access permission, it can send or receive messages through the queue. Message notification can be synchronous or asynchronous. Message queues can store multiple messages, be accessed by multiple processes, be read in any order, and be prioritized according to application needs.
This chapter includes the following sections:
Message Queues, Section 10.1
The Message Interface, Section 10.2
Message Queue Examples, Section 10.3
The POSIX 1003.1b message passing facilities provide a deterministic, efficient means for interprocess communication (IPC). Realtime message passing is designed to work with shared memory in order to accommodate the needs of realtime applications with an efficient, deterministic mechanism to pass arbitrary amounts of data between cooperating processes. Predictability is the primary emphasis behind the design for realtime message passing.
Cooperating processes can send and receive messages by accessing system-wide message queues. These message queues are accessed through names that may be pathnames.
The maximum size of each message is defined by the system to optimize the message sending and receiving functions. Message buffers are preallocated, ensuring the availability of resources when they are needed.
If your application involves heavy message traffic, you can prioritize the order in which processes receive messages by assigning a priority to the message or controlling the priority of the receiving process.
Asynchronous notification of the availability of a message on a queue allows a process to do useful work while waiting to receive a message.
Message passing operations that contribute to kernel overhead have been eliminated in the realtime message queue interface. If your application requires the ability to wait on multiple message queues simultaneously or the broadcast of a single message to multiple queues, you may need to write this functionality into your application.
The message queue interface is a set of structures and data that allows you to use a message queue for sending and receiving messages. The message queue is a linked list that serves as a holding place for messages being sent to and received by processes sharing access to the message queue.
The following POSIX 1003.1b message queue functions allow you controlled access to messaging operations on a message queue:
Function | Description |
mq_close |
Closes a message queue |
mq_getattr |
Retrieves the attributes of a message queue |
mq_notify |
Requests that a process be notified when a message is available on a queue |
mq_open |
Opens a message queue |
mq_receive |
Receives a message from a queue |
mq_send |
Sends a message to a queue |
mq_setattr |
Sets the attributes of a message queue |
mq_unlink |
Removes a message queue |
General usage for message queues is as follows:
Get a message queue descriptor with a call to the
mq_open
function.
Send and receive messages with calls to the
mq_send
and
mq_receive
functions.
Close the message queue with a call to the
mq_close
function.
Remove the message queue with a call to the
mq_unlink
function.
Data written to a message queue created by one process is available to all
processes that open the same message queue.
Message queues are persistent;
once unlinked, their names and contents remain until all processes that have
opened the queue call the
mq_close
function.
Child
processes inherit the message queue descriptor created by the parent
process.
Once the message queue is opened, the child process can read or
write to it according to access permissions.
Unrelated processes can also
use the message queue, but must first call the
mq_open
function to establish the connection.
You can identify message queue attributes with a call to the
mq_getattr
function.
You can specify whether the message
operation is blocking or non-blocking by calling the
mq_setattr
function.
A call to the
mq_receive
function receives the oldest,
highest-priority message on the queue.
If two or more processes are waiting
for an incoming message on the same queue, the process with the highest
priority that has been waiting the longest receives the next message.
Often message queues are created and used only while an application is
executing.
The
mq_unlink
function removes (deletes) the
message queue and its contents, unless processes still have the queue open.
The message queue is deleted only when all processes using it have closed
the queue.
To set up a message queue, first create a new message queue or open an
existing queue using the
mq_open
function.
If a message
queue of the specified name does not already exist, a new message queue is
allocated and initialized.
If one already exists, the
mq_open
function checks permissions.
A process can create and open message queues early in the life of the
application.
Use the
mq_open
function to open (establish
a connection to) a message queue.
After a process opens the message queue,
each process that needs to use it must call the
mq_open
function specifying the same pathname.
The
mq_open
function provides a set of flags that
prescribe the characteristics of the message queue for the process and
define access modes for the message queue.
Message queue access is
determined by the OR of the file status flags and access modes listed in
Table 10-1.
Flag | Description |
O_RDONLY | Open for read access only |
O_WRONLY | Open for write access only |
O_RDWR | Open for read and write access |
O_CREAT | Create the message queue, if it does not already exist |
O_EXCL | When used with O_CREAT, creates and opens a message queue if a queue of the same name does not already exist. If a message queue of the same name exists, the message queue is not opened. |
O_NONBLOCK | Determines whether a send or receive operation is blocking or nonblocking |
The first process to call the
mq_open
function should use
the O_CREAT flag to create the message queue, to set the queue's user ID to
that of the calling process, and to set the queue's group ID to the
effective group ID of the calling process.
This establishes an environment
whereby the calling process, all cooperating processes, and child processes
share the same effective group ID with the message queue.
All processes
that subsequently open the message queue must have the same access
permission as the creating process.
Each process that uses a message queue must begin by calling the
mq_open
function.
This call can accomplish several
objectives:
Create and open the message queue, if it does not yet exist (specify the O_CREAT flag).
Open an existing message queue.
Attempt to create and open the queue but fail if the queue already exists (specify both the O_CREAT and O_EXCL flags).
Open access to the queue for the calling process and establish a connection between the queue and a descriptor. All threads within the same process using the queue use the same descriptor.
Specify the access mode for the process:
Read only
Write only
Read/write
Specify whether the process will block or fail when unable to send a message (the queue is full) or receive a message (the queue is empty) with the oflags argument.
The mode bit is checked to determine if the caller has permission for the requested operation. If the calling process is not the owner and is not in the group, the mode bits must be set for world access before permission is granted. In addition, the appropriate access bits must be set before an operation is performed. That is, to perform a read operation, the read bit must be set.
For example, the following code creates a message queue and, if it does not already exist, opens it for read and write access.
fd = mq_open("new_queue", (O_CREAT|O_EXCL|O_RDWR);
Once a message queue is created, its name and resources are persistent.
It
exists until the message queue is unlinked with a call to the
mq_unlink
function and all other references to the queue
are gone.
The message flag parameter is either 0 or O_NONBLOCK.
If you specify a flag
of 0, then a sending process sleeps if the message cannot be sent to the
specified queue, due to the queue being full.
The process will sleep until
other messages have been removed from the queue and space becomes available.
When the flag is specified as O_NONBLOCK, the
mq_send
function returns immediately with an error status.
Example 10-1 shows the code sequence to establish a connection to a message queue descriptor.
#include <unistd.h> #include <sys/types.h> #include <mqueue.h> #include <fcntl.h> main () int md; int status; /* Create message queue */ md = mq_open ("my_queue", O_CREAT|O_RDWR); /* * code to close and unlink the message queue goes here */ status = mq_close(md); /* Close message queue */ status = mq_unlink("my_queue"); /* Unlink message queue */
Use the same access permissions that you would normally use on a call to the
file
open
function.
If you intend to only read the queue,
specify read permission only on the
mq_open
function.
If
you intend to read and write to the queue, open the queue with both read and
write permissions.
When finished using a message queue, close the queue with the
mq_close
function, and remove the queue by calling the
mq_unlink
function.
For an application in which the intended recipients of messages might be ambiguous because they all use a single message queue, you can establish multiple queues. In some cases you may need to provide a separate queue for each process that receives a message. Two processes that carry on two-way communication between them normally require two message queues:
Process X sends messages to queue A; process Y receives from it
Process Y sends messages to queue B; process X receives from it
Use of a single queue by multiple processes could be appropriate for an application that collects and processes data. Consider an application that consists of five processes that monitor data points and a sixth process that accumulates and interprets the data. Each of the five monitoring processes could send information to a single message queue. The sixth process could receive the messages from the queue, with assurance that it is receiving information according to the specified priorities of the incoming messages, in first-in first-out order within each priority.
When a process receives a message from a queue, it removes that message from the queue. Therefore, an application that requires one process to send the same message to several other processes should choose one of the following communication methods:
Set up a message queue for each receiving process, and send each message to each queue
Communicate by using signals and shared memory
Once a message queue is open, you can send messages to another process using
the
mq_send
function.
The
mq_send
function takes four parameters, including: the message queue descriptor, a
pointer to a message buffer, the size of the buffer, and the message
priority.
The read/write permissions are checked along with the length of
the message, the status of the message queue, and the message flag.
If all
checks are successful, the message is added to the message queue.
If the
queue is already full, the sending process can block until space in the
queue becomes available, or it can return immediately, according to whether
it set the O_NONBLOCK flag when it called the
mq_open
function.
Once a message has been placed on a queue, you can retrieve the message with
a call to the
mq_receive
function.
The
mq_receive
function includes four parameters: the message
queue descriptor, a pointer to a buffer to hold the incoming message, the
size of the buffer, and the priority of the message received (the priority
is returned by the function).
The size of the buffer must be at least the
size of the message queue's size attribute.
As with the
mq_send
function, the read/write operation
permissions are checked on a call to the
mq_receive
function.
If more than one process is waiting to receive a message when a
message arrives at an empty queue, then the process with the highest
priority that has been waiting the longest is selected to receive the
message.
When a process uses the
mq_receive
function to read a
message from a queue, the queue may be empty.
The receiving process can
block until a message arrives in the queue, or it can return immediately,
according to the state of the O_NONBLOCK flag established with a preceding
call to the
mq_open
function.
A process that wants to read a message from a message queue has three options:
Set the queue to blocking mode, and wait for a message to be received by
calling
mq_receive
Set the queue to non-blocking mode, and call
mq_receive
multiple times until a message is received
Set the queue to non-blocking mode, and call
mq_notify
specifying a signal to be sent when the queue
goes from empty to non-empty
The last option is a good choice for a realtime application.
The
mq_notify
function is used to register a request for
asynchronous notification by a signal when a message becomes available on a
previously empty queue.
The process can then do useful work until a message
arrives, at which time a signal is sent according to the signal information
specified in the
notification
argument of the
mq_notify
function.
After notification, the process can
call
mq_receive
to receive the message.
Only one notification request at a time is allowed per message queue
descriptor.
The previous notification request is canceled when another
signal is sent; thus, the request must be re-registered by calling
mq_notify
again.
A process can control the relative priority of messages it sends to a
specified queue by setting the
msg_prio
parameter
in the
mq_send
function.
If
msg_prio
is specified on the
mq_send
function, the message is inserted into the
message queue according to its priority relative to other messages on the
queue.
A message with a larger numeric value (higher priority) is inserted
into the queue before messages with a lower numeric value.
The
mq_receive
function always returns the first message on
the queue, so if you assign higher priorities to messages of higher
importance, you can receive the most important messages first.
If you assign
lower priorities to less important messages, you can delay delivery of the
messages as more important messages are sent.
Messages of equal priority are
inserted in a first-in, first-out manner.
The ability to assign priorities
to messages on the queue reduces the possibility of priority inversion in
the realtime messaging interface.
Use the
mq_getattr
function to determine the message
queue attributes of an existing message queue.
The attributes are as
follows:
Attribute | Description |
mq_flags | The message queue flags |
mq_maxmsg | The maximum number of messages allowed |
mq_msgsize | The maximum message size allowed for the queue |
mq_curmsgs | The number of messages on the queue |
The
mq_curmsgs
attribute describes the current
queue status.
If necessary, call the
mq_setattr
function to reset the flags.
The
mq_maxmsg
and
mq_msgsize
attributes cannot be modified after
the initial queue creation.
The
mqueue.h
header file
contains information concerning system-wide maximums and other limits
pertaining to message queues.
Each process that uses a message queue should close its access to the queue
by calling the
mq_close
function before exiting.
When
all processes using the queue have called this function, the software
removes the queue.
A process can remove a message queue by calling the
mq_unlink
function.
However, if other processes still
have the message queue open, the
mq_unlink
function
returns immediately and destruction of the queue is postponed until all
references to the queue have been closed.
Example 10-2 creates a message queue and sends a loop of messages. The message queue is created using O_CREAT.
/* * test_send.c * * This test goes with test_receive.c. * test_send.c does a loop of mq_sends, * and test_receive.c does a loop of mq_receives. */ #include <unistd.h> #include <sys/types.h> #include <stdio.h> #include <sys/time.h> #include <sys/resource.h> #include <time.h> #include <sched.h> #include <sys/mman.h> #include <sys/fcntl.h> #include <signal.h> #include <sys/rt_syscall.h> #include <mqueue.h> #include <errno.h> #define PMODE 0666 extern int errno; int main() { int i; int status = 0; mqd_t mqfd; char msg_buffer[P4IPC_MSGSIZE]; struct mq_attr attr; int open_flags = 0; int num_bytes_to_send; int priority_of_msg; printf("START OF TEST_SEND \n"); /* Fill in attributes for message queue */ attr.mq_maxmsg = 20; attr.mq_msgsize = P4IPC_MSGSIZE; attr.mq_flags = 0; /* Set the flags for the open of the queue. * Make it a blocking open on the queue, meaning it will block if * this process tries to send to the queue and the queue is full. * (Absence of O_NONBLOCK flag implies that the open is blocking) * * Specify O_CREAT so that the file will get created if it does not * already exist. * * Specify O_WRONLY since we are only planning to write to the queue, * although we could specify O_RDWR also. */ open_flags = O_WRONLY|O_CREAT; /* Open the queue, and create it if the receiving process hasn't * already created it. */ mqfd = mq_open("myipc",open_flags,PMODE,&attr); if (mqfd == -1) { perror("mq_open failure from main"); exit(0); }; /* Fill in a test message buffer to send */ msg_buffer[0] = 'P'; msg_buffer[1] = 'R'; msg_buffer[2] = 'I'; msg_buffer[3] = 'O'; msg_buffer[4] = 'R'; msg_buffer[5] = 'I'; msg_buffer[6] = 'T'; msg_buffer[7] = 'Y'; msg_buffer[8] = '1'; msg_buffer[9] = 'a'; num_bytes_to_send = 10; priority_of_msg = 1; /* Perform the send 10 times */ for (i=0; i<10; i++) { status = mq_send(mqfd,msg_buffer,num_bytes_to_send,priority_of_msg); if (status == -1) perror("mq_send failure on mqfd"); else printf("successful call to mq_send, i = %d\n",i); } /* Done with queue, so close it */ if (mq_close(mqfd) == -1) perror("mq_close failure on mqfd"); printf("About to exit the sending process after closing the queue \n"); }
Example 10-3 creates a message queue and receives a loop of messages. The message queue is created using O_CREAT.
/* * test_receive.c * * This test goes with test_send.c. * test_send.c does a loop of mq_sends, * and test_receive.c does a loop of mq_receives. */ #include <unistd.h> #include <sys/types.h> #include <stdio.h> #include <sys/time.h> #include <sys/resource.h> #include <time.h> #include <sched.h> #include <sys/mman.h> #include <sys/fcntl.h> #include <signal.h> #include <sys/rt_syscall.h> #include <mqueue.h> #include <errno.h> #define PMODE 0666 extern int errno; int main() { int i; mqd_t mqfd; /* Buffer to receive msg into */ char msg_buffer[P4IPC_MSGSIZE]; struct mq_attr attr; int open_flags = 0; ssize_t num_bytes_received = 0; msg_buffer[10] = 0; /* For printing a null terminated string for testing */ printf("START OF TEST_RECEIVE \n"); /* Fill in attributes for message queue */ attr.mq_maxmsg = 20; attr.mq_msgsize = P4IPC_MSGSIZE; attr.mq_flags = 0; /* Set the flags for the open of the queue. * Make it a blocking open on the queue, * meaning it will block if this process tries to * send to the queue and the queue is full. * (Absence of O_NONBLOCK flag implies that * the open is blocking) * * Specify O_CREAT so that the file will get * created if it does not already exist. * * Specify O_RDONLY since we are only * planning to write to the queue, * although we could specify O_RDWR also. */ open_flags = O_RDONLY|O_CREAT; /* Open the queue, and create it if the sending process hasn't * already created it. */ mqfd = mq_open("myipc",open_flags,PMODE,&attr); if (mqfd == -1) { perror("mq_open failure from main"); exit(0); }; /* Perform the receive 10 times */ for (i=0;i<10;i++) { num_bytes_received = mq_receive(mqfd,msg_buffer,P4IPC_MSGSIZE,0); if (num_bytes_received == -1) { perror("mq_receive failure on mqfd"); } else printf("data read for iteration %d = %s \n",i,msg_buffer); } /* Done with queue, so close it */ if (mq_close(mqfd) == -1) perror("mq_close failure on mqfd"); /* Done with test, so unlink the queue, * which destroys it. * You only need one call to unlink. */ if (mq_unlink("myipc") == -1) perror("mq_unlink failure in test_ipc"); printf("Exiting receiving process after closing and unlinking queue \n"); }