hadoop笔记 (2):pipes例子分析 (1)

Pipes 是 hadoop 提供的 c++ 接口,但是在官网上找不到 pipes 的文档,只能从例子开始一点点摸索。实验环境是 debian 6 amd64,hadoop 1.0.3。hadoop 的安装目录是 $HOME/hadoop,安装和配置过程在上一篇安装笔记中有提到。

为了少敲些字符,给 hadoop 命令做了一个 alias:

alias hadoop='$HOME/hadoop/bin/hadoop'

单词统计程序

下面的程序是对 hadoop 1.0.3 自带的单词统计程序(src/examples/pipes/impl/wordcount-simple.cc)的一个修改版:

#include <string>
#include <vector>
using namespace std;

#include <hadoop/Pipes.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/TemplateFactory.hh>

class WordCountMapper : public HadoopPipes::Mapper {

    public:

        WordCountMapper(HadoopPipes::TaskContext& context) {}

        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};

class WordCountReducer : public HadoopPipes::Reducer {

    public:

        WordCountReducer(HadoopPipes::TaskContext& context) {}

        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;

            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());

            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};

int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer>());
}

先看看编译程序的 Makefile:

CXX := g++ 
CXXFLAGS := -g -Wall
INCLUDE := -I$(HOME)/hadoop/c++/Linux-amd64-64/include
LIBS := -L$(HOME)/hadoop/c++/Linux-amd64-64/lib -lhadooppipes -lhadooputils -lpthread -lcrypto

TARGET := wordcount

.PHONY: all clean

all: $(TARGET)

wordcount: wordcount.o
        $(CXX) $(CXXFLAGS) $^ -o $@ $(LIBS)

.cpp.o:
        $(CXX) $(CXXFLAGS) -c $< $(INCLUDE)

clean:
        rm -f $(TARGET) *.o

链接时要加上“-lcrypto”,安装 libssl-dev,如果不加这个就会报错“undefined reference to EVP_*'(undefined reference toHMAC_'或undefined reference to `BIO_')”。

编译完后把生成的可执行文件 wordcount 上传到 hdfs 中:

hadoop dfs -put wordcount wordcount

然后准备测试文件 wordcount-input(随便输入一些单词),并且把测试文件也上传到 hdfs 中:

hadoop dfs -put wordcount-input wordcount-input

上传完后看看是否上传成功:

hadoop@debian:wordcount$ hadoop dfs -ls
Found 2 items
-rw-r--r--   1 hadoop supergroup     261399 2012-08-12 10:56 /user/hadoop/wordcount
-rw-r--r--   1 hadoop supergroup         42 2012-08-12 10:56 /user/hadoop/wordcount-input

一切准备好之后就可以运行程序了:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=true \
           -D hadoop.pipes.java.recordwriter=true \
           -input wordcount-input -output wordcount-output \
           -program wordcount

在运行过程中 hadoop 会输出进度和其它的一些相关信息。运行完的结果在 hdfs 的 wordcount-output 目录下:

hadoop@debian:wordcount$ hadoop dfs -ls wordcount-output
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2012-08-12 10:58 /user/hadoop/wordcount-output/_SUCCESS
drwx------   - hadoop supergroup          0 2012-08-12 10:57 /user/hadoop/wordcount-output/_logs
-rw-r--r--   1 hadoop supergroup         33 2012-08-12 10:57 /user/hadoop/wordcount-output/part-00000

最后查看运行结果:

hadoop dfs -cat wordcount-output/part-00000

和 java 版本类似,pipes 要求至少有 mapper 和 reducer 两个类:WordCountMapper 和 WordCountReducer,分别继承 HadoopPipes::Mapper 和 HadoopPipes::Reducer,并分别实现其中的纯虚函数 map() 和 reduce()。具体需要实现的接口可以参考头文件(c++/Linux-amd64-64/include/hadoop/Pipes.hh)。

在 c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh 中看到,除了必须实现的 mapper 和 reducer 类外,还可以指定 partitioner,combiner,reader 和 writer。如果不指定的话就使用默认的,在执行程序的命令中指定“hadoop.pipes.java.recordreader”和“hadoop.pipes.java.recordwriter”为 true 表示使用 hadoop 默认的 reader 和 writer。为了简化讨论,在这个简单的程序中可以认为只有五个步骤:reader -> mapper -> shuffle -> reducer -> writer。

程序使用默认的 RecordReader 为 LineRecordReader,即把输入文件按行拆分成 {行偏移量, 内容} 这样的 {key, value},然后传递给 map() 函数。

在 WordCountMapper 的 map() 中,使用 context.getInputKey() 可以获取输入的 key,由于程序中不关心行号,因此并没有使用;context.getInputValue() 就是文件中某一行的内容。words 保存了按照空格拆分后的所有单词,并通过 context.emit() 函数返回每个单词及其出现的次数给下一阶段。

shuffle 阶段是由 hadoop 替我们完成的。hadoop 会把 map() 输出的 {单词, 出现次数} 这样的 key/value 对进行排序和合并,并且把结果传给 reducer。

WordCountReducer 中的 reducer() 函数收到了经过排序和合并后的结果:{key: value1, value2, ...}。每个 reducer 收到的内容都是一个 key 以及和这个 key 相关的 values,在这个程序中 key 就是一个单词,values 就是每个单词出现的次数,因此程序中通过 context.nextValue() 获取所有出现次数并累加,最后通过 context.emit() 把统计得到的 {单词, 出现次数} 结果传给 writer。

程序使用默认的 RecordWriter 为 LineRecordWriter,输出格式为“key\tvalue\n”,因此最终写到文件的结果是每个单词及其出现次数作为单独一行。

参考资料

[1] Tom White. Hadoop: The Definitive Guide.
[2] Hadoop pipes编程

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注