Jump to: navigation, search

SMILA/Project Concepts/Blackboard Service Concept

Description

Design a service to ease management of EILF records during workflow processing.


Discussion

When to persist data into the a storage.

User:G.schmidt.brox.de: Several month ago we have had a discussion, how to persist information into the search index. At that time i proposed a configuration mechanism for controlling the storage/indexing processes when leaving BPEL. At that time we moved this discussion in the direction of using BPEL pipelets for e.g. indexing purposes because that way we are free to configure when and where to use this option. From my point of view such operations should be a general paradime. Either we use by default a configurable process for indexing/storage at the end of BPEL processing or we use pipelets for this case. Please share your thoughts.

  • Juergen.schumacher.empolis.com Yes, that is probably another valid way to configure it. It would be a minor change: Instead of writing back the records to the persistence layer on at the commit after each workflow which also invalidates the blackboard content, we could introduce a relatively simple pipelet to trigger the commit, after which the blackboard content must stay valid until the router has finished processing the records. So one would have more control over record persistence (the current concept causes each processed record to be persisted after each successfully finished workflow). The reason why I put this outside of BPEL was that I distinguished between "infrastructure" elements that are required to run with each workflow and "application" elements that are different in different setups. To me, "insert to index" is mainly an "application" element, as I can think of EILF setups that are not used to build search indices. In my picture, persistence was a "infrastructure" element: To be able to chain several workflows via queues it is necessary that the blackboard content is persisted after single each workflow such that the next workflow can access the result (ok, strictly speaking this is not necessary after final workflows that are not followed by others anymore). So I thought it would safer to enforce record persistence this way, and that a workflow creator this way can concentrate more on the "create my application workflow" side of his problem instead of the "making my application work" side. If the team is more in favor of a more flexible solution, no problem. Just vote here (-:

Technical proposal

Purpose of the Blackboard Service is the management of EILF record data during processing in a EILF component (Connectivity, Workflow Processor). The problem is that different processing engines could require different physical formats of the record data (see SMILA/Project Concepts/Data Model and XML representation for a discussion). Because this means either complex implementations of the logical data model or big data coversion problems, the idea is to keep the complete record data only on a "blackboard" which is not pushed through the workflow engine itself and to extract only a small "workflow object" from the blackboard to feed the workflow engine. This workflow object would contain only the part from the complete record data which the workflow engine needs for loop or branch conditions (and the record ID, of course). Thus it could be efficient enough to do the conversion between blackboard and workflow object before and after each workflow service invocation. As a side effect, the blackboard service could hide the handling of record persistence from the services to make service development easier.

Basics

This figure given an overview about how these services could be composed:

SMILA-Blackboard-Service.png

Note that the use of the Blackboard service is not restricted to workflow processing, but it can be also used in Connectivity to create the initial EILF record from the data sent by Crawlers. This way the persistence services are hidden from Connectivity, too.

It is assumed that the workflow engine itself (which will be a third party product usually) must be embedded into EILF using some wrapper that translates incoming calls to workflow specific objects and service invocations from the workflow into real EILF service calls. At least with a BPEL engine like ODE it must be done this way. In the following this wrapper is called the Workflow Integration Service. This workflow integration service will also handle the necessary interaction between workflow engine and blackboard (see next section for details).

For ODE, the use of Tuscany SCA Java would simplify the development of this integration service because it could be based on the BPEL implementation type of Tuscany. However, in the first version we will create an EILF specific workflow integration service for ODE that can only orchestrate EILF pipelet because the Tuscany BPEL implementation type does not yet support service references (see this mail in the Tuscany user mailing list).

Workflow

The next picture illustrates how and which data flows through this system:

SMILA-Blackboard-Activity.png

In more detail:

  • Listener receives record from queue.

bq. The record usually contains only the ID. In special cases it could optionally include some small attribute values or annotations that could be used to control routing inside the message broker.

  • Listener calls blackboard to load record data from persistence service and writes attributes contained in message record to blackboard.
  • Listener calls workflow service with ID from message record.
  • Workflow integration creates workflow object for ID.

bq. The workflow object uses engine specific classes (e.g. DOM for ODE BPEL engine) to represent the record ID and some chosen attributes that are needed in the engine for condition testing or computation. It's an configuration option of the workflow integration which attributes are to be included. In a more advanced version it may be possible to analyse the workflow definition (e.g. the BPEL process) to determine which attributes are needed.

  • Workflow integration invokes the workflow engine. This causes the following steps to be executed a couple of times:
    • Workflow engine invokes EILF service (pipelet). At least for ODE BPEL this means that the engine calls the integration layer which in turn routes the request to the invoked pipelet. So the workflow integration layer receives (potentially modified) workflow objects.
    • Workflow integration writes workflow objects to blackboard and creates record IDs. The selected pipelet is called with these IDs
    • Pipelet processes IDs and manipulates blackboard content. The result is a new list of record IDs (usually identical to the argument list, and usually the list has length 1)
    • Workflow integration creates new workflow objects from the result IDs and blackboard content and feeds them back to the workflow engine.
  • Workflow engine finishes successfully and returns a list of workflow objects.

bq. If it finishes with an exception, instead of the following the Listener/Router has to invalidate the blackboard for all IDs related to the workflow such that they are not committed back to the storages, and also it has to signal the message broker that the received message has not been processed successfully such that the message broker can move it to the dead letter queue.

  • Workflow integration extracts IDs from workflow objects and returns them.
  • Router creates outgoing messages with message records depending on blackboard content for given IDs.

bq. Two things may need configuration here: When to create an outgoing message to which queue (never, always, depending on conditions of attribute values or annotations) - this could also be done in workflow by setting a "nextDestination" annotation for each record ID. And which attributes/annotations are to be included in the message record - if any.

  • Router commits IDs on blackboard. This writes the blackboard content to the persistence services and invalidates the blackboard content for these IDs.
  • Router sends outgoing messages to message broker.

Content on Blackboard

The Blackboard contains two kinds of content:

  • Records:* All records currently processed in this runtime process. The structure of an record is defined in SMILA/Project Concepts/Data Model and XML representation. Clients manipulate the records through Blackboard API methods. This way the records are completely under control of the Blackboard which may be used in advanced versions for optimised communication with the persistence services.

Records enter the blackboard by one of the following operations:

  • create: create a new record with a given ID. No data is loaded from persistence, if a record with this ID exists already in the storages it will be overwritten when the created record is commited. E.g. used by Connectivity to initialize the record from incoming data.
  • load: loads record data for the given ID from persistence (or prepare it to be loaded). Used by a client to indicate that it wants to process this record.
  • split: creates a fragment of a given record, i.e. the record content is copied to a new ID derived from the given by adding a frament name (see [ID Concept] for details).

All these methods should care about locking the record ID in the storages such that no second runtime process can try to manipulate the same record.

A record is removed from the blackboard with one of these operations:

  • commit: all changes are written to the storages before the record is removed. The record is unlocked in the database.
  • invalidate: the record is removed from the blackboard. The record is unlocked in the database. If the record was created new (not overwritten) on this blackboard it should be removed from the storage completely.
  • Notes:* Additional temporary data created by pipelets to be used in later pipelets in the same workflow, but not to be persisted in the storages. Notes can be either global or record specific (associated to a record ID). Record specific notes are copied on record splits and removed when the associated record is removed from the blackboard. In any case a note has a name and the value can be of any serializable Java class such that they can be accessed from seperated services in own VMs.

bq. A nice extension would be workflow instance specific notes such that a pipelet can pass non persistent information to another pipelet invoked later in the workflow which is not associated to a single record, but does not conflict with information from different workflow instances like global notes would (based on the assumption that the workflow engine supports multi-threaded execution). This information would be removed from the blackboard after the workflow instance has finished. However, it has to be clarified how they are can associated to the workflow instance, even when accessed from a remote VM.

Service Interfaces

The Blackboard will be implemented as an OSGi service. The interface could look similar to the following definition. It is getting quite big, so maybe it makes sense to divide it up into the different parts (handling of lifecycle, literal values, object values, annotation, notes, attachments?) for better readability? We'll see about this when implementing.

interface Blackboard {
    // record life cycle methods
    void create(ID id) throws BlackboardAccessException;
    void load(ID id) throws BlackboardAccessException;
    ID split(ID id, String fragmentName) throws BlackboardAccessException;
    void commit(ID id) throws BlackboardAccessException;
    void invalidate(ID id);
 
    // factory methods for attribute values and annotation objects
    // EILFLiteral and EILFAnnotation are just interfaces, 
    // blackboard implementation can determine the actual types for optimization
    EILFLiteral createLiteral();
    EILFAnnotation createAnnotation();
 
    // record content methods
    // - record metadata 
    //   for referenced types see interfaces proposed in [[SMILA/Project Concepts/Data Model and XML representation]]
    //   for string format of an attribute path see definition of AttributePath class below.
    // -- basic navigation
    Iterator<String> getAttributeNames(ID id, Path path) throws BlackboardAccessException;
    Iterator<String> getAttributeNames(ID id) throws BlackboardAccessException; // convenience for getAttributeNames(ID, null);
    boolean hasAttribute(ID id, Path path) throws BlackboardAccessException;
 
    // -- handling of literal values
    //    navigation support
    boolean hasLiterals(ID id, Path path) throws BlackboardAccessException;
    int getLiteralsSize(ID id, Path path) throws BlackboardAccessException;
    //    get all literal attribute values of an attribute (index of last step is irrelevant)
    //    a client should not expect the blackboard to reflect changes done to these object automatically,
    //    but always should call one of the modification methods below to really set the changes.
    List<Literal> getLiterals(ID id, Path path) throws BlackboardAccessException;
 
    //    get single attribute value, index is specified in last step of path, defaults to 0.
    Literal getLiteral(ID id, Path path) throws BlackboardAccessException;
 
    //    modification of attribute values on blackboard
    void setLiterals(ID id, Path path, List<EILFLiteral> values) throws BlackboardAccessException;
    //    set single literal value, index of last attribute step is irrelevant
    void setLiteral(ID id, Path path, EILFLiteral value) throws BlackboardAccessException;
    //    add a single literal value, index of last attribute step is irrelevant
    void addLiteral(ID id, Path path, EILFLiteral value) throws BlackboardAccessException;
 
    //    remove literal specified by index in last step
    void removeLiteral(ID id, Path path) throws BlackboardAccessException;
    //    remove all literals of specified attribute
    void removeLiterals(ID id, Path path) throws BlackboardAccessException;
 
    // -- handling of sub-objects
    //    navigation: check if an attribute has sub-objects and get their number.
    boolean hasObjects(ID id, Path path) throws BlackboardAccessException;
    int getObjectSize(ID id, Path path) throws BlackboardAccessException;
 
    //    deleting sub-objects
    //    remove sub-objects specified by index in last step
    void removeObject(ID id, Path path) throws BlackboardAccessException;
    //    remove all sub-objects of specified attribute
    void removeObjects(ID id, Path path) throws BlackboardAccessException;
 
    // access semantic type of sub-object attribute values. 
    // semantic types of literals are modified at literal object    
    String getObjectSemanticType(ID id, Path path) throws BlackboardAccessException;
    void setObjectSemanticType(ID id, Path path, String typename) throws BlackboardAccessException;
 
    // -- annotations of attributes and sub-objects. 
    //    annotations of literals are accessed via the Literal object
    //    use null, "" or an empty attribute path to access root annotations of record.
    //    use PathStep.ATTRIBUTE_ANNOTATION as index in final step to access the annotation 
    //    of the attribute itself.
    Iterator<String> getAnnotationNames(ID id, Path path) throws BlackboardAccessException;
    boolean hasAnnotations(ID id, Path path) throws BlackboardAccessException;
    boolean hasAnnotation(ID id, Path path, String name) throws BlackboardAccessException;
 
    List<EILFAnnotation> getAnnotations(ID id, Path path, String name) throws BlackboardAccessException;
    //    shortcut to get only first annotation if one exists.
    EILFAnnotation getAnnotation(ID id, Path path, String name) throws BlackboardAccessException;
    void setAnnotations(ID id, Path path, String name, List<EILFAnnotation> annotations) throws BlackboardAccessException;
    void setAnnotation(ID id, Path path, String name, EILFAnnotation annotation) throws BlackboardAccessException;
    void addAnnotation(ID id, Path path, String name, EILFAnnotation annotation) throws BlackboardAccessException;
    void removeAnnotation(ID id, Path path, String name) throws BlackboardAccessException;
    void removeAnnotations(ID id, Path path) throws BlackboardAccessException;
 
    // - record attachments
    boolean hasAttachment(ID id, String name) throws BlackboardAccessException;
    byte[] getAttachment(ID id, String name) throws BlackboardAccessException;
    InputStream getAttachmentAsStream(ID id, String name) throws BlackboardAccessException;
    void setAttachment(ID id, String name, byte[] name) throws BlackboardAccessException;
    InputStream setAttachmentFromStream(ID id, String name, InputStream name) throws BlackboardAccessException;
 
    // - notes methods
    boolean hasGlobalNote(String name) throws BlackboardAccessException;
    Serializable getGlobalNote(String name) throws BlackboardAccessException;
    void setGlobalNote(String name, Serializable object) throws BlackboardAccessException;
    boolean hasRecordNote(ID id, String name) throws BlackboardAccessException;
    Serializable getRecordNote(ID id, String name) throws BlackboardAccessException;
    void setRecordNote(ID id, String name, Serializable object) throws BlackboardAccessException;
 
    // This is certainly not complete ... just to give an idea of how it could taste.
    // lots of convenience methods can be added later.
}
public class Path implements Serializable, Iterable<PathStep> {
    // string format of attribute path could be something like 
    // "attributeName1[index1]/attributeName2[index2]/..." or 
    // "attributeName1@index1/attributeName2@index2/...". 
    // The first is probably better because similar to XPath?
    // The specification of index is optional and defaults to 0.
    // Whether the index refers to a literal or a sub-object depends on methods getting the argument
 
    public static final char SEPARATOR = '/';
 
    public Path();
    public Path(Path path);
    public Path(String path);
 
    // extend path extended more steps. This modifies the object itself and returns it again 
    // for further modifications, e.g. path.add("level1").add("level2");
    public Path add(PathStep step); 
    public Path add(String attributeName); 
    public Path add(String attributeName, int index); 
 
    // remove tail element of this.  This modifies the object itself and returns it again 
    // for further modifications, e.g. path.up().add("siblingAttribute");
    public Path up(); 
 
    public Iterator<PathStep> iterator();
    public boolean isEmpty();
    public PathStep get(int positionInPath);
    public String getName(int positionInPath);
    public int getIndex(int positionInPath);
    public int length();
 
    public boolean equals(Path other);
    public int hashCode();
    public String toString();
}
public class PathStep implements Serializable {
    public static final int ATTRIBUTE_ANNOTATION = -1;
 
    private String name;
    private int index = 0; / index of value in multivalued attributes. default is first value.
 
    public PathStep(String name);
    public PathStep(String name, int index);
 
    public String getName();
    public int getIndex();
 
    public boolean equals(AttributePath other);
    public int hashCode();
    public String toString();
}

The business interface of a pipelet to be used in an EILF workflow will be rather simple. It can get access to the local blackboard service using OSGi service lookup or by injection using OSGi Declarative Services. Therefore the main business method just needs to take a list of record IDs as an argument and return a new (or the same) list of IDs as the result. This is the same method that a workflow integration service needs to expose to the Listener/Router component, therefore it makes sense to use a common interface definition for both. This way it is possible to deploy processing runtimes with only a single pipelet without having to create dummy workflow definitions, because a pipelet can be wired up to the Listener/Router immediately. Becasue remote communication with separated pipelets (see below) will be implemented later pipelets (and therefore workflow integrations, too) must be implemented as OSGi services such that the remote communication can be coordinated using SCA. Thus, interfaces could look like this:

interface RecordProcessor {
    ID[] process(ID[] records) throws ProcessingException;
}
interface Pipelet extends RecordProcessor {
    // specific methods for pipelets
}
interface WorkflowIntegration extends RecordProcessor {
    // specific methods for workflow integration services.
}


What about pipelets running in a seperate VM?

  • Not relevant for initial implementation. This will be added in advanced versions and discussed in more detail then.*

We want to be able have pipelets running in separated VM if they are known to be unstable or non-terminating in error conditions. This can be supported by the blackboard service like this:

SMILA-Blackboard-SeparatedService.png

The seperated pipelet VM would have a proxy blackboard service that coordinates the communication with the master blackboard in the workflow processor VM. Only the record ID needs to be sent to the separated pipelets. However, the separated pipelet must be wrapped to provide control of the record life cycle on the proxy blackboard, especially because the changes done in the remote blackboard must be committed back to the master blackboard when the separated pipelet has finished successful, or the proxy blackboard content must be invalidated without commit in case of an pipelet error. Possibly, this pipelet wrapper can also provide "watchdog" functionality to monitor the separated pipelet and terminate and restart it in case of endless loops or excessive memory consumption.