作者:追梦and寻梦 | 来源:互联网 | 2023-01-31 10:30
我试图创建通用的DataSet [T]阅读器,以避免每个阅读器调用都使用dataframe.as [..]。对原始类型和案例类的支持,所以我在想类似的东西:
def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlCOntext= sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
但是我收到“无法为数据集中存储的类型找到编码器”错误。可以做这样的事情吗?
第二周期:
def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlCOntext= sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
以类型不匹配结尾(foudn Encoder [Nothing],必需的Encoder [T])。
我试图仅导入newProductEncoder,但结果相同。
1> stefanobaghi..:
为了将a转换DataFrame
为a,Dataset
您需要有一个Encoder
。你可以通过简单地添加绑定在和上下文做Encoder
的T
:
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlCOntext= sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
上下文绑定是以下语法糖:
def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
这意味着您需要在隐式上下文中具有(仅一个)实例Encoder[T]
。
这是必需的,因为as
方法本身需要此上下文绑定。
通过导入(如您所做的那样),Spark本身可以为您提供您Encoder
可能需要的大多数s(到目前为止,基元,String
s和case class
es)SparkSession
。但是,这些必须在呼叫站点的隐式范围内可用,这意味着您想要的内容可能更像以下内容:
def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")