hadoop 提供了 3 种方式来实现自己的功能,一种是 java 接口,另一种是 pipes,还有一种是 streaming。streaming 允许用任何语言来开发 mapper 和 reducer,它们和 hadoop 之间的交互通过标准输入/输出来进行。这种方式的好处是调试很方便。
先来看一个使用 bash 编写的 wordcount 的例子:
#!/bin/bash
while read line; do
arr=($line)
let i=0
while ! [ -z ${arr[$i]} ]; do
echo -e "${arr[$i]}\t1"
let i=$i+1
done
done
#!/bin/bash
let sum=0
while read word count; do
if [ "$word" == "$last" ]; then
let sum=$sum+$count
else
if [ $sum -gt 0 ]; then
echo -e "$last\t$sum"
fi
last=$word
sum=$count
fi
done
if [ $sum -gt 0 ]; then
echo -e "$last\t$sum"
fi
运行前先把输入文件上传到 hdfs:
hadoop dfs -put input input
然后提交:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
-input input -output output \
-mapper mapper -reducer reducer \
-file mapper -file reducer
提交的时候要使用“-file”选项告诉 hadoop 把 mapper 和 reducer 的可执行文件打包到 job 中,否则查看 logs 会有“Cannot run program "mapper": java.io.IOException: error=2, No such file or directory”这样的错误,即使预先把 mapper 和 reducer 上传到 hdfs 中也没用。
mapper 收到的内容是文件的一行,拆分后输出的每对 {key, value} 单独一行,key 和 value 之间使用“\t”分隔。hadoop 把每行的第一个“\t”之前的部分认为是 key,后面的作为 value,如果没有“\t”的话认为整行都是 key 而没有 value。
reducer 收到的 {key, value} 也是每行单独一对,key 和 value 之间以“\t”分隔。key 相同的内容会以连续的 {key, value1},{key, value2},... 的形式传输给 reducer。由于每个 reducer 可能接收到若干个不同的 key 及其相关的内容,因此要分别把各个 key 的内容收集全了才能输出。因为 key 已经有序,当发现 key 不一样时说明上一个 key 的结果已经收集完,经处理后就可以输出结果了,而不需要把所有的结果都保存起来,等到最后才一并输出。reducer 输出的格式和 mapper 的一样。
由于 mapper 和 reducer 都是可执行文件,因此在本地运行
$ cat input | ./mapper | sort | ./reducer
得到的结果应该和使用 hadoop 的结果一致。
如果需要修改配置项同样可以通过“-D”来指定,但是要注意这些“-D”选项要位于其它所有选项之前。例如指定使用 NLineInputFormat,每次读入 1 行:
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
-D mapred.line.input.format.linespermap=1 \
-inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
-input input -output output \
-mapper mapper -reducer reducer \
-file mapper -file reducer
根据 inputformat 的不同,mapper 得到的 {key, value} 格式也不同,具体的格式可以看看参考资料 [1] 中关于 inputformat 的章节。
最后贴一个用 c++ 写的例子,编译成可执行文件后运行方法和上面的一样,和 pipes 的比较一下:
#include <iostream>
#include <string>
using namespace std;
int main(void)
{
string word;
while (cin >> word)
cout << word << "\t1" << endl;
return 0;
}
#include <iostream>
#include <string>
using namespace std;
int main(void)
{
string last, word;
int count, sum = 0;
while (cin >> word >> count) {
if (word == last)
sum += count;
else {
if (sum > 0)
cout << last << "\t" << sum << endl;
last = word;
sum = count;
}
}
if (sum > 0)
cout << last << "\t" << sum << endl;
return 0;
}
参考资料
[1] Tom White. Hadoop: The Definitive Guide.