Skip to main content

Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.

Jump to: navigation, search

Gyrex/Development Space/Queue Service

This article describes the cloud queue service available in Gyrex. The default implementation persists messages in ZooKeeper. This puts some limits on message size. However, it's possible to provide an alternate implementations based on Amazon SQS, RabbitMQ or ActiveMQ.

API

The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService in the OSGi service registry.

Publishing Messages

Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.

// the message
final byte[] messageBody = ...;
 
// get queue
final IQueue queue = getQueueService().getQueue("myqueue");
 
// add message to queue
queue.sendMessage(messageBody);


Consuming Messages

Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.

// get queue
final IQueue queue = getQueueService().getQueue("myqueue");
 
// set receive timeout (5 minutes)
final Map<String, Object> requestProperties = new HashMap<String, Object>(1);
requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5));
 
// receive at most one message
final List<IMessage> messages = queue.receiveMessages(1, requestProperties);
if (!messages.isEmpty()) {
  final IMessage message = messages.get(0);
 
  // do something with the message
  processMessage(message.getBody());
 
  // delete the message
  if (queue.deleteMessage(message)) {
    // consumed successfully --> commit
  } else {
    // error consuming (maybe because of network issues) --> rollback or just try again
  }
}


Administration

It's possible to manage queues via the OSGi console. An Admin UI is on our list but contributions are welcome.

osgi> queue --help
---QueueConsoleCommands---
	queue <cmd> [args]
		create <QUEUEID> - creates a queue
		ls [FILTER] - list queues
		rm <QUEUEID> - removes a queue

osgi> 

Back to the top