使用多处理疯狂记录

 扬帆900 发布于 2023-02-13 13:36

我试图在多处理场景中使用python的默认日志记录模块.我读了:

    Python多进程,日志记录,各种类

    使用多处理记录

和其他多篇关于多处理,日志记录,python类等的帖子.在完成所有这些阅读后,我来到这段代码,我无法正常运行使用python的logutils QueueHandler:

import sys
import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler


class Worker(Process):

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        self.qh = QueueHandler(self.queue)
        self.root = logging.getLogger()
        self.root.addHandler(self.qh)
        self.root.setLevel(logging.DEBUG)        
        self.logger = logging.getLogger("W%i"%self.n)


    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


def listener_process(queue):
    while True:
        try:
            record = queue.get()
            if record is None:
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            import sys, traceback
            print >> sys.stderr, 'Whoops! Problem:'
            traceback.print_exc(file=sys.stderr)

if __name__ == "__main__":

    mpq = mpQueue(-1)

    root = logging.getLogger()
    h = logging.StreamHandler()    
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s     %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

    l = logging.getLogger("Test")
    l.setLevel(logging.DEBUG)

    listener = Process(target=listener_process,
                       args=(mpq,))
    listener.start()
    workers=[]
    for i in xrange(1):
        worker = Worker(i, mpq)
        worker.daemon = True
        worker.start()
        workers.append(worker)

    for worker in workers:
        worker.join()

    mpq.put_nowait(None)
    listener.join()

    for i in xrange(10):
        l.info("testing %i"%i)

    print "Finish"

如果代码被执行,输出会以某种方式重复以下行:

2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,002 Worker-2   W0 INFO         Worker 0 Starting
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 0
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 1
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 2
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 3
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 4
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 5
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,003 Worker-2   W0 INFO         testing 6
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 7
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 8
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,004 Worker-2   W0 INFO         testing 9
2013-12-02 16:44:46,004 Worker-2   W0 INFO         Completed 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 0
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 1
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 2
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 3
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 4
2013-12-02 16:44:46,005 MainProcess Test INFO         testing 5
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 6
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 7
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 8
2013-12-02 16:44:46,006 MainProcess Test INFO         testing 9
Finish

在其他问题中,建议处理程序不止一次添加,但是,正如您所看到的,我只在方法中添加了一次streamhanlder .我已经测试过将main方法嵌入到具有相同结果的类中.

编辑:正如@max建议的(或者我相信他说的)我已经修改了worker类的代码:

class Worker(Process):

    root = logging.getLogger()
    qh = None

    def __init__(self, n, q):
        super(Worker, self).__init__()
        self.n = n
        self.queue = q

        if not self.qh:
            Worker.qh = QueueHandler(self.queue)            
            Worker.root.addHandler(self.qh)
            Worker.root.setLevel(logging.DEBUG)

        self.logger = logging.getLogger("W%i"%self.n)

        print self.root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)

使用相同的结果,现在队列处理程序不会一次又一次地添加,但仍然存在重复的日志条目,即使只有一个工作程序也是如此.

编辑2:我已经改变了一点代码.我更改了侦听器进程,现在使用了QueueListener(这就是我在开始时的意图),将主代码移动到了一个类.

import sys

import logging
from logging import INFO
from multiprocessing import Process, Queue as mpQueue
import threading
import time

from logutils.queue import QueueListener, QueueHandler

root = logging.getLogger()
added_qh = False

class Worker(Process):

    def __init__(self, logconf, n, qh):
        super(Worker, self).__init__()
        self.n = n
        self.logconf = logconf

#        global root
        global added_qh

        if not added_qh:
            added_qh = True
            root.addHandler(qh)
            root.setLevel(logging.DEBUG)            

        self.logger = logging.getLogger("W%i"%self.n)

        #print root.handlers

    def run(self):
        self.logger.info("Worker %i Starting"%self.n)

        for i in xrange(10):
            self.logger.log(INFO, "testing %i"%i)

        self.logger.log(INFO, "Completed %i"%self.n)


class Main(object):

    def __init__(self):
        pass

    def start(self):

        mpq = mpQueue(-1)
        qh = QueueHandler(mpq)

        h = logging.StreamHandler()

        ql = QueueListener(mpq, h)

        #h.setFormatter(f)
        root.addHandler(qh)

        l = logging.getLogger("Test")
        l.setLevel(logging.DEBUG)

        workers=[]

        for i in xrange(15):
            worker = Worker(logconf, i, qh)
            worker.daemon = True
            worker.start()
            workers.append(worker)

        for worker in workers:
            print "joining worker: {}".format(worker)
            worker.join()

        mpq.put_nowait(None)

        ql.start()

        # listener.join()

        for i in xrange(10):
            l.info("testing %i"%i)

if __name__ == "__main__":


    x = Main()
    x.start()

    time.sleep(10)

    print "Finish"

现在它主要起作用,直到我达到一定数量的工人(~15),因为某些原因主要课程在de join中被阻止而其他工作人员什么都不做.

撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有