Jump to: navigation, search

Difference between revisions of "SMILA/Discussion/MapReduce"

(New page: == Discussion - SMILA and MapReduce == Thinking about Big Data, a lot of people have '''MapReduce''' in mind. Due to its flexibility in the job management / asynchronous workflows, let's ...)
(No difference)

Revision as of 13:04, 31 October 2012

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.


(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. The Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key.

  • Map: Input -> (K,V)*
  • Reduce: K,V* -> result

To integrate this in SMILA's job processing, intermediate result and output results should also be record bulks.


The idea for a first step is to provide an abstract MapWorker and an abstract ReduceWorker. To implement your own Map/Reduce workers you can use these as superclasses. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could take some scripts as input for the map and reduce function.)


 /** the main map function which has to be implemented. */
 protected abstract void map(Record)
 /** the resulting key/values pairs are delivered to the superclass. */
 protected void emit(key, values)  

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys, by serializing them to an appropriate string). The result of the MapWorker is a record bulk, with records a la:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": <values>, 
    "key2": <values>, 
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. It also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record implementation.
  • The MapWorker implementations should use the so called "in-mapper combining" pattern which means that it already cumulates the results for the same keys. So, no key should be emitted twice. We can not cumulate the values for the keys in the abstract MapWorker, cause we don't know anything about the Map function that is implemented, e.g. if we should sum or concat the values or whatever.


 /** the main reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store the result in one (or more?) record(s), also below the "_mapReduce" parameter(?)


The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can do a little trick here: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all results for the same key as Reduce input. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need lots of memory although with the merge sort approach we will not have all results in memory at once.

We could also use a kind of MergeWorker in between to process the Map results and create output objects that contain all merged values for each key. Then the following ReduceWorker could get a task for each of these objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.