作者:手机用户2602905567 | 来源:互联网 | 2022-12-10 13:08
有没有一种方法可以指定Spark结构化文件流源的起始偏移量?
我正在尝试从HDFS流式传输实木复合地板:
spark.sql("SET spark.sql.streaming.schemaInference=true")
spark.readStream
.parquet("/tmp/streaming/")
.writeStream
.option("checkpointLocation", "/tmp/streaming-test/checkpoint")
.format("parquet")
.option("path", "/tmp/parquet-sink")
.trigger(Trigger.ProcessingTime(1.minutes))
.start()
如我所见,第一个运行是处理路径中检测到的所有可用文件,然后将偏移量保存到检查点位置并仅处理新文件,即接受期限并且在所看到的文件中不存在映射。
我正在寻找一种方法,如何指定起始偏移量或时间戳或选项数量,以在首次运行时不处理所有可用文件。
我有找的方法吗?
1> Mikhail Dubk..:
感谢@jayfah,据我所知,我们可以使用以下技巧来模拟Kafka“最新的”起始偏移量:
运行带有暖机流option("latestFirst", true)
和option("maxFilesPerTrigger", "1")
与检查站,假水槽和巨大的处理时间。这样,预热流将最新的文件时间戳保存到检查点。
option("maxFileAge", "0")
使用相同的检查点位置,使用实际接收器运行实时流。在这种情况下,流将仅处理新近可用的文件。
很有可能这对于生产不是必需的,并且有更好的方法,例如重组数据路径等,但是至少我以此方式找到了我的问题的答案。