Spark大部分转化操作和一部分行动操作,都需要依赖开发人员传递的函数进行计算。在Spark支持的3种开发语言(Java、Python、Scala)里,传递的方式略有不同。下面苦李介绍Java如何向Spark传递函数。

1、标准接口

在Java中,函数需要作为实现了Spark的org.apache.spark.api.java.function包中的任一函数接口的对象来传递。根据不同的返回类型,我们定义了一些不同的接口。

下面是标准的Java函数接口:

函数名实现方法用途
Function<T, R>R call(T)接收一个输入值,并返回一个输出值,用于类似map()和filter()等操作中
Function2<T1, T2, R>R call(T1, T2)接收两个输入值,并返回一个输出值,用于类似aggregate()和fold()等操作中
FlatMapFunction<T, R>Iterable<R> call(T)接收一个输入值,并返回任意个输出值,用于类似flatMap()等操作中

2、传递方法

在Java里,我们可以通过匿名内部类、具名类和lambda(java8中)的方式进行函数传递,下面通过统计日志文件中的404请求,依次给出代码示例。

(1)在java中使用匿名内部类进行函数传递

JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> lines_404 = lines.filter(
        new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("404");
            }
        }
);
System.out.println(":::: 404 lines count: " + lines_404.count());

(2)在java中使用具名类进行函数传递

JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
class Contains_404 implements Function<String, Boolean> {
    public Boolean call(String x) {
        return x.contains("404");
    }
}
JavaRDD<String> lines_404 = lines.filter(new Contains_404());
System.out.println(":::: 404 lines count: " + lines_404.count());

具体使用哪种方式可以根据自己的编程习惯进行选择,苦李建议大家在大型项目中最好使用具名类的方式进行参数传递,这种方式可以让我们的代码更加清晰,提升可读性,而且这种方式可以让我们在构造函数中添加参数。

例如,修改过的代码:

class Contains_404 implements Function<String, Boolean> {
    private String query_str;
    private Contains_404(String query_str){
        this.query_str = query_str;
    }
    public Boolean call(String x) {
        return x.contains(query_str);
    }
}
JavaRDD<String> lines_404 = lines.filter(new Contains_404("404"));
System.out.println(":::: 404 lines count: " + lines_404.count());

(3)在Java8中,还可以通过lambda的方式进行函数传递

在java8里,我们还可以通过lambda的方式进行函数传递,上述代码可精简如下:

JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> lines_404 = lines.filter(s -> s.contains("404"));
System.out.println(":::: 404 lines count: " + lines_404.count());

扫码关注李苦李公众号

李苦李公众号

标签: Spark

添加新评论