Spark sql求平均年龄
//输入
{"name":"liming","age":11}
{"name":"LiLei","age":21}
{"name":"Tom","age":31}
def main(args: Array[String]): Unit = {
//创建spark并设置app名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
df.createOrReplaceTempView("people")
spark.sql("select avg(age) from people").show()
}
//output
+--------+
|avg(age)|
+--------+
| 21.0|
+--------+
自带的函数avg使用很方便 ,如果没有自带的系统函数的话,要自定义实现udaf函数
UDAF 弱类型自定义函数
//自定义udaf函数,抽像函数实现父类的方法
// 三个类型:输入类型,缓存类型,输出类型 +缓存初始化-更新-合并-计算
class myAvg extends UserDefinedAggregateFunction{
//输入数据的类型,多个列的类型,
override def inputSchema: StructType = {
StructType(Array(StructField("age",IntegerType)))
}
//缓存的数据类型-缓存中间的计算结果
override def bufferSchema: StructType = {
StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
}
//聚合函数返回的数据类型
override def dataType: DataType = DoubleType
//稳定性。 当前程序是否稳定。一般都是true,相同的输入有相同的输出
override def deterministic: Boolean = true
//初始化, 缓存设置为初始状态
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//设置缓存为年龄初始化为:0
buffer(0)=0L
//设置缓存为人数初始化为:0
buffer(1)=0L
}
//更新缓存的数据。两个参数(缓存的数据,输入的内容)
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!buffer.isNullAt(0)){ //判断是否已经初始化
buffer(0)=buffer.getLong(0)+input.getInt(0)
// buffer(0)=buffer.getAs[Long](0)+input.getInt(0) 也可以写成这样的写法
buffer(1)=buffer.getLong(1)+1
}
}
//分区间的合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
buffer2(1)=buffer1.getLong(1)+buffer2.getLong(1)
}
//计算的业务逻辑
override def evaluate(buffer: Row): Any = {
buffer.getDouble(0)/buffer.getLong(1)
}
}
如何使用自定义函数
创建自定义函数对象。 并调用 。
def main(args: Array[String]): Unit = {
//创建spark并设置app名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
// spark.udf.register("changeName",(name:String)=>{"new" +name})
df.createOrReplaceTempView("people")
//创建自定义函数对象
val avg = new myAvg
//注册自定义函数
spark.udf.register("myAvg",avg)
spark.sql("select myAvg(name),age from people").show()
// spark.sql("select changeName(name),age from people").show()
}
//输出结果
+--------+
|avg(age)|
+--------+
| 21.0|
+--------+
《史蒂夫乔布斯:机器人生》记录片高清在线免费观看:https://www.jgz518.com/xingkong/102414.html