Difference between revisions of "Gyrex/Development Space/Queue Service"

From Eclipsepedia

Jump to: navigation, search
(New page: This article describes the cloud queue service available in Gyrex.)
 
m
 
(3 intermediate revisions by one user not shown)
Line 1: Line 1:
This article describes the cloud queue service available in Gyrex.
+
This article describes the cloud queue service available in Gyrex. The default implementation persists messages in ZooKeeper. This puts some limits on message size. However, it's possible to provide an alternate implementations based on Amazon SQS, RabbitMQ or ActiveMQ.
 +
 
 +
== API ==
 +
The queue service is made available as <code>[http://git.eclipse.org/c/gyrex/gyrex-platform.git/tree/bundles/org.eclipse.gyrex.cloud/src/org/eclipse/gyrex/cloud/services/queue/IQueueService.java org.eclipse.gyrex.cloud.services.queue.IQueueService]</code> in the OSGi service registry.
 +
 
 +
=== Publishing Messages ===
 +
Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.
 +
<source lang="java">
 +
// the message
 +
final byte[] messageBody = ...;
 +
 
 +
// get queue
 +
final IQueue queue = getQueueService().getQueue("myqueue");
 +
 
 +
// add message to queue
 +
queue.sendMessage(messageBody);
 +
</source>
 +
 
 +
 
 +
=== Consuming Messages ===
 +
Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.
 +
<source lang="java">
 +
// get queue
 +
final IQueue queue = getQueueService().getQueue("myqueue");
 +
 
 +
// set receive timeout (5 minutes)
 +
final Map<String, Object> requestProperties = new HashMap<String, Object>(1);
 +
requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5));
 +
 
 +
// receive at most one message
 +
final List<IMessage> messages = queue.receiveMessages(1, requestProperties);
 +
if (!messages.isEmpty()) {
 +
  final IMessage message = messages.get(0);
 +
 
 +
  // do something with the message
 +
  processMessage(message.getBody());
 +
 
 +
  // delete the message
 +
  if (queue.deleteMessage(message)) {
 +
    // consumed successfully --> commit
 +
  } else {
 +
    // error consuming (maybe because of network issues) --> rollback or just try again
 +
  }
 +
}
 +
</source>
 +
 
 +
 
 +
== Administration ==
 +
It's possible to manage queues via the OSGi console. An Admin UI is on our list but contributions are welcome.
 +
<pre>
 +
osgi> queue --help
 +
---QueueConsoleCommands---
 +
queue <cmd> [args]
 +
create <QUEUEID> - creates a queue
 +
ls [FILTER] - list queues
 +
rm <QUEUEID> - removes a queue
 +
 
 +
osgi>
 +
</pre>

Latest revision as of 16:03, 12 November 2012

This article describes the cloud queue service available in Gyrex. The default implementation persists messages in ZooKeeper. This puts some limits on message size. However, it's possible to provide an alternate implementations based on Amazon SQS, RabbitMQ or ActiveMQ.

Contents

[edit] API

The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService in the OSGi service registry.

[edit] Publishing Messages

Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.

// the message
final byte[] messageBody = ...;
 
// get queue
final IQueue queue = getQueueService().getQueue("myqueue");
 
// add message to queue
queue.sendMessage(messageBody);


[edit] Consuming Messages

Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.

// get queue
final IQueue queue = getQueueService().getQueue("myqueue");
 
// set receive timeout (5 minutes)
final Map<String, Object> requestProperties = new HashMap<String, Object>(1);
requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5));
 
// receive at most one message
final List<IMessage> messages = queue.receiveMessages(1, requestProperties);
if (!messages.isEmpty()) {
  final IMessage message = messages.get(0);
 
  // do something with the message
  processMessage(message.getBody());
 
  // delete the message
  if (queue.deleteMessage(message)) {
    // consumed successfully --> commit
  } else {
    // error consuming (maybe because of network issues) --> rollback or just try again
  }
}


[edit] Administration

It's possible to manage queues via the OSGi console. An Admin UI is on our list but contributions are welcome.

osgi> queue --help
---QueueConsoleCommands---
	queue <cmd> [args]
		create <QUEUEID> - creates a queue
		ls [FILTER] - list queues
		rm <QUEUEID> - removes a queue

osgi>