Jump to: navigation, search

Triquetrum/Task-based processing

This is a work in progress...

Intro

The goal of the Task-based processing model is to be able to represent a work item, to trace the progress of its processing and to represent the obtained results.

And all of this in a model and with sufficiently detailed contents to support two essential goals :

  • facilitate the definition of service APIs and implementations to work with tasks in a process-oriented system
  • allow to store and consult full traces of each significant step in a process including timing, success and failure events etc.

TBD

Related Triquetrum components

Triquetrum provides an API for task-based processing and will also provide several implementation options. Currently the following is available:

  • org.eclipse.triquetrum.processing.api : contains the API of the Task-based model and of the main related services
  • org.eclipse.triquetrum.processing.service.impl : default/generic implementations of some of the services
  • org.eclipse.triquetrum.processing.model.impl.memory : a simple non-persistent implementation of the Task model, based on plain Java beans
  • org.eclipse.triquetrum.processing.test : unit tests for the Task model and processing services, also serving as code examples

The Task model

As the title implies, the model is built around the concept of a Task which is the core entity to represent an item of work that must be performed.

The definition of a work item in this model consists of the following main elements :

  • a task type: the main differentiator on the kind of work to be performed
  • a set of parameters a.k.a. attributes: these define the data needed to perform the desired 'variation' of the given task type
  • initial set of traceability data such as creation timestamp, the task's initiator, a correlation ID defined by the initiator, the initial task status etc.

After the creation of a Task with its attributes, an initiator will not perform the actual work itself but will hand it to a service broker that is responsible for finding a matching service implementation that is able to perform the actual processing (or to delegate it to further specialized processing systems externally or internally to the initiator's system). The selected service will be the executor of the task.

During the task submission and its processing, the task may change status several times. Such status changes, together with other potentially relevant processing events, are stored with timestamp and relevant data in an event stream with the Task as context.

When the task has been successfully processed, there will typically be some results available and the task will reach a final success status. Depending on the application domain and the concrete task, results may be simple success indicators, they may consist of large and/or complex data sets or anything in-between. Triquetrum offers a simple model to represent results as blocks of named values, linked to the task that produced them. Not every type of result is fit to be represented or stored in such a structure. Often the raw result data is stored externally, e.g. in files. In such cases the Triquetrum result items could be used to store paths or other references to the externally stored data.

A simplified UML class diagram shows the main elements of the Task model :

Triq processing Task-related classes.jpg

Core services

TBD

Triq processing services.jpg

Initiating a task

The basic flow for processing a task involves :

  1. creating a Task instance using the runtime's TriqFactory
  2. configuring the Task by setting 0 or more Attributes, using the TriqFactory again
  3. submitting the Task to the runtime's TaskProcessingBroker. The broker will then look for a processing service that is able to process the submitted task and it will delegate the actual processing to it.
  4. optionally : do something with the Task results when its processing is done, which can be determined from the returned future.

Create and submit a task.jpg

In the project org.eclipse.triquetrum.processing.test there are some code samples to illustrate this :

  • TaskConstructiontest shows different ways to create a Task with attributes and/or with sub-Tasks. (It also shows how a service implementation could create results for a given task, but that's not relevant here)
  • TaskProcessingTest shows different options to process one or more tasks.

Here's an example for submitting a Task without extra Attributes and checking the obtained results :

   //...
   // 1 : get the default factory instance and create a test task
   TriqFactory triqFactory = TriqFactoryTracker.getDefaultFactory();
   Task t = triqFactory.createTask(null, "testInitiator", "testType", "testCorrelationId", "testExternalRef");
 
   try {
     // 2 : get the broker instance and submit the task with timeout info
     CompletableFuture<Task> future = TaskProcessingBrokerTracker.getBroker().process(t, 3L, TimeUnit.SECONDS);
     // 3 : do more work if needed
     // 4 : get the task results : the simplest way (but there's better, see below) is to block on the future until the processing is done
     Task tDone = future.get();
     // 5 : get result blocks (in this case just one) for the finished task
     Stream<ResultBlock> results = tDone.getResults();
     ResultBlock resultBlock = results.findFirst().orElse(null);
     // 6 : read some individual items, by name
     ResultItem<? extends Serializable> item1 = resultBlock.getItemForName("item1");
     ResultItem<? extends Serializable> item2 = resultBlock.getItemForName("item2");
     // 7 : do whatever else you gotta do...
   } catch (Exception e) {
     // do whatever's needed to handle an error in the task processing
   }
   //...

The above code fragment can be improved in several ways :

  1. find a way to avoid the need for a blocking future.get()
  2. instead of implementing further work directly in the code that initiated the first task, why not doing that via another Task?

A possible way to address both concerns uses a combination of :

  1. taking advantage of the CompletableFuture's features to chain work in a non-blocking manner
  2. implementing a second Task that can itself also be processed in some matching service.

If the processing logic for the second Task depends on results of the first one, we typically consider both Tasks within a common context. This is implemented by making them sub-tasks of a common parent Task. This allows one sub-task to browse in its parent's other sub-tasks and to look for any needed data in their results. (see the Task JavaDoc for more details on accessing parent- & sub-tasks)

An example to address both issues could be as follows :

   Task mainTask = triqFactory.createTask(null, "testInitiator", "testType", "testCorrelationId", "testExternalRef");
   Task t1 = triqFactory.createTask(mainTask, "testInitiator", "testType", "testCorrelationId", "testExternalRef");
   Task t2 = triqFactory.createTask(mainTask, "testInitiator2", "testType", "testCorrelationId2", "testExternalRef2");
 
   // This gives an example of chaining task executions asynchronously.
   TaskProcessingBroker broker = TaskProcessingBrokerTracker.getBroker();
   CompletableFuture<Task> future1 = broker.process(t1, 3L, TimeUnit.SECONDS);
 
   // Remark that the "t" parameter in the chained call is only present for syntactic reasons.
   // The API does not support passing earlier processed tasks, it just accepts the new task t2.
   // t2 can find the preceeding tasks via its parent mainTask, as described above.
   CompletableFuture<Task> future2 = future1.thenCompose(t -> broker.process(t2, 3L, TimeUnit.SECONDS));


The below is still on a dvp-branch and will be merged in master in the near future

For cases where a Task has sub-tasks that do not depend on each other's results, the sub-tasks can all be processed concurrently. The broker API provides a short-cut method for this case, where you can just submit the parent Task and all its sub-tasks get processed :

     CompletableFuture<Task> future = TaskProcessingBrokerTracker.getBroker().processSubTasks(mainTask, 3L, TimeUnit.SECONDS, false);

The last argument allows to optionally set the parent task to finished once all its sub-tasks are done.

Further Reading

Task-based processing is an example of a coordination language, which composes separate computational elements together.

Collecting and storing processing events is a form of data provenance, where data sources and processes are documented so that an experiment may be reproduced. Kepler, which also uses Ptolemy II as an execution engine, supports data provenance.