热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

自定义DateTimeBucket

转载自:https:blog.csdn.netu010259977articledetails88683503Flink的StreamingFileSink自定义D

转载自:https://blog.csdn.net/u010259977/article/details/88683503

Flink的StreamingFileSink自定义DateTimeBucket

 

用flink消费kafka内容,通过清洗、转换、过滤后,要sink到parquet文件,需要按照事件的event进行分区生产需要写入的文件夹,如event1的发生时间在2018-03-19,而event2的发生时间在2018-03-20,这就涉及到extract它的eventtime,并生产parquet文件的bucktId,具体代码如下:

 


  1. /*

  2. * Licensed to the Apache Software Foundation (ASF) under one

  3. * or more contributor license agreements. See the NOTICE file

  4. * distributed with this work for additional information

  5. * regarding copyright ownership. The ASF licenses this file

  6. * to you under the Apache License, Version 2.0 (the

  7. * "License"); you may not use this file except in compliance

  8. * with the License. You may obtain a copy of the License at

  9. *

  10. * http://www.apache.org/licenses/LICENSE-2.0

  11. *

  12. * Unless required by applicable law or agreed to in writing, software

  13. * distributed under the License is distributed on an "AS IS" BASIS,

  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  15. * See the License for the specific language governing permissions and

  16. * limitations under the License.

  17. */

  18.  
  19. package com.hellobike.realtimeplatform.utils;

  20.  
  21. import com.hellobike.realtimeplatform.model.AccessLog;

  22. import org.apache.flink.annotation.PublicEvolving;

  23. import org.apache.flink.core.io.SimpleVersionedSerializer;

  24. import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

  25. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

  26. import org.apache.flink.util.Preconditions;

  27.  
  28. import java.text.ParseException;

  29. import java.text.SimpleDateFormat;

  30. import java.time.Instant;

  31. import java.time.ZoneId;

  32. import java.time.format.DateTimeFormatter;

  33. import java.util.Calendar;

  34.  
  35. /**

  36. * A {@link BucketAssigner} that assigns to buckets based on current system time.

  37. *

  38. *

  39. *

    The {@code DateTimeBucketer} will create directories of the following form:

  40. * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path

  41. * that was specified as a base path when creating the

  42. * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.

  43. * The {@code dateTimePath} is determined based on the current system time and the

  44. * user provided format string.

  45. *

  46. *

  47. *

    {@link DateTimeFormatter} is used to derive a date string from the current system time and

  48. * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling

  49. * files will have a granularity of hours.

  50. *

  51. *

    Example:

  52. *

  53. *

    {@code

  54. * BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");

  55. * }

  56. *

  57. *

    This will create for example the following bucket path:

  58. * {@code /base/1976-12-31-14/}

  59. */

  60. @PublicEvolving

  61. public class DateTimeBucketWithPartitionAssigner implements BucketAssigner {

  62.  
  63. private static final long serialVersionUID = 1L;

  64.  
  65. private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

  66.  
  67. private final String formatString;

  68.  
  69. private final ZoneId zoneId;

  70.  
  71. private transient DateTimeFormatter dateTimeFormatter;

  72.  
  73. /**

  74. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.

  75. */

  76. public DateTimeBucketWithPartitionAssigner() {

  77. this(DEFAULT_FORMAT_STRING);

  78. }

  79.  
  80. /**

  81. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.

  82. *

  83. * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine

  84. * the bucket id.

  85. */

  86. public DateTimeBucketWithPartitionAssigner(String formatString) {

  87. this(formatString, ZoneId.systemDefault());

  88. }

  89.  
  90. /**

  91. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.

  92. *

  93. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  94. */

  95. public DateTimeBucketWithPartitionAssigner(ZoneId zoneId) {

  96. this(DEFAULT_FORMAT_STRING, zoneId);

  97. }

  98.  
  99. /**

  100. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.

  101. *

  102. * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine

  103. * the bucket path.

  104. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  105. */

  106. public DateTimeBucketWithPartitionAssigner(String formatString, ZoneId zoneId) {

  107. this.formatString = Preconditions.checkNotNull(formatString);

  108. this.zoneId = Preconditions.checkNotNull(zoneId);

  109. }

  110.  
  111. @Override

  112. public String getBucketId(IN element, Context context) {

  113. long eventTime = 0L;

  114. if (dateTimeFormatter == null) {

  115. dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

  116. }

  117. if (element instanceof AccessLog) {

  118. if (-1 == ((AccessLog) element).getParseResult() && "".equals(((AccessLog) element).getEventTimestamp())) {

  119. return "pt=errorTime";

  120. }

  121.  
  122. eventTime = Long.valueOf(((AccessLog) element).getEventTimestamp());

  123.  
  124. return "pt=" + dateTimeFormatter.format(Instant.ofEpochMilli(eventTime));

  125. } else {

  126. return "pt=errorObjects";

  127. }

  128. }

  129.  
  130.  
  131. @Override

  132. public SimpleVersionedSerializer getSerializer() {

  133. return SimpleVersionedStringSerializer.INSTANCE;

  134. }

  135.  
  136. @Override

  137. public String toString() {

  138. return "DateTimeBucketAssigner{" +

  139. "formatString='" + formatString + '\'' +

  140. ", zoneId=" + zoneId +

  141. '}';

  142. }

  143. }

指定自定义的DateTime Assigner就可以实现基于event time分桶写入parquet文件

 


推荐阅读
  • springboot基于redis配置session共享项目环境配置pom.xml引入依赖application.properties配置Cookie序列化(高版本不需要)测试启 ... [详细]
  • 向QTextEdit拖放文件的方法及实现步骤
    本文介绍了在使用QTextEdit时如何实现拖放文件的功能,包括相关的方法和实现步骤。通过重写dragEnterEvent和dropEvent函数,并结合QMimeData和QUrl等类,可以轻松实现向QTextEdit拖放文件的功能。详细的代码实现和说明可以参考本文提供的示例代码。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • mac php错误日志配置方法及错误级别修改
    本文介绍了在mac环境下配置php错误日志的方法,包括修改php.ini文件和httpd.conf文件的操作步骤。同时还介绍了如何修改错误级别,以及相应的错误级别参考链接。 ... [详细]
  • 本文介绍了使用Spark实现低配版高斯朴素贝叶斯模型的原因和原理。随着数据量的增大,单机上运行高斯朴素贝叶斯模型会变得很慢,因此考虑使用Spark来加速运行。然而,Spark的MLlib并没有实现高斯朴素贝叶斯模型,因此需要自己动手实现。文章还介绍了朴素贝叶斯的原理和公式,并对具有多个特征和类别的模型进行了讨论。最后,作者总结了实现低配版高斯朴素贝叶斯模型的步骤。 ... [详细]
  • Android日历提醒软件开源项目分享及使用教程
    本文介绍了一款名为Android日历提醒软件的开源项目,作者分享了该项目的代码和使用教程,并提供了GitHub项目地址。文章详细介绍了该软件的主界面风格、日程信息的分类查看功能,以及添加日程提醒和查看详情的界面。同时,作者还提醒了读者在使用过程中可能遇到的Android6.0权限问题,并提供了解决方法。 ... [详细]
  • 本文介绍了在Cpp中将字符串形式的数值转换为int或float等数值类型的方法,主要使用了strtol、strtod和strtoul函数。这些函数可以将以null结尾的字符串转换为long int、double或unsigned long类型的数值,且支持任意进制的字符串转换。相比之下,atoi函数只能转换十进制数值且没有错误返回。 ... [详细]
  • 大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记
    本文介绍了大数据Hadoop生态(20)MapReduce框架原理OutputFormat的开发笔记,包括outputFormat接口实现类、自定义outputFormat步骤和案例。案例中将包含nty的日志输出到nty.log文件,其他日志输出到other.log文件。同时提供了一些相关网址供参考。 ... [详细]
  • 本文讨论了如何使用GStreamer来删除H264格式视频文件中的中间部分,而不需要进行重编码。作者提出了使用gst_element_seek(...)函数来实现这个目标的思路,并提到遇到了一个解决不了的BUG。文章还列举了8个解决方案,希望能够得到更好的思路。 ... [详细]
  • 本文介绍了解决java开源项目apache commons email简单使用报错的方法,包括使用正确的JAR包和正确的代码配置,以及相关参数的设置。详细介绍了如何使用apache commons email发送邮件。 ... [详细]
  • 本文介绍了关于Java异常的八大常见问题,包括异常管理的最佳做法、在try块中定义的变量不能用于catch或finally的原因以及为什么Double.parseDouble(null)和Integer.parseInt(null)会抛出不同的异常。同时指出这些问题是由于不同的开发人员开发所导致的,不值得过多思考。 ... [详细]
  • Skywalking系列博客1安装单机版 Skywalking的快速安装方法
    本文介绍了如何快速安装单机版的Skywalking,包括下载、环境需求和端口检查等步骤。同时提供了百度盘下载地址和查询端口是否被占用的命令。 ... [详细]
  • 这是原文链接:sendingformdata许多情况下,我们使用表单发送数据到服务器。服务器处理数据并返回响应给用户。这看起来很简单,但是 ... [详细]
  • IOS开发之短信发送与拨打电话的方法详解
    本文详细介绍了在IOS开发中实现短信发送和拨打电话的两种方式,一种是使用系统底层发送,虽然无法自定义短信内容和返回原应用,但是简单方便;另一种是使用第三方框架发送,需要导入MessageUI头文件,并遵守MFMessageComposeViewControllerDelegate协议,可以实现自定义短信内容和返回原应用的功能。 ... [详细]
  • Postgresql备份和恢复的方法及命令行操作步骤
    本文介绍了使用Postgresql进行备份和恢复的方法及命令行操作步骤。通过使用pg_dump命令进行备份,pg_restore命令进行恢复,并设置-h localhost选项,可以完成数据的备份和恢复操作。此外,本文还提供了参考链接以获取更多详细信息。 ... [详细]
author-avatar
风清淡雅的梦
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有