用Python编写MapReduce的WordCount实例程序

条件,假设hadoop环境和python环境已经安装好,且hadoop已正常启动

Hadoop Streaming提供了编写MapReduce程序的map和reduce的一种方式,Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出)来实现的。我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据.下面介绍如何用Python编写一个WordCount实例程序。

map过程的python源文件wc_map.py:

#!/usr/bin/env python

import sys

for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '%s %d' % (word,1)
reduce过程的python源文件wc_red.py:

#!/usr/bin/env python

import sys

wdict = {}
for line in sys.stdin:
    line = line.strip()
    if len(line.split()) != 2:
        continue
    word, count = line.split()
    try:
        if wdict.has_key(word) == False:
            wdict[word] = 0
        count = int(count)
        wdict[word] += count
    except Exception:
        pass

for key in wdict.keys():
    print '%s %d' % (key, wdict[key])


测试Python代码(cat data | map | sort | reduce)

建议在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果
这里有一些建议,关于如何测试你的Map和Reduce的功能:

测试map:

$ echo "foo foo quux labs foo bar quux" | ./wc_map.py
foo	1
foo	1
quux	1
labs	1
foo	1
bar	1
quux	1
测试reduce:

$ echo "foo foo quux labs foo bar quux" | ./wc_map.py | sort | ./wc_red.py 
labs 1
quux 2
foo 3
bar 1

上例子也可以直接传入文件进行测试。


提交MapReduce Job

首先得上传一个test文件到HDFS上进行测试

$ hadoop jar /home/hadoop/soft/hadoop/contrib/streaming/hadoop-streaming-1.0.4.jar \
> -mapper ./wc_map.py \ #指定reduce过程处理的python程序
> /wc_red.py \#指定combiner过程处理的python程序
> -reducer ./wc_red.py \ #指定reduce过程处理的python程序
> -input /user/hadoop/test \ #输入文件在HDFS的位置
> -output /user/hadoop/test_r \ #输出结果位置
> -file ./wc_map.py \
> -file ./wc_red.py -combiner.
注意,wc_map.py和wc_red.py在命令中出现了两次,第一次是告诉Hadoop要执行着两个文件,第二次是告诉Hadoop把这两个文件分发给集群的所有节点。可以通过设置 -jobconf mapred.reduce.tasks=10来设置reduce过程任务的数目。

终端输入如下所示:

packageJobJar: [./wc_map.py, ./wc_red.py, /tmp/hadoop-hadoop/hadoop-unjar6964169209196045728/] [] /tmp/streamjob161475460684037667.jar tmpDir=null
13/07/11 14:56:44 INFO mapred.FileInputFormat: Total input paths to process : 1
13/07/11 14:56:45 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-hadoop/mapred/local]
13/07/11 14:56:45 INFO streaming.StreamJob: Running job: job_201307110942_0006
13/07/11 14:56:45 INFO streaming.StreamJob: To kill this job, run:
13/07/11 14:56:45 INFO streaming.StreamJob: /home/hadoop/soft/hadoop-1.0.0/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201307110942_0006
13/07/11 14:56:45 INFO streaming.StreamJob: Tracking URL: http://localhost.localdomain:50030/jobdetails.jsp?jobid=job_201307110942_0006
13/07/11 14:56:46 INFO streaming.StreamJob:  map 0%  reduce 0%
13/07/11 14:57:03 INFO streaming.StreamJob:  map 50%  reduce 0%
13/07/11 14:57:06 INFO streaming.StreamJob:  map 100%  reduce 0%
13/07/11 14:57:18 INFO streaming.StreamJob:  map 100%  reduce 100%
13/07/11 14:57:24 INFO streaming.StreamJob: Job complete: job_201307110942_0006
13/07/11 14:57:24 INFO streaming.StreamJob: Output: /user/hadoop/test_rc
job运行结束后可以在HDFS上看到新产生一个输出文件夹:

$ hadoop fs -ls /user/hadoop
drwxr-xr-x   - hadoop supergroup          0 2013-07-11 14:26 /user/hadoop/test_r
...
通过命令查看结果:

$ hadoop fs -text test_r/part-00000
...
gap 2	
Dover 2	
HA-cluster 2	
hp-status 2
结果第一列为word,第二列为出现次数,中间空格隔开。








参考:http://www.cnblogs.com/end/archive/2012/08/13/2636175.html

            http://www.oschina.net/translate/a-guide-to-python-frameworks-for-hadoop

相关文章
相关标签/搜索