Jump to: navigation, search

SMILA/Documentation/HowTo/How to add a new Data Source to the importing framework

This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see Importing Concept for more information about the framework).

The steps necessary to include the bundles and workers into the builds or launchers won't be covered here, as they are covered in detail in other how tos (see preconditions).

Preconditions

  • Set up your development environment, see How to set up the development environment.
  • You should have read and understood the documentation about the JobManager, especially the configuration of workers and workflows if you want to create new workers.
  • You should have at least an idea about the OSGi framework and OSGi services. For links to introductory articles and tutorials see [1]. For a quite comprehensive overview on OSGi see [2]. Especially, SMILA makes intensive use of OSGi's Declarative Services facility, so you may want to have at least a quick look at it.
  • You should already have gone through the How to write a Worker tutorial, since you need a Crawler and a Fetcher worker in order to be able to crawl a new Data Source.

Prepare the bundle

Please follow the How to create a bundle (plug-in) manual to create a new bundle.

Add the following bundles to the Imported Packages list:

  • org.eclipse.smila.datamodel: For the Record class.
  • org.eclipse.smila.objectstore: Possible exceptions when accessing input/output streams.
  • org.eclipse.smila.taskmanager: To access the Task.
  • org.eclipse.smila.taskworker: The TaskWorker bundle containing the Worker and TaskContext interfaces.
  • org.eclipse.smila.taskworker.input: Input streams of the TaskWorker bundle.
  • org.eclipse.smila.taskworker.output: Output streams of the TaskWorker bundle.
  • org.eclipse.smila.importing: The importing framework bundle.

You should also add a test bundle (see How to create a test bundle (plug-in)).

Writing the workers

You should also have a look at the existing crawlers in SMILA: org.eclipse.smila.importing.crawler.file, org.eclipse.smila.importing.crawler.web, org.eclipse.smila.importing.crawler.jdbc and org.eclipse.smila.importing.crawler.feed.

The Crawler

The crawler worker is responsible to retrieve or produce the IDs (e.g. URLs etc.) to adress or identify the data in the data source.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the crawler worker supposed to do?

The crawler worker is supposed to do the following:

  • be invoked by the task generator when the crawl job ist started (as Run-Once job!)
  • optionally get some information about what to crawl (some seed id or base URL or SQL query or whatever)
  • iterate over the data source according to that information
  • and for each entry generate an output record
    • with the data source property set
    • with the id set (e.g. to the ID of the data sources data record, to make things easier)
    • optionally with the attribute _deltaHash (ImportingConstants.ATTRIBUTE_DELTA_HASH) set to some information that indicates if the data has been changed meanwhile (a hash over the content or a timestampt of the last modification etc.), so the delta checker can determine if the record has to be processed or the data in the index is up-to-date.
    • optionally with data source properties it can easily gather for the data (e.g. for a file crawler these would be the file's metadata that are quickly available without actually reading the file).
    • with metadata attributes that were mapped from the data source properties so that they fit to the following processing, e.g. the search index schema
    • if the data source contains compound objects, it must set the attribute _isCompound (ImportingConstants.ATTRIBUTE_COMPOUNDFLAG) to true, so that the DeltaChecker worker can route compound records to a special compound extractor worker. See below for more details.

So the worker could look something like the following:

public class WhatsoeverCrawlerWorker implements Worker {
 
	private static final String NAME = "whatsoeverCrawler";
 
	private static final String OUTPUT_SLOT = "output";
 
	private static final String PROPERTY_SEED = "seed";
 
	private static final int MAX_IDS_PER_BULK = 1024;
 
	private Log _log = LogFactory.getLog(getClass());
 
	/** {@inheritDoc} */
	@Override
	public void perform(TaskContext taskContext) throws Exception {
		final AnyMap taskParams = taskContext.getTaskParameters();
		final String dataSource = taskParams
				.getStringValue(ImportingConstants.TASK_PARAM_DATA_SOURCE);
		if (dataSource == null || dataSource.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '"
					+ ImportingConstants.TASK_PARAM_DATA_SOURCE + "' of task "
					+ taskContext.getTask().getTaskId() + " is null or empty");
		}
		final String seedId = taskParams.getStringValue(PROPERTY_SEED);
		if (seedId == null || seedId.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '" + PROPERTY_SEED
					+ "' of task " + taskContext.getTask().getTaskId()
					+ " is null or empty");
		}
 
		int recordCount = 0;
		int recordOutputIndex = 0;
		RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(
				OUTPUT_SLOT, recordOutputIndex);
		for (Record record : getRecordsBySeed(seedId, dataSource)) {
			recordOutput.writeRecord(record);
			recordCount++;
			if (_log.isDebugEnabled()) {
				_log.debug("added id " + record.getId());
			}
			if (recordCount % MAX_IDS_PER_BULK == 0) {
				recordOutput.commit();
				recordOutputIndex++;
				recordOutput = taskContext.getOutputs().getAsRecordOutput(
						OUTPUT_SLOT, recordOutputIndex);
			}
		}
		_log.info("Found " + recordCount + " records for seed id " + seedId + ".");
	}
 
	/**
	 * gets records from the data source, if possible fills the
	 * {@link ImportingConstants#ATTRIBUTE_DELTA_HASH} attribute for the delta
	 * checker to be able to determine if the record has to be updated/inserted
	 * at all.
	 * 
	 * @param seedId
	 *            the seed id to know where/what to crawl.
	 * @param dataSource
	 *            the data source to crawl.
	 * @return a list of records containing the ID of the data source's data and
	 *         optionally a delta hash.
	 */
	private List<Record> getRecordsBySeed(final String seedId, final String dataSource) {
		ArrayList<Record> recordsToCrawl = new ArrayList<Record>();
 
		// iterate over the entries in the data source determined by the seed id
                while(...) {
                        // id: the id of the data
                        // lastModified: the last modified date of the record (omit if it cannot be determined)
			final Record record = DataFactory.DEFAULT.createRecord(id, dataSource);
			record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, lastModified);
 
                        // map internal property names to attribute names e.g. by using org.eclipse.smila.importing.util.PropertyNameMapper                       
                        // mapper.mapNames(record, ...);  // (see implementation of FileCrawlerWorker, WebCrawlerWorker)
 
                        recordsToCrawl.add(record);
		}
		return recordsToCrawl;
	}
 
	/** {@inheritDoc} */
	@Override
	public String getName() {
		return NAME;
	}
}

If your data source is a bit more complex, e.g. hierarchical (file system, etc.) or you have to follow a linked source (like e.g. a web site), you might just have a look at how the sample implementations of file and web crawler work like (e.g. using the VisitedLinks service or looping back to the crawler to visit the next hierarchiy stage, etc.).

The Fetcher

So now we've created bulks of records pointing to the data to be imported into SMILA, we now need a worker that actually fetches the data from the data source using the ids, the crawler provided.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the fetcher worker supposed to do?

  • Read the records sent from the crawler and filtered by the delta checker
  • get the data to be processed by SMILA out of the data source for each ID in the record bulk
  • create records from that data
  • hand that data over to the update pusher, which in turn hands it over to the import workflow (i.e. the bulk builder)
  • optionally (if supported) extract compounds or send them to a compound extractor worker to do so.

so the fetcher worker would look something like the follows, with the magic happening in the fetch(...) method, that has to be accessing the data source, retrieving the data and adding it as an attachment and filling other metadata as needed (you might have a look at the FileFetcherWorker or the web crawlers SimpleFetcher implementation for an inspiration).

public class WhatsoeverFetcherWorker implements Worker {
 
	private static final String NAME = "whatsoeverFetcher";
	private static final String INPUT_SLOT = "input";
	private static final String OUTPUT_SLOT = "output";
	protected final Log _log = LogFactory.getLog(getClass());
 
	/** {@inheritDoc} */
	@Override
	public void perform(TaskContext taskContext) throws Exception {
		final RecordInput recordInput = taskContext.getInputs()
				.getAsRecordInput(INPUT_SLOT);
		final RecordOutput recordOutput = taskContext.getOutputs()
				.getAsRecordOutput(OUTPUT_SLOT);
		Record record;
		do {
			record = recordInput.getRecord();
			if (record != null) {
				if (_log.isDebugEnabled()) {
					_log.debug("fetching content for record " + record.getId());
				}
				fetch(record, taskContext);
 
                                // map internal property names to attribute/attachment names e.g. by using org.eclipse.smila.importing.util.PropertyNameMapper                       
                                // mapper.mapNames(record, ...);  // (see implementation of FileCrawlerWorker, WebCrawlerWorker)
 
				recordOutput.writeRecord(record);
				if (_log.isDebugEnabled()) {
					_log.debug("added record " + record.getId());
				}
			}
		} while (record != null);
	}
 
	/**
	 * Actually retrieves the data from the source based on the ID of the record
	 * and fills in the record's meta data and/or attachments.
	 * 
	 * @param record
	 *            the record to be completed with information from the data
	 *            source
	 * @param taskContext
	 *            the tasks context.
	 */
	private void fetch(Record record, TaskContext taskContext) {
		final long time = taskContext.getTimestamp();
 
		// go and fetch the content and fill the record's content, metadata and/or
		// attachments with it.
		record.getMetadata().put(..., ...);
		...
 
		taskContext.measureTime("fetchContent", time);
	}
 
	/** {@inheritDoc} */
	@Override
	public String getName() {
		return NAME;
	}
}

The Extractor

If your data source contains compound documents like ZIP archives or similar that should be decomposed into the real documents you need to implement an additional compound extractor worker.

Basically, you need to implement the interface org.eclipse.smila.taskworker.Worker, again.

What's the extractor worker supposed to do?

The extractor worker should for each input object fetch the content from the data source, extract the compound object and create records for each contained object. Additionally, the crawler worker must mark compound objects by setting a special attribute so that the DeltaChecker worker can route the compound to the extractor worker instead of the fetcher worker.

There are some things in SMILA that can make this stuff more simple:

  • An org.eclipse.smila.importing.compounds.CompoundExtractor service can handle the actual extraction process: Just give it an java.io.InputStream to a supported and it return records for each contained object. Additionally, it can handle the compound identification for the crawler worker by checking if a filename or a mimetype denotes a supprted compound type. See SMILA/Documentation/Importing/CompoundExtractorService for more details on this service.
  • The org.eclipse.smila.importing.compounds.ExtractorWorkerBase is a base class for extractor workers that use the CompoundExtractor.

Creating an extractor worker using the base classes

If using the ExtractorWorkerBase, your extractor worker has three tasks to do:

  • First, provide a ContentFetcher that creates a InputStream to the compound content. In the simplest case, the worker can be the ContentFetcher itself. Then it has to implement a getContent method to get the content based on attributes set in the current compound record and the task parameters. Of course, this method will likely be very similar to some of the fetcher worker's code, so it makes sense to share this code, and the shared component then might implement the ContentFetcher interface.
  • Second, invoke a given CompoundExtractor service. This is necessary, because the extractor service needs more than only the content to be able to extract it: the mime type or the filename (especially the file extension) is needed to identify which type of compound it has to extract. So the invokeExtractor method is given the compound record so that it can get this meta information (it hopefully knows where the associated worker has put it ;-) and calls the appropriate extractor service method. The extractor also needs the name of the attachment to put the object content in.
  • Then it implements a method that converts the records produced by the extractor service to records that look like records produced by the associated crawler. Especially, it has to create a record ID that matches those produced by the crawler, copy the attachment and attributes produced by the extrator to the corresponding names called like the ones the crawler would have produced (or use the task parameters to determine the correct names) and set the _deltaHash attribute. It also can copy attributes from the compound record, if there is a need to "inherit" them.
  • Finally it implements a method mapRecord() that is used for the mapping of internal data source property names to attribute names.
  • Of course, like any worker, it has to provide a getName() method returning the worker name.
public class WhatsoeverExtractorWorker extends ExtractorWorkerBase implements ContentFetcher {
 
  private static final String NAME = "whatsoeverExtractor";
 
  @Override
  public String getName() {
    return NAME;
  }
 
 @Override
  protected ContentFetcher getContentFetcher() {
    return this; // easiest thing, if you don't have created a separate fetcher component.
  }
 
  @Override
  public InputStream getContent(final Record record, final AnyMap parameters) throws ImportingException {
    return ...; // get a stream to the compound content. Might share code with associated Fetcher worker. 
  }
 
  @Override
  protected Iterator<Record> invokeExtractor(final CompoundExtractor extractor, final Record compoundRecord,
    final InputStream compoundContent, final TaskContext taskContext) throws CompoundExtractorException {
    final String mimeType = compoundRecord.getMetadata().getStringValue(WhatsoeverCrawlerWorker.ATTRIBUTE_MIMETYPE);
    return extractor.extract(compoundContent, null, mimeType, WhatsoeverCrawlerWorker.ATTACHMENT_CONTENT);
  }
 
  @Override
  protected Record convertRecord(final Record compoundRecord, final Record extractedRecord,
    final TaskContext taskContext) {
    final String dataSource = compoundRecord.getSource();
    final String id = ...; // create a unique ID from the compound record attributes, according to the conventions of associated crawler worker.
    final Record convertedRecord = extractedRecord.getFactory().createRecord(id, dataSource);
    copyAttachment(extractedRecord, convertedRecord, WhatsoeverCrawlerWorker.ATTACHMENT_CONTENT);
    copyAttribute(extractedRecord, CompoundExtractor.KEY_SIZE, convertedRecord, WhatsoeverCrawlerWorker.ATTRIBUTE_SIZE);
    // copy more attributes from compound record or extracted record to the final record according to the conventions of your crawler worker.
    // ...
    copyAttribute(extractedRecord, CompoundExtractor.KEY_TIME, convertedRecord, ImportingConstants.ATTRIBUTE_DELTA_HASH);
    return convertedRecord;
  }
 
  @Override
  protected void mapRecord(final Record record, final TaskContext taskContext) {
    final PropertyNameMapper mapper = PropertyNameMapper.createFrom(taskContext);
    mapper.mapNames(record, ...);
  }
 
}

That's it, basically. You might want to have a look at the web or file crawler implementations in SMILA to see how this scheme is implemented there.

Don't forget that the base class requires a service reference to the extractor service, so you have to add it to the OSGi DS component definition, like this (you can of course add more references if necessary):

<?xml version="1.0" encoding="UTF-8"?>
<scr:component xmlns:scr="http://www.osgi.org/xmlns/scr/v1.1.0" name="WhatsoeverExtractorWorker">
   <implementation class="org.eclipse.smila.importing.crawler.whatever.WhatsoeverExtractorWorker"/>
   <service>
      <provide interface="org.eclipse.smila.taskworker.Worker"/>
   </service>
   ...
   <reference 
        name="compoundExtractor"
        interface="org.eclipse.smila.importing.compounds.CompoundExtractor" 
        bind="setCompoundExtractor"
        unbind="unsetCompoundExtractor"
        cardinality="1..1"
        policy="static" 
   />
</scr:component>

If your data source contains compound types not supported by the SMILA extractor services you can either extend them or implement the compound extraction completely on your own. We will not go into detail about this here, however.

Plugging it up

So, now we have to plug all together.

  • Write component definitions for your workers (and as well for your service if one is needed to access your data source).
  • Add the bundle to the launcher and the config.ini file.
  • Set your Scale-Up limits
  • add worker descriptions to the workers.json file for your workers, these could look something like the following code snippet.

Please note:

  • The crawler worker needs the "runOnceTrigger" task generator for the runOnce triggering!
  • An extractor worker based on the ExtractorWorkerBase class has always an input slot named "compound" and an output slot named "files".
{ 
  "name": "whatsoeverCrawler",
  "taskGenerator":"runOnceTrigger",
  "parameters":[
    {
      "name":"dataSource"
    },
    {
      "name":"seed"
    },
    {
      "name":"mapping",
      "type":"map",
      "entries":[
        {
          "name":"whatsoeverProperty_1"         
        },
        {
           "name":"whatsoeverProperty_2"
        },
        ...
      ]
    }
  ],
  "input": [],
  "output": [ 
    {   "name": "output",
        "type": "recordBulks",
        "modes":[
            "maybeEmpty",
            "multiple"
        ]
    } ]
},
{ 
  "name": "whatsoeverFetcher",
  "parameters":[
    {
      "name":"mapping",
      "type":"map",
      "entries":[
        {
          "name":"whatsoeverProperty_A"         
        },
        ...
      ]
    }
  "input": [ 
    {	"name": "input",
	"type": "recordBulks"
    } ],
  "output": [ 
    {	"name": "output",
	"type": "recordBulks"
    } ]
},
{ 
  "name": "whatsoeverExtractor",
  "parameters":[
    {
      "name":"mapping",
      "type":"map",
      "entries":[
        {
          "name":"whatsoeverProperty_1"         
        },
        {
           "name":"whatsoeverProperty_2"
        },
        ...
      ]
    }
  "input": [ 
    {	"name": "compounds",
	"type": "recordBulks"
    } ],
  "output": [ 
    {	"name": "files",
	"type": "recordBulks"
    } ]
}
  • add the workers to a sensible workflow like e.g.
    {
      "name":"whatsoeverCrawling",
      "startAction":{
        "worker":"whatsoeverCrawler",
        "output":{
          "output":"somethingToCrawlBucket"
        }
      },
      "actions":[
        {
          "worker":"deltaChecker",
          "input":{
            "recordsToCheck":"somethingToCrawlBucket"
          },
          "output":{
            "updatedRecords":"somethingToFetchBucket",
            "updateedCompounds":"somethingToExtractBucket"
          }
        },
        {
          "worker":"whatsoeverFetcher",
          "input":{
            "input":"somethingToFetchBucket"
          },
          "output":{
            "output":"somethingToPushBucket"
          }
        },
        {
          "worker":"whatsoeverExtractor",
          "input":{
            "input":"somethingToExtractBucket"
          },
          "output":{
            "output":"somethingToPushBucket"
          }
        },
        {
          "worker":"updatePusher",
          "input":{
            "recordsToPush":"somethingToPushBucket"
          }
        }
      ]
    }
  • For your convenience you can also create a predefined job in the jobs.json, like the following snippet (you should notice that the seed parameter is fixed if you choose to use a predefined job)
    {
      "name": "crawlWhatsoever",
      "workflow": "whatsoeverCrawling",
      "parameters": {
        "tempStore": "temp",
        "dataSource": "whatsoever",
        "seed": "your seed data",
        "jobToPushTo": "indexUpdate",
        "mapping":{
          "whatsoeverProperty_1":"Title",
          "whatsoeverProperty_2":"Path",
          "whatsoeverProperty_A":"Content",
          ...
        }
      }

And ....Action!

So now it's time to check if everything went right.

  • Start SMILA
  • check if you can access your worker definitions, workflow and job via the REST API. If not, check for errors (syntax errors in the json files, others in SMILA log).
  • check in SMILA's log if your workers were added
  • start the indexing job: POST http://localhost:8080/smila/jobmanager/jobs/indexUpdate/
  • start your crawling job (remember: it has to be started as a RunOnce Job!)
POST http://localhost:8080/smila/jobmanager/jobs/crawlWhatsoever/
{
  "mode": "runOnce"
}
  • Check your jobs, after your crawl job succeeded, you can finish your input job. After the input job succeeded (if you finished it), you should wait some seconds (up to 60, because it takes some time for the autocommit), before checking, if your data was indexed (see http://localhost:8080/SMILA/search).

So now you should be able to search in your content.

If you can find your records, you have just successfully added a new datasource to your SMILA application. Congratulations!