Skip to main content
Jump to: navigation, search

SMILA/Documentation/HowTo/How to add a new Data Source to the importing framework

< SMILA‎ | Documentation‎ | HowTo
Revision as of 06:50, 20 January 2012 by Andreas.schank.attensity.com (Talk | contribs) (New page: {{note|Work in progress. Not yet finished!}} This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see [[SMIL...)

(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Note.png
Work in progress. Not yet finished!


This how to manual shows how you can add a new data source (e.g. database, connectors, etc.) for the new SMILA importing framework (see Importing Concept for more information about the framework).

The steps necessary to include the bundles and workers into the builds or launchers won't be covered here, as they are covered in detail in other how tos (see preconditions).

Preconditions

  • Set up your development environment, see How to set up the development environment.
  • You should have read and understood the documentation about the JobManager, especially the configuration of workers and workflows if you want to create new workers.
  • You should have at least an idea about the OSGi framework and OSGi services. For links to introductory articles and tutorials see [1]. For a quite comprehensive overview on OSGi see [2]. Especially, SMILA makes intensive use of OSGi's Declarative Services facility, so you may want to have at least a quick look at it.
  • You should already have gone through the How to write a Worker tutorial, since you need a Crawler and a Fetcher worker in order to be able to crawl a new Data Source.

Prepare the bundle

Please follow the How to create a bundle (plug-in) manual to create a new bundle.

Add the following bundles to the Imported Packages list:

  • org.eclipse.smila.datamodel: For the Record class.
  • org.eclipse.smila.objectstore: Possible exceptions when accessing input/output streams.
  • org.eclipse.smila.taskmanager: To access the Task.
  • org.eclipse.smila.taskworker: The TaskWorker bundle containing the Worker and TaskContext interfaces.
  • org.eclipse.smila.taskworker.input: Input streams of the TaskWorker bundle.
  • org.eclipse.smila.taskworker.output: Output streams of the TaskWorker bundle.
  • org.eclipse.smila.importing: The importing framework bundle.

You should also add a test bundle (see How to create a test bundle (plug-in)).

Writing the workers

You can also have a look at the two existing crawlers in SMILA, org.eclipse.smila.importing.crawler.file and org.eclipse.smila.importing.crawler.web.

The Crawler

The crawler worker is responsible to retrieve or produce the IDs (e.g. URLs etc.) to adress or identify the data in the data source.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the crawler worker supposed to do?

The crawler worker is supposed to do the following:

  • be invoked by the task generator when the crawl job ist started (as Run-Once job!)
  • optionally get some information about what to crawl (some seed id or base URL or SQL query or whatever)
  • iterate over the data source according to that information
  • and for each entry generate an output record
    • with the data source property set
    • with the id set (e.g. to the ID of the data sources data record, to make things easier)
    • optionally with the attribute _deltaHash (ImportingConstants.ATTRIBUTE_DELTA_HASH) set to some information that indicates if the data has been changed meanwhile (a hash over the content or a timestampt of the last modification etc.), so the delta checker can determine if the record has to be processed or the data in the index is up-to-date.

So the worker could look something like the following:

public class WhatsoeverCrawlerWorker implements Worker {
 
	private static final String NAME = "mongoDbCrawler";
 
	private static final String OUTPUT_SLOT = "output";
 
	private static final String PROPERTY_SEED = "seed";
 
	private static final int MAX_IDS_PER_BULK = 1024;
 
	private Log _log = LogFactory.getLog(getClass());
 
	/** {@inheritDoc} */
	@Override
	public void perform(TaskContext taskContext) throws Exception {
		final AnyMap taskParams = taskContext.getTaskParameters();
		final String dataSource = taskParams
				.getStringValue(ImportingConstants.TASK_PARAM_DATA_SOURCE);
		if (dataSource == null || dataSource.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '"
					+ ImportingConstants.TASK_PARAM_DATA_SOURCE + "' of task "
					+ taskContext.getTask().getTaskId() + " is null or empty");
		}
		final String seedId = taskParams.getStringValue(PROPERTY_SEED);
		if (seedId == null || seedId.trim().length() == 0) {
			throw new IllegalArgumentException("Parameter '" + PROPERTY_SEED
					+ "' of task " + taskContext.getTask().getTaskId()
					+ " is null or empty");
		}
 
		int recordCount = 0;
		int recordOutputIndex = 0;
		RecordOutput recordOutput = taskContext.getOutputs().getAsRecordOutput(
				OUTPUT_SLOT, recordOutputIndex);
		for (Record record : getRecordsBySeed(seedId, dataSource)) {
			recordOutput.writeRecord(record);
			recordCount++;
			if (_log.isDebugEnabled()) {
				_log.debug("added id " + record.getId());
			}
			if (recordCount % MAX_IDS_PER_BULK == 0) {
				recordOutput.commit();
				recordOutputIndex++;
				recordOutput = taskContext.getOutputs().getAsRecordOutput(
						OUTPUT_SLOT, recordOutputIndex);
			}
		}
		_log.info("Found " + recordCount + " records for seed id " + seedId + ".");
	}
 
	/**
	 * gets records from the data source, if possible fills the
	 * {@link ImportingConstants#ATTRIBUTE_DELTA_HASH} attribute for the delta
	 * checker to be able to determine if the record has to be updated/inserted
	 * at all.
	 * 
	 * @param seedId
	 *            the seed id to know where/what to crawl.
	 * @param dataSource
	 *            the data source to crawl.
	 * @return a list of records containing the ID of the data source's data and
	 *         optionally a delta hash.
	 */
	private List<Record> getRecordsBySeed(final String seedId, final String dataSource) {
		ArrayList<Record> recordsToCralw = new ArrayList<Record>();
 
		// iterate over the entries in the data source determined by the seed id
                while(...) {
                        // id: the id of the data
                        // lastModified: the last modified date of the record (omit if it cannot be determined)
			final Record record = DataFactory.DEFAULT.createRecord(id, dataSource);
			record.getMetadata().put(ImportingConstants.ATTRIBUTE_DELTA_HASH, lastModified);
		}
		return recordsToCralw;
	}
 
	/** {@inheritDoc} */
	@Override
	public String getName() {
		return NAME;
	}
}

The Fetcher

So now we've created bulks of records pointing to the data to be imported into SMILA, we now need a worker that actually fetches the data from the data source using the ids, the crawler provided.

The only interface the worker has to implement is org.eclipse.smila.taskworker.Worker.

What is the crawler worker supposed to do?

  • Read the records sent from the crawler and filtered by the delta checker


...to be completed some time after lunch :-)...

Back to the top