累计器
executor中会把driver定义的变量,变成一个新的副本进行累计。不会影响driver中的变量的值
var sum=0; //driver中定义
rdd.foreach{ //executor中执行
case (word,count)=>{sum+=count}
}
println(sum) //0
普通定义的变量,executor中会把driver中定义的变量复制一个副本,在executor中执行。
如果需要通过executor,对driver定义的变量进行更新,需要定义为累加器
累加器和普通的变量相比,会将executor端的结果,收集到driver端进行汇总
val myacc: LongAccumulator = sc.longAccumulator(name = "myacc")
rdd.foreach{
case(work,count)=>{
myacc.add(count)
}
}
println(myacc)
ideal添加快捷键
File-setting-editor-live templates->output->右边添加live template->scc->输入代码-》define->scala
val conf:SparkConf = new SparkConf() .setAppName("SparkDemo").setMaster("local[*]")
val sc:SparkContext = new SparkContext()
sc.stop();
创建RDD
sc.parallelize等价于sc.makeRDD , 在源代码中,makeRDD会调用parallelize方法
创建rdd可以从文件中读取来创建,也可以从hdfs中读取来创建,也可以从定义的集合在来创建
//创建一个集合
val list: List[Int] = List(1, 2, 3, 4);
//方法一
// val rdd: RDD[Int] = sc.parallelize(list);
//方法二,默认的分区为cpu核数,如果sparkcontext选定的master是local[*]
val rdd = sc.makeRDD(list)
rdd.collect().foreach(println);
//从文件中读取,默认的分区最小值为2.
val rdd = sc.textFile("d:/code/demo/spark/input/1.txt");
rdd.collect().foreach(println);
//从hdfs中读取
val rdd: RDD[String] = sc.textFile("hdfs://master:8020/input")
rdd.collect().foreach(println);
makeRDD调用方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices) //调用这个方法
}
还不快抢沙发