Manjusaka

Manjusaka

日常辣鸡水文: logging のプロセス安全問題について

日常辣鸡水文:关于 logging 的进程安全问题#

チームの食事会で少し酒を飲み、ゴミ文書エンジニアとして日常の水文を書いています。

正文#

現在、チームのログ収集方法は、元々の TCP 直伝 logstash の方法から、単一ファイルに書き込む方法に改善され、FileBeat をログ収集のフロントエンドとして使用しています。しかし、これによりしばしば問題が発生します。つまり、ログが失われることです。

ええ、私たちのオンラインサービスは Gunicorn を使用して複数の Worker を起動して処理しています。これには問題があります。私たちは皆、logging モジュールがスレッドセーフであることを知っています。標準の Log Handler 内部に一連のロックを追加してスレッドの安全性を確保していますが、logging がファイルに直接書き込むことはプロセスセーフなのでしょうか?

分析#

私たちがファイルに書き込む方法は、logging モジュールに付属の FileHandler を使用しています。まずはそのソースコードを見てみましょう。

class FileHandler(StreamHandler):
    """
    フォーマットされたログ記録をディスクファイルに書き込むハンドラークラス。
    """
    def __init__(self, filename, mode='a', encoding=None, delay=False):
        """
        指定されたファイルを開き、ログ用のストリームとして使用します。
        """
        # Issue #27493: Path オブジェクトのサポートを追加
        filename = os.fspath(filename)
        # 絶対パスを保持します。そうしないと、このクラスを使用する派生クラスが
        # 現在のディレクトリが変更されたときに問題を引き起こす可能性があります。
        self.baseFilename = os.path.abspath(filename)
        self.mode = mode
        self.encoding = encoding
        self.delay = delay
        if delay:
            # ストリームを開きませんが、レベル、フォーマッタ、ロックなどを設定するために
            # Handler コンストラクタを呼び出す必要があります。
            Handler.__init__(self)
            self.stream = None
        else:
            StreamHandler.__init__(self, self._open())

    def close(self):
        """
        ストリームを閉じます。
        """
        self.acquire()
        try:
            try:
                if self.stream:
                    try:
                        self.flush()
                    finally:
                        stream = self.stream
                        self.stream = None
                        if hasattr(stream, "close"):
                            stream.close()
            finally:
                # Issue #19523: 遅延が設定されているときにハンドラリークを防ぐために
                # 無条件に呼び出します。
                StreamHandler.close(self)
        finally:
            self.release()

    def _open(self):
        """
        現在のベースファイルを(元の)モードとエンコーディングで開きます。
        結果のストリームを返します。
        """
        return open(self.baseFilename, self.mode, encoding=self.encoding)

    def emit(self, record):
        """
        レコードを発行します。

        コンストラクタで 'delay' が指定されていない場合、発行する前にストリームを開きます。
        """
        if self.stream is None:
            self.stream = self._open()
        StreamHandler.emit(self, record)

    def __repr__(self):
        level = getLevelName(self.level)
        return '<%s %s (%s)>' % (self.__class__.__name__, self.baseFilename, level)

ええ、その中で注目すべき点は _open メソッドと emit メソッドです。まず、背景知識を紹介します。私たちが logging を使用してログを出力する際、logging モジュールは対応する Handlerhandle メソッドを呼び出します。handle メソッド内では、emit メソッドが呼び出され、最終的なログが出力されます。したがって、FileHandler を使用する場合、最初に handle メソッドの呼び出しがトリガーされ、その後 emit メソッドがトリガーされ、_open メソッドを呼び出して file point を取得し、親クラス(より正確には MRO の一つ上のクラス)である StreamHandleremit メソッドが呼び出されます。

StreamHandleremit メソッドを見てみましょう。

class StreamHandler(Handler):
    """
    ログ記録を適切にフォーマットしてストリームに書き込むハンドラークラス。
    このクラスはストリームを閉じないことに注意してください。sys.stdout または sys.stderr が使用される可能性があります。
    """

    terminator = '\n'

    def __init__(self, stream=None):
        """
        ハンドラーを初期化します。

        ストリームが指定されていない場合、sys.stderr が使用されます。
        """
        Handler.__init__(self)
        if stream is None:
            stream = sys.stderr
        self.stream = stream

    def flush(self):
        """
        ストリームをフラッシュします。
        """
        self.acquire()
        try:
            if self.stream and hasattr(self.stream, "flush"):
                self.stream.flush()
        finally:
            self.release()

    def emit(self, record):
        """
        レコードを発行します。

        フォーマッタが指定されている場合、それを使用してレコードをフォーマットします。
        レコードはストリームに書き込まれ、末尾に改行が追加されます。例外情報が存在する場合は、
        traceback.print_exception を使用してフォーマットされ、ストリームに追加されます。
        ストリームに 'encoding' 属性がある場合、それを使用してストリームへの出力方法を決定します。
        """
        try:
            msg = self.format(record)
            stream = self.stream
            stream.write(msg)
            stream.write(self.terminator)
            self.flush()
        except Exception:
            self.handleError(record)

    def __repr__(self):
        level = getLevelName(self.level)
        name = getattr(self.stream, 'name', '')
        if name:
            name += ' '
        return '<%s %s(%s)>' % (self.__class__.__name__, name, level)

ええ、とてもシンプルです。先ほど取得した file point を使用してファイルにデータを書き込みます。

問題はここにあります。FileHandler_open 関数で open 関数を呼び出すとき、選択される mode'a' です。つまり、通常の O_APPEND モードです。私たちは通常、O_APPEND はプロセスセーフと見なすことができることを知っています。なぜなら、O_APPEND は内容が他の O_APPEND 書き込み操作によって上書きされないことを保証するからです。しかし、なぜここでログが失われる状況が発生するのでしょうか?

理由は POSIX に特有の設計が存在するためです。『POSIX Programmers Guide』という書籍では、次のように説明されています。

  • PIPE_BUF バイト未満の書き込みは原子性があり、データは同じパイプに書き込む他のプロセスのデータと混在することはありません。PIPE_BUF を超える書き込みは、任意の方法でデータが混在する可能性があります。

この文の翻訳は、POSIX には PIPE_BUF という変数が存在し、そのサイズは 512 であることを示しています。書き込み操作のサイズが PIPE_BUF の値未満である場合、その操作は原子性を持ち、中断されることはありません。したがって、他のプロセスによる書き込み値と混乱することはありません。しかし、書き込む内容が PIPE_BUF を超える場合、オペレーティングシステムはこの点を保証できません。

Linux オペレーティングシステムでは、この値が少し変わります。

  • POSIX.1 は、PIPE_BUF バイト未満の write (2) は原子性を持つべきであると述べています。出力データはパイプに連続したシーケンスとして書き込まれます。PIPE_BUF バイトを超える書き込みは非原子性である可能性があり、カーネルは他のプロセスによって書き込まれたデータと混在する可能性があります。POSIX.1 は PIPE_BUF を少なくとも 512 バイトとすることを要求しています。(Linux では、PIPE_BUF は 4096 バイトです。)

つまり、4K を超える書き込み操作は原子性を保証できず、データの混乱が発生する可能性があります。

データの混乱が発生すると、ログのフォーマットが不定になり、最終的に解析側が解析できず、結果としてログが失われます。

ここで、再現してみましょう。まずはテストコードです。

最后#

このような操作は以前は考えたことがありませんでしたが、今日は新しい扉を開いたようです。最後に、@依云先輩の指導に感謝します。先輩の指摘がなければ、O_APPEND モードでもデータが安全であるとは全く考えられませんでした。

Reference#

文中で参考にした資料は以下の 2 つです。

1.OReilly POSIX Programmers Guide

2.Linux Man: PIPE

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。