Spark SQL 读写操作

使用方法

  • spark-shell

  • spark-sql

  • scala code

数据源

  • CSV

  • JSON

  • Parquet

  • ORC

  • JDBC/ODBC connections

  • Plain-text files

读取示例

1
2
3
4
5
6
7
8
9
10
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 读取模式
.option("inferSchema", "true") // 是否自动推断 schema
.option("path", "path/to/file(s)") // 文件路径
.schema(someSchema) // 使用预定义的 schema
.load()
读模式 描述
permissive 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中
dropMalformed 删除格式不正确的行
failFast 遇到格式不正确的数据时立即失败

写入示例

1
2
3
4
5
6
7
8
9
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
Scala/Java 描述
SaveMode.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
SaveMode.Append 数据以追加的方式写入
SaveMode.Overwrite 数据以覆盖的方式写入
SaveMode.Ignore 如果给定的路径已经存在文件,则不做任何操作

常用读写示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 简写法
spark.read.格式("路径")
spark.read.json("/usr/file/json/emp.json")

// 读取csv
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
// 写入csv,指定分隔符\t
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

// json 读
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

// json 写
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

// Parquet 读
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

// Parquet 写
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

// 数据库读
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)

// 数据库写
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

作者

MisakaWater

发布于

2022-01-23

更新于

2022-01-23

许可协议