我有一个大型复杂的应用程序,它大量使用Python日志记录模块.
我需要开始将这些日志记录到Kafka集群中,并且需要确保我不会在此过程中更改数据.
对我来说,理想的解决方案是为Kafka创建一个新的处理程序 - 并允许日志并行转发旧的日志记录解决方案和kafka.然后最终关闭旧的日志处理程序,然后发送给Kafka.
但是,我没有看到任何kafka日志处理程序 - 只有kafka客户端.添加kafka客户端意味着跟踪每个当前的日志记录调用并向新的kafka客户端添加单独的调用.获得相同的结果将是困难的.
处理程序实现非常简单.实际上,设置环境比实现处理程序花费的时间更多.
处理程序构造函数接受可选参数key
.如果已提供,则写入的消息将发送到此密钥指定的单个分区.如果未提供,则将以循环方式在服务器之间分发消息.
我没有对它进行过多次测试,但它很简单,我不知道这里可能出现什么问题.希望它会有用.
from kafka.client import KafkaClient from kafka.producer import SimpleProducer,KeyedProducer import logging,sys class KafkaLoggingHandler(logging.Handler): def __init__(self, host, port, topic, key=None): logging.Handler.__init__(self) self.kafka_client = KafkaClient(host, port) self.key = key if key is None: self.producer = SimpleProducer(self.kafka_client, topic) else: self.producer = KeyedProducer(self.kafka_client, topic) def emit(self, record): #drop kafka logging to avoid infinite recursion if record.name == 'kafka': return try: #use default formatting msg = self.format(record) #produce message if self.key is None: self.producer.send_messages(msg) else: self.producer.send(self.key, msg) except: import traceback ei = sys.exc_info() traceback.print_exception(ei[0], ei[1], ei[2], None, sys.stderr) del ei def close(self): self.producer.stop() logging.Handler.close(self) kh = KafkaLoggingHandler("localhost", 9092, "test_log") #OR #kh = KafkaLoggingHandler("localhost", 9092, "test_log", "key1") logger = logging.getLogger("") logger.setLevel(logging.DEBUG) logger.addHandler(kh) logger.info("The %s boxing wizards jump %s", 5, "quickly") logger.debug("The quick brown %s jumps over the lazy %s", "fox", "dog") try: import math math.exp(1000) except: logger.exception("Problem with %s", "math.exp")
PS处理程序使用此Kafka客户端:https://github.com/mumrah/kafka-python
这是一个很棒的修复程序,谢谢!在过去的几年中,该代码已进行了一些更新,并且现在不赞成使用某些功能。尽管此修复程序的总体设计非常非常有帮助,所以再次感谢您。
SimpleProducer (deprecated) --> KafkaProducer SimpleConsumer (deprecated) --> KafkaConsumer
这是我使用Kafka 1.0.0和kafka-python 1.4.2以及生产者的经过修订的代码段,因为我在另一端通过logstash进行了消耗。
希望这对您有用!
# -*- coding: utf-8 -*- """Module to test out logging to kafka.""" import json import logging from utils.kafka_handler import KafkaHandler from kafka import KafkaProducer def run_it(logger=None): """Run the actual connections.""" logger = logging.getLogger(__name__) # enable the debug logger if you want to see ALL of the lines #logging.basicConfig(level=logging.DEBUG) logger.setLevel(logging.DEBUG) kh = KafkaHandler(['localhost:9092'], 'sebtest') logger.addHandler(kh) logger.info("I'm a little logger, short and stout") logger.debug("Don't tase me bro!") if __name__ == "__main__": run_it()
# -*- coding: utf-8 -*- """Module to provide kafka handlers for internal logging facility.""" import json import logging import sys from kafka import KafkaProducer class KafkaHandler(logging.Handler): """Class to instantiate the kafka logging facility.""" def __init__(self, hostlist, topic='corp_it_testing', tls=None): """Initialize an instance of the kafka handler.""" logging.Handler.__init__(self) self.producer = KafkaProducer(bootstrap_servers=hostlist, value_serializer=lambda v: json.dumps(v).encode('utf-8'), linger_ms=10) self.topic = topic def emit(self, record): """Emit the provided record to the kafka_client producer.""" # drop kafka logging to avoid infinite recursion if 'kafka.' in record.name: return try: # apply the logger formatter msg = self.format(record) self.producer.send(self.topic, {'message': msg}) self.flush(timeout=1.0) except Exception: logging.Handler.handleError(self, record) def flush(self, timeout=None): """Flush the objects.""" self.producer.flush(timeout=timeout) def close(self): """Close the producer and clean up.""" self.acquire() try: if self.producer: self.producer.close() logging.Handler.close(self) finally: self.release()