作者:手机用户2502907453 | 来源:互联网 | 2022-12-09 16:02
这看起来似乎很明显,但是在回顾文档和示例时,我不确定是否可以找到一种方法来使用PySpark进行结构化流转换。
例如:
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('StreamingWordCount')
.getOrCreate()
)
raw_records = (
spark
.readStream
.format('socket')
.option('host', 'localhost')
.option('port', 9999)
.load()
)
# I realize there's a SQL function for upper-case, just illustrating a sample
# use of an arbitrary map function
records = raw_records.rdd.map(lambda w: w.upper()).toDF()
counts = (
records
.groupBy(records.value)
.count()
)
query = (
counts
.writeStream
.outputMode('complete')
.format('console')
.start()
)
query.awaitTermination()
这将引发以下异常:
Queries with streaming sources must be executed with writeStream.start
但是,如果我删除呼叫,rdd.map(...).toDF()
一切似乎都可以正常工作。
似乎是rdd.map
从流上下文中调用分支执行,并导致Spark警告它从未启动?
是否有“正确”的方法使用结构化流和PySpark 应用map
或mapPartition
样式转换?
1> 小智..:
结构化流中应用的每个转换都必须完全包含在Dataset
世界中-如果使用PySpark,则意味着您只能使用DataFrame
或SQL,并且不支持转换为RDD
(DStream
或本地集合)。
如果要使用普通的Python代码,则必须使用UserDefinedFunction
。
from pyspark.sql.functions import udf
@udf
def to_upper(s)
return s.upper()
raw_records.select(to_upper("value"))
另请参见Spark结构化流和Spark-Ml回归