2019年1月24日星期四

大数据之数据存取:Spark存取文件系统数据源

我们常见的数据文件包括:csv, json, txt和parquet等。Spark对这些文件的读写都提供了内建的支持。
要读写这些数据源,首先需要创建SparkSession:
import os
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder \
        .appName("Demo data sources") \
        .getOrCreate()

为便于演示,我们构造一个DataFrame:
data = [Row(Country="France",Age=44,Salary=72000,Purchased="No"),
        Row(Country="Spain",Age=27,Salary=48000,Purchased="Yes"),
        Row(Country="Germany",Age=30,Salary=54000,Purchased="No"),
        Row(Country="Spain",Age=38,Salary=61000,Purchased="No"),
        Row(Country="Germany",Age=40,Salary=65000,Purchased="Yes"),
        Row(Country="France",Age=35,Salary=58000,Purchased="Yes"),
        Row(Country="Spain",Age=27,Salary=52000,Purchased="No"),
        Row(Country="France",Age=48,Salary=79000,Purchased="Yes"),
        Row(Country="Germany",Age=50,Salary=83000,Purchased="No"),
        Row(Country="France",Age=37,Salary=67000,Purchased="Yes")]
df = spark.createDataFrame(data)
df.show()
运行结果:
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 27|  Spain|      Yes| 48000|
    | 30|Germany|       No| 54000|
    | 38|  Spain|       No| 61000|
    | 40|Germany|      Yes| 65000|
    | 35| France|      Yes| 58000|
    | 27|  Spain|       No| 52000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+
    
将dataframe数据集保存为csv, json, parquet和HIVE Table格式文件:
df.write.csv("data/demo.csv", mode='overwrite', header=True)
df.write.json("data/demo.json", mode = 'ignore')
df.write.parquet("data/demo.parquet", mode = 'ignore')
df.write.saveAsTable("demo")

创建一个临时表:customers,它是随同spark Session存在的:
df.createOrReplaceTempView("customers")

查看(用HIVE SQL)有哪些表:
spark.sql("show tables").show()
    +--------+---------+-----------+
    |database|tableName|isTemporary|
    +--------+---------+-----------+
    | default|     demo|      false|
    |        |customers|       true|
    +--------+---------+-----------+
    
用HIVE SQL查询表:
spark.sql("SELECT * FROM customers WHERE Salary > 60000").show()
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 38|  Spain|       No| 61000|
    | 40|Germany|      Yes| 65000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+
    
spark.sql("SELECT * FROM customers WHERE Country='France'").show()
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 35| France|      Yes| 58000|
    | 48| France|      Yes| 79000|
    | 37| France|      Yes| 67000|
    +---+-------+---------+------+

读txt文件,内容将放在value字段中:    
df = spark.read.text("data/demo.csv")
df.show(3)
    +--------------------+
    |               value|
    +--------------------+
    |Age,Country,Purch...|
    |   27,Spain,No,52000|
    | 48,France,Yes,79000|
    +--------------------+
    only showing top 3 rows
    
读csv文件(csv文件中可能包含有字段名称信息的文件头,但数据类型需要推断,你也可以显式指定):   
df = spark.read.csv("data/demo.csv", inferSchema = True, header = True)
df.show(3)
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 27|  Spain|       No| 52000|
    | 48| France|      Yes| 79000|
    | 50|Germany|       No| 83000|
    +---+-------+---------+------+
    only showing top 3 rows
    
读json文件(json文件中包含的字段的名称,但数据类型需要推断,你也可以显式指定):   
df = spark.read.json("data/demo.json")
df.show(3)
    +---+-------+---------+------+
    |Age|Country|Purchased|Salary|
    +---+-------+---------+------+
    | 44| France|       No| 72000|
    | 27|  Spain|      Yes| 48000|
    | 30|Germany|       No| 54000|
    +---+-------+---------+------+
    only showing top 3 rows
    
读parquet文件(parquet文件中保存了数据结构的元信息): 
    
df = spark.read.parquet("data/demo.parquet")
df.show(3)
    +-------+---+------+---------+
    |Country|Age|Salary|Purchased|
    +-------+---+------+---------+
    | France| 44| 72000|       No|
    |  Spain| 27| 48000|      Yes|
    |Germany| 30| 54000|       No|
    +-------+---+------+---------+
    only showing top 3 rows

2019年1月23日星期三

大数据之数据存取:从HDFS读excel文件到pandas dataframe

难点:pandas不能直接从hdfs读excel文件。
解决方案:先读excel文件内容到RDD,然后把RDD内容从cluster传回本地,最后调用pandas.read_excel()方法
步骤:
(1)设置pyspark环境
import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
raise ValueError('SPARK_HOME environment variable is not set')

sys.path.insert(0, os.path.join(spark_home, 'python'))
# sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))

(2)创建spark context
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('Read Excel File on HDFS to Pandas Dataframe')
sc = SparkContext(conf=conf)

(3)读excel文件内容到RDD
rdd = sc.binaryFiles("/HomeProject/my-demo/demo.xlsx")

(4)把RDD内容从cluster传回本地
arr = rdd.collect()

(4)调用pandas.read_excel()方法读文件内容到pandas dataframe
import io
import pandas as pd
df = pd.read_excel(io.BytesIO(arr[0][1]), header = 0)
df.head(5)