At eXenSa, we are currently working on a super fast way to preprocess large amounts of documents. It’s based on a combination of our new generation Count-Min Sketch (not the one I’ve posted about before, but an even better version) and a new trick we’ve devised just this week-end.
Before I explain this trick, here is how counting is usually done with standard data structures in map-reduce, and why we have chosen to try another way. Usually here is what happens :
- Take a huge collection of documents
- Split it into thousands of partitions
- Now, in a convert a document into a collection of counts
- Reduce the counts (sum by word)
Let’s introduce an example sentence to see how that precisely works:
Alan Mathison Turing, OBE, FRS (/ˈtjʊərɪŋ/; 23 June 1912 – 7 June 1954) was a British pioneering computer scientist, mathematician, logician,cryptanalyst, philosopher, mathematical biologist, and marathon and ultra distance runner. He was highly influential in the development ofcomputer science, providing a formalisation of the concepts of algorithm and computation with the Turing machine, which can be considered a model of a general purpose computer.[2][3][4] Turing is widely considered to be the father of theoretical computer science and artificial intelligence
We first convert this sequence of words into a collection like that :
(Alan,1) (Mathison, 1) (Turing, 1), …
Because words occur (hopefully) several times, we want to reduce the counts locally before shuffling the pairs to other nodes and reducing the counts globally. But even like that, if we take the partitions -> count pairs, we will be faced with a sad truth : each partition will generate a big load of pairs, most of them being also found in other partitions. There’s a lot of duplication that needs to be done and later reduced, because we can’t have a mutable Map of (Words -> Counts) updated in each executor as partitions are processed.
There are several good reasons for this. The first one is that using an executor-level mutable map can have very bad effects if the executor has some failing tasks, or if Spark needs to recompute a partition’s ouput because the RDD is evaluated several times. Another good reason is that there is not guarantee that the mutable Map will fit into memory of each executor.
The problem is, we want to count a lot of items. Not only words, but also bigrams, trigrams, skip-ngrams, etc. So counting the standard way is a real expensive processing.
Now, suppose you have a memory bound data structure that can handle a lot of counts with a small error (that’s a Count-Min sketch). In theory, you still need one data structure per partition (and you would rather have a lot of small partitions because that’s in general a good idea performance-wise). That’s bad, because to be able to handle tons of word counts (and also bigrams, trigrams, skip-ngrams,…), ALL these data structures must be quite large (merging Count-Min sketches only works if they have the same configuration : width, depth, and hash functions).
What you would like, though, is to have this data structure distributed as one instance per node (let’s say it takes 1GB), process all the small partitions of the documents collection, and potentially, at the end, reduce and collect the distributed sketches into one. What we really want is an accumulator, except that in Spark, accumulators are bound to a partition, not to a worker, and even less to an executor.
You cannot do that with RDD or accumulators (or accumulables), but you can do that with a little trick :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
// First, define singleton (one per JVM, so one per executor) object CMSSingleton extends Serializable { @transient var cms: Option[CMS] = None def getCMS(): CMS = { this.synchronized { if (cms.isDefined) cms.get else { cms = Some(new CMS()) cms.get } } } def getCMSContent(): Iterator[((Int, Int), Array[Byte])] = { this.synchronized { if (cms.isDefined) { cms.toContent() cms = None } else Iterator[((Int, Int), Array[Byte])]() } } } .... documents.foreachPartition { docIter => val cms = CMSSingleton.getCMS() docIter.foreach { case (docID, docContent) => cms.count(docContent) } } // Assume defaultParallelism is >> nb of executors val cmsAsRDD = sc.parallelize(0 to defaultParallelism).flatMap{ x => CMSSingleton.getContent() } val cmsLocal = CMS.fromBlocks(cmsAsRDD.reduceByKey(CMS.MergeCMSBlocks(_,_)).collectAsMap) |
Note the important ideas :
- Synchronized access to a singleton referencing a thread-safe data structure (remember there are several worker per executor)
- Getting the content “destroys” the singleton reference in the synchronized singleton. I haven’t found any other way to ensure exactly one partition per executor (if you find a cleaner way to do it, please tell me), mostly because keys are not always assigned to the same executor apparently (I guess you have to have a strong data/executor binding to enforce this, but there’s no guarantee)
Comments are closed, but trackbacks and pingbacks are open.