SMILA/Project Concepts/Connectivity

From Eclipsepedia

Jump to: navigation, search

Contents

Description

The Connectivity Manager is the entry point for external data. It is a single point of entry - on information level. The Connectivity Manager normalizes incoming information to an internally used message format. Large sets of incoming data (binary data) should also be persisted into an external storage to reduce the queue load. It also includes functionality for buffering and routing of the incoming information.

Discussion

  • Daniel Stucky: While implementing and testing the LucenePipelet, I found out that there may be a need to send special actions after a DataSource was indexed (e.g. for Lucene: send a flush action so that all changes are visible in the index to IndexReaders). I already discussed this with Igor and we had the following idea: As the whole system is asynchronus, we don't exatcly know when a index job has finished completely. ConnectivityManager knows when a Crawler has finished crawling. Thereafter it could create a special Queue Message to execute a pipeline (e.g. to flush Lucene index). Of course there may still be messages of the crawled DataSource either in the Buffer or in the Queue. Therefor before sending this special message, it should be checked that the Buffer and the Queue don't contain anymore messages belonging to the datasource.

As Lucene IndexReaders only have the index state of the time the index was opened available, they have to reopen the index from time to time to be up to date. The only posibility is to poll the index on a regular basis. This could be done in the Lucene Query Service or more genereal using some scheduling service of SMILA.

Technical proposal

The Connectivity Manager is the single point of entry for information (data) in the SMILA. It's functionality is devided into several Sub-Components for better modularization. The Connectivity Manager, and it's Sub-Components, should all be implemented in Java. The external interfaces should also support SCA.

Overview

This chart shows the Connectivity Manager, it's Sub-Components and their relationship as well as the relationship to other components : Connectivity Module.png The connections using arrowheads represent the actual flow of data to/in/from the Connectivity Manager. See section Workflow for detailed information. The XML and binary storages are global storages also accessible by BPEL.

Sub-Components

APIs

Probably the Connectivity Manager has to provide more than one interface/technologies for access. The main interface is used by IRMs to provide crawled data objects. But it may also be used from within BPEL processes or from the Publish/Subscribe Module. This concepts focuses on the interfaces used by IRMs. I also decided to integrate the Delta Indexing Manager functionality as a Sub-Component in the Connectivity Manager. But parts of it's API are accessible via the Connectivity Manager interface. The Connectivity Manager's APIs should be available via SCA, but there is no such need for the Sub-Components.

Processor

The Processor is the core of the Connectivity Manager, it does the actual processing of the incoming data objects. The incoming data is stored depending on it's type:

  • large or binary data is stored in a binary store (eg. distributed filesystem)
  • all other data os stored in a XML store (e.g. XML database)

The Processor also creates the message object to be enqueued. A message contains the unique ID of the object, the Delta Indexing hash, routing information and any additional needed information. It should be configurable what information is part of a message. The Processor should also be able to standardize incoming objects (either Records and/or MessageObjects of the 2nd alternative interface design) to the latest version (internal representation) or to reject them.

Buffer (P2)

The Buffer delays the enqueueing of outgoing messages. Therefore it needs a seperate Queue mechanism to temporarily store the messages. This has not to be mistaken with the Queue Servers\! The Buffer provides functionality to detect and resolve competing messages (add/update and delete of the same document).

For a first release the Buffer functionality is of low priority (P2).

Router

The Router routes messages to according Queues and/or BPEL workflows. The routing information (what whereto) has to be provided by configuration. The Router also has to update the Delta Indexing information accordingly. Neither the IRM nor the Connectivity Manager get's any feedback if/how a message was processed (successfully or if some error noccured). The only feedback the Router (and so the Connectivity Manager) gets is if a message was enqueed or not. Therefore after a message was successfully enqueued one of the following actions must be triggered by the Router:

  • add: create the Delta Indexing entry and mark as processed (visited)
  • update: update the Delta Indexing entry and mark as processed (visited)
  • delete: remove the Delta Indexing entry

It may be neccessary to directly access the Router after a BPEL workflow has finished to route a message to another Queue and therefore expand the API.

{anchor:Delta Indexing Manager}

Delta Indexing Manager

The Delta Indexing Manager stores information about last modification of each document (even compound elements) and can determine if a document has changed. The information about last modification should be some kind of Hash computed by the Crawler (see IRM for further information.) It provides functionality to manage this information, to determine if documents have changed, to mark documents that have not changed (visited flag) and to determine documents that are indexed but no longer exist in the data source. The Delta Indexing Manager was moved inside the Connectivity Manager for these reasons:

  • some of it's functionality is used within the Connectivity Manager
  • as a single point of access should "know" about the delta indexing information
  • in a distributed system we only need one connection from a IRM to the Connectivity Manager and not a second one to access Delta Indexing Manager (this seems not to be a big gain, but may proove valid in high volume distributed scenarios)

Despite of being a part of the Connectivity Manager, the implementation of Delta Indexing Manager is still replaceable to provide different stores for the delta indexing information (e.g. database or even a search index).

Here is a list of the information that needs to be stored by the Delta Indexing Manager:

  • ID: the id of the document
  • Hash: the hash of the document to determine modifications
  • DataSourceID: the id of the data source from where the document was provided. This is already part of the document's ID, but we need it as seperate value to clear by source
  • IsCompound: flag, if the document is a compound object. This is needed to clean up recursively
  • ParentID or ChildIDs: a reference to the parent document (if any exists) or refferences to child documents. This is needed to clean up recursively.
  • VisitedFlag: flag that is temporary set during processing of a data source, to mark documents as visited. At the end all unmarked documents of a data source are deleted.

If this information is stored in a database we have to provide an efficient table scheme.{info:title=Improvement} A further feature improvement of DeltaIndexing is to not only store information about data objects but also store information about hierarchy nodes (e.g. folders in filesystem or exchange). Assuming that hierarchy nodes now if any of their sub elements (data objects or hierarchy nodes) have changed, delta indexing performs faster as complete hierarchy levels can be skipped during crawling. A hierarchy does not have to be a tree but may be a graph as well. So data objects and hierarchy nodes may be refferenced by more than one hierarchy node, it's not a 1:1 child - parent relationship. Special care has to be taken when deleting hierarchy nodes to not delete elements that are refferenced by other hierarchy nodes. Crawlers neeed to be adopted so that hierachy nodes are returned, too. I suggest to make this feature P2, as such an optimization is not neccessary for a first release..{info}{anchor:Interfaces}

Interfaces

The following data types are used in the Interfaces:

  • DataObject: contains the objects ID, the hash value used for delta Indexing and all information (xml and bin)
  • ID: a unique id of a DataObject. See ID Concept for details about IDs.

The Sub-Component Processor has no interface itself, but it implements the Connectivity Manager Interface.

interface ConnectivityManager
{
    void add(DataObject data);
    void update(DataObject data);
    void delete(ID id);
 
    // external interface for DeltaIndexing
    void clear();
    void clear(String dataSourceID);
    void init(String dataSourceID);
    boolean checkForUpdate(ID id, String hash);
    void deleteDelta(String dataSourceID);
    void deleteDelta(ID id); // to handle delta deletion for a single compound object and it's elements (recursion !)
}

An alternative to optimize network overhead is to provide methods supporting Lists of objects.

interface ConnectivityManager
{
 
    void add(List<DataObject> data);
    void update(List<DataObject> data);
    void delete(List<ID> ids);
    List<boolean> checkForUpdate(List<DeltaIndexInfo> infos);
}
 
class DeltaIndexInfo
{
    ID id;
    String hash;
}


It makes especially sense for checkUpdate() and delete(), as the parameters of those methods do not contain much data. The size of the DataObject used in add() and update() varies greatly. For smaller objects it makes sense to provide Lists, for larger objects not. Of course the maximum List size should be configurable but who decides what List size to use (1 vs. N)? This has to be done dynamically to avoid memory errors\! Another problem is that the Iterator concept used in Crawlers does not fit to methods with Lists. Therefore the iteration over DeltaIndexInfo must be seperated from getting the DataObjects. For some Crawlers/DataSources it may be difficult or even impossible to support direct access on objects outside of an iteration. All of this complicates the logic\!

{note:title=2nd Alternative Interface } On page ConnectivityMessageInterface you will find another alternative for the Connectivity Managers interface based on messages.{note}


interface DeltaIndexingManager
{
    void clear();
    void clear(String dataSourceID);
    void init(String dataSourceID);
    void finish(String dataSourceID);
    boolean checkForUpdate(ID id, String hash); // to reduce method calls mark entry as visited on return value false
    void visit(ID id, String hash);
    void delete(ID id);
    List<Identity> getObsoleteIDs(String dataSourceID);
    List<Identity> getObsoleteIDs(ID id); // for compounds
}
  • clear: clears the complete state information
  • clear: clears the state information for one dataSourceID
  • init: initializes the internal state for an import of a dataSourceID and establishes a lock to avoid that the same dataSourceID ist initialized multiple times concurrently
  • checkForUpdate: checks for the hash of the current id is new or has changed (true) or not (false)
  • visit: updates the hash and marks this id as visited
  • getObsoleteIDs: returns the entries that have not been marked as visited
  • finish: removes the lock


interface Buffer
{
    void store(Message msg);
}
interface Router
{
    void route(Message msg);
}

{info:title=Conclusion} During the team meeting on April 15 we agreed to implement the "1st alternative interface", employing lists to reduce communication overhead. In addition, the Crawler interface will also support lists. A Message-like interface or a pure XML interface may be adde later on. {info}

Workflow

Here follows a description of the workflow when used by an IRM:

  • Crawler*
  • the CrawlerController initializes an import by calling init() to reset Delta Indexing Manager visited flags. It should not be allowed to concurrently import the same DataSource, Connectivity and DeltaIIndexing Manager have to enshure this.
  • for each Record (only ID and hash) the CrawlerController receives by the Crawler, it asks the Connectivity Manager (internally the DeltaIndexing Manager) if it needs to be added/updated
    • no:
      • mark the Delta Indexing entry as visited
    • yes:
      • the Record (now with all data) is sent to the Connectivity Manager. The Connectivity Manager should not request the data object via callback. Crawlers should be the active components that send/request information.
      • the Processor stores the Record data in the external stores (Bin and XML) and creates an add/update Message and sends it to the Buffer
      • the Buffer applies it's logic (holding back the message some time, checking for conflicts) and sends the add/update message to the Router
      • the Router routes the add/update Message to the appropriate Queue/BPEL workflow
        • add: creates a Delta Indexing entry for the object and marks it as visited
        • update: updates the Delta Indexing entry for the object and marks it as visited
  • after the iteration has finished the CrawlerController tells the Connectivity Manager to perform Delta Indexing Manager Delete
    • the Processor checks the Buffer, until no more messages belonging to this DataSource are on hold
    • the Processor gets the list of objects to be deleted from the Delta Indexing Manager
    • the Processor creates N delete messages and adds each to the Buffer and calls Delta Index Manager finish() (cleans up the visited flags)
    • the Buffer applies it's logic and sends each delete message to the Router
    • the Router routes the delete Message to the appropriate Queue/BPEL workflow and removes the corresponding entry from the Delta Indexing Manager {info:title=Useful Information}For better performance it may be preferable to not create a single delete Message for each oject but to create one delete Message with a list of IDs to be deleted. DeltaIndexing should then support a delete(List<ID>) metod. But this will complicate the Buffer logic. Is it possible to pack a list of IDs in a message ?{info}
  • Agent*
  • the Agent sends a add Record (all data) to the AgentController, which in turn calls add on the ConnectivityManager. No DeltaIndex is done (init() is not called) so no component applies DeltaIndexing logic.
    • the Processor stores the Record data in the external stores (Bin and XML) and creates an add Message and sends it to the Buffer
    • the Buffer applies it's logic (holding back the message some time, checking for conflicts) and sends the add message to the Router
    • the Router routes the add Message to the appropriate Queue/BPEL workflow
    • add: creates a Delta Indexing entry for the object and marks it as visited
  • the Agent sends a update Record (all data) to the AgentController, which in turn calls update on the ConnectivityManager
    • the Processor stores the Record data in the external stores (Bin and XML) and creates an update Message and sends it to the Buffer
    • the Buffer applies it's logic (holding back the message some time, checking for conflicts) and sends the update message to the Router
    • the Router routes the update Message to the appropriate Queue/BPEL workflow
  • the Agent sends a delete Record (just ID) to the AgentController, which in turn calls delete on the ConnectivityManager
    • the Processor creates an delete Message and sends it to the Buffer
    • the Buffer applies it's logic (holding back the message some time, checking for conflicts) and sends the delete message to the Router
    • the Router routes the delete Message to the appropriate Queue/BPEL workflow