UDAF 强类型自定义函数

udaf强类型自定义函数需要继承Aggregator抽象类,并实现6个方法

 //输入类型
  case class People(name:String,age:Long)

  //缓存数据类型
  case class AgeBuffer(var sum:Long,var count:Long)

  //输入类型,缓存数据类型,返回精英
  class myAvg extends Aggregator[People,AgeBuffer,Double]{
    //缓存数据初始化
    override def zero: AgeBuffer = {AgeBuffer(0L,0L)}
    //分区内数据进行聚合
    override def reduce(b: AgeBuffer, a: People): AgeBuffer = {
      b.sum+=a.age
      b.count+=1
      b
    }

    //分区间的合并
    override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
      b1.sum+=b2.sum
      b1.count+=b2.count
      b1
    }

    //返回计算结果
    override def finish(reduction: AgeBuffer): Double = {
      reduction.sum.toDouble/reduction.count
    }

    //导入包:import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession}
    //自定义引用类型 :用project
    override def bufferEncoder: Encoder[AgeBuffer] = {
      Encoders.product
    }

    //系统值类型
    override def outputEncoder: Encoder[Double] = {
      Encoders.scalaDouble
    }
  }

调用

调用的时候 要对创建好的自定义函数实例化, 并导入jar包,把数据转为dataset

把对象调用toColumn方法转化为列。

然后根据列名进行查询

def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
    df.createOrReplaceTempView("people")
    import spark.implicits._
    //创建自定义函数对象
    val avg = new myAvg
    //导入包import spark.implicits._, 转成dataset
    val ds: Dataset[People] = df.as[People]
    //将自定义函数对象转成列
    val col: TypedColumn[People, Double] = avg.toColumn
    //在进行查询的时候 ,会将查出来的记录 People类型交行自定义函数进行处理
    ds.select(col).show()
  }

本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

只有地板了

  1. qnwimzvpnc
    qnwimzvpnc

    2025年10月新盘 做第一批吃螃蟹的人coinsrore.com
    新车新盘 嘎嘎稳 嘎嘎靠谱coinsrore.com
    新车首发,新的一年,只带想赚米的人coinsrore.com
    新盘 上车集合 留下 我要发发 立马进裙coinsrore.com
    做了几十年的项目 我总结了最好的一个盘(纯干货)coinsrore.com
    新车上路,只带前10个人coinsrore.com
    新盘首开 新盘首开 征召客户!!!coinsrore.com
    新项目准备上线,寻找志同道合的合作伙伴coinsrore.com
    新车即将上线 真正的项目,期待你的参与coinsrore.com
    新盘新项目,不再等待,现在就是最佳上车机会!coinsrore.com
    新盘新盘 这个月刚上新盘 新车第一个吃螃蟹!coinsrore.com

添加新评论