ゴミ話#
久しぶりにブログを書いていないので、もう水をかけるわけにはいかないと思い、自分に目標を設定して concurrent.future の内容を書いてみることにしました。この記事では、Python 3.2 に新しく追加された concurrent.future モジュールについてお話しします。
本文#
Python の非同期処理#
ある Python 開発者の小明が面接中に、突然次のような要求を受けました:いくつかのウェブサイトにリクエストを送り、データを取得することです。小明は考えました、簡単だ、パパッと書いてみました。
import multiprocessing
import time
def request_url(query_url: str):
time.sleep(3) # リクエスト処理ロジック
if __name__ == '__main__':
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url,)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]
簡単ですね。さて、新しい要求が来ました。各リクエストの結果を取得したいのですが、どうすればいいでしょうか?小明は考え、次のようなコードを書きました。
import multiprocessing
import time
def request_url(query_url: str, result_dict: dict):
time.sleep(3) # リクエスト処理ロジック
result_dict[query_url] = {} # 結果を返す
if __name__ == '__main__':
process_manager = multiprocessing.Manager()
result_dict = process_manager.dict()
url_list = ["abc.com", "xyz.com"]
task_list = [multiprocessing.Process(target=request_url, args=(url, result_dict)) for url in url_list]
[task.start() for task in task_list]
[task.join() for task in task_list]
print(result_dict)
さて、面接官が言いました、「うん、見た目は良さそうだ。では、問題を少し変えます。まず、主プロセスをブロックしてはいけません。主プロセスはタスクの現在の状態(終了 / 未終了)に基づいて、適時に対応する結果を取得する必要があります。どう変更しますか?」小明は考えました、「それなら、セマフォを使って、タスクが完了した後に親プロセスに信号を送るのはどうでしょうか?それとももっと簡単な方法はありますか?」どうやら他には思いつきませんでした。最後に面接官は心の中で「naive」と言い、顔には笑みを浮かべて小明に帰って待つように言いました。
小明の窮地から、私たちは次のような問題が見えてきます。私たちが最もよく使用する multiprocessing
または threading
の 2 つのモジュールは、非同期タスクを実現したいシーンにおいて、実は少し不親切です。私たちはしばしば、比較的クリーンに非同期の要求を実現するために、いくつかの追加作業を行う必要があります。このような窮地を解決するために、2009 年 10 月にブライアン・クインラン氏が PEP 3148 を提案しました。この提案では、私たちがよく使用する multiprocessing
と threading
モジュールをさらにラップして、非同期操作をより良くサポートすることを目指しました。最終的にこの提案は Python 3.2 に導入されました。つまり、今日私たちが話す concurrent.future です。
Future モード#
新しいモジュールについて本格的に話し始める前に、Future
モードに関する関連情報を理解する必要があります。
まず、Future
モードとは何でしょうか?
Future
は実際には生産者 - 消費者モデルの一種の拡張です。生産者 - 消費者モデルでは、生産者は消費者がデータを処理するタイミングや、消費者が処理した結果には関心を持ちません。例えば、私たちはしばしば次のようなコードを書きます。
import multiprocessing, Queue
import os
import time
from multiprocessing import Process
from time import sleep
from random import randint
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
self.queue.put('one product')
print(multiprocessing.current_process().name + str(os.getpid()) + ' produced one product, the no of queue now is: %d' %self.queue.qsize())
sleep(randint(1, 3))
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
d = self.queue.get(1)
if d != None:
print(multiprocessing.current_process().name + str(os.getpid()) + ' consumed %s, the no of queue now is: %d' %(d,self.queue.qsize()))
sleep(randint(1, 4))
continue
else:
break
#create queue
queue = multiprocessing.Queue(40)
if __name__ == "__main__":
print('Excited!')
#create processes
processed = []
for i in range(3):
processed.append(Producer(queue))
processed.append(Consumer(queue))
#start processes
for i in range(len(processed)):
processed[i].start()
#join processes
for i in range(len(processed)):
processed[i].join()
これが生産者 - 消費者モデルの簡単な実装です。私たちは multiprocessing
の Queue
を通信チャネルとして利用し、生産者はキューにデータを投入し、消費者はキューからデータを取得して処理します。しかし、上記のように、このモデルでは生産者は消費者がデータを処理するタイミングや、処理結果には関心を持ちません。一方、Future
では、生産者がメッセージ処理の完了を待つことができ、必要に応じて関連する計算結果を取得することもできます。
例えば、次のような Java コードを見てみましょう。
package concurrent;
import java.util.concurrent.Callable;
public class DataProcessThread implements Callable<String> {
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
Thread.sleep(10000);//データ処理をシミュレート
return "データ返却";
}
}
これは私たちがデータを処理するためのコードです。
package concurrent;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
public class MainThread {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
// TODO Auto-generated method stub
DataProcessThread dataProcessThread = new DataProcessThread();
FutureTask<String> future = new FutureTask<String>(dataProcessThread);
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(future);
Thread.sleep(10000);//他の業務処理をシミュレート
while (true) {
if (future.isDone()) {
System.out.println(future.get());
break;
}
}
executor.shutdown();
}
}
これは私たちのメインスレッドです。皆さんは、データ処理タスクの状態を簡単に取得できることがわかります。同時に、関連する結果を取得することもできます。
Python における concurrent.futures#
前述のように、Python 3.2 以降、concurrent.futures は組み込みモジュールであり、直接使用できます。
注意: Python 2.7 で concurrent.futures を使用する必要がある場合は、pip を使用してインストールしてください。
pip install futures
さて、準備が整ったので、このものをどう使うか見てみましょう。
from concurrent.futures import ProcessPoolExecutor
import time
def return_future_result(message):
time.sleep(2)
return message
if __name__ == '__main__':
pool = ProcessPoolExecutor(max_workers=2) # 最大2つのタスクを収容できるプロセスプールを作成
future1 = pool.submit(return_future_result, ("hello")) # プロセスプールにタスクを追加
future2 = pool.submit(return_future_result, ("world")) # プロセスプールにタスクを追加
print(future1.done()) # タスク1が終了したかどうかを判断
time.sleep(3)
print(future2.done()) # タスク2が終了したかどうかを判断
print(future1.result()) # タスク1が返した結果を確認
print(future2.result()) # タスク2が返した結果を確認
まず from concurrent.futures import ProcessPoolExecutor
から concurrent.futures
の ProcessPoolExecutor
をインポートし、後のデータ処理を行います。(concurrent.futures
では、私たちに 2 種類の Executor
を提供しています。一つは現在使用している ProcessPoolExecutor
、もう一つは ThreadPoolExecutor
です。これらは外部に公開されているメソッドが一致しているため、皆さんは実際のニーズに応じて選択できます。)
次に、最大容量が 2 のプロセスプールを初期化します。そして、プロセスプールの submit
メソッドを呼び出してタスクを提出します。面白い点は、submit
メソッドを呼び出すと、特別な変数が得られます。この変数は Future
クラスのインスタンスであり、将来完了する操作を表します。言い換えれば、submit
が Future
インスタンスを返すとき、私たちのタスクはまだ完了していない可能性があります。Future
インスタンスの done
メソッドを呼び出すことで、現在のタスクの実行状態を取得できます。タスクが終了した後、result
メソッドを呼び出すことで、返された結果を取得できます。もし後続のロジックを実行する際に、何らかの理由でタスクをキャンセルしたい場合は、cancel
メソッドを呼び出して現在のタスクをキャンセルできます。
さて、新しい問題が出てきました。たくさんのタスクを提出したい場合はどうすればよいでしょうか?concurrent.future
は、タスクを一括で追加するために map
メソッドを提供しています。
import concurrent.futures
import requests
task_url = [('http://www.baidu.com', 40), ('http://example.com/', 40), ('https://www.github.com/', 40)]
def load_url(params: tuple):
return requests.get(params[0], timeout=params[1]).text
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
for url, data in zip(task_url, executor.map(load_url, task_url)):
print('%r page is %d bytes' % (url, len(data)))
ええ、concurrent.future
のスレッド / プロセスプールが提供する map
メソッドは、標準ライブラリの map
関数の使用方法と同じです。
concurrent.futures の解析#
前述のように concurrent.futures
の使い方を説明した後、私たちは concurrent.futures
がどのように Future
モードを実現しているのか、タスクと結果をどのように関連付けているのかに興味を持ちます。今から submit
メソッドを手がかりに、ProcessPoolExecutor
の実装を簡単に見てみましょう。
まず、ProcessPoolExecutor
を初期化する際、その __init__
メソッドでいくつかの重要な変数の初期化操作が行われます。
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""新しい ProcessPoolExecutor インスタンスを初期化します。
引数:
max_workers: 指定された呼び出しを実行するために使用できるプロセスの最大数。None または指定されていない場合、マシンのプロセッサ数と同じ数のワーカープロセスが作成されます。
"""
_check_system_limits()
if max_workers is None:
self._max_workers = os.cpu_count() or 1
else:
if max_workers <= 0:
raise ValueError("max_workers は 0 より大きくなければなりません")
self._max_workers = max_workers
# ワーカープロセスがアイドル状態にならないように、呼び出しキューをプロセス数より少し大きくします。ただし、キュー内の未来はキャンセルできないため、あまり大きくしないでください。
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
# 終了したワーカープロセスは、キューの自身のワーカースレッドで「壊れたパイプ」のトレースバックを生成する可能性があります。しかし、私たちは終了したプロセスを検出するので、トレースバックを無視します。
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
# pids とプロセスのマップ
self._processes = {}
# シャットダウンは二段階のプロセスです。
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
さて、私たちの今日のエントリーポイントである submit
メソッドを見てみましょう。
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool('子プロセスが突然終了しました。プロセスプールはもはや使用できません')
if self._shutdown_thread:
raise RuntimeError('シャットダウン後に新しい未来をスケジュールすることはできません')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
# キュー管理スレッドを起こす
self._result_queue.put(None)
self._start_queue_management_thread()
return f
まず、渡された引数 fn
は私たちの処理関数であり、args
および kwargs
は fn
関数に渡す引数です。submit
関数の最初では、まず _broken
と _shutdown_thread
の値に基づいて、現在のプロセスプール内の処理プロセスの状態と現在のプロセスプールの状態を判断します。処理プロセスが突然終了したり、プロセスプールがすでにシャットダウンされている場合、新しい submit
操作を受け付けないことを示す例外がスローされます。
前述の状態に問題がなければ、まず Future
クラスをインスタンス化し、そのインスタンスと処理関数および関連する引数を使って _WorkItem
クラスをインスタンス化します。次に、インスタンス w
を値として、_queue_count
をキーとして _pending_work_items
に格納します。そして、_start_queue_management_thread
メソッドを呼び出して、プロセスプール内の管理スレッドを開始します。今、これらの部分のコードを見てみましょう。
def _start_queue_management_thread(self):
# エグゼキュータが失われた場合、弱参照コールバックがキュー管理スレッドを起こします。
def weakref_cb(_, q=self._result_queue):
q.put(None)
if self._queue_management_thread is None:
# プロセスが開始され、送信先が知られるようにします。
self._adjust_process_count()
self._queue_management_thread = threading.Thread(
target=_queue_management_worker,
args=(weakref.ref(self, weakref_cb),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue
この部分は非常にシンプルです。まず _adjust_process_count
メソッドを実行し、その後、守護スレッドを開始して _queue_management_worker
メソッドを実行します。私たちはいくつかの変数を渡しました。まず、_processes
は私たちのプロセスマッピングであり、_pending_work_items
には私たちの待機タスクが格納されています。また、_call_queue
と _result_queue
もあります。さて、皆さんがあまり理解できないかもしれないもう一つのパラメータは weakref.ref(self, weakref_cb)
です。
まず、Python はガベージコレクションメカニズムを持つ言語であり、GC (Garbage Collection) メカニズムは、私たちがほとんどの場合、メモリの割り当てと回収を気にする必要がないことを意味します。Python では、オブジェクトがいつ回収されるかは、その参照カウントによって決まります。参照カウントが 0 になると、そのオブジェクトは回収されます。しかし、いくつかの状況では、オブジェクトが交差参照やその他の理由により、参照カウントが常に 0 にならないことがあります。これにより、オブジェクトが回収されず、メモリリークが発生します。したがって、通常の参照とは異なり、Python では弱参照という参照メカニズムが追加されました。弱参照の意味は、ある変数がオブジェクトを保持しているが、そのオブジェクトの参照カウントを増やさないことです。したがって、weakref.ref(self, weakref_cb)
はほとんどの場合、self
と等価です(ここでなぜ弱参照を使用するのかについては、ここでは説明しませんが、別の章で説明します)。
さて、この部分のコードを見終わったので、_queue_management_worker
がどのように実装されているか見てみましょう。
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
"""このプロセスとワーカープロセス間の通信を管理します。
この関数はローカルスレッドで実行されます。
executor_reference: このスレッドが所有する ProcessPoolExecutor への weakref.ref
引数:
process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。これは、ProcessPoolExecutor がガベージコレクションされているかどうかを判断し、この関数が終了できるかどうかを判断するために使用されます。
pending_work_items: work_id を _WorkItems にマッピングする dict 例:
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: work_id の queue.Queue 例: Queue([5, 6, ...])。ワーク ID は消費され、対応する _WorkItems が pending_work_items から _CallItems に変換され、call_queue に置かれます。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
"""
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# これは上限です
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# キューのリソースをできるだけ早く解放します。
call_queue.close()
# 作成されたプロセスで .join() が呼び出されない場合、いくつかの multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに提出が失敗するようにします。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされる必要があります
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが突然終了し、未来が実行中または保留中でした。"
))
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます。キューまたはそのロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# PID を使用してワーカーを正常にシャットダウンします
# (エグゼキュータを壊れたとマークするのを避けます)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は別のプロセスが終了した場合には None になる可能性があります(上記を参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
# シャットダウンを開始する必要があるかどうかを確認します。
executor = executor_reference()
# 次の条件が満たされると、新しい作業アイテムを追加できなくなります:
# - インタプリタがシャットダウン中であるか、または
# - このワーカーを所有するエグゼキュータが収集されたか、または
# - このワーカーを所有するエグゼキュータがシャットダウンされた。
if shutting_down():
try:
# 新しい作業アイテムを追加できないため、保留中の作業アイテムがない場合は、このスレッドを安全にシャットダウンできます。
if not pending_work_items:
shutdown_worker()
return
except Full:
# これは問題ではありません。最終的には目覚めて(result_queue.get() で)再びセンチネルを送信できるようになります。
pass
executor = None
おなじみの無限ループです。ループの最初のステップでは、_add_call_item_to_queue
関数を使用して、待機キュー内のタスクを呼び出しキューに追加します。この部分のコードを見てみましょう。
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""pending_work_items から _WorkItems を使用して call_queue を満たします。
この関数はブロックしません。
引数:
pending_work_items: work_id を _WorkItems にマッピングする dict 例:
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: work_id の queue.Queue 例: Queue([5, 6, ...]). ワーク ID は消費され、対応する _WorkItems が pending_work_items から _CallItems に変換され、call_queue に置かれます。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
まず、呼び出しキューが満杯かどうかを判断します。満杯の場合は、このループを放棄します。次に、work_id
キューから取得し、待機タスクから対応する _WorkItem
インスタンスを取得します。次に、インスタンスにバインドされた Future
インスタンスの set_running_or_notify_cancel
メソッドを呼び出してタスクの状態を設定し、その後、呼び出しキューに投入します。
def set_running_or_notify_cancel(self):
"""未来を実行中としてマークするか、キャンセル通知を処理します。
このメソッドは、Executor 実装とユニットテストによってのみ使用されるべきです。
未来がキャンセルされている場合(cancel() が呼び出され、True を返した場合)、未来の完了を待っているスレッド(as_completed() や wait() を通じて)は通知され、False が返されます。
未来がキャンセルされていない場合、実行中の状態に設定され(未来の呼び出しは実行中であることを返します)、True が返されます。
このメソッドは、Executor 実装がこの未来に関連する作業を実行する前に呼び出す必要があります。このメソッドが False を返す場合、その作業は実行されるべきではありません。
戻り値:
未来がキャンセルされていれば False、そうでなければ True を返します。
例外:
RuntimeError: このメソッドがすでに呼び出された場合、または set_result() または set_exception() が呼び出された場合。
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() は必要ありません。なぜなら、self.cancel() が通知をトリガーするからです。
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
この部分の内容は非常にシンプルです。現在のインスタンスが待機状態であれば、True を返し、キャンセル状態であれば False を返します。_add_call_item_to_queue
関数では、すでに cancel
状態にある _WorkItem
を待機タスクから削除します。
さて、私たちは _queue_management_worker
関数に戻ります。
def _queue_management_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue):
"""このプロセスとワーカープロセス間の通信を管理します。
この関数はローカルスレッドで実行されます。
executor_reference: このスレッドが所有する ProcessPoolExecutor への weakref.ref
引数:
process: このスレッドで使用される multiprocessing.Process インスタンスのリスト。これは、ProcessPoolExecutor がガベージコレクションされているかどうかを判断し、この関数が終了できるかどうかを判断するために使用されます。
pending_work_items: work_id を _WorkItems にマッピングする dict 例:
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: work_id の queue.Queue 例: Queue([5, 6, ...])。ワーク ID は消費され、対応する _WorkItems が pending_work_items から _CallItems に変換され、call_queue に置かれます。
call_queue: _WorkItems から派生した _CallItems で満たされる multiprocessing.Queue。
result_queue: プロセスワーカーによって生成された _ResultItems の multiprocessing.Queue。
"""
executor = None
def shutting_down():
return _shutdown or executor is None or executor._shutdown_thread
def shutdown_worker():
# これは上限です
nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive):
call_queue.put_nowait(None)
# キューのリソースをできるだけ早く解放します。
call_queue.close()
# 作成されたプロセスで .join() が呼び出されない場合、いくつかの multiprocessing.Queue メソッドが Mac OS X でデッドロックする可能性があります。
for p in processes.values():
p.join()
reader = result_queue._reader
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに提出が失敗するようにします。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされる必要があります
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが突然終了し、未来が実行中または保留中でした。"
))
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます。キューまたはそのロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
if isinstance(result_item, int):
# PID を使用してワーカーを正常にシャットダウンします
# (エグゼキュータを壊れたとマークするのを避けます)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は別のプロセスが終了した場合には None になる可能性があります(上記を参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
# シャットダウンを開始する必要があるかどうかを確認します。
executor = executor_reference()
# 次の条件が満たされると、新しい作業アイテムを追加できなくなります:
# - インタプリタがシャットダウン中であるか、または
# - このワーカーを所有するエグゼキュータが収集されたか、または
# - このワーカーを所有するエグゼキュータがシャットダウンされた。
if shutting_down():
try:
# 新しい作業アイテムを追加できないため、保留中の作業アイテムがない場合は、このスレッドを安全にシャットダウンできます。
if not pending_work_items:
shutdown_worker()
return
except Full:
# これは問題ではありません。最終的には目覚めて(result_queue.get() で)再びセンチネルを送信できるようになります。
pass
executor = None
result_item
変数
私たちは見てみましょう。
まず、皆さんはここで少し疑問を持つかもしれません。
sentinels = [p.sentinel for p in processes.values()]
assert sentinels
ready = wait([reader] + sentinels)
この wait
は何でしょうか、reader
は何でしょうか。順を追って見てみましょう。まず、前述のように、reader = result_queue._reader
も皆さんの疑問を引き起こすかもしれません。ここで result_queue
は multiprocess
の SimpleQueue
です。彼は _reader
メソッドを持っていません。
class SimpleQueue(object):
def __init__(self, *, ctx):
self._reader, self._writer = connection.Pipe(duplex=False)
self._rlock = ctx.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
self._wlock = ctx.Lock()
上記は SimpleQueue
の一部コードです。私たちは、SimpleQueue
が本質的に Pipe
を使用してプロセス間通信を行っていることを明確に見ることができます。そして、_reader
は Pipe
を読み取るための変数です。
注意 : 他のプロセス間通信の方法を復習してみてください。
さて、この部分を理解した後、wait
メソッドを見てみましょう。
def wait(object_list, timeout=None):
'''
object_list のオブジェクトが準備完了/読み取り可能になるまで待機します。
準備完了/読み取り可能な object_list 内のオブジェクトのリストを返します。
'''
with _WaitSelector() as selector:
for obj in object_list:
selector.register(obj, selectors.EVENT_READ)
if timeout is not None:
deadline = time.time() + timeout
while True:
ready = selector.select(timeout)
if ready:
return [key.fileobj for (key, events) in ready]
else:
if timeout is not None:
timeout = deadline - time.time()
if timeout < 0:
return ready
この部分のコードは非常にシンプルです。まず、私たちが読み取るべきオブジェクトを登録し、timeout
が None の場合は、オブジェクトがデータを読み取ることに成功するまで待機し続けます。
さて、私たちは前述の _queue_management_worker
関数に戻ります。
ready = wait([reader] + sentinels)
if reader in ready:
result_item = reader.recv()
else:
# プロセスプールを壊れたとマークし、すぐに提出が失敗するようにします。
executor = executor_reference()
if executor is not None:
executor._broken = True
executor._shutdown_thread = True
executor = None
# すべてのフューチャーは失敗としてマークされる必要があります
for work_id, work_item in pending_work_items.items():
work_item.future.set_exception(
BrokenProcessPool(
"プロセスプール内のプロセスが突然終了し、未来が実行中または保留中でした。"
))
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
pending_work_items.clear()
# 残りのワーカーを強制的に終了させます。キューまたはそのロックが汚れた状態になっている可能性があり、永遠にブロックされる可能性があります。
for p in processes.values():
p.terminate()
shutdown_worker()
return
私たちは wait
関数を使用して一連のオブジェクトを読み取ります。timeout
を設定していないため、可読オブジェクトの結果を取得したときに、result_queue._reader
がリストにない場合、処理プロセスが突然異常終了したことを意味します。このとき、私たちは後続の文を実行して、現在のプロセスプールのシャットダウン操作を実行します。リストにある場合、データを読み取り、result_item
変数を取得します。
私たちは次のコードを見てみましょう。
if isinstance(result_item, int):
# PID を使用してワーカーを正常にシャットダウンします
# (エグゼキュータを壊れたとマークするのを避けます)
assert shutting_down()
p = processes.pop(result_item)
p.join()
if not processes:
shutdown_worker()
return
elif result_item is not None:
work_item = pending_work_items.pop(result_item.work_id, None)
# work_item は別のプロセスが終了した場合には None になる可能性があります(上記を参照)
if work_item is not None:
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# オブジェクトへの参照を削除します。問題16284を参照
del work_item
まず、result_item
変数が int 型である場合、皆さんは _process_worker
関数の中に次のようなロジックがあったことを覚えていますか?
call_item = call_queue.get(block=True)
if call_item is None:
# キュー管理スレッドを起こします
result_queue.put(os.getpid())
return
呼び出しキューに新しいタスクがない場合、現在のプロセスの pid
を結果キューに入れます。したがって、result_item
の値が int である場合、以前のタスク処理が完了したことを意味し、私たちはクリーンアップを開始し、プロセスプールをシャットダウンします。
result_item
が int でも None でもない場合、必然的に _ResultItem
のインスタンスであり、work_id
に基づいて _WorkItem
インスタンスを取得し、生成された例外または値を _WorkItem
インスタンス内の Future
インスタンス(つまり、私たちが submit した後に返されたもの)にバインドします。
最後に、この work_item
を削除し、完了です。
最後に#
長々としたゴミのような記事を書きましたが、皆さんが気にしないことを願っています。実際、私たちは concurrent.future
の実装を見ると、特に高深な黒魔法を使っているわけではありませんが、その中の細部は私たちが一つ一つ味わう価値があります。この記事はここまでにしましょう。後で機会があれば、concurrent.future
の他の部分のコードも見てみたいと思います。味わうべき点がたくさんあります。