2019年1月24日星期四

大数据之数据存取:Spark存取文件系统数据源

我们常见的数据文件包括:csv, json, txt和parquet等。Spark对这些文件的读写都提供了内建的支持。
要读写这些数据源,首先需要创建SparkSession:
import os
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder \
        .appName("Demo data sources") \
        .getOrCreate()

为便于演示,我们构造一个DataFrame:
data = [Row(Country="France",Age=44,Salary=72000,Purchased="No"),
        Row(Country="Spain",Age=27,Salary=48000,Purchased="Yes"),
        Row(Country="Germany",Age=30,Salary=54000,Purchased="No"),
        Row(Country="Spain",Age=38,Salary=61000,Purchased="No"),
        Row(Country="Germany",Age=40,Salary=65000,Purchased="Yes"),
        Row(Country="France",Age=35,Salary=58000,Purchased="Yes"),
        Row(Country="Spain",Age=27,Salary=52000,Purchased="No"),
        Row(Country="France",Age=48,Salary=79000,Purchased="Yes"),
        Row(Country="Germany",Age=50,Salary=83000,Purchased="No"),
        Row(Country="France",Age=37,Salary=67000,Purchased="Yes")]
df = spark.createDataFrame(data)
df.show()
运行结果:
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 27|  Spain|      Yes| 48000|
    | 30|Germany|       No| 54000|
    | 38|  Spain|       No| 61000|
    | 40|Germany|      Yes| 65000|
    | 35| France|      Yes| 58000|
    | 27|  Spain|       No| 52000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+
    
将dataframe数据集保存为csv, json, parquet和HIVE Table格式文件:
df.write.csv("data/demo.csv", mode='overwrite', header=True)
df.write.json("data/demo.json", mode = 'ignore')
df.write.parquet("data/demo.parquet", mode = 'ignore')
df.write.saveAsTable("demo")

创建一个临时表:customers,它是随同spark Session存在的:
df.createOrReplaceTempView("customers")

查看(用HIVE SQL)有哪些表:
spark.sql("show tables").show()
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|     demo|      false|
    |        |customers|       true|
    +--------+---------+-----------+
    
用HIVE SQL查询表:
spark.sql("SELECT * FROM customers WHERE Salary > 60000").show()
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 38|  Spain|       No| 61000|
    | 40|Germany|      Yes| 65000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+
    
spark.sql("SELECT * FROM customers WHERE Country='France'").show()
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 35| France|      Yes| 58000|
    | 48| France|      Yes| 79000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+

读txt文件,内容将放在value字段中:    
df = spark.read.text("data/demo.csv")
df.show(3)
    +--------------------+
    |               value|
    +--------------------+
    |Age,Country,Purch...|
    |   27,Spain,No,52000|
    | 48,France,Yes,79000|
    +--------------------+
    only showing top 3 rows
    
读csv文件(csv文件中可能包含有字段名称信息的文件头,但数据类型需要推断,你也可以显式指定):   
df = spark.read.csv("data/demo.csv", inferSchema = True, header = True)
df.show(3)
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 27|  Spain|       No| 52000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    +---+-------+---------+------+
    only showing top 3 rows
    
读json文件(json文件中包含的字段的名称,但数据类型需要推断,你也可以显式指定):   
df = spark.read.json("data/demo.json")
df.show(3)
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 27|  Spain|      Yes| 48000|
    | 30|Germany|       No| 54000|
    +---+-------+---------+------+
    only showing top 3 rows
    
读parquet文件(parquet文件中保存了数据结构的元信息): 
    
df = spark.read.parquet("data/demo.parquet")
df.show(3)
    +-------+---+------+---------+
    |Country|Age|Salary|Purchased|
    +-------+---+------+---------+
    | France| 44| 72000|       No|
    |  Spain| 27| 48000|      Yes|
    |Germany| 30| 54000|       No|
    +-------+---+------+---------+
    only showing top 3 rows