Archive for the Hadoop Category

When you have terabytes of time series data, deciding how you will process it becomes more important than the issue of storage. MySQL serves well for storing such data but the complexity arises when we have to perform complex calculations or data mining operations on this sequential data. The mapreduce framework is designed to handle this kind of data well, but getting MySQL to do mapreduce-like processing is not supported unless you have access to nPath. The other solution is to get this data into an existing mapreduce framework like Hadoop. In a recent hadoop release (0.19), mapreduce jobs have the ability to take the input from databases [link]. Recently, I tried interfacing hadoop with MySQL and although it was an easy task, I did not find much documentation on the topic. So in this post I intend to outline the way you can get Hadoop talking to MySQL.

Lets try to implement Hadoop’s ‘Hello World’ (Word Count) example. The MySQL table is as follows:

CREATE TABLE `wordcount` (
`word` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL

We need a class that implements the map and reduce tasks. Lets call this class WordCount. This class needs to extend Configured and implement the Tool interface.

public class WordCount extends Configured implements Tool {

We need to implement the Tool interface to parse the generic options. This is needed as we will be passing the mysql-connector jar via command line argument (-libjar) to hadoop. This jar will become part of the custom configuration for the WordCount class. Thus, the WordCount class needs to be configurable as well. This is done by extending Configured.

Tuples/rows from the DB are converted to Java objects. Thus we need to define a class that would hold the tuples. All such classes need to implement the Writable and DBWritable interfaces. Typically every table that we want to read/write needs to be represented by a class implementing the above interfaces. We will be dealing with reading tables, hence only the read functions are overridden.

   static class WordRecord implements Writable, DBWritable {
        String word;
        int count;

        public void write(DataOutput arg0) throws IOException {
            throw new UnsupportedOperationException("Not supported yet.");

       public void readFields(DataInput in) throws IOException {
            this.word = Text.readString(in);
            this.count = in.readInt();

        public void write(PreparedStatement arg0) throws SQLException {
            throw new UnsupportedOperationException("Not supported yet.");

        public void readFields(ResultSet rs) throws SQLException {
            this.word = rs.getString(1);
            this.count = rs.getInt(2);

Once we’re done with this, its time to define the map and reduce operations. In the map function, we are just outputting the tuple, with the word as the key and its count as the value.

    static class WordCountMapper extends MapReduceBase
            implements Mapper {

        public void map(LongWritable key, WordRecord value,
                OutputCollector output, Reporter reporter)
                throws IOException {

            output.collect(new Text(value.word), new LongWritable(value.count));

In the reduce function, we are summing up the counts associated for a given word and outputting the [word , sum].

    static class WordCountReducer extends MapReduceBase
            implements Reducer {

        public void reduce(Text key, Iterator values,
                OutputCollector output, Reporter reporter)
                throws IOException {

            long sum = 0L;
            while (values.hasNext()) {
                sum +=;
            output.collect(key, new LongWritable(sum));

Since we implemented the Tool interface, we need the WordCount class to implement the run function. In this function, we will specify the job configuration, configure the mapper and reducer classes, configure the DB, set the input and output to the job etc.

public int run(String[] arg0) throws Exception {
        // the getConf method is implemented by Configured - this way we can pass the generic options to the job
        JobConf job = new JobConf(getConf(), WordCount.class);

        job.setJobName("word count job");

        // set the mapper and reducer classes

        // configure the DB - provide the driver class, provide the mysql host and db name in the url
        DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/mysqlhadoop");

        // define the fields you want to access from the table
        String[] fields = {"word", "count"};

        // specify which class represents the tuple as well as the table to be accessed (in this case 'wordcount')
        // alternatively we can also specify a SQL query (which here is null)
        // we are sorting the results by the field 'word'
        DBInputFormat.setInput(job, WordRecord.class, "wordcount", null, "word", fields);



        // write the final results to a folder in HDFS
        // alternatively we can also write the output back to mysql using DBOutputFormat
        FileOutputFormat.setOutputPath(job, new Path("output_wordcount);


        return 0;

And now finally we can define the main function as

    public static void main(String args[]) throws Exception {
        int ret = WordCount(), args);

To see how the above pieces fit together take a look at run this job we need to provide the mysql-connector jar in the classpath. This can done by either placing this JAR in the $HADOOP_HOME/lib or by providing this JAR in the command line as follows:

$ ==> hadoop jar wordcount.jar WordCount -libjars mysql-connector-java-5.1.7-bin.jar

Its that easy to get hadoop talking to MySQL and you are ready to do some heavy number crunching. Get hold of here.