我试图在多处理场景中使用python的默认日志记录模块.我读了:
Python多进程,日志记录,各种类
使用多处理记录
和其他多篇关于多处理,日志记录,python类等的帖子.在完成所有这些阅读后,我来到这段代码,我无法正常运行使用python的logutils QueueHandler:
import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler class Worker(Process): def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q self.qh = QueueHandler(self.queue) self.root = logging.getLogger() self.root.addHandler(self.qh) self.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) def listener_process(queue): while True: try: record = queue.get() if record is None: break logger = logging.getLogger(record.name) logger.handle(record) except (KeyboardInterrupt, SystemExit): raise except: import sys, traceback print >> sys.stderr, 'Whoops! Problem:' traceback.print_exc(file=sys.stderr) if __name__ == "__main__": mpq = mpQueue(-1) root = logging.getLogger() h = logging.StreamHandler() f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s') h.setFormatter(f) root.addHandler(h) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) listener = Process(target=listener_process, args=(mpq,)) listener.start() workers=[] for i in xrange(1): worker = Worker(i, mpq) worker.daemon = True worker.start() workers.append(worker) for worker in workers: worker.join() mpq.put_nowait(None) listener.join() for i in xrange(10): l.info("testing %i"%i) print "Finish"
如果代码被执行,输出会以某种方式重复以下行:
2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,002 Worker-2 W0 INFO Worker 0 Starting 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 0 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 1 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 2 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 3 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 4 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 5 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,003 Worker-2 W0 INFO testing 6 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 7 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 8 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,004 Worker-2 W0 INFO testing 9 2013-12-02 16:44:46,004 Worker-2 W0 INFO Completed 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 0 2013-12-02 16:44:46,005 MainProcess Test INFO testing 1 2013-12-02 16:44:46,005 MainProcess Test INFO testing 2 2013-12-02 16:44:46,005 MainProcess Test INFO testing 3 2013-12-02 16:44:46,005 MainProcess Test INFO testing 4 2013-12-02 16:44:46,005 MainProcess Test INFO testing 5 2013-12-02 16:44:46,006 MainProcess Test INFO testing 6 2013-12-02 16:44:46,006 MainProcess Test INFO testing 7 2013-12-02 16:44:46,006 MainProcess Test INFO testing 8 2013-12-02 16:44:46,006 MainProcess Test INFO testing 9 Finish
在其他问题中,建议处理程序不止一次添加,但是,正如您所看到的,我只在主方法中添加了一次streamhanlder .我已经测试过将main方法嵌入到具有相同结果的类中.
编辑:正如@max建议的(或者我相信他说的)我已经修改了worker类的代码:
class Worker(Process): root = logging.getLogger() qh = None def __init__(self, n, q): super(Worker, self).__init__() self.n = n self.queue = q if not self.qh: Worker.qh = QueueHandler(self.queue) Worker.root.addHandler(self.qh) Worker.root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) print self.root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n)
使用相同的结果,现在队列处理程序不会一次又一次地添加,但仍然存在重复的日志条目,即使只有一个工作程序也是如此.
编辑2:我已经改变了一点代码.我更改了侦听器进程,现在使用了QueueListener(这就是我在开始时的意图),将主代码移动到了一个类.
import sys import logging from logging import INFO from multiprocessing import Process, Queue as mpQueue import threading import time from logutils.queue import QueueListener, QueueHandler root = logging.getLogger() added_qh = False class Worker(Process): def __init__(self, logconf, n, qh): super(Worker, self).__init__() self.n = n self.logconf = logconf # global root global added_qh if not added_qh: added_qh = True root.addHandler(qh) root.setLevel(logging.DEBUG) self.logger = logging.getLogger("W%i"%self.n) #print root.handlers def run(self): self.logger.info("Worker %i Starting"%self.n) for i in xrange(10): self.logger.log(INFO, "testing %i"%i) self.logger.log(INFO, "Completed %i"%self.n) class Main(object): def __init__(self): pass def start(self): mpq = mpQueue(-1) qh = QueueHandler(mpq) h = logging.StreamHandler() ql = QueueListener(mpq, h) #h.setFormatter(f) root.addHandler(qh) l = logging.getLogger("Test") l.setLevel(logging.DEBUG) workers=[] for i in xrange(15): worker = Worker(logconf, i, qh) worker.daemon = True worker.start() workers.append(worker) for worker in workers: print "joining worker: {}".format(worker) worker.join() mpq.put_nowait(None) ql.start() # listener.join() for i in xrange(10): l.info("testing %i"%i) if __name__ == "__main__": x = Main() x.start() time.sleep(10) print "Finish"
现在它主要起作用,直到我达到一定数量的工人(~15),因为某些原因主要课程在de join中被阻止而其他工作人员什么都不做.