Jump to: navigation, search


Revision as of 15:11, 31 October 2012 by Andreas.weber.empolis.com (Talk | contribs) (MergeWorker)

Discussion - SMILA and MapReduce

Thinking about Big Data, a lot of people have MapReduce in mind. Due to its flexibility in the job management / asynchronous workflows, let's think about the possibilities we have in SMILA to provide some - at least basic - support for MapReduce algorithms.


(in very simplified form ;)

First, we have some kind of input data, let's say a record bulk in SMILA. For each record the Map function produces Key/Value pairs (K,V)* as intermediate results. After all input data has passed the Map phase, the intermediate results are processed by the Reduce function which creates an output result for each unique intermediate key and all values of this key.

  • Map: record -> (K,V)*
  • Reduce: K,V* -> result

To integrate this easily in SMILA's data model, intermediate and output results should also be records resp. record bulks.


As a basic MapReduce support in SMILA we could provide an abstract MapWorker and an abstract ReduceWorker. To implement a dedicated MapReduce solution, subclasses of Map- and ReduceWorker have to be implemented. (In a second step, we could also think about having only one MapWorker and one ReduceWorker and these could have a kind of script parameter to provide the map and reduce function.)


 /** the map function which has to be implemented. */
 protected abstract void map(record)
 /** the resulting key/values pairs are delivered to the superclass. */
 protected void emit(key, values)  

To keep things simple we should (in a first step) only allow strings as keys for the map result. (Nevertheless, one could use complex keys by serializing them to an appropriate string). The result of a MapWorker task is a record bulk, with records like that:

{"_recordid": "...", 
 "_mapReduce": {
    "key1": <values>, 
    "key2": <values>, 
  • "_mapReduce" is a hardcoded parameter name, so that the following ReduceWorker can find its input data. This also allows to transport MapReduce data in Records additional to other parameters and attributes.
  • The results of a MapWorker are already ordered by the keys, cause these are strings and we use ordered maps in our Record/AnyMap implementation.
  • The MapWorker implementations should use the so called "in-mapper combining" pattern which means that it already cumulates the results for the same keys. So, no key should be emitted twice. We can not cumulate the values for the keys in the abstract MapWorker, cause we don't know anything about the Map function that is implemented, e.g. if we should sum or concat the values or whatever.


 /** the main reduce function which has to be implemented. */
 protected abstract void reduce(key, values)
 /** the result for each key is delivered to the superclass. */
 protected void emit(key, result)  

The (abstract) ReduceWorker could store the result in one (or more?) record(s), also below the "_mapReduce" parameter(?)


The workflow consists of (at least) a MapWorker and a Reduce worker which processes the result of the MapWorker. The main problem here is, that the Reduce-Phase musn't start before _all_ MapWorker tasks have been finished.

We can solve this with a little trick: As long as the job is in RUNNING mode, the ReduceWorker's TaskGenerator doesn't produce tasks. So the MapReduce job run will only process MapWorker tasks until they are all finished. Afterwards the job run will go to its COMPLETION phase, and here we can interfere and let the ReduceWorker's TaskGenerator produce one task for the Reduce phase.

Why only one task for the ReduceWorker? Well, we have to provide all values for the same key as Reduce input. These values are distributed in the different intermediate record bulks. The (abstract) ReduceWorker can read the input (streams) of all (sorted!) intermediate results from the Map phase, do a kind of merge sort, and so provide all input values for the reduce function without reading the whole input before.

The disadvantage of this approach is that the Reduce phase doesn't scale, cause we have only one task for the whole reduce phase. And we may need a good amount of memory, although with the streaming merge sort we should not have many intermediate results in memory at once. (Do we have to hold the complete reduce result in memory?)


A different approach is the usage of something like a "MergeWorker" in the workflow between the Map- and ReduceWorker. The MergeWorker could process the Map results and create output objects that contain all merged values for each key. In principle this would work with streaming merge sort like already described for the Reducee phase before. During this step the following ReduceWorker could process the already created objects. The disadvantage here is the worser performance due to the additional workflow step with reading and writing another intermediate result.