Spark SQL 读写操作

使用方法

  • spark-shell

  • spark-sql

  • scala code

数据源

  • CSV

  • JSON

  • Parquet

  • ORC

  • JDBC/ODBC connections

  • Plain-text files

读取示例

1
2
3
4
5
6
7
8
9
10
// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST") // 读取模式
.option("inferSchema", "true") // 是否自动推断 schema
.option("path", "path/to/file(s)") // 文件路径
.schema(someSchema) // 使用预定义的 schema
.load()
读模式 描述
permissive 当遇到损坏的记录时,将其所有字段设置为 null,并将所有损坏的记录放在名为 _corruption t_record 的字符串列中
dropMalformed 删除格式不正确的行
failFast 遇到格式不正确的数据时立即失败

写入示例

1
2
3
4
5
6
7
8
9
// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE") //写模式
.option("dateFormat", "yyyy-MM-dd") //日期格式
.option("path", "path/to/file(s)")
.save()
Scala/Java 描述
SaveMode.ErrorIfExists 如果给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
SaveMode.Append 数据以追加的方式写入
SaveMode.Overwrite 数据以覆盖的方式写入
SaveMode.Ignore 如果给定的路径已经存在文件,则不做任何操作

常用读写示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 简写法
spark.read.格式("路径")
spark.read.json("/usr/file/json/emp.json")

// 读取csv
spark.read.format("csv")
.option("header", "false") // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST") // 是否快速失败
.option("inferSchema", "true") // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
// 写入csv,指定分隔符\t
df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")

// json 读
spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)

// json 写
df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")

// Parquet 读
spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)

// Parquet 写
df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")

// 数据库读
spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver") //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql") //数据库地址
.option("dbtable", "help_keyword") //表名
.option("user", "root").option("password","root").load().show(10)

// 数据库写
val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()

Windows10 下使用 IDEA 配置 Spark 的 Debug 环境

之前报名了学校的一些大数据比赛,做的笔记整理一下发出来,供以后查看
第一次用 ide 和 scala,不足之处还请批评指正

  • idea 的安装就不赘述了。
  • spark 在本地不需要安装。
  • jdk = 1.8
  • spark version = 2.0.0,比较老,但是是比赛的要求。
  • hadoop version = 2.6.0
  • scala version = 2.11.0
  1. 在 idea 中建立一个 maven 工程,注意路径不能包括中文

  2. 添加 pom.xml,示例在 Source\Task3\example\pom.xml 中

    pom.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
    <spark.version>2.0.0</spark.version>
    <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.4</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_${scala.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-mllib_${scala.version}</artifactId>
    <version>${spark.version}</version>
    </dependency>

    </dependencies>

    <build>
    <plugins>

    <plugin>
    <groupId>org.scala-tools</groupId>
    <artifactId>maven-scala-plugin</artifactId>
    <version>2.15.2</version>
    <executions>
    <execution>
    <goals>
    <goal>compile</goal>
    <goal>testCompile</goal>
    </goals>
    </execution>
    </executions>
    </plugin>

    <plugin>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.6.0</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    </configuration>
    </plugin>

    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-surefire-plugin</artifactId>
    <version>2.19</version>
    <configuration>
    <skip>true</skip>
    </configuration>
    </plugin>

    </plugins>
    </build>
    </project>
  3. 在 Idea 左侧 Project 窗口中选中 src\main 右键添加文件夹 scala

  4. 选中 scala 右键 make directory as -> Sources Root

  5. scala 变成蓝色后右键 new -> scala class ->选择 object,输入名称

  6. 我们先写一个最简单的 CsvShow 程序让他能在本地运行并且 debug

    CsvShow Code
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package org.example.spark.scala

    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}

    object CsvShow {
    def main(args:Array[String]):Unit= {
    val spark: SparkSession = SparkSession.builder()
    .master("local[*]")
    .appName("CsvShow")
    .config("spark.sql.warehouse.dir", "file:/")
    .getOrCreate()
    val csv: DataFrame = spark.read.csv("../../../../data/mysql.csv")
    csv.show()
    val pdf: DataFrame = csv.toDF()
    pdf.show()
    spark.stop()
    }
    }
  7. 解压 Hadoop,在没有安装 hadoop 的情况下是不能进行 spark 程序的本地 debug

  • 我们先下载如下两个文件 hadoop-2.6.0.tar.gz 和 hadooponwindows-master.zip

  • 先将 hadoop 解压出来

  • 再解压 hadooponwindows 到 hadoop 的根目录下,提示覆盖选择全覆盖

  • 添加如下环境变量

    1. 变量名=HADOOP_HOME, 变量值=你解压的路径
    1. 在 path 中添加 %HADOOP_HOME%\bin 变量
  • 打开 cmd,输入 hadoop version,显示版本说明 hadoop 环境变量配置正确

  1. 配置 Hadoop 环境
  • 修改 etc\hadoop\hadoop-env.cmd 中的 JAVA_HOME,如 JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_241,路径中不能有空格

  • 在根目录下添加 datanode、namenode 和 tmp 文件夹

  • 修改 etc\hadoop\hdfs-site.xml,添加如下 xml, 注意/D:/hadoop-2.6.0/namenode 和/D:/hadoop-2.6.0/datanode 这两个值需要时之前创建文件夹的路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>/D:/hadoop-2.6.0/namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/D:/hadoop-2.6.0/datanode</value>
</property>
</configuration>
  • 修改 etc\hadoop\core-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
  • 修改 etc\hadoop\mapred-site.xml
1
2
3
4
5
6
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
  • 修改 etc\hadoop\yarn-site.xml
1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
</configuration>
  1. 以管理员身份打开 cmd 输入 hdfs namenode -format 来执行格式化 hdfs,执行后 namenode 文件里会自动生成一个 current 文件
  2. cd 到根目录下 sbin 文件夹,输入 start-all.cmd 启动全部的 hadoop 服务
  3. 在相应的方法中右键 Debug “*”,就可以运行本地 debug 调试