hadoop笔记 (3):pipes例子分析 (2)

使用 partitioner 和 combiner

下面的程序是对 hadoop 1.0.3 自带例子(src/examples/pipes/impl/wordcount-part.cc)的一个修改版:

#include <string>
#include <vector>
using namespace std;

#include <hadoop/Pipes.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/TemplateFactory.hh>

class WordCountMapper : public HadoopPipes::Mapper {

    public:

        WordCountMapper(HadoopPipes::TaskContext& context) {}

        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};

class WordCountReducer : public HadoopPipes::Reducer {

    public:

        WordCountReducer(HadoopPipes::TaskContext& context) {}

        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;

            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());

            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};

class WordCountPartitioner: public HadoopPipes::Partitioner {

    public:

        WordCountPartitioner(HadoopPipes::TaskContext& context) {}

        int partition(const string& key, int numOfReduces)
        {
            return (simple_hash(key.c_str(), key.size()) % numOfReduces);
        }

    private:

        unsigned long simple_hash(const char* str, unsigned len)
        {
            const char* end;
            unsigned long hash = 0;

            for (end = str + len; str < end; ++str)
                hash += (*str);

            return hash;
        }
};

int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer,
                                WordCountPartitioner, WordCountReducer>());
}

编译程序的 Makefile 和前面的例子相似,只需把相应的文件名修改一下即可。

程序中的 mapper 和 reducer 不变,多了两个部分:partitioner 和 combiner(和 reducer 相同)。partitioner 的作用是指定由 mapper 产生的某个 key 被分配到哪个 reducer 上,它只有一个接口 partition(),参数是 key 和 reducer 的个数 numOfReduces,返回值范围是 [0, numOfReduces - 1],表示与 key 相关联的结果被分到哪个 reducer 上。combiner 的作用是先在本地对由 mapper 输出的相同 key 的结果先做一次合并,得到的合并结果再传给 reducer。对于这个单词统计程序来说,使用的 combiner 就是 reducer,即先对本地的 key 出现次数进行统计,再把统计结果传给 reducer。当本地输出的键值对较多但是 key 个数很少时,这样可以大大减少从 mapper 到 reducer 的数据传输量,同时也减轻了 reducer 的工作量。

运行前需要把可执行文件和输入数据上传到 hdfs:

hadoop dfs -put wordcount-parcom wordcount-parcom
hadoop dfs -put wordcount-parcom-input wordcount-parcom-input

然后运行命令:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=true \
           -D hadoop.pipes.java.recordwriter=true \
           -reduces 4 \
           -input wordcount-parcom-input \
           -output wordcount-parcom-output \
           -program wordcount-parcom

这里我们启动了 4 个 reducer 来验证最终 partitioner 的拆分功能。在结果目录 wordcount-parcom-output 中的输出包括 4 个文件,part-00000,...,part-00003。其中的编号对应于 partitioner 返回值中的编号。

自定义 reader 和 writer

下面的程序是对 hadoop 1.0.3 自带例子(src/examples/pipes/impl/wordcount-nopipe.cc)的一个修改版:

#include <iostream>
#include <fstream>
#include <vector>
#include <string>
using namespace std;

#include <hadoop/Pipes.hh>
#include <hadoop/TemplateFactory.hh>
#include <hadoop/StringUtils.hh>
#include <hadoop/SerialUtils.hh>

#include <sys/stat.h>
#include <sys/types.h>

class WordCountMapper : public HadoopPipes::Mapper {

    public:

        WordCountMapper(HadoopPipes::TaskContext& context) {}

        void map(HadoopPipes::MapContext& context)
        {
            vector<string> words = HadoopUtils::splitString(context.getInputValue(), " ");
            for(unsigned int i = 0; i < words.size(); ++i)
                context.emit(words[i], "1");
        }
};

class WordCountReducer : public HadoopPipes::Reducer {

    public:

        WordCountReducer(HadoopPipes::TaskContext& context) {}

        void reduce(HadoopPipes::ReduceContext& context)
        {
            int sum = 0;

            while (context.nextValue())
                sum += HadoopUtils::toInt(context.getInputValue());

            context.emit(context.getInputKey(), HadoopUtils::toString(sum));
        }
};

class WordCountReader: public HadoopPipes::RecordReader {

    private:

        ifstream ifs;
        uint64_t bytesTotal;

    public:

        WordCountReader(HadoopPipes::MapContext& context)
        {
            string filename;
            HadoopUtils::StringInStream stream(context.getInputSplit());
            HadoopUtils::deserializeString(filename, stream);

            ifs.open(filename.c_str(), ios::in);
            HADOOP_ASSERT(ifs.is_open(), "failed to open " + filename);

            ifs.seekg(0, ios::end);
            bytesTotal = ifs.tellg();

            ifs.seekg(0, ios::beg);
        }

        ~WordCountReader()
        {
            ifs.close();
        }

        virtual bool next(string& key, string& value)
        {
            key = HadoopUtils::toString(ifs.tellg()); // line offset
            getline(ifs, value);

            return !ifs.eof();
        }

        /**
         * The progress of the record reader through the split as a value between
         * 0.0 and 1.0.
         */
        virtual float getProgress()
        {
            if (bytesTotal > 0)
                return (float)(ifs.tellg()) / bytesTotal;

            return 1.0f;
        }
};

class WordCountWriter: public HadoopPipes::RecordWriter {

    private:

        ofstream ofs;

    public:

        WordCountWriter(HadoopPipes::ReduceContext& context)
        {
            const HadoopPipes::JobConf* job = context.getJobConf();
            int part = job->getInt("mapred.task.partition");
            string outDir = job->get("mapred.work.output.dir");

            // remove the file: schema substring
            string::size_type posn = outDir.find(":");
            HADOOP_ASSERT(posn != string::npos, "no schema found in output dir: " + outDir);

            outDir.erase(0, posn + 1);
            mkdir(outDir.c_str(), 0755);
            string outFile = outDir + "/part-" + HadoopUtils::toString(part);

            ofs.open(outFile.c_str(), ios::out);
            HADOOP_ASSERT(ofs.is_open(), "can't open file for writing: " + outFile);
        }

        ~WordCountWriter()
        {
            ofs.close();
        }

        void emit(const string& key, const string& value)
        {
            ofs << key << " -> " << value << endl;
        }
};

int main(void)
{
    return HadoopPipes::runTask(HadoopPipes::TemplateFactory<WordCountMapper, WordCountReducer,
                                void, void, WordCountReader, WordCountWriter>());
}

编译程序的 Makefile 和前面的例子相似,只需把相应的文件名修改一下即可。

由于重新实现的 reader 和 writer 是读写本地数据而不是 hdfs 上的数据,因此要保证每个 slave 上同样的位置上有相同的副本。为了方便,我在 master 上建立了一个目录 $HOME/shareddata 作为 NFSDIR,然后把程序和测试数据复制到这个目录中:

cp wordcount-nopipe $NFSDIR
cp wordcount-nopipe-input $NFSDIR

其它的 slaves 通过 nfs 挂载这个目录即可实现共享(nfs 和 java 都是好东西,SUN 是一家伟大的公司)。

另外运行的时候还需要指定 inputformat 为 WordCountInputFormat(参考资料 [1]),这一项需要 $HADOOP_HOME/hadoop-test-*.jar。

一切准备就绪后就可以提交程序了:

hadoop pipes \
           -D hadoop.pipes.java.recordreader=false \
           -D hadoop.pipes.java.recordwriter=false \
           -D mapred.job.name=wordcount-nopipe \
           -D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \
           -libjars $HADOOP_HOME/hadoop-test-$HADOOP_VERSION.jar \
           -input file://$NFSDIR/wordcount-nopipe-input \
           -output file:///tmp/wordcount-nopipe-output \
           -program file://$NFSDIR/wordcount-nopipe

这里有几个地方不同:一是把“hadoop.pipes.java.recordreader”和“hadoop.pipes.java.recordwriter”都设为 false,程序使用自定义的 reader 和 writer;二是指定了 inputformat;三是 input,output 和 program 三个选项的路径前都加了前缀“file://”,表示本地操作而不是默认的 hdfs。

说完了运行方法后回过头来看看程序的内容。其中的 WordCountMapper 和 WordCountReducer 和第一个程序中的是一样的,不同点在于重新实现了 reader 和 writer,并且在启动任务的时候注册自定义的 reader 和 writer(具体的接口可以查看 c++/Linux-amd64-64/include/hadoop/TemplateFactory.hh)。

reader 通过 context.getInputSplit() 获取 inputformat 划分好的分片内容。每个 inputformat 对于数据的划分不一样,传递给 reader 的内容也不相同,在这里 WordCountInputFormat 传递给 WordCountReader 的是文件名。

自定义的 reader 需要实现两个接口:next 和 getProgress。其中 next() 每次返回一对 {key, value},如果到达分片末尾则返回 false;getProgress() 返回一个位于 [0.0, 1.0] 之间的浮点数,表示当前处理进度。WordCountReader 从 WordcontInputFormat 中接收输入文件的路径,然后每次读取一行,把行偏移量作为 key,行内容作为 value,传递给 WordCountMapper。

接下来 mapper 和 reducer 的工作和上一个例子一样。

reducer 把最终的 {key, value} 传递给下一阶段,也就是 writer。在 WordCountWriter 中,通过 JobConf 获取输出目录(命令中“-output”后跟的路径),然后把最后的结果写到文件中。由于运行的时候不知道任务会被分配给哪个 slave,因此输出文件也不确定在具体某台机器,只能一个个地找……

输入格式(参考资料 [2])

hadoop 对输入的拆分是由 inputformat 来完成的。inputformat 把文件拆分成一个个的分片(split),每个 mapper 只处理一个分片(因此有多少个 split 就有多少个 mapper)。reader 的作用是把分片拆成一个个的 {key, value} 形式,然后传递给 mapper;最后 reducer 处理完后再把结果传递给 writer 输出到文件中。

在示例程序中用到的是一个专门用来测试的 WordCountInputFormat,源代码在 src/test/org/apache/hadoop/mapred/pipes/WordCountInputFormat.java,它给 reader 传递的就是在命令行中传入的文件路径。这里的 WordCountReader 打开文件,并且把内容按行传递给 mapper。

在最后的 WordCountWriter 中通过 JobConf 获取输出路径(由运行时的“-output”指定),并且把结果保存在指定目录下。

参考资料 [2] 中列举了 hadoop 提供的几种 inputformat 的特点,例如 TextInputFormat 输出的是 {行偏移, 行内容} 这样的键值对,NLineInputFormat 输出的也是 {行偏移, 行内容},只是它每次都输出 N 行(N由“mapred.line.input.format.linespermap”指定)。但是不清楚这些输出格式传到 mapper 之后是如何表示的,网上没找到相应的文档,看代码也理不清。

最后给一个自己理解的 hadoop 各个模块的功能示意图:

                                             +--------------+         +---------------+
                                             | +----------+ |         | +-----------+ |
                                             | | rk1, rv1 | |         | | mk1, mv11 | |
                                             | +----------+ |         | +-----------+ |
                                             | +----------+ |         | +-----------+ |
                           +-------+ reader  | | rk2, rv2 | | mapper  | | mk2, mv21 | |
                     +---> | split | ------> | +----------+ | ------> | +-----------+ | ---+
                     |     +-------+         |      ...     |         |       ...     |    |           +--------------------------+
                     |                       | +----------+ |         | +-----------+ |    |           | +----------------------+ |
                     |                       | | rkn, rvn | |         | | mkn, mvn1 | |    |           | | mk1: mv11, mv12, ... | |
                     |                       | +----------+ |         | +-----------+ |    |           | +----------------------+ |
                     |                       +--------------+         +---------------+    |           | +----------------------+ |
                     |                                                                     | combiner  | | mk2: mv21, mv22, ... | |
                     |                                                                     + --------> | +----------------------+ | ---+
                     |                                                                     |           |            ...           |    |                                                          +--------------------------+
                     |                       +--------------+         +---------------+    |           | +----------------------+ |    |                                                          | +----------------------+ |
                     |                       | +----------+ |         | +-----------+ |    |           | | mkn, mvn1, mvn2, ... | |    |          +--------------------------+                    | | mk1: mv11, mv12, ... | |
                     |                       | | rk1, rv1 | |         | | mk1, mv11 | |    |           | +----------------------+ |    |          | +----------------------+ |                    | +----------------------+ | reducer  +--------+ writer  +--------+
                     |                       | +----------+ |         | +-----------+ |    |           +--------------------------+    |          | | mk1: mv11, mv12, ... | |              +---> |           ...            | -------> | result | ------> | output |
+-----+              |                       | +----------+ |         | +-----------+ |    |                                           |          | +----------------------+ |              |     | +----------------------+ |          +--------+         +--------+
|     | inputformat  |     +-------+ reader  | | rk2, rv2 | | mapper  | | mk2, mv21 | |    |                                           |          | +----------------------+ |              |     | | mkx: mvx1, mvx2, ... | |
|     | -----------> +---> | split | ------> | +----------+ | ------> | +-----------+ | ---+                                           |          | | mk2: mv21, mv22, ... | |              |     | +----------------------+ |
|     |              |     +-------+         |      ...     |         |       ...     |                                                |          | +----------------------+ |              |     |           ...            |
+-----+              |                       | +----------+ |         | +-----------+ |                                                | shuffle  |           ...            | partitioner  |     +--------------------------+
                     |                       | | rkn, rvn | |         | | mkn, mvn1 | |                                                + -------> | +----------------------+ | -----------> +
                     |         .             | +----------+ |         | +-----------+ |                                                |          | | mkx: mvx1, mvx2, ... | |              |
                     |         .             +--------------+         +---------------+                                                |          | +----------------------+ |              |     +--------------------------+
                     |         .                                                                                                       |          | +----------------------+ |              |     | +----------------------+ |
                     |         .                                                                                                       |          | | mky: mvy1, mvy2, ... | |              |     | | mk2: mv21, mv22, ... | |
                     |         .                                                                                                       |          | +----------------------+ |              |     | +----------------------+ | reducer  +--------+ writer  +--------+
                     |         .             +--------------+         +---------------+                +--------------------------+    |          |           ...            |              +---> |           ...            | -------> | result | ------> | output |
                     |                       | +----------+ |         | +-----------+ |                | +----------------------+ |    |          +--------------------------+                    | +----------------------+ |          +--------+         +--------+
                     |                       | | rk1, rv1 | |         | | mk1, mv11 | |                | | mk1: mv11, mv12, ... | |    |                                                          | | mky: mvy1, mvy2, ... | |
                     |                       | +----------+ |         | +-----------+ |                | +----------------------+ |    |                                                          | +----------------------+ |
                     |                       | +----------+ |         | +-----------+ |                | +----------------------+ |    |                                                          |           ...            |
                     |     +-------+ reader  | | rk2, rv2 | | mapper  | | mk2, mv21 | |    combiner    | | mk2: mv21, mv22, ... | |    |                                                          +--------------------------+
                     +---> | split | ------> | +----------+ | ------> | +-----------+ | -------------> | +----------------------+ | ---+
                           +-------+         |      ...     |         |       ...     |                |            ...           |
                                             | +----------+ |         | +-----------+ |                | +----------------------+ |
                                             | | rkn, rvn | |         | | mkn, mvn1 | |                | | mkn, mvn1, mvn2, ... | |
                                             | +----------+ |         | +-----------+ |                | +----------------------+ |
                                             +--------------+         +---------------+                +--------------------------+

参考资料

[1] Hadoop pipes编程
[2] Tom White. Hadoop: The Definitive Guide.

Comment (1)

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注