The WorkerManager service of SMILA provides an environment to make it easy to integrate, deploy and scale Workers, i.e. classes that implement a functionality to be coordinated by asynchronous workflows of SMILA. The WorkerManager service provides all the managements of tasks and data objects to be consumed and produced by the worker so that the implementation can focus on the real work to be done.
Workers are implemented as OSGi services that are referenced by the WorkerManager and announces its name via a getName() method. The WorkerManager then reads the worker definition from the JobManager to check if the worker is known to the JobManager and to get access worker modes or other definitions. It asks the TaskManager for available tasks to be done by this worker. If it gets one, it creates a TaskContext, that wraps up all data and facility the worker function needs to process the task:
- the task itself, including all parameters and properties.
- access to the data objects in the input and output slots of the worker. The data objects can be accessed in different ways as needed by the worker function: direct stream access, record-by-record reading and writing. The framework cares about creating only data objects that are really needed and committing objects after the function has finished successfully.
- counters to measure performance or other worker statistics. The WorkerManager already produces some basic counters measuring the execution time of the worker and amounts of data read and written. The worker function may produce additional counters as needed.
As long as the worker is performing a task, the WorkerManager keeps the task alive in the TaskManager and notifies the worker about task cancellation. When the worker finished the task processing, the WorkerManager cares about finishing the task: successfully if no error has occurred, or with as a fatal or recoverable error, based on the type of exception thrown by the worker function.
The WorkerManager controls the number of tasks that are allowed for each managed worker on a node in parallel (scale-up). It does not retrieve further tasks for a worker if its scale-up limit is reached, even if a lot more tasks are waiting. Note that even if the worker scale-up limit is not yet reached the TaskManager may refuse to deliver further tasks for a worker if the global node scale-up limit specificed as taskmanager.maxScaleUp is reached already. Workers may declare themselves as runAlways. This means that the global scale-up limit is not applied to this worker.
Task result handling
There are four possible outcomes of a worker's processing:
- The perform() method returns normally. This is interpreted by the WorkerManager as a successful task execution and it will finish the task with a SUCCESSFUL task completion status. All open output data objects will be committed (if this fails: continue below depending on the exception type). The task result includes all counters produced by the task execution in the task result so that they can be aggregated by the JobManager in the job run data.
- The perform() method aborts with a RecoverableTaskException, a IOException, UnavailableException or a MaybeRecoverableException (or subclass) with isRecoverable() == true. This will be interpreted as a temporary failure to access input data or write output data to objectstore, so the task will be finished with a RECOVERABLE_ERROR task completion status and the JobManager will usually reschedule the task for a retry. Produced counters will be ignored.
- The perform() method aborts with a PostponeTaskException. This means that the worker cannot yet perform this task for some reason but it should be processed later. The task will be readded to the todo queue for this worker and redelivered later (but very soon, usually).
- The perform() method aborts with any other exception (including all RuntimeExceptions). This will be interpreted as a sign that the input data cannot be processed at all (because it is corrupted or contains invalid values, for example). Such tasks will be finished with a FATAL_ERROR completion status and not rescheduled. Produced counters will be ignored.
There is currently no other way for the workers to influence the task result.
Input/Output Data Objects
A task assigns "data objects" to the input and output slots of a worker that represents objects in a objectstore. The WorkerManager framework provides so called IODataObjects that encapsulate the access to these objectstore objects, so the worker does not need to know in detail how to work with the objectstore API. Apart from encapsulating the objectstore API and taking care of proper committing and cleanup after task processing, these IODataObjects also provide higher-level access methods that makes it easier to handle record bulk or key-value objects, for example.
Their wrappers can be accessed via the getInputs() and getOutput() components of the task context. Of course, these inputs and outputs managers give also access to the plain bulk info objects using getDataObject methods. However, in this case the worker function must clean up and commit by itself after finishing processing, so this should probably be used if you need only the object ID of an object, but not the actual content.
Available input wrappers are:
- StreamInput: provides direct access to the java.io.InputStream for reading from objectstore. Play with each single byte as you like.
- RecordInput: provides access to objects like record bulks that are sequences of BON records. You can get single records (or Anys) from the objects, one at a time, by calling the getRecord() method. When end-of-stream is reached, null is returned. You can also access the IpcStreamReader BON parser in case you do not want to read complete records at once. However, you should not mix up getRecord() calls with calls to the IpcStreamReader as your direct calls will probably confuse the record parsing.
Available output wrappers are:
- StreamOutput: provides direct access to the java.io.OutputStream for writing to objectstore.
- RecordOutput: simplified access for creating record bulks by writing one Record (or Any) at a time. You can also directory access the underlying IpcStreamWriter BON writer, but again you should not mix up direct acess the BON writer with the writeRecord/Any() methods.
You can create only a single IO wrapper for each data object. On the second call, only null will be returned.
For the Stream and Record wrappers the Inputs/Outputs classes provide special getAs... methods. For other wrappers you can use the generic getAs...(String slotName, Class wrapperClass) methods. Additionally, this allows you to create your own input/output wrapper classes and get them managed by the Inputs/Outputs framework.
If the WorkerManager receives an 404 NOT FOUND response when trying to keep-alive a currently processed task, it sets a canceled flag in the associated TaskContext object. The worker should regularly check this flag to see if it should still continue to process the task or if it can abort. If so, it can just return (after releasing and cleaning-up used resources that are not part of the task context, of course), the WorkerManager will not commit the results in this case and will not try to finish the task.