Python マルチプロセス ブロック

Python学習【365日チャレンジ!】221日目のマスターU(@Udemy11)です。

雨上がりに原付きででかけたら、心臓が止まりそうになりました。

車とぶつかりそうになったわけでも、何かにぶつかりそうになったわけでもありませんが、コケちゃいそうになりました。

雨上がりだったので、マンホールが濡れていたんです。

曲がりながらマンホールの上を通過したら、タイヤが滑ってそのまま横転しそうになりました。

スピードがあまり出ていなかったのと、なんとか足を地面につけて踏ん張ることができたので、コケなくて住みましたが、ほんと心臓が止まりそうでした。

ほんと雨上がりの二輪車の運転は気をつけないといけませんね。

それでは、今日もPython学習をはじめましょう。

昨日の学習

昨日は、プールの数を制限した非同期処理を学習しました。

プールの引数を変更することで、非同期処理できるプロセスの数を制限することができました。

条件に応じて、並列処理で走らせるプロセスを制限したいときに使うといいかもしれません。

詳細については、昨日の記事をごらんください。

今日は、プールを使って処理をブロックする方法について学習します。

applyでブロック

非同期処理ではなく、最初に処理をさせてから次に非同期処理をしたいときに次のようにコードを書きます。

import logging
import multiprocessing
import time

logging.basicConfig(
    level=logging.DEBUG, format='%(processName)s: %(message)s'
)

def worker1(i):
    logging.debug('start')
    time.sleep(5)
    logging.debug('end')
    return i

if __name__ == '__main__':
    with multiprocessing.Pool(2) as p:
        logging.debug(p.apply(worker1, (300, )))
        logging.debug('executed apply')
        p1 = p.apply_async(worker1, (100, ))
        p2 = p.apply_async(worker1, (200, ))
        logging.debug('executed')
        logging.debug(p1.get())
        logging.debug(p2.get())

昨日のコードに17行目と18行目を加えています。

ご存知のとおり、Pythonは上から順番にコードを実行していきますので、17行目が最初に実行されます。

17行目のapplyは、worker1の返り値をgetしなくても受け取るところまでの処理をしますので、loggingで返り値(つまり300)を出力します。

17行目の処理が終了してから、次の行へ進んでexecuted applyが出力されて、その後は昨日と同じ出力になります。

実行結果

ForkPoolWorker-1: start #ここで5秒待つ
ForkPoolWorker-1: end
MainProcess: 300
MainProcess: executed apply
MainProcess: executed
ForkPoolWorker-1: start
ForkPoolWorker-2: start #ここで5秒待つ
ForkPoolWorker-1: end
ForkPoolWorker-2: end
MainProcess: 100
MainProcess: 200

applyは、次の処理をブロックしているわけですが、これまで学習してきた内容から考えるとちょっとわかりづらくなってしまうかもしれません。

マルチプロセスの並列処理で考えたときに、処理が終了してから次に移るのか、並列処理をするのか、非同期処理をするのかというように処理が複雑になってくると少しややこしい処理をしないといけないのかもしれません。

一時期、ファジーなんて言葉が流行になっていましたが、プログラムは細かいところまで指示をしないと期待する処理をしてくれないということです。

人間のように曖昧な指示では正確な処理ができないわけで、その部分を埋めようとしているのが学習機能を持った人工知能ということです。

プロセスを増やす

今回のコードにハイライトした次の2行を付け足してみました。

if __name__ == '__main__':
    with multiprocessing.Pool(2) as p:
        p3 = p.apply_async(worker1, (400, ))
        logging.debug(p3.get())
        logging.debug(p.apply(worker1, (300, )))
        logging.debug('executed apply')
        p1 = p.apply_async(worker1, (100, ))
        p2 = p.apply_async(worker1, (200, ))
        logging.debug('executed')
        logging.debug(p1.get())
        logging.debug(p2.get())

このコードの中には、非同期処理のプロセスが3つあり、16行目のPool(2)でプロセスを2つに制限しているので、最後のp2p1と並列処理ができないのかと思っていました。

結果は、18行目でp3が終了しているので、p1p2は並列処理できているんですね。

ForkPoolWorker-1: start #ここで5秒待つ
ForkPoolWorker-1: end
MainProcess: 400
ForkPoolWorker-2: start #ここで5秒待つ
ForkPoolWorker-2: end
MainProcess: 300
MainProcess: executed apply
MainProcess: executed
ForkPoolWorker-1: start
ForkPoolWorker-2: start #ここで5秒待つ
ForkPoolWorker-1: end
ForkPoolWorker-2: end
MainProcess: 100
MainProcess: 200

ちなみに16行目をPool(3)に変更して実行してみると、10行目と12行目のForkPoolWorker2から3にかわります。

3つまでプロセスが使えるので、最初の1つを開放せずに処理しているのでしょうか?

このあたりの処理の仕方は裏でどのように動いているのかよくわかりませんが、どちらも同じような処理ができています。

いろいろ試す

他にもプロセスの数をもっと増やしたり、getのコードを移動してみたり、いろいろと試しているのですが、実行結果がいろいろと変わってどこがどうなっているのかちょっと頭がこんがらがってきます。

どこがどうなっているのか何回も実行してみるのですが、どうしてもわからないところが出てくるんですよね。

そんなときは、一旦休憩してみるものいいかもしれません。

休憩して一晩寝れば、脳が情報を整理して、わからなかったことが理解できるようになることって結構ありますから。

それでは明日も、Good Python!