hadoop笔记 (4):streaming

hadoop 提供了 3 种方式来实现自己的功能,一种是 java 接口,另一种是 pipes,还有一种是 streaming。streaming 允许用任何语言来开发 mapper 和 reducer,它们和 hadoop 之间的交互通过标准输入/输出来进行。这种方式的好处是调试很方便。

先来看一个使用 bash 编写的 wordcount 的例子:

#!/bin/bash

while read line; do
    arr=($line)
    let i=0
    while ! [ -z ${arr[$i]} ]; do
        echo -e "${arr[$i]}\t1"
        let i=$i+1
    done
done
#!/bin/bash

let sum=0

while read word count; do
    if [ "$word" == "$last" ]; then
        let sum=$sum+$count
    else
        if [ $sum -gt 0 ]; then
            echo -e "$last\t$sum"
        fi  
        last=$word
        sum=$count
    fi  
done

if [ $sum -gt 0 ]; then
    echo -e "$last\t$sum"
fi

运行前先把输入文件上传到 hdfs:

hadoop dfs -put input input

然后提交:

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
           -input input -output output \
           -mapper mapper -reducer reducer \
           -file mapper -file reducer

提交的时候要使用“-file”选项告诉 hadoop 把 mapper 和 reducer 的可执行文件打包到 job 中,否则查看 logs 会有“Cannot run program "mapper": java.io.IOException: error=2, No such file or directory”这样的错误,即使预先把 mapper 和 reducer 上传到 hdfs 中也没用。

mapper 收到的内容是文件的一行,拆分后输出的每对 {key, value} 单独一行,key 和 value 之间使用“\t”分隔。hadoop 把每行的第一个“\t”之前的部分认为是 key,后面的作为 value,如果没有“\t”的话认为整行都是 key 而没有 value。

reducer 收到的 {key, value} 也是每行单独一对,key 和 value 之间以“\t”分隔。key 相同的内容会以连续的 {key, value1},{key, value2},... 的形式传输给 reducer。由于每个 reducer 可能接收到若干个不同的 key 及其相关的内容,因此要分别把各个 key 的内容收集全了才能输出。因为 key 已经有序,当发现 key 不一样时说明上一个 key 的结果已经收集完,经处理后就可以输出结果了,而不需要把所有的结果都保存起来,等到最后才一并输出。reducer 输出的格式和 mapper 的一样。

由于 mapper 和 reducer 都是可执行文件,因此在本地运行

$ cat input | ./mapper | sort | ./reducer

得到的结果应该和使用 hadoop 的结果一致。

如果需要修改配置项同样可以通过“-D”来指定,但是要注意这些“-D”选项要位于其它所有选项之前。例如指定使用 NLineInputFormat,每次读入 1 行:

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-*.jar \
           -D mapred.line.input.format.linespermap=1 \
           -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat \
           -input input -output output \
           -mapper mapper -reducer reducer \
           -file mapper -file reducer

根据 inputformat 的不同,mapper 得到的 {key, value} 格式也不同,具体的格式可以看看参考资料 [1] 中关于 inputformat 的章节。

最后贴一个用 c++ 写的例子,编译成可执行文件后运行方法和上面的一样,和 pipes 的比较一下:

#include <iostream>
#include <string>
using namespace std;

int main(void)
{
    string word;

    while (cin >> word)
        cout << word << "\t1" << endl;

    return 0;
}
#include <iostream>
#include <string>
using namespace std;

int main(void)
{
    string last, word;
    int count, sum = 0;

    while (cin >> word >> count) {
        if (word == last)
            sum += count;
        else {
            if (sum > 0)
                cout << last << "\t" << sum << endl;

            last = word;
            sum = count;
        }   
    }   

    if (sum > 0)
        cout << last << "\t" << sum << endl;

    return 0;
}

参考资料

[1] Tom White. Hadoop: The Definitive Guide.

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注