继承multiprocessing.Process实现了一个worker类,在父进程中,自己实现了一个最多启动N的限制(出问题的环境是30个)。
实际运行中发现,大约有万分之二(当前每天运行46000+次,大约出现11次)的概率,子进程创建后run方法未执行。
代码和日志如下,注意打印日志的语句
父进程启动子进程(父进程里还有一个控制并发进程数量的逻辑,如果需要的话我贴出来):
... def run_task(self, task): logging.info('execute monitor %s' % task['id']) worker = execute_worker.ExecuteWorkerProcess(task) logging.debug('execute process start %s' % task['id']) worker.start() logging.info('worker pid is %s (%s)' % (worker.pid, task['id'])) logging.debug('execute process started %s' % task['id']) self.worker_pool.append(worker) ...
子进程run方法
class ExecuteWorkerProcess(multiprocessing.Process): ... def __init__(self, task): super(ExecuteWorkerProcess, self).__init__() self.stopping = False self.task = task self.worker = ExecuteWorker(task) if 'task' in task: self.routine = False else: self.routine = True self.zk = None logging.debug('process created %s' % self.task['id']) ... def run(self): logging.debug('process start %s' % self.task['id']) try: logging.debug('process run before %s' % self.task['id']) self._run() logging.debug('process run after %s' % self.task['id']) except: logging.exception('') title = u'监控执行进程报错' text = u'监控项id:%s\n错误信息:\n%s' % (self.task['id'], traceback.format_exc()) warning.email_warning(title, text, to_dev=True) logging.debug('process start done %s' % self.task['id']) ...
出现问题的进程日志如下:
正常任务日志如下:
可以看到正常和异常的日志主进程中都打印除了子进程的pid,但是异常继承子进程run行数的第一行没有执行。
是否有人遇到过?这个是不是multiprocessing.Process的坑,有没有规避办法...
原因已找到,由于主进程中使用了thread+mutiprocessing(fork),导致logging出现死锁,现象就是遇到子进程里第一句logging就hang住。问题只会发生在Linux下。
看了stckoverflow这个答案找到的复现方法,另一个回答,解决方案
复现demo:
#coding=utf-8 import os import time import logging import threading import multiprocessing import logging.handlers def init_log(log_path, level=logging.INFO, when="midnight", backup=7, format="%(levelname)s:[%(asctime)s][%(filename)s:%(lineno)d][%(process)s][%(thread)d] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"): formatter = logging.Formatter(format, datefmt) logger = logging.getLogger() logger.setLevel(level) dir = os.path.dirname(log_path) if not os.path.isdir(dir): os.makedirs(dir) handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log", when=when, backupCount=backup) handler.setLevel(level) handler.setFormatter(formatter) logger.addHandler(handler) stop = False class Worker(multiprocessing.Process): u""" 多进程方式执行任务,使用CPU密集型 """ def __init__(self): super(Worker, self).__init__() self.reset_ts = time.time() + 3 self.stopping = False def run(self): logging.info('Process pid is %s' % os.getpid()) def check_timeout(self, now): u""" 若当前时间已超过复位时间,则结束进程 :param now: :return: """ global stop if now > self.reset_ts and not self.stopping: self.stopping = True logging.info('error pid is %s' % os.getpid()) stop = True def main(): global stop worker_pool = [] while 1: if worker_pool: now = time.time() for worker in worker_pool: worker.check_timeout(now) if stop: logging.error('Process not run, exit!') exit(-1) alive_workers = [worker for worker in worker_pool if worker.is_alive()] over_workers = list(set(alive_workers) ^ set(worker_pool)) for over_worker in over_workers: over_worker.join() worker_pool = alive_workers if len(worker_pool) < 1000: logging.info('create worker') worker = Worker() worker.start() logging.info('worker pid is %s' % worker.pid) worker_pool.append(worker) time.sleep(0.001) class ExecuteThread(threading.Thread): def run(self): main() class ExecuteThread2(threading.Thread): def run(self): global stop while 1: logging.info('main thread') time.sleep(0.001) if stop: exit(-1) if __name__ == '__main__': init_log('/yourpath/timeout') #main() thread = ExecuteThread() thread2 = ExecuteThread2() thread.start() thread2.start() thread.join() thread2.join()
复现完了记得清掉hang住的进程....