Spark支持多种文件的读写操作,例如普通文本文件、JSON、CSV、SequenceFile等,而且Spark会根据文件后缀自动选择对应的处理方式。

下面是Spark支持的常见的文本格式:

格式名称结构化备注
文本文件普通文本,每行一条记录
JSON常见的基于文本的格式,半结构化数据
CSV常见的基于文本的格式,通常在Excel中使用
SequenceFileHadoop文件格式,用于键值对数据
Protocol buffers一种快速、节省空间的跨语言格式
对象文件用来将Spark作业中的数据存储下来以让共享的代码读取,改变类的时候会失效,因为依赖于Java序列化

1、文本文件

当我们将一个文件读取为Spark的RDD时,输入的每一行都会成为RDD的一个元素,也可以将多个文本文件一次性读取为一个pair RDD,其中键为文件名,值是文件内容。

Spark读取文本文件示例(JAVA):

String logFile = "likuli_com_access.log";
SparkSession spark = SparkSession.builder().appName("SparkTest").getOrCreate();
JavaRDD<String> log_lines = spark.read().textFile(logFile).javaRDD();

上述代码就将日志文件中的内容读取至log_lines的RDD中了。

使用textFile()读取多个文件示例,用于测试的文件目录和文件列表如下:

代码如下:

String filePath = "/Library/spark-2.3.4/test_data";
SparkSession spark = SparkSession.builder().appName("SparkTest").getOrCreate();
JavaRDD<String> filesData = spark.read().textFile(filePath).javaRDD();
// 输出读取的文件内容 结果如下:[11111111, 22222222, 3333333]
System.out.println(":::: " + filesData.collect());

上述代码输出的结果是一个数组,读取完成之后,无法判断内容具体数据哪一个文件。

如果我们想要知道数据属于哪个文件,则可以使用SparkContext.wholeTextFiles()方法。对于用时间切割的数据文件,wholeTextFiles()方法将会非常有用。示例代码如下(还是使用上述目录下的三个文件):

String filePath = "/Library/spark-2.3.4/test_data";
JavaSparkContext spark = new JavaSparkContext();
JavaPairRDD filesData = spark.wholeTextFiles(filePath);
System.out.println(":::: " + filesData.collect());

输入结果如下:

[(file:/Library/spark-2.3.4/test_data/3.txt,333), 
(file:/Library/spark-2.3.4/test_data/2.txt,222), 
(file:/Library/spark-2.3.4/test_data/1.txt,111)]

保存文本文件

Spark利用saveAsTextFile()将数据写入文件,该方法接受一个目录路径,Spark会在该目录下输出多个文件。这样,Spark就可以从多个节点上并行输出了。在这个方法中,我们不能控制数据的哪一部分输出到哪个文件中,不过有些输出格式支持控制。

下面的示例,是将日志文件中过滤出的404日志写入文件,代码如下:

SparkSession spark = SparkSession.builder().appName("SparkTest").getOrCreate();
JavaRDD<String> lines = spark.read().textFile(logFile).javaRDD();
JavaRDD<String> lines_404 = lines.filter(s -> s.contains("404"));
lines_404.saveAsTextFile("/Library/spark-2.3.4/log_404/");

2、JSON/CSV

Spark中操作JSON/CSV最简单的方式就是将他们作为普通的文本文件进行操作,然后使用对应的解析器对内容进行解析即可。

标签: Spark, 大数据

添加新评论