Simple Moving Average with Hadoop
Given
y(t) = [x(t) + x(t - 1) + x(t - 2) + ... + x(t - N + 1)] / N where t = time, N = number of values / points for computation.
From the equation, each x(t) will be used in N calculations,
y(t) = [x(t) + x(t - 1) + x(t - 2) + ... + x(t - N + 1)] / N y(t + 1) = [x(t + 1) + x(t) + x(t - 1) + ... + x(t - N + 2)] / N y(t + 2) = [x(t + 2) + x(t + 1) + x(t) + ... + x(t - N + 3)] / N : y(t + N - 1) = [x(t + N - 1) + x(t + N - 2) + x(t + N - 3) + ... + x(t)] / N
To get a input data (time-series data) for our tutorial, we can visit http://finance.yahoo.com/q/hp?s=YHOO and download it.
package com.company;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
public class Main {
private static final int N = 5;
public static class MovingAverageMapper extends Mapper<LongWritable, Text, Text, FloatWritable> {
public void map(
LongWritable key,
Text value,
Context context) throws IOException, InterruptedException {
//
// http://finance.yahoo.com/q/hp?s=YHOO
//
// Date,Open,High,Low,Close,Volume,Adj Close
// 2013-09-13,29.47,29.47,28.80,29.26,13836600,29.26
// 2013-09-12,29.72,30.27,29.50,29.65,22018600,29.65
// 2013-09-11,29.38,29.41,28.97,29.19,10345700,29.19
// 2013-09-10,29.43,29.63,29.08,29.48,12992600,29.48
// 2013-09-09,28.32,29.32,28.32,29.24,21167700,29.24
// 2013-09-06,28.35,28.50,27.82,28.17,10807500,28.17
// 2013-09-05,28.10,28.35,27.91,28.23,8989600,28.23
//
String line = value.toString();
String[] cols = StringUtils.getStrings(line);
String dateString = cols[0];
Float adjClose = Float.parseFloat(cols[6]);
// I use joda Time to calculate possible keys for each adjClose value
DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
DateTime dateTime = DateTime.parse(dateString, dateTimeFormatter);
System.out.println("dateString:" + dateString);
for (int i = 0; i < N; i++) {
DateTime newDateTime = dateTime.plusDays(i);
String newDateString = newDateTime.toString(dateTimeFormatter);
System.out.println("newDateString:" + newDateString);
context.write(new Text(newDateString), new FloatWritable(adjClose));
}
}
}
public static class MovingAverageReducer extends Reducer<Text, FloatWritable, Text, FloatWritable> {
public void reduce(
Text key,
Iterable<FloatWritable> values,
Context context) throws IOException, InterruptedException {
Float sum = 0.0f;
int n = 0;
for (FloatWritable value : values) {
sum += value.get();
n++;
}
Float movingAverage = sum / n;
context.write(key, new FloatWritable(movingAverage));
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(
String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
String inputPath = otherArgs[0];
String outputPath = otherArgs[1];
Job job = new Job(configuration, "moving average");
job.setJarByClass(Main.class);
job.setMapperClass(MovingAverageMapper.class);
job.setReducerClass(MovingAverageReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FloatWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}