parallel python + Simple MapReduce
BigData系のhadoopなどで有名になったMapReduceという処理体系がある。 大量のデータ処理をサーバ間を超えて並列に実行したい場合、それぞれのサーバ同期は最小限に抑えたほうがよい。MapReduceは、MapperとReducerという2つの処理を並列に実行することで、サーバ同期を最小限に抑えて実行できるものである。(と、僕は理解している)
これの簡単なpythonのサンプルが以下にある。
multiprocessing で MapReduce を実装する
これはこれで面白くって、非常に簡単なコードで並列処理ができるので、結構使っている。けれど、せっかくなので超並列MPPで、異なるサーバ間で実行できるようにしたくなる。
pythonの超並列実行モジュールとして、先のコードでも利用しているmultiprocessingでもできるのであろうが、非常に簡単に実装できるということで、Parallel pythonを使ってみる。
以下、コード。
import itertools
import collections
import pp
def forfunc(func,lst):
out=[]
for l in lst:
out.append( func(l) )
return out
class SimpleMapReduceServer(object):
def __init__(slf, hosts, ncpus=None):
slf.map_func = None
slf.reduce_func = None
if ncpus==None:
slf.js = pp.Server(ppservers=hosts)
else:
slf.js = pp.Server(ncpus=ncpus,ppservers=hosts)
def SetMapReduce(slf,map_func, reduce_func):
slf.map_func = map_func
slf.reduce_func = reduce_func
def partition(slf, mapped_values):
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(slf, inputs, map_chunksize=1,reduce_chunksize=100):
map_jobs=[]
for i in range(0,len(inputs),map_chunksize):
ipt=inputs[i:i+map_chunksize]
map_jobs.append(slf.js.submit(forfunc,(slf.map_func,ipt),(slf.map_func,)) )
mapped_values=[]
for process_out in [ j() for j in map_jobs]:
mapped_values+=process_out
partitioned_data = slf.partition(itertools.chain(*mapped_values))
reduce_jobs=[]
for i in range(0,len(partitioned_data),reduce_chunksize):
pd = partitioned_data[i:i+reduce_chunksize]
reduce_jobs.append(slf.js.submit(forfunc,(slf.reduce_func,pd),(slf.reduce_func,)) )
slf.js.print_stats()
reduced_values=[j() for j in reduce_jobs]
return [o for o in itertools.chain(*reduced_values)]
コードをややこしくしているのは、chunksizeの存在。parallel python には、multiprocessing のmap関数のようなものがないようなので、chunksizeでリストを分割し、forfuncというリストを繰り返し実行する関数を介して、それぞれのmapfunc/reducefuncにアクセスするようにしている。# もともとのSimpleMapReduceでは、reduce処理のchunksizeは指定していなかったが、これを追加している。
実行の仕方は、multiprocessing_wordcountの以下の部分を書き換える。
# 上記のファイルをpp_mapreduce.pyとしたので
from pp_mapreduce import SimpleMapReduceServer
~
def file_to_words(filename):
# サーバ間でプロセス実行するために 関数内でmoduleをインポートする
import multiprocessing
import string
~
# 実際のアクセス方法は、以下のとおり
ppservers=("192.168.1.1:60000","192.168.1.2:60000","192.168.1.3:60000")
mapper = SimpleMapReduceServer( ppservers )
mapper.SetMapReduce(file_to_words,count_words)
word_counts = mapper(input_files)
これで実行すると、(kvm上の各ノード192.168.1.1~3で事前にppserver.pyを実行しておいた)
Job execution statistics:
job count | % of all jobs | job time sum | time per job | job server
3 | 0.78 | 0.4296 | 0.143188 | 192.168.1.2:60000
30 | 7.81 | 0.3886 | 0.012954 | 192.168.1.1:60000
340 | 88.54 | 2.9437 | 0.008658 | local
11 | 2.86 | 0.3927 | 0.035704 | 192.168.1.3:60000
Time elapsed since server creation 2.09296107292
0 active tasks, 3 cores
成功!!(slf.js.print_stats()のところの出力。実行結果などは省略)
(parallel python は、CPU負荷を見てロードバランスしタスクプロセスを振り分けているらしいが、仮想kvmを使っているせいか、localの分量が多くなってしまった。。。)
これで、hdfsみたいな分散ファイルと、racawarenessみたいなタスク分散スケジューラが実装できれば、pythonでhadoop(hadoopy??)みたいなことできるんじゃないか!!