自定义累加器

累加器和普通的变量相比,会将executor端的结果,收集到driver端进行汇总

def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    //创建sparkcontext ,该对象是提交spark app的入口
    val sc: SparkContext = new SparkContext(conf)

    val rdd: RDD[String] = sc.makeRDD(List("hello", "abc", "hi", "hello", "hi", "ok", "111", "xxx", "hello"))
    val accumlator = new MyAccumlator
    sc.register(accumlator)
    rdd.foreach{word=>{accumlator.add(word)}}
    println(accumlator.value)  //Map(hello -> 3, hi -> 2)
    //关闭连接
    sc.stop()
  }

//定义一个类来继承AccumulatorV2
class MyAccumlator extends AccumulatorV2[String,mutable.Map[String,Int]]{
  //定义一个集合,集合单词以及出现次数
  var map: mutable.Map[String, Int] = mutable.Map[String, Int]()
  //是否为初始状态
  override def isZero: Boolean =map.isEmpty
  //拷贝
  override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {
    val acc = new MyAccumlator
    acc.map=this.map
    acc
  }
  //reset
  override def reset(): Unit = map.clear()
  //向累加器中添加元素
  override def add(v: String): Unit = {
    if(v.startsWith("h")){
      //向可变元素中添加或更新元素
      map(v)= map.getOrElse(v,0)+1
    }
  }
  override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {
    var map1=map
    var map2=other.value;
    map=map1.foldLeft(map2){
      (mm,kv)=>{
        val k: String = kv._1
        val v: Int = kv._2
        mm(k)=mm.getOrElse(k,0)+v
        mm
      }
    }
  }
  override def value: mutable.Map[String, Int] = map
}

累加器执行原理

累加器:分布式共享只写变量

先调用copy,再调用reset ,再调用iszero

merge是计算完毕后自动调用的。

广播变量

分布式共享只读变量

多个executor中使用同一个变量。

def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    //创建sparkcontext ,该对象是提交spark app的入口
    val sc: SparkContext = new SparkContext(conf)
//采用集合的方式,实现rdd1,rdd2的join
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 1), ("c", 1)))
val rdd2 = List(("a", 4), ("b", 5), ("c", 6))
//声明广播变量
val broadCastList: Broadcast[List[(String, Int)]] = sc.broadcast(rdd2)
val resRDD: RDD[(String, (Int, Int))] = rdd1.map {
  case (k, v) => {
    var v3 = 0
    for ((k2, v2) <- broadCastList.value) {
      if (k == k2) {
        v3 = v2
      }
    }
    (k, (v, v3))
  }
}
resRDD.collect().foreach(println)

//关闭连接
sc.stop();

本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论