Basic Unix Queuing Techniques

}

January 31, 2014

It occasionally happens that our incoming or outgoing data cannot be processed as it is generated or, for some reason, we choose to process it at a later time.


A typical example might be a client-server system, where it is necessary to queue the socket descriptors of incoming connections because of some limit on the number of active processes, or a message hub, which accepts data synchronously, but must rely on other processes to remove the data asynchronously. Apart from the numerous commercially-available third party implementations of queuing systems, Unix has two highly efficient queuing mechanisms, which can be used for extremely low overhead systems of queues.

Kernel mode queues
The kernel uses queues internally for the implementation of functions such as device drivers, and the system call interface to this mechanism is available for the implementation of application programs. The queues so produced are implemented in memory, so they are very fast. However, because there is no permanent storage of the data, these queues are also non-persistent. This means that if the process or the machine crashes, all of the queued data will be lost, and all incoming data will never be enqueued.

User mode queues
In this section, we will concentrate on disk-based user mode queues. The kernel mode queuing system, which will be covered in an upcoming Advanced Queuing Article, is a bit limited, and it is sometimes more convenient to use the user mode queue library functions which offer a little more functionality, namely:

• Notification of message arrival, by sending a signal to the monitoring process.
• Prioritization of messages

There are only four fundamental commands to remember:

• mq _ open() – opens an existing queue, or creates anew queue
• mq _ send() – enqueues a message
• mq _ receive() – dequeues a message
• mq _ notify() – notifies a process of the arrival of a message
The remaining five commands perform housekeeping tasks:
• mq _ close() – closes a queue
• mq _ unlink() – deletes a queue from the disk
• mq _ getattr() – interrogates a queue’s characteristics
• mq _ setattr() – sets a queue’s characteristics

A single structure definition is used to set and get the queue’s attributes, and is defined as:

struct mq_attr {
long mq_flags /* message queue flags */
long mq_maxmsg /* maximum number of messages */
long mq_msgsize /* maximum message size */
long mq_curmsgs /* number of messages currently
queued */
};

The mq series of commands all relate to disk based queues. The queues themselves are created in the /tmp directory and are always referred to in the commands, as if they were situated below the root directory.
Thus to create a queue called ‘zq’, we would call mq_open(), like this:

Int qd;
struct mq_attr atr;
       atr.mq_maxmsg = 100;
atr.mq_msgsize = 255;
       if((qd = mq_open(“/zq”, O_RDWR|O_CREAT, 0755, &atr))
== (mqd_t)-1){
perror(“mq_open”);
}

Notice the similarity between the above syntax, and that of the open() command, for a file. The returned value is the queue descriptor, while the flags are exactly the same, as defined in fcntl.h for those relating to a file. The pointer to the ‘atr’ structure permits the setting of the maximum number of messages, and the maximum message size, prior to calling mq _ open.
Enqueuing a message is analogous to a write() on a file:

char *msg = “xyz”;
int priority = 5;
     if(mq_send(qd, msg, strlen(msg), priority) == -1){
perror(“mq_send”);
}

The extra parameter, ‘priority’ determines the order that the message will be removed from the queue when it is dequeued, with ‘1’ being the highest priority.
The dequeuing is performed by mq_receive():

unsigned char data[8192];
int priority;
int n;
     If((n = mq_receive(qd, (char *)data, sizeof(data),
&priority)) > 0){
Printf(“Received %d byte message >%s< with %d
priorityn”, n, data, priority);
}

Messages are taken off the queue in order of their priority, which is returned by mq _ receive(), into the variable passed to it. The return value of the function is the number of bytes in the message. In normal operation, this function would be called in a ‘while’ loop and the queue length would be checked at each iteration of the loop. The checking is done with the mq _ getattr() function, called with the queue descriptor, and the atr structure, defined above:

       if(mq_getattr(qd, &atr) == 0){
if(atr.mq_curmsgs == 0){
printf(“No more messagesn”);
mq_close(qd);
}
}

The following code extract puts this all together:

       while((rval = mq_receive(qd, (char *)data,
sizeof(data), &priority)) > 0){
printf(“Client received: >%s< priority %dn”,
data, priority);
memset(data, ‘’, sizeof(data));
if(mq_getattr(qd, &atr) == 0){
if(atr.mq_curmsgs == 0){
printf(“No more messagesn”);
mq_close(qd);
break;
}
}
}

 

We now have all the information we need to write a test program that exercises all of these queuing functions. Instead of attempting to re-create MQ Series from scratch (which we will leave for the ‘Advanced Queues’ article), this program merely does the following:

• Create a queue, whose descriptor is ‘qd’.
• Launch a child process, child() which asks to be notified of the arrival of a message
• Enqueue 4 messages, in ascending order of priority.
• The child pulls the messages off the queue, in the order that they arrived, i.e, in order of priority. It then quits.
• Launch another child process, client(), which merely performs a blocking read of the queue.
• Enqueue 4 more messages, in descending order of priority
• The child, again, pulls the messages off, in order of priority, which means the reverse of the order of their arrival. It does not quit.

The notification mechanism uses a software interrupt defined by means of the sigevent structure. To do this, we first create the variable:

struct sigevent ev;

The interesting parts of this structure (defined fully in siginfo.h) are

struct sigevent {
int sigev_notify;
int sigev_signo;
}

where sigev_notify has the values

SIGEV_NONE
SIGEV_SIGNAL
SIGEV_THREAD

We will choose SIGEV_SIGNAL, since we want to catch an interrupt, with the arrival of each message on our queue. Later, if we need to turn off notification, we can do it by passing in SIGEV_ NONE.
Since sigev_signo lets us choose which signal can be sent to us, we’ll choose something safe, that isn’t used by other processes. SIGURG is normally sent out when an urgent condition exists on a socket or other I/O device and, in that capacity, is of no interest to us. Therefore, we will use SIGURG, and register it, together with our interrupt handler, in main():

signal(SIGURG, interrupt);

Then, in our child() function, when our child process is running, we define the kind of event we need, and the signal number that we’re expecting, as follows:

ev.sigev_notify = SIGEV_SIGNAL;
ev.sigev_signo = SIGURG;

Immediately after these lines, we call pause(), which puts the process into a catatonic state, waiting for the arrival of an interrupt.
In reality, the server and client code would probably be in separate files, and run in unrelated processes. Since this is merely an exercise, all of the code is in one file, as follows.


Mark Sitkowski
Mark Sitkowski C.Eng, M.I.E.E Consultant to Forticom Security

Join iX Newsletter

iXsystems values privacy for all visitors. Learn more about how we use cookies and how you can control them by reading our Privacy Policy.
π