Jump to: navigation, search

SMILA/Discussion/MapReduce

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.

MapReduce

(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.

  • Map: record -> (K,V)*
  • Reduce: K,V* -> result

To integrate this easily in SMILA's data model, intermediate and output results should also be records resp. record bulks.

Worker

As a basic MapReduce support in SMILA we could provide an abstract MapWorker and an abstract ReduceWorker. To implement a dedicated MapReduce solution, subclasses of Map- and ReduceWorker have to be implemented. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could have a kind of script parameter to provide the map and reduce function.)

MapWorker:

 /** the map function which has to be implemented. */
 protected abstract void map(record)
 /** the resulting key/value pairs are delivered to the superclass. */
 protected void emit(key, value)
 /** this will be called after the input has been processed. */
 protected abstract void finish() 

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records having a "_mapReduce" parameter:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": [<values>], 
    "key2": [<values>], 
    ...
  } 
...
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
  • The MapWorker implementation should, if possible, use the so called in-mapper combining pattern which means that - during the processing of an input task - it already cumulates the emitted values belonging to the same key. The emitting of the values then can be done in finish(). This will reduce the number of values. (Alternative: We could call finish() after each processed record(?))

ReduceWorker:

 /** the reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store its result in one (or more?) record(s), also below the "_mapReduce" parameter(?)

Workflow

The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)

MergeWorker

A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.

Open issues

  • Is it really possible for the ReduceWorker to process the intermediate record bulks as stream so that it doesn't have to hold the whole intermediate results in memory?
  • Does it make sense to have a Combine phase, resp. a CombineWorker?
  • Should the result of the ReduceWorker have a fix format (e.g. storing the results below a _mapReduce parameter)? What makes sense here? Or should we just leave that to each ReduceWorker implementation?


Examples

word count

Workflow: Crawler -> Fetcher -> WordCountMapWorker -> WordCountReduceWorker

We assume the crawling produces three records in two record bulks as input for the MapWorker:

{"_recordid": "r1", "content": "aaa ccc ccc" }
{"_recordid": "r2", "content": "ccc bbb" }
{"_recordid": "r3", "content": "bbb ccc" }

This will produce two tasks for the WordCountMapWorker which produces two intermediate record bulks:

{"_recordid": "intermediate1", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [1], 
    "ccc": [2,1]
  } 
{"_recordid": "intermediate2", 
 "_mapReduce": { 
    "bbb": [1], 
    "ccc": [1]
  } 


The WordCountReduceWorker reads those two intermediate results and produces the final result:

{"_recordid": "result", 
 "_mapReduce": {
    "aaa": [1], 
    "bbb": [2], 
    "ccc": [4]
  } 


average value calculation

tbd (e.g. crawl filesystem, calculate avergae file size for each file extension)

inverted indexing of document terms

tbd (key: term, value: document which contains the term)