読者です 読者をやめる 読者になる 読者になる

parallel python + Simple MapReduce

BigData系のhadoopなどで有名になったMapReduceという処理体系がある。 大量のデータ処理をサーバ間を超えて並列に実行したい場合、それぞれのサーバ同期は最小限に抑えたほうがよい。MapReduceは、MapperとReducerという2つの処理を並列に実行することで、サーバ同期を最小限に抑えて実行できるものである。(と、僕は理解している)

これの簡単なpythonのサンプルが以下にある。

multiprocessing で MapReduce を実装する

これはこれで面白くって、非常に簡単なコードで並列処理ができるので、結構使っている。けれど、せっかくなので超並列MPPで、異なるサーバ間で実行できるようにしたくなる。

pythonの超並列実行モジュールとして、先のコードでも利用しているmultiprocessingでもできるのであろうが、非常に簡単に実装できるということで、Parallel pythonを使ってみる。

以下、コード。

import itertools
import collections
import pp

def forfunc(func,lst):
    out=[]
    for l in lst:
        out.append( func(l) )
    return out

class SimpleMapReduceServer(object):
    def __init__(slf, hosts, ncpus=None):
        slf.map_func = None
        slf.reduce_func = None
        if ncpus==None:
            slf.js = pp.Server(ppservers=hosts)
        else:
            slf.js = pp.Server(ncpus=ncpus,ppservers=hosts)

    def SetMapReduce(slf,map_func, reduce_func):
        slf.map_func = map_func
        slf.reduce_func = reduce_func

    def partition(slf, mapped_values):
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()


    def __call__(slf, inputs, map_chunksize=1,reduce_chunksize=100):
        map_jobs=[]
        for i in range(0,len(inputs),map_chunksize):
            ipt=inputs[i:i+map_chunksize]
            map_jobs.append(slf.js.submit(forfunc,(slf.map_func,ipt),(slf.map_func,)) )
        mapped_values=[]
        for process_out in [ j() for j in map_jobs]:
            mapped_values+=process_out
        partitioned_data = slf.partition(itertools.chain(*mapped_values))
        reduce_jobs=[]
        for i in range(0,len(partitioned_data),reduce_chunksize):
            pd = partitioned_data[i:i+reduce_chunksize]
            reduce_jobs.append(slf.js.submit(forfunc,(slf.reduce_func,pd),(slf.reduce_func,)) )
        slf.js.print_stats()
        reduced_values=[j() for j in reduce_jobs]
        return [o for o in itertools.chain(*reduced_values)]

コードをややこしくしているのは、chunksizeの存在。parallel python には、multiprocessing のmap関数のようなものがないようなので、chunksizeでリストを分割し、forfuncというリストを繰り返し実行する関数を介して、それぞれのmapfunc/reducefuncにアクセスするようにしている。# もともとのSimpleMapReduceでは、reduce処理のchunksizeは指定していなかったが、これを追加している。

実行の仕方は、multiprocessing_wordcountの以下の部分を書き換える。

# 上記のファイルをpp_mapreduce.pyとしたので
from pp_mapreduce import SimpleMapReduceServer 
~
    def file_to_words(filename):
    # サーバ間でプロセス実行するために 関数内でmoduleをインポートする
    import multiprocessing
    import string
~
    # 実際のアクセス方法は、以下のとおり
    ppservers=("192.168.1.1:60000","192.168.1.2:60000","192.168.1.3:60000")
    mapper = SimpleMapReduceServer( ppservers )
    mapper.SetMapReduce(file_to_words,count_words)
    word_counts = mapper(input_files)

これで実行すると、(kvm上の各ノード192.168.1.1~3で事前にppserver.pyを実行しておいた)

Job execution statistics:
 job count | % of all jobs | job time sum | time per job | job server
         3 |          0.78 |       0.4296 |     0.143188 | 192.168.1.2:60000
        30 |          7.81 |       0.3886 |     0.012954 | 192.168.1.1:60000
       340 |         88.54 |       2.9437 |     0.008658 | local
        11 |          2.86 |       0.3927 |     0.035704 | 192.168.1.3:60000
Time elapsed since server creation 2.09296107292
0 active tasks, 3 cores

成功!!(slf.js.print_stats()のところの出力。実行結果などは省略)

(parallel python は、CPU負荷を見てロードバランスしタスクプロセスを振り分けているらしいが、仮想kvmを使っているせいか、localの分量が多くなってしまった。。。)

これで、hdfsみたいな分散ファイルと、racawarenessみたいなタスク分散スケジューラが実装できれば、pythonhadoop(hadoopy??)みたいなことできるんじゃないか!!