Gyrex/Development Space/Queue Service

From Eclipsepedia

Jump to: navigation, search

This article describes the cloud queue service available in Gyrex.

API

The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService in the OSGi service registry.

Publishing Messages

Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.

// the message
final byte[] messageBody = ...;

// get queue
final IQueue queue = getQueueService().getQueue("myqueue");

// add message to queue
queue.sendMessage(messageBody);


Consuming Messages

Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.

// get queue
final IQueue queue = getQueueService().getQueue("myqueue");

// set receive timeout (5 minutes)
final Map<String, Object> requestProperties = new HashMap<String, Object>(1);
requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5));

// receive at most one message
final List<IMessage> messages = queue.receiveMessages(1, requestProperties);
if (!messages.isEmpty()) {
  final IMessage message = messages.get(0);

  // do something with the message
  processMessage(message.getBody());

  // delete the message
  if (queue.deleteMessage(message)) {
    // consumed successfully --> commit
  } else {
    // error consuming (maybe because of network issues) --> rollback or just try again
  }
}