热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

springboot-kafkajava8时间序列化

如何解决《springboot-kafkajava8时间序列化》经验,为你挑选了1个好方法。

目前与spring-kafka 2.1.8.RELEASE一起使用spring-boot 2.0.4。我想简化交换,将对象发送到kafka模板,并使用json作为格式。但是,一些需要反序列化的消息包含java.time.LocalDateTime。所以我的设置是

配置(application.yml):

spring:
  jackson:
    serialization:
      write_dates_as_timestamps: false
  kafka:
    consumer:
      group-id: foo
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: my.package
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        spring.json.trusted.packages: my.package
      retries: 3
      acks: all

至于应该工作的杰克逊依赖关系,我的依赖关系树是:

[INFO] |  |  +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile
[INFO] |  |  |  +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile
[INFO] |  |  |  \- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
[INFO] |  |  \- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.6:compile
[INFO] |  |  +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.6:compile
[INFO] |  |  \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.6:compile

但是,这会产生以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Foo-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 45, 50, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 97, 110, 116, 111, 110, 105, 111, 34, 44, 34, 99, 114, 101, 97, 116, 101, 100, 34, 58, 123, 34, 104, 111, 117, 114, 34, 58, 49, 56, 44, 34, 109, 105, 110, 117, 116, 101, 34, 58, 52, 48, 44, 34, 115, 101, 99, 111, 110, 100, 34, 58, 53, 49, 44, 34, 110, 97, 110, 111, 34, 58, 51, 50, 53, 48, 48, 48, 48, 48, 48, 44, 34, 100, 97, 121, 79, 102, 89, 101, 97, 114, 34, 58, 50, 52, 48, 44, 34, 100, 97, 121, 79, 102, 87, 101, 101, 107, 34, 58, 34, 84, 85, 69, 83, 68, 65, 89, 34, 44, 34, 109, 111, 110, 116, 104, 34, 58, 34, 65, 85, 71, 85, 83, 84, 34, 44, 34, 100, 97, 121, 79, 102, 77, 111, 110, 116, 104, 34, 58, 50, 56, 44, 34, 121, 101, 97, 114, 34, 58, 50, 48, 49, 56, 44, 34, 109, 111, 110, 116, 104, 86, 97, 108, 117, 101, 34, 58, 56, 44, 34, 99, 104, 114, 111, 110, 111, 108, 111, 103, 121, 34, 58, 123, 34, 99, 97, 108, 101, 110, 100, 97, 114, 84, 121, 112, 101, 34, 58, 34, 105, 115, 111, 56, 54, 48, 49, 34, 44, 34, 105, 100, 34, 58, 34, 73, 83, 79, 34, 125, 125, 44, 34, 97, 103, 103, 114, 101, 103, 97, 116, 101, 73, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 48, 44, 34, 112, 114, 105, 122, 101, 73, 110, 102, 111, 34, 58, 123, 34, 110, 117, 109, 98, 101, 114, 79, 102, 87, 105, 110, 110, 101, 114, 115, 34, 58, 49, 44, 34, 112, 114, 105, 122, 101, 80, 111, 111, 108, 34, 58, 49, 48, 44, 34, 112, 114, 105, 122, 101, 84, 97, 98, 108, 101, 34, 58, 91, 49, 48, 93, 125, 125]] from topic [Foo]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Expected array or string.
 at [Source: (byte[])"{"id":"a2c28ccea1bb43aa85215ce1690433b3-2","author":"foo","created":{"hour":18,"minute":40,"second":51,"nano":325000000,"dayOfYear":240,"dayOfWeek":"TUESDAY","month":"AUGUST","dayOfMonth":28,"year":2018,"monthValue":8,"chronology":{"calendarType":"iso8601","id":"ISO"}},"aggregateId":"a2c28ccea1bb43aa85215ce1690433b3","version":0,"prizeInfo":{"numberOfWinners":1,"prizePool":10,"prizeTable":[10]}}"; line: 1, column: 73] (through reference chain: my.package.Foo["created"])
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.JSR310DeserializerBase._handleUnexpectedToken(JSR310DeserializerBase.java:99) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize(LocalDateTimeDeserializer.java:141) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize(LocalDateTimeDeserializer.java:39) ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:136) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:369) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:228) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1154) ~[kafka-clients-1.0.2.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) ~[kafka-clients-1.0.2.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:699) ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

因此,我尝试了以下操作,但到目前为止还没有成功:1.Custom ObjectMapper声明为bean

@Bean
public ObjectMapper objectMapper() {
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(new JavaTimeModule());
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
    return objectMapper;
}

2. LocalDateTime字段上的Serializer批注

为了确保我具有正确的对象映射器设置和必要的依赖关系,我创建了一个rest控制器,将响应模拟为json,作为rest终结点返回带有日期时间字段的对象,这将正确返回;样品:

[
    {
        "playerId": "foo",
        "points": 10,
        "entryDateTime": "2018-08-19T09:30:20.051"
    },
    {
        "playerId": "bar",
        "points": 3,
        "entryDateTime": "2018-08-27T09:30:20.051"
    }
]

Farhad Abdol.. 5

将Json(De)Serializer构造函数与对象映射器参数一起使用对我来说很有效。我在对具有java.time.Instant字段的pojo进行反序列化时遇到了麻烦,因此,在对同一个org.apache.kafka.common.errors.SerializationException*** 进行数小时的故障排除后,我终于意识到了(借助此处的答案)不是春天,而是卡夫卡自己的序列化。给定我拥有的objectmapper bean,我通过将其自动装配到我的kafka生产者和消费者设置的JsonSerializerJsonDeserializer参数中来解决。

@Configuration
public class JacksonConfig {

    @Bean
    @Primary
    public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
        ObjectMapper objectMapper = builder.build();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return objectMapper;
    }
}

@Configuration
public class KafkaProducerConfig {

    @Value(value="${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public KafkaTemplate orderKafkaTemplate(){
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(props, new StringSerializer(), new JsonSerializer(objectMapper));

        return new KafkaTemplate<>(producerFactory);
    }
}

@Configuration
public class KafkaConsumerConfig {

    @Value(value="${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value="${kafka.consumer.groupId}")
    private String groupId;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ConcurrentKafkaListenerContainerFactory orderKafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        ConsumerFactory cOnsumerFactory= new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Order.class, objectMapper));

        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

}

(为进一步清晰起见,显示了Pojo)

public class Order {

    private long accountId;
    private long assetId;
    private long quantity;
    private long price;
    private Instant createdOn = Instant.now();

    // no args constructor, constructor with params for all fields except createdOn, and getters/setters for all fields omitted

***通常的原因是: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of 'java.time.Instant' (no Creators, like default construct, exist): cannot deserialize from object value (no delegate- or property-based Creator) at [Source: (byte[])"{"accountId":1,"assetId":2,"quantity":100,"price":1000,"createdOn":{"epochSecond":1558570217,"nano":728000000}}"...



1> Farhad Abdol..:

将Json(De)Serializer构造函数与对象映射器参数一起使用对我来说很有效。我在对具有java.time.Instant字段的pojo进行反序列化时遇到了麻烦,因此,在对同一个org.apache.kafka.common.errors.SerializationException*** 进行数小时的故障排除后,我终于意识到了(借助此处的答案)不是春天,而是卡夫卡自己的序列化。给定我拥有的objectmapper bean,我通过将其自动装配到我的kafka生产者和消费者设置的JsonSerializerJsonDeserializer参数中来解决。

@Configuration
public class JacksonConfig {

    @Bean
    @Primary
    public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
        ObjectMapper objectMapper = builder.build();
        objectMapper.registerModule(new JavaTimeModule());
        objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return objectMapper;
    }
}

@Configuration
public class KafkaProducerConfig {

    @Value(value="${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public KafkaTemplate orderKafkaTemplate(){
        Map props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(props, new StringSerializer(), new JsonSerializer(objectMapper));

        return new KafkaTemplate<>(producerFactory);
    }
}

@Configuration
public class KafkaConsumerConfig {

    @Value(value="${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value="${kafka.consumer.groupId}")
    private String groupId;

    @Autowired
    private ObjectMapper objectMapper;

    @Bean
    public ConcurrentKafkaListenerContainerFactory orderKafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();

        Map props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        ConsumerFactory cOnsumerFactory= new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Order.class, objectMapper));

        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

}

(为进一步清晰起见,显示了Pojo)

public class Order {

    private long accountId;
    private long assetId;
    private long quantity;
    private long price;
    private Instant createdOn = Instant.now();

    // no args constructor, constructor with params for all fields except createdOn, and getters/setters for all fields omitted

***通常的原因是: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of 'java.time.Instant' (no Creators, like default construct, exist): cannot deserialize from object value (no delegate- or property-based Creator) at [Source: (byte[])"{"accountId":1,"assetId":2,"quantity":100,"price":1000,"createdOn":{"epochSecond":1558570217,"nano":728000000}}"...


推荐阅读
  • Jboss的EJB部署描述符standardjaws.xml配置步骤详解
    本文详细介绍了Jboss的EJB部署描述符standardjaws.xml的配置步骤,包括映射CMP实体EJB、数据源连接池的获取以及数据库配置等内容。 ... [详细]
  • Spring常用注解(绝对经典),全靠这份Java知识点PDF大全
    本文介绍了Spring常用注解和注入bean的注解,包括@Bean、@Autowired、@Inject等,同时提供了一个Java知识点PDF大全的资源链接。其中详细介绍了ColorFactoryBean的使用,以及@Autowired和@Inject的区别和用法。此外,还提到了@Required属性的配置和使用。 ... [详细]
  • VScode格式化文档换行或不换行的设置方法
    本文介绍了在VScode中设置格式化文档换行或不换行的方法,包括使用插件和修改settings.json文件的内容。详细步骤为:找到settings.json文件,将其中的代码替换为指定的代码。 ... [详细]
  • 本文介绍了Java工具类库Hutool,该工具包封装了对文件、流、加密解密、转码、正则、线程、XML等JDK方法的封装,并提供了各种Util工具类。同时,还介绍了Hutool的组件,包括动态代理、布隆过滤、缓存、定时任务等功能。该工具包可以简化Java代码,提高开发效率。 ... [详细]
  • 本文介绍了Redis的基础数据结构string的应用场景,并以面试的形式进行问答讲解,帮助读者更好地理解和应用Redis。同时,描述了一位面试者的心理状态和面试官的行为。 ... [详细]
  • React项目中运用React技巧解决实际问题的总结
    本文总结了在React项目中如何运用React技巧解决一些实际问题,包括取消请求和页面卸载的关联,利用useEffect和AbortController等技术实现请求的取消。文章中的代码是简化后的例子,但思想是相通的。 ... [详细]
  • uniapp开发H5解决跨域问题的两种代理方法
    本文介绍了uniapp开发H5解决跨域问题的两种代理方法,分别是在manifest.json文件和vue.config.js文件中设置代理。通过设置代理根域名和配置路径别名,可以实现H5页面的跨域访问。同时还介绍了如何开启内网穿透,让外网的人可以访问到本地调试的H5页面。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 在说Hibernate映射前,我们先来了解下对象关系映射ORM。ORM的实现思想就是将关系数据库中表的数据映射成对象,以对象的形式展现。这样开发人员就可以把对数据库的操作转化为对 ... [详细]
  • 海马s5近光灯能否直接更换为H7?
    本文主要介绍了海马s5车型的近光灯是否可以直接更换为H7灯泡,并提供了完整的教程下载地址。此外,还详细讲解了DSP功能函数中的数据拷贝、数据填充和浮点数转换为定点数的相关内容。 ... [详细]
  • LVS实现负载均衡的原理LVS负载均衡负载均衡集群是LoadBalance集群。是一种将网络上的访问流量分布于各个节点,以降低服务器压力,更好的向客户端 ... [详细]
  • OpenMap教程4 – 图层概述
    本文介绍了OpenMap教程4中关于地图图层的内容,包括将ShapeLayer添加到MapBean中的方法,OpenMap支持的图层类型以及使用BufferedLayer创建图像的MapBean。此外,还介绍了Layer背景标志的作用和OMGraphicHandlerLayer的基础层类。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 使用Spring AOP实现切面编程的步骤和注意事项
    本文介绍了使用Spring AOP实现切面编程的步骤和注意事项。首先解释了@EnableAspectJAutoProxy、@Aspect、@Pointcut等注解的作用,并介绍了实现AOP功能的方法。然后详细介绍了创建切面、编写测试代码的过程,并展示了测试结果。接着讲解了关于环绕通知的使用方法,并修改了FirstTangent类以添加环绕通知方法。最后介绍了利用AOP拦截注解的方法,只需修改全局切入点即可实现。使用Spring AOP进行切面编程可以方便地实现对代码的增强和拦截。 ... [详细]
author-avatar
李老鱼儿_654
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有