热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

自定义DateTimeBucket

转载自:https:blog.csdn.netu010259977articledetails88683503Flink的StreamingFileSink自定义D

转载自:https://blog.csdn.net/u010259977/article/details/88683503

Flink的StreamingFileSink自定义DateTimeBucket

 

用flink消费kafka内容,通过清洗、转换、过滤后,要sink到parquet文件,需要按照事件的event进行分区生产需要写入的文件夹,如event1的发生时间在2018-03-19,而event2的发生时间在2018-03-20,这就涉及到extract它的eventtime,并生产parquet文件的bucktId,具体代码如下:

 


  1. /*

  2. * Licensed to the Apache Software Foundation (ASF) under one

  3. * or more contributor license agreements. See the NOTICE file

  4. * distributed with this work for additional information

  5. * regarding copyright ownership. The ASF licenses this file

  6. * to you under the Apache License, Version 2.0 (the

  7. * "License"); you may not use this file except in compliance

  8. * with the License. You may obtain a copy of the License at

  9. *

  10. * http://www.apache.org/licenses/LICENSE-2.0

  11. *

  12. * Unless required by applicable law or agreed to in writing, software

  13. * distributed under the License is distributed on an "AS IS" BASIS,

  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  15. * See the License for the specific language governing permissions and

  16. * limitations under the License.

  17. */

  18.  
  19. package com.hellobike.realtimeplatform.utils;

  20.  
  21. import com.hellobike.realtimeplatform.model.AccessLog;

  22. import org.apache.flink.annotation.PublicEvolving;

  23. import org.apache.flink.core.io.SimpleVersionedSerializer;

  24. import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

  25. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

  26. import org.apache.flink.util.Preconditions;

  27.  
  28. import java.text.ParseException;

  29. import java.text.SimpleDateFormat;

  30. import java.time.Instant;

  31. import java.time.ZoneId;

  32. import java.time.format.DateTimeFormatter;

  33. import java.util.Calendar;

  34.  
  35. /**

  36. * A {@link BucketAssigner} that assigns to buckets based on current system time.

  37. *

  38. *

  39. *

    The {@code DateTimeBucketer} will create directories of the following form:

  40. * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path

  41. * that was specified as a base path when creating the

  42. * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.

  43. * The {@code dateTimePath} is determined based on the current system time and the

  44. * user provided format string.

  45. *

  46. *

  47. *

    {@link DateTimeFormatter} is used to derive a date string from the current system time and

  48. * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling

  49. * files will have a granularity of hours.

  50. *

  51. *

    Example:

  52. *

  53. *

    {@code

  54. * BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");

  55. * }

  56. *

  57. *

    This will create for example the following bucket path:

  58. * {@code /base/1976-12-31-14/}

  59. */

  60. @PublicEvolving

  61. public class DateTimeBucketWithPartitionAssigner implements BucketAssigner {

  62.  
  63. private static final long serialVersionUID = 1L;

  64.  
  65. private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

  66.  
  67. private final String formatString;

  68.  
  69. private final ZoneId zoneId;

  70.  
  71. private transient DateTimeFormatter dateTimeFormatter;

  72.  
  73. /**

  74. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.

  75. */

  76. public DateTimeBucketWithPartitionAssigner() {

  77. this(DEFAULT_FORMAT_STRING);

  78. }

  79.  
  80. /**

  81. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.

  82. *

  83. * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine

  84. * the bucket id.

  85. */

  86. public DateTimeBucketWithPartitionAssigner(String formatString) {

  87. this(formatString, ZoneId.systemDefault());

  88. }

  89.  
  90. /**

  91. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.

  92. *

  93. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  94. */

  95. public DateTimeBucketWithPartitionAssigner(ZoneId zoneId) {

  96. this(DEFAULT_FORMAT_STRING, zoneId);

  97. }

  98.  
  99. /**

  100. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.

  101. *

  102. * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine

  103. * the bucket path.

  104. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  105. */

  106. public DateTimeBucketWithPartitionAssigner(String formatString, ZoneId zoneId) {

  107. this.formatString = Preconditions.checkNotNull(formatString);

  108. this.zoneId = Preconditions.checkNotNull(zoneId);

  109. }

  110.  
  111. @Override

  112. public String getBucketId(IN element, Context context) {

  113. long eventTime = 0L;

  114. if (dateTimeFormatter == null) {

  115. dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

  116. }

  117. if (element instanceof AccessLog) {

  118. if (-1 == ((AccessLog) element).getParseResult() && "".equals(((AccessLog) element).getEventTimestamp())) {

  119. return "pt=errorTime";

  120. }

  121.  
  122. eventTime = Long.valueOf(((AccessLog) element).getEventTimestamp());

  123.  
  124. return "pt=" + dateTimeFormatter.format(Instant.ofEpochMilli(eventTime));

  125. } else {

  126. return "pt=errorObjects";

  127. }

  128. }

  129.  
  130.  
  131. @Override

  132. public SimpleVersionedSerializer getSerializer() {

  133. return SimpleVersionedStringSerializer.INSTANCE;

  134. }

  135.  
  136. @Override

  137. public String toString() {

  138. return "DateTimeBucketAssigner{" +

  139. "formatString='" + formatString + '\'' +

  140. ", zoneId=" + zoneId +

  141. '}';

  142. }

  143. }

指定自定义的DateTime Assigner就可以实现基于event time分桶写入parquet文件

 


推荐阅读
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • Android系统移植与调试之如何修改Android设备状态条上音量加减键在横竖屏切换的时候的显示于隐藏
    本文介绍了如何修改Android设备状态条上音量加减键在横竖屏切换时的显示与隐藏。通过修改系统文件system_bar.xml实现了该功能,并分享了解决思路和经验。 ... [详细]
  • Linux重启网络命令实例及关机和重启示例教程
    本文介绍了Linux系统中重启网络命令的实例,以及使用不同方式关机和重启系统的示例教程。包括使用图形界面和控制台访问系统的方法,以及使用shutdown命令进行系统关机和重启的句法和用法。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • Android Studio Bumblebee | 2021.1.1(大黄蜂版本使用介绍)
    本文介绍了Android Studio Bumblebee | 2021.1.1(大黄蜂版本)的使用方法和相关知识,包括Gradle的介绍、设备管理器的配置、无线调试、新版本问题等内容。同时还提供了更新版本的下载地址和启动页面截图。 ... [详细]
  • CF:3D City Model(小思维)问题解析和代码实现
    本文通过解析CF:3D City Model问题,介绍了问题的背景和要求,并给出了相应的代码实现。该问题涉及到在一个矩形的网格上建造城市的情景,每个网格单元可以作为建筑的基础,建筑由多个立方体叠加而成。文章详细讲解了问题的解决思路,并给出了相应的代码实现供读者参考。 ... [详细]
  • 本文介绍了如何使用Express App提供静态文件,同时提到了一些不需要使用的文件,如package.json和/.ssh/known_hosts,并解释了为什么app.get('*')无法捕获所有请求以及为什么app.use(express.static(__dirname))可能会提供不需要的文件。 ... [详细]
  • 本文介绍了在Mac上搭建php环境后无法使用localhost连接mysql的问题,并通过将localhost替换为127.0.0.1或本机IP解决了该问题。文章解释了localhost和127.0.0.1的区别,指出了使用socket方式连接导致连接失败的原因。此外,还提供了相关链接供读者深入了解。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • r2dbc配置多数据源
    R2dbc配置多数据源问题根据官网配置r2dbc连接mysql多数据源所遇到的问题pom配置可以参考官网,不过我这样配置会报错我并没有这样配置将以下内容添加到pom.xml文件d ... [详细]
  • IjustinheritedsomewebpageswhichusesMooTools.IneverusedMooTools.NowIneedtoaddsomef ... [详细]
  • JDK源码学习之HashTable(附带面试题)的学习笔记
    本文介绍了JDK源码学习之HashTable(附带面试题)的学习笔记,包括HashTable的定义、数据类型、与HashMap的关系和区别。文章提供了干货,并附带了其他相关主题的学习笔记。 ... [详细]
  • iOS超签签名服务器搭建及其优劣势
    本文介绍了搭建iOS超签签名服务器的原因和优势,包括不掉签、用户可以直接安装不需要信任、体验好等。同时也提到了超签的劣势,即一个证书只能安装100个,成本较高。文章还详细介绍了超签的实现原理,包括用户请求服务器安装mobileconfig文件、服务器调用苹果接口添加udid等步骤。最后,还提到了生成mobileconfig文件和导出AppleWorldwideDeveloperRelationsCertificationAuthority证书的方法。 ... [详细]
  • 本文介绍了一个适用于PHP应用快速接入TRX和TRC20数字资产的开发包,该开发包支持使用自有Tron区块链节点的应用场景,也支持基于Tron官方公共API服务的轻量级部署场景。提供的功能包括生成地址、验证地址、查询余额、交易转账、查询最新区块和查询交易信息等。详细信息可参考tron-php的Github地址:https://github.com/Fenguoz/tron-php。 ... [详细]
  • Activiti7流程定义开发笔记
    本文介绍了Activiti7流程定义的开发笔记,包括流程定义的概念、使用activiti-explorer和activiti-eclipse-designer进行建模的方式,以及生成流程图的方法。还介绍了流程定义部署的概念和步骤,包括将bpmn和png文件添加部署到activiti数据库中的方法,以及使用ZIP包进行部署的方式。同时还提到了activiti.cfg.xml文件的作用。 ... [详细]
author-avatar
风清淡雅的梦
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有