Notice: this Wiki will be going read only early in 2024 and edits will no longer be possible. Please see: https://gitlab.eclipse.org/eclipsefdn/helpdesk/-/wikis/Wiki-shutdown-plan for the plan.
Difference between revisions of "Gyrex/Development Space/Queue Service"
(New page: This article describes the cloud queue service available in Gyrex.) |
|||
Line 1: | Line 1: | ||
This article describes the cloud queue service available in Gyrex. | This article describes the cloud queue service available in Gyrex. | ||
+ | |||
+ | == API == | ||
+ | The queue service is made available as <code>[http://git.eclipse.org/c/gyrex/gyrex-platform.git/tree/bundles/org.eclipse.gyrex.cloud/src/org/eclipse/gyrex/cloud/services/queue/IQueueService.java org.eclipse.gyrex.cloud.services.queue.IQueueService]</code> in the OSGi service registry. | ||
+ | |||
+ | === Publishing Messages === | ||
+ | Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply. | ||
+ | <pre> | ||
+ | // the message | ||
+ | final byte[] messageBody = ...; | ||
+ | |||
+ | // get queue | ||
+ | final IQueue queue = getQueueService().getQueue("myqueue"); | ||
+ | |||
+ | // add message to queue | ||
+ | queue.sendMessage(messageBody); | ||
+ | </pre> | ||
+ | |||
+ | |||
+ | === Consuming Messages === | ||
+ | Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between. | ||
+ | <pre> | ||
+ | // get queue | ||
+ | final IQueue queue = getQueueService().getQueue("myqueue"); | ||
+ | |||
+ | // set receive timeout (5 minutes) | ||
+ | final Map<String, Object> requestProperties = new HashMap<String, Object>(1); | ||
+ | requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5)); | ||
+ | |||
+ | // receive at most one message | ||
+ | final List<IMessage> messages = queue.receiveMessages(1, requestProperties); | ||
+ | if (!messages.isEmpty()) { | ||
+ | final IMessage message = messages.get(0); | ||
+ | |||
+ | // do something with the message | ||
+ | processMessage(message.getBody()); | ||
+ | |||
+ | // delete the message | ||
+ | if (queue.deleteMessage(message)) { | ||
+ | // consumed successfully --> commit | ||
+ | } else { | ||
+ | // error consuming (maybe because of network issues) --> rollback or just try again | ||
+ | } | ||
+ | } | ||
+ | </pre> |
Revision as of 07:00, 31 August 2012
This article describes the cloud queue service available in Gyrex.
API
The queue service is made available as org.eclipse.gyrex.cloud.services.queue.IQueueService
in the OSGi service registry.
Publishing Messages
Arbitrary messages can be published to queues. Depending on the underlying messaging system, some limitations to the body size may apply.
// the message final byte[] messageBody = ...; // get queue final IQueue queue = getQueueService().getQueue("myqueue"); // add message to queue queue.sendMessage(messageBody);
Consuming Messages
Receiving messages is a two step process. First a message has to be retrieved from the queue. This operation will not remove a message from a queue but hide it. Once the message has been processed successfully, it must be deleted from the queue in a second step. This ensures that no message is lost if a node processing a message dies in between.
// get queue final IQueue queue = getQueueService().getQueue("myqueue"); // set receive timeout (5 minutes) final Map<String, Object> requestProperties = new HashMap<String, Object>(1); requestProperties.put(IQueueServiceProperties.MESSAGE_RECEIVE_TIMEOUT, TimeUnit.MINUTES.toMillis(5)); // receive at most one message final List<IMessage> messages = queue.receiveMessages(1, requestProperties); if (!messages.isEmpty()) { final IMessage message = messages.get(0); // do something with the message processMessage(message.getBody()); // delete the message if (queue.deleteMessage(message)) { // consumed successfully --> commit } else { // error consuming (maybe because of network issues) --> rollback or just try again } }