SHadoop
What is Scala and Hadoop?
Scala is a modern multi-paradigm programming language designed to express common programming patterns in a concise, and type-safe way. It smoothly integrates features of object-oriented and functional languages including mixins, algebraic datatypes with pattern matching, genericity, and more.
Hadoop is a free Java software platform that supports running applications to process vast amounts of data. It has been developed under the Apache Lucene Project and was originally developed to support distribution for Nutch, which is an effort to build an open source search engine for the search and index component. Hadoop consists of an open source implementation of Google’s published computing infrastructure, specifically MapReduce and the Google File System.
Scala + Hadoop!?
The Hadoop Map-Reduce framework is map based, whose keys and values are serializable objects which implements a simple serialization protocol. This serialization protocol is defined by the Writable interface. In addition, Hadoop provides Writable
’s implementations for each basic types(Ints
, Long
, Float
, String
, …). This implementations wrap a value of the basic type in an Writable
object. Let us take this example, and think about possible utilization of int values in Hadoop:
private final static IntWritable one = new IntWritable(1);
Sounds like primitive wrappers before Java 5 boxing!
Why Scala?
Read this and this. Moreover, Scala offers a bag of others features:
- Implicit conversion methods
Implicit methods that are often used for converting types, essentially give you statically guaranteed and provided dynamic scoping. Implicit conversions between types (You know the way a number get implicitly converted to another number type in Java, e.g.
int
tolong
,short
toint
, …? Scala does that too, but it’s all programmer definable. They’re just library functions in scala.Predef).
- Type inference
The Scala compiler can oftentimes infer the type of an object so there is no need to explicitly specify the type. It is, for instance, often not necessary in Scala to specify the type of a variable, since the compiler can deduce the type from the initialization expression of the variable. Also return types of methods can often be omitted since they corresponds to the type of the body, which gets inferred by the compiler.
In short, Scala provides a clear, concise and stylized syntax.
SHadoop = Scala + Hadoop
What we would like is something like this:
val one = 1
Or like this:
def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) =
(value split " ") foreach (output collect (_, one))
The interesting point is that with Scala, this is quite simple to implement. SHadoop is the proof!!! SHadoop consists in only one source file containing a Scala object with some implicit methods that are often used for converting primitives Java types (including String
) to Writable
instances. Furthermore, the SHadoop object provides implicit methods that are often used for converting writable java iterators to primitives type scala iterators - scala iterators provides a lot of useful methods, like foreach
, map
, filter
and others.
Usage
The Hadoop Map-Reduce Tutorial shows a very simple Map-Reduce application that counts the number of occurences of each word in a given input set.
Source Code - WordCount.scala
package shadoop
import SHadoop._
import java.util.Iterator
import org.apache.hadoop.fs._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred._
object WordCount {
class Map extends MapReduceBase with Mapper[LongWritable, Text, Text, IntWritable] {
val one = 1
def map(key: LongWritable, value: Text, output: OutputCollector[Text, IntWritable], reporter: Reporter) =
(value split " ") foreach (output collect (_, one))
}
class Reduce extends MapReduceBase with Reducer[Text, IntWritable, Text, IntWritable] {
def reduce(key: Text, values: Iterator[IntWritable],
output: OutputCollector[Text, IntWritable], reporter: Reporter) = {
val sum = values reduceLeft ((a: Int, b: Int) => a + b)
output collect (key, sum)
}
}
def main(args: Array[String]) = {
val conf = new JobConf(classOf[Map])
conf setJobName "wordCount"
conf setOutputKeyClass classOf[Text]
conf setOutputValueClass classOf[IntWritable]
conf setMapperClass classOf[Map]
conf setCombinerClass classOf[Reduce]
conf setReducerClass classOf[Reduce]
conf setInputFormat classOf[TextInputFormat]
conf setOutputFormat classOf[TextOutputFormat[_ <: WritableComparable, _ <: Writable]]
conf setInputPath(args(0))
conf setOutputPath(args(1))
JobClient runJob conf
}
}
Source code explained: Java x Scala
1. The one field from the Map class
Java
private final static IntWritable one = new IntWritable(1);
Scala
val one = 1
Scala to specify the type of the field by type inference. Very cool!!!
2. The map method from the Map class
Java
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
Scala
def map(key: LongWritable, value: Text,
output: OutputCollector[Text, IntWritable], reporter: Reporter) =
(value split " ") foreach (output collect (_, one))
Wow!!! Scala implicitly converts value to String
and applies String’s split
method that returns a String array. This array iterate over each String adding it as key(implicitly converted to Text
) from output
object and whose value is one
. Note: Scala doesn’t require semicolons at the end of each instruction, they are optionals.
3. The reduce method from the Reduce class
Java
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
Scala
def reduce(key: Text, values: Iterator[IntWritable],
output: OutputCollector[Text, IntWritable], reporter: Reporter) = {
val sum = values reduceLeft ((a: Int, b: Int) => a + b)
output collect (key, sum)
}
Again, wow!!! On first line Scala calculates the sum of the values using the reduceLeft
method from a Int
iterator, implicitly converted from IntWritable
java iterator. After, the output
object collects the sum result.
Running
Assuming HADOOP_HOME
is the root of the installation from Hadoop:
- Copy the
scala-library.jar
to${HADOOP_HOME}/lib
directory - Run the application:
${HADOOP_HOME}/bin/hadoop jar shadoop-0.0.1-alpha.jar shadoop.WordCount input/ output/
input/
- a directory containing the text-files as input setouput/
- a ouput directory