如何从Python日志模块写入Kafka?

 蒋小宁蒋小羊 发布于 2023-02-03 10:15

我有一个大型复杂的应用程序,它大量使用Python日志记录模块.

我需要开始将这些日志记录到Kafka集群中,并且需要确保我不会在此过程中更改数据.

对我来说,理想的解决方案是为Kafka创建一个新的处理程序 - 并允许日志并行转发旧的日志记录解决方案和kafka.然后最终关闭旧的日志处理程序,然后发送给Kafka.

但是,我没有看到任何kafka日志处理程序 - 只有kafka客户端.添加kafka客户端意味着跟踪每个当前的日志记录调用并向新的kafka客户端添加单独的调用.获得相同的结果将是困难的.

2 个回答
  • 处理程序实现非常简单.实际上,设置环境比实现处理程序花费的时间更多.

    处理程序构造函数接受可选参数key.如果已提供,则写入的消息将发送到此密钥指定的单个分区.如果未提供,则将以循环方式在服务器之间分发消息.

    我没有对它进行过多次测试,但它很简单,我不知道这里可能出现什么问题.希望它会有用.

    from kafka.client import KafkaClient
    from kafka.producer import SimpleProducer,KeyedProducer
    import logging,sys
    
    class KafkaLoggingHandler(logging.Handler):
    
        def __init__(self, host, port, topic, key=None):
            logging.Handler.__init__(self)
            self.kafka_client = KafkaClient(host, port)
            self.key = key
            if key is None:
                self.producer = SimpleProducer(self.kafka_client, topic)
            else:
                self.producer = KeyedProducer(self.kafka_client, topic)
    
        def emit(self, record):
            #drop kafka logging to avoid infinite recursion
            if record.name == 'kafka':
                return
            try:
                #use default formatting
                msg = self.format(record)
                #produce message
                if self.key is None:
                    self.producer.send_messages(msg)
                else:
                    self.producer.send(self.key, msg)
            except:
                import traceback
                ei = sys.exc_info()
                traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr)
                del ei
    
        def close(self):
            self.producer.stop()
            logging.Handler.close(self)
    
    kh = KafkaLoggingHandler("localhost", 9092, "test_log")
    #OR
    #kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1")
    
    logger = logging.getLogger("")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(kh)
    logger.info("The %s boxing wizards jump %s", 5, "quickly")
    logger.debug("The quick brown %s jumps over the lazy %s", "fox",  "dog")
    try:
        import math
        math.exp(1000)
    except:
        logger.exception("Problem with %s", "math.exp")
    

    PS处理程序使用此Kafka客户端:https://github.com/mumrah/kafka-python

    2023-02-03 10:17 回答
  • 这是一个很棒的修复程序,谢谢!在过去的几年中,该代码已进行了一些更新,并且现在不赞成使用某些功能。尽管此修复程序的总体设计非常非常有帮助,所以再次感谢您。

    SimpleProducer (deprecated) --> KafkaProducer
    SimpleConsumer (deprecated) --> KafkaConsumer
    

    这是我使用Kafka 1.0.0和kafka-python 1.4.2以及生产者的经过修订的代码段,因为我在另一端通过logstash进行了消耗。

    希望这对您有用!

    tester.py(主例程)

    # -*- coding: utf-8 -*-
    """Module to test out logging to kafka."""
    
    import json
    import logging
    
    from utils.kafka_handler import KafkaHandler
    from kafka import KafkaProducer
    
    
    def run_it(logger=None):
        """Run the actual connections."""
    
        logger = logging.getLogger(__name__)
        # enable the debug logger if you want to see ALL of the lines
        #logging.basicConfig(level=logging.DEBUG)
        logger.setLevel(logging.DEBUG)
    
        kh = KafkaHandler(['localhost:9092'], 'sebtest')
        logger.addHandler(kh)
    
        logger.info("I'm a little logger, short and stout")
        logger.debug("Don't tase me bro!")
    
    
    if __name__ == "__main__":
        run_it()
    

    utils / kafka_handler.py(记录实用程序)

    # -*- coding: utf-8 -*-
    """Module to provide kafka handlers for internal logging facility."""
    
    import json
    import logging
    import sys
    
    from kafka import KafkaProducer
    
    
    class KafkaHandler(logging.Handler):
        """Class to instantiate the kafka logging facility."""
    
        def __init__(self, hostlist, topic='corp_it_testing', tls=None):
            """Initialize an instance of the kafka handler."""
            logging.Handler.__init__(self)
            self.producer = KafkaProducer(bootstrap_servers=hostlist,
                                          value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                                          linger_ms=10)
            self.topic = topic
    
        def emit(self, record):
            """Emit the provided record to the kafka_client producer."""
            # drop kafka logging to avoid infinite recursion
            if 'kafka.' in record.name:
                return
    
            try:
                # apply the logger formatter
                msg = self.format(record)
                self.producer.send(self.topic, {'message': msg})
                self.flush(timeout=1.0)
            except Exception:
                logging.Handler.handleError(self, record)
    
        def flush(self, timeout=None):
            """Flush the objects."""
            self.producer.flush(timeout=timeout)
    
        def close(self):
            """Close the producer and clean up."""
            self.acquire()
            try:
                if self.producer:
                    self.producer.close()
    
                logging.Handler.close(self)
            finally:
                self.release()
    

    2023-02-03 10:17 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有