Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.
Difference between revisions of "Gyrex/Development Space/Queue Service"
m |
|||
Line 6: | Line 6: | ||
=== Publishing Messages === | === Publishing Messages === | ||
Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply. | Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply. | ||
− | < | + | <source lang="java"> |
// the message | // the message | ||
final byte[] messageBody = ...; | final byte[] messageBody = ...; | ||
Line 15: | Line 15: | ||
// add message to queue | // add message to queue | ||
queue.sendMessage(messageBody); | queue.sendMessage(messageBody); | ||
− | </ | + | </source> |
=== Consuming Messages === | === Consuming Messages === | ||
Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between. | Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between. | ||
− | < | + | <source lang="java"> |
// get queue | // get queue | ||
final IQueue queue = getQueueService().getQueue("myqueue"); | final IQueue queue = getQueueService().getQueue("myqueue"); | ||
Line 43: | Line 43: | ||
} | } | ||
} | } | ||
− | </ | + | </source> |
Latest revision as of 16:03, 12 November 2012
This article describes the cloud queue service available in Gyrex. The default implementation persists messages in ZooKeeper. This puts some limits on message size. However, it's possible to provide an alternate implementations based on Amazon SQS, RabbitMQ or ActiveMQ.
API
The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService
in the OSGi service registry.
Publishing Messages
Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.
// the message final byte[] messageBody = ...; // get queue final IQueue queue = getQueueService().getQueue("myqueue"); // add message to queue queue.sendMessage(messageBody);
Consuming Messages
Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.
// get queue final IQueue queue = getQueueService().getQueue("myqueue"); // set receive timeout (5 minutes) final Map<String, Object> requestProperties = new HashMap<String, Object>(1); requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5)); // receive at most one message final List<IMessage> messages = queue.receiveMessages(1, requestProperties); if (!messages.isEmpty()) { final IMessage message = messages.get(0); // do something with the message processMessage(message.getBody()); // delete the message if (queue.deleteMessage(message)) { // consumed successfully --> commit } else { // error consuming (maybe because of network issues) --> rollback or just try again } }
Administration
It's possible to manage queues via the OSGi console. An Admin UI is on our list but contributions are welcome.
osgi> queue --help ---QueueConsoleCommands--- queue <cmd> [args] create <QUEUEID> - creates a queue ls [FILTER] - list queues rm <QUEUEID> - removes a queue osgi>