通常在向Spark传递函数时,比如使用map()函数或者用filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。

Spark中存在两个共享变量,累加器广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。

累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。

自Spark2.0版本以后,Spark内置支持三种类型的累加器,分别是集合累加器CollectionAccumulator, 浮点型累加器DoubleAccumulator, 整型累加器LongAccumulator

使用方法如下:

SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate();
SparkContext sc = spark.sparkContext();

// 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator
// LongAccumulator: 整型累加
LongAccumulator longAccumulator = sc.longAccumulator("longAccumulator");
// DoubleAccumulator: 浮点型累加
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("doubleAccumulator");
// CollectionAccumulator:集合累加
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("collectionAccumulator");

Dataset<Integer> intSet = spark.createDataset(Arrays.asList(5, 4, 3, 10), Encoders.INT());
Dataset<Integer> result = intSet.map((MapFunction<Integer, Integer>) x -> {
    longAccumulator.add(x);
    doubleAccumulator.add(x);
    collectionAccumulator.add(x);
    return x;
}, Encoders.INT()).cache();
result.count();
System.out.println("::::: longAccumulator: " + longAccumulator.value());
System.out.println("::::: doubleAccumulator: " + doubleAccumulator.value());
System.out.println("::::: collectionAccumulator: " + collectionAccumulator.value());

上述代码输出结果如下:

::::: longAccumulator: 22
::::: doubleAccumulator: 22.0
::::: collectionAccumulator: [4, 3, 5, 10]

标签: Spark, 大数据

添加新评论