要读写这些数据源,首先需要创建SparkSession: import os from pyspark.sql import SparkSession, Row spark = SparkSession.builder \ .appName("Demo data sources") \ .getOrCreate() 为便于演示,我们构造一个DataFrame: data = [Row(Country="France",Age=44,Salary=72000,Purchased="No"), Row(Country="Spain",Age=27,Salary=48000,Purchased="Yes"), Row(Country="Germany",Age=30,Salary=54000,Purchased="No"), Row(Country="Spain",Age=38,Salary=61000,Purchased="No"), Row(Country="Germany",Age=40,Salary=65000,Purchased="Yes"), Row(Country="France",Age=35,Salary=58000,Purchased="Yes"), Row(Country="Spain",Age=27,Salary=52000,Purchased="No"), Row(Country="France",Age=48,Salary=79000,Purchased="Yes"), Row(Country="Germany",Age=50,Salary=83000,Purchased="No"), Row(Country="France",Age=37,Salary=67000,Purchased="Yes")] df = spark.createDataFrame(data) df.show() 运行结果: +---+-------+---------+------+ |Age|Country|Purchased|Salary| +---+-------+---------+------+ | 44| France| No| 72000| | 27| Spain| Yes| 48000| | 30|Germany| No| 54000| | 38| Spain| No| 61000| | 40|Germany| Yes| 65000| | 35| France| Yes| 58000| | 27| Spain| No| 52000| | 48| France| Yes| 79000| | 50|Germany| No| 83000| | 37| France| Yes| 67000| +---+-------+---------+------+ 将dataframe数据集保存为csv, json, parquet和HIVE Table格式文件: df.write.csv("data/demo.csv", mode='overwrite', header=True) df.write.json("data/demo.json", mode = 'ignore') df.write.parquet("data/demo.parquet", mode = 'ignore') df.write.saveAsTable("demo") 创建一个临时表:customers,它是随同spark Session存在的: df.createOrReplaceTempView("customers") 查看(用HIVE SQL)有哪些表: spark.sql("show tables").show() +--------+---------+-----------+ |database|tableName|isTemporary| +--------+---------+-----------+ | default| demo| false| | |customers| true| +--------+---------+-----------+ 用HIVE SQL查询表: spark.sql("SELECT * FROM customers WHERE Salary > 60000").show() +---+-------+---------+------+ |Age|Country|Purchased|Salary| +---+-------+---------+------+ | 44| France| No| 72000| | 38| Spain| No| 61000| | 40|Germany| Yes| 65000| | 48| France| Yes| 79000| | 50|Germany| No| 83000| | 37| France| Yes| 67000| +---+-------+---------+------+ spark.sql("SELECT * FROM customers WHERE Country='France'").show() +---+-------+---------+------+ |Age|Country|Purchased|Salary| +---+-------+---------+------+ | 44| France| No| 72000| | 35| France| Yes| 58000| | 48| France| Yes| 79000| | 37| France| Yes| 67000| +---+-------+---------+------+ 读txt文件,内容将放在value字段中: df = spark.read.text("data/demo.csv") df.show(3) +--------------------+ | value| +--------------------+ |Age,Country,Purch...| | 27,Spain,No,52000| | 48,France,Yes,79000| +--------------------+ only showing top 3 rows 读csv文件(csv文件中可能包含有字段名称信息的文件头,但数据类型需要推断,你也可以显式指定): df = spark.read.csv("data/demo.csv", inferSchema = True, header = True) df.show(3) +---+-------+---------+------+ |Age|Country|Purchased|Salary| +---+-------+---------+------+ | 27| Spain| No| 52000| | 48| France| Yes| 79000| | 50|Germany| No| 83000| +---+-------+---------+------+ only showing top 3 rows 读json文件(json文件中包含的字段的名称,但数据类型需要推断,你也可以显式指定): df = spark.read.json("data/demo.json") df.show(3) +---+-------+---------+------+ |Age|Country|Purchased|Salary| +---+-------+---------+------+ | 44| France| No| 72000| | 27| Spain| Yes| 48000| | 30|Germany| No| 54000| +---+-------+---------+------+ only showing top 3 rows 读parquet文件(parquet文件中保存了数据结构的元信息): df = spark.read.parquet("data/demo.parquet") df.show(3) +-------+---+------+---------+ |Country|Age|Salary|Purchased| +-------+---+------+---------+ | France| 44| 72000| No| | Spain| 27| 48000| Yes| |Germany| 30| 54000| No| +-------+---+------+---------+ only showing top 3 rows
2019年1月24日星期四
大数据之数据存取:Spark存取文件系统数据源
我们常见的数据文件包括:csv, json, txt和parquet等。Spark对这些文件的读写都提供了内建的支持。
订阅:
博文评论 (Atom)
没有评论:
发表评论