  1. /*

  2. * Licensed to the Apache Software Foundation (ASF) under one

  3. * or more contributor license agreements. See the NOTICE file

  4. * distributed with this work for additional information

  5. * regarding copyright ownership. The ASF licenses this file

  6. * to you under the Apache License, Version 2.0 (the

  7. * "License"); you may not use this file except in compliance

  8. * with the License. You may obtain a copy of the License at

  9. *

  10. * http://www.apache.org/licenses/LICENSE-2.0

  11. *

  12. * Unless required by applicable law or agreed to in writing, software

  13. * distributed under the License is distributed on an "AS IS" BASIS,

  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

  15. * See the License for the specific language governing permissions and

  16. * limitations under the License.

  17. */

  19. package com.hellobike.realtimeplatform.utils;

  21. import com.hellobike.realtimeplatform.model.AccessLog;

  22. import org.apache.flink.annotation.PublicEvolving;

  23. import org.apache.flink.core.io.SimpleVersionedSerializer;

  24. import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;

  25. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;

  26. import org.apache.flink.util.Preconditions;

  28. import java.text.ParseException;

  29. import java.text.SimpleDateFormat;

  30. import java.time.Instant;

  31. import java.time.ZoneId;

  32. import java.time.format.DateTimeFormatter;

  33. import java.util.Calendar;

  35. /**

  36. * A {@link BucketAssigner} that assigns to buckets based on current system time.

  37. *

  38. *

  39. *

    The {@code DateTimeBucketer} will create directories of the following form:

  40. * {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path

  41. * that was specified as a base path when creating the

  42. * {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.

  43. * The {@code dateTimePath} is determined based on the current system time and the

  44. * user provided format string.

  45. *

  46. *

  47. *

    {@link DateTimeFormatter} is used to derive a date string from the current system time and

  48. * the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling

  49. * files will have a granularity of hours.

  50. *

  51. *


  52. *

  53. *


  54. * BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");

  55. * }

  56. *

  57. *

    This will create for example the following bucket path:

  58. * {@code /base/1976-12-31-14/}

  59. */

  60. @PublicEvolving

  61. public class DateTimeBucketWithPartitionAssigner implements BucketAssigner {

  63. private static final long serialVersionUID = 1L;

  65. private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

  67. private final String formatString;

  69. private final ZoneId zoneId;

  71. private transient DateTimeFormatter dateTimeFormatter;

  73. /**

  74. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.

  75. */

  76. public DateTimeBucketWithPartitionAssigner() {


  78. }

  80. /**

  81. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.

  82. *

  83. * @param formatString The format string that will be given to {@code SimpleDateFormat} to determine

  84. * the bucket id.

  85. */

  86. public DateTimeBucketWithPartitionAssigner(String formatString) {

  87. this(formatString, ZoneId.systemDefault());

  88. }

  90. /**

  91. * Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.

  92. *

  93. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  94. */

  95. public DateTimeBucketWithPartitionAssigner(ZoneId zoneId) {

  96. this(DEFAULT_FORMAT_STRING, zoneId);

  97. }

  99. /**

  100. * Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.

  101. *

  102. * @param formatString The format string that will be given to {@code DateTimeFormatter} to determine

  103. * the bucket path.

  104. * @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.

  105. */

  106. public DateTimeBucketWithPartitionAssigner(String formatString, ZoneId zoneId) {

  107. this.formatString = Preconditions.checkNotNull(formatString);

  108. this.zoneId = Preconditions.checkNotNull(zoneId);

  109. }

  111. @Override

  112. public String getBucketId(IN element, Context context) {

  113. long eventTime = 0L;

  114. if (dateTimeFormatter == null) {

  115. dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);

  116. }

  117. if (element instanceof AccessLog) {

  118. if (-1 == ((AccessLog) element).getParseResult() && "".equals(((AccessLog) element).getEventTimestamp())) {

  119. return "pt=errorTime";

  120. }

  122. eventTime = Long.valueOf(((AccessLog) element).getEventTimestamp());

  124. return "pt=" + dateTimeFormatter.format(Instant.ofEpochMilli(eventTime));

  125. } else {

  126. return "pt=errorObjects";

  127. }

  128. }

  131. @Override

  132. public SimpleVersionedSerializer getSerializer() {

  133. return SimpleVersionedStringSerializer.INSTANCE;

  134. }

  136. @Override

  137. public String toString() {

  138. return "DateTimeBucketAssigner{" +

  139. "formatString='" + formatString + '\'' +

  140. ", zoneId=" + zoneId +

  141. '}';

  142. }

  143. }

指定自定义的DateTime Assigner就可以实现基于event time分桶写入parquet文件


