Directory::Queue.3pm

Langue: en

Version: 2010-07-15 (fedora - 01/12/10)

Section: 3 (Bibliothèques de fonctions)

NAME

Directory::Queue - object oriented interface to a directory based queue

SYNOPSIS

   use Directory::Queue;
 
   #
   # simple schema:
   #  - there must be a "body" which is a string
   #  - there can be a "header" which is a table/hash
   #
 
   $schema = { "body" => "string", "header" => "table?" };
   $queuedir = "/tmp/test";
 
   #
   # sample producer
   #
 
   $dirq = Directory::Queue->new(path => $queuedir, schema => $schema);
   foreach $count (1 .. 100) {
       $name = $dirq->add(body => "element $count\n", header => \%ENV);
       printf("# added element %d as %s\n", $count, $name);
   }
 
   #
   # sample consumer
   #
 
   $dirq = Directory::Queue->new(path => $queuedir, schema => $schema);
   for ($name = $dirq->first(); $name; $name = $dirq->next()) {
       next unless $dirq->lock($name);
       printf("# reading element %s\n", $name);
       %data = $dirq->get($name);
       # one can use $data{body} and $data{header} here...
       # one could use $dirq->unlock($name) to only browse the queue...
       $dirq->remove($name);
   }
 
 

DESCRIPTION

The goal of this module is to offer a simple queue system using the underlying filesystem for storage, security and to prevent race conditions via atomic operations. It focuses on simplicity, robustness and scalability.

This module allows multiple concurrent readers and writers to interact with the same queue. A Python implementation of the same algorithm is available at <http://code.google.com/p/dirq> so readers and writers can even be written in different languages.

There is no knowledge of priority within a queue. If multiple priorities are needed, multiple queues should be used.

TERMINOLOGY

An element is something that contains one or more pieces of data. A simple string may be an element but more complex schemas can also be used, see the ``SCHEMA'' section for more information.

A queue is a ``best effort FIFO'' collection of elements.

It is very hard to guarantee pure FIFO behavior with multiple writers using the same queue. Consider for instance:

. Writer1: calls the add() method
. Writer2: calls the add() method
. Writer2: the add() method returns
. Writer1: the add() method returns

Who should be first in the queue, Writer1 or Writer2?

For simplicity, this implementation provides only ``best effort FIFO'', i.e. there is a very high probability that elements are processed in FIFO order but this is not guaranteed. This is achieved by using a high-resolution time function and having elements sorted by the time the element's final directory gets created.

LOCKING

Adding an element is not a problem because the add() method is atomic.

In order to support multiple processes interacting with the same queue, advisory locking is used. Processes should first lock an element before working with it. In fact, the get() and remove() methods report a fatal error if they are called on unlocked elements.

If the process that created the lock dies without unlocking the element, we end up with a staled lock. The purge() method can be used to remove these staled locks.

An element can basically be in only one of two states: locked or unlocked.

A newly created element is unlocked as a writer usually does not need to do anything more with the element once dropped in the queue.

Iterators return all the elements, regardless of their states.

There is no method to get an element state as this information is usually useless since it may change at any time. Instead, programs should directly try to lock elements to make sure they are indeed locked.

CONSTRUCTOR

The new() method can be used to create a Directory::Queue object that will later be used to interact with the queue. The following attributes are supported:
path
the queue toplevel directory (mandatory)
umask
the umask to use when creating files and directories (default: use the running process' umask)
maxelts
the maximum number of elements that an intermediate directory can hold (default: 16,000)
schema
the schema defining how to interpret user supplied data (mandatory if elements are added or read)

SCHEMA

The schema defines how user supplied data is stored in the queue. It is only required by the add() and get() methods.

The schema must be a reference to a hash containing key/value pairs.

The key must contain only alphanumerical characters. It identifies the piece of data and will be used as file name when storing the data inside the element directory.

The value represents the type of the given piece of data. It can be:

binary
the data is a sequence of binary bytes, it will be stored directly in a plain file with no further encoding
string
the data is a text string (i.e. a sequence of characters), it will be UTF-8 encoded
table
the data is a reference to a hash of text strings, it will be serialized and UTF-8 encoded before being stored in a file

By default, all pieces of data are mandatory. If you append a question mark to the type, this piece of data will be marked as optional. See the comments in the ``SYNOPSIS'' section for more information.

METHODS

The following methods are available:
new()
return a new Directory::Queue object (class method)
copy()
return a copy of the object; this can be useful to have independent iterators on the same queue
path()
return the queue toplevel path
id()
return a unique identifier for the queue
count()
return the number of elements in the queue
first()
return the first element in the queue, resetting the iterator; return an empty string if the queue is empty
next()
return the next element in the queue, incrementing the iterator; return an empty string if there is no next element
add(DATA)
add the given data (a hash) to the queue and return the corresponding element name; the schema must be known and the data must conform to it
lock(ELEMENT[, PERMISSIVE])
attempt to lock the given element and return true on success; if the PERMISSIVE option is true (which is the default), it is not a fatal error if the element cannot be locked and false is returned
unlock(ELEMENT[, PERMISSIVE])
attempt to unlock the given element and return true on success; if the PERMISSIVE option is true (which is not the default), it is not a fatal error if the element cannot be unlocked and false is returned
remove(ELEMENT)
remove the given element (which must be locked) from the queue
get(ELEMENT)
get the data from the given element (which must be locked) and return basically the same hash as what add() used; the schema must be known
purge([OPTIONS])
purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); OPTIONS can be:
maxtemp
maximum time for a temporary element (in seconds, default 300)
maxlock
maximum time for a locked element (in seconds, default 600)

DIRECTORY STRUCTURE

All the directories holding the elements and all the files holding the data pieces are located under the queue toplevel directory. This directory can contain:
temporary
the directory holding temporary elements, i.e. the elements being added
obsolete
the directory holding obsolete elements, i.e. the elements being removed
NNNNNNNN
an intermediate directory holding elements; NNNNNNNN is an 8-digits long hexadecimal number

In any of the above directories, an element is stored as a single directory with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:

SSSSSSSS
represents the number of seconds since the Epoch
MMMMM
represents the microsecond part of the time since the Epoch
R
is a random digit used to reduce name collisions

Finally, inside an element directory, the different pieces of data are stored into different files, named according to the schema. A locked element contains in addition a directory named "locked".

SECURITY

There are no specific security mechanisms in this module.

The elements are stored as plain files and directories. The filesystem security features (owner, group, permissions, ACLs...) should be used to adequately protect the data.

By default, the process' umask is respected. See the class constructor documentation if you want an other behavior.

If multiple readers and writers with different uids are expected, the easiest solution is to have all the files and directories inside the toplevel directory world-writable (i.e. umask=0). Then, the permissions of the toplevel directory itself (e.g. group-writable) are enough to control who can access the queue.

AUTHOR

Lionel Cons <http://cern.ch/lionel.cons>

Copyright CERN 2010