作者:冰雪聪明 | 来源:互联网 | 2022-12-08 23:36
我正在与kafka一起工作,并且一直被要求对发送给Kafka的消息进行验证,但是我不喜欢我认为这就是为什么我希望有人对此提供建议的解决方案。
我们有许多生产者不在我们的控制范围之内,因此他们可以发送任何格式的任何消息,并且我们最多可以发送8000万条记录,并且应在2小时内完成处理。有人要求我:
验证格式(Json,因为它必须与mongoDB兼容)。
验证发送的某些字段。
重命名一些字段
最后两个请求将使用存储在MongoDB中的参数来完成。所有这些工作都应假设我们不是唯一一个制造消费者的公司,因此应该对我们的服务进行“简单”调用以进行此验证。有任何想法吗?
1> Mickael Mais..:
这通常是通过Kafka Streams作业完成的。
您有“原始”输入主题,生产者在其中发送事件。然后,Streams作业将从这些主题中读取并将有效记录写入“干净”主题中。在Streams中,您可以进行各种处理以检查记录或在需要时丰富记录。
您可能还希望将不良记录写入死信队列主题,以便您检查发生这些错误的原因。
然后,您的消费者可以阅读干净的主题,以确保他们只看到经过验证的数据。
该解决方案给记录增加了一些延迟,因为在到达消费者之前必须对其进行“处理”。您还想在靠近Kafka集群的地方运行Streams作业,因为您要验证的数量取决于它可能需要摄取大量数据。
另请参阅使用Kafka的Streams API处理不良消息,其中详细介绍了其中一些概念。