要读写这些数据源,首先需要创建SparkSession:
import os
from pyspark.sql import SparkSession, Row
spark = SparkSession.builder \
.appName("Demo data sources") \
.getOrCreate()
为便于演示,我们构造一个DataFrame:
data = [Row(Country="France",Age=44,Salary=72000,Purchased="No"),
Row(Country="Spain",Age=27,Salary=48000,Purchased="Yes"),
Row(Country="Germany",Age=30,Salary=54000,Purchased="No"),
Row(Country="Spain",Age=38,Salary=61000,Purchased="No"),
Row(Country="Germany",Age=40,Salary=65000,Purchased="Yes"),
Row(Country="France",Age=35,Salary=58000,Purchased="Yes"),
Row(Country="Spain",Age=27,Salary=52000,Purchased="No"),
Row(Country="France",Age=48,Salary=79000,Purchased="Yes"),
Row(Country="Germany",Age=50,Salary=83000,Purchased="No"),
Row(Country="France",Age=37,Salary=67000,Purchased="Yes")]
df = spark.createDataFrame(data)
df.show()
运行结果:
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 27| Spain| Yes| 48000|
| 30|Germany| No| 54000|
| 38| Spain| No| 61000|
| 40|Germany| Yes| 65000|
| 35| France| Yes| 58000|
| 27| Spain| No| 52000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
将dataframe数据集保存为csv, json, parquet和HIVE Table格式文件:
df.write.csv("data/demo.csv", mode='overwrite', header=True)
df.write.json("data/demo.json", mode = 'ignore')
df.write.parquet("data/demo.parquet", mode = 'ignore')
df.write.saveAsTable("demo")
创建一个临时表:customers,它是随同spark Session存在的:
df.createOrReplaceTempView("customers")
查看(用HIVE SQL)有哪些表:
spark.sql("show tables").show()
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| demo| false|
| |customers| true|
+--------+---------+-----------+
用HIVE SQL查询表:
spark.sql("SELECT * FROM customers WHERE Salary > 60000").show()
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 38| Spain| No| 61000|
| 40|Germany| Yes| 65000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
spark.sql("SELECT * FROM customers WHERE Country='France'").show()
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 35| France| Yes| 58000|
| 48| France| Yes| 79000|
| 37| France| Yes| 67000|
+---+-------+---------+------+
读txt文件,内容将放在value字段中:
df = spark.read.text("data/demo.csv")
df.show(3)
+--------------------+
| value|
+--------------------+
|Age,Country,Purch...|
| 27,Spain,No,52000|
| 48,France,Yes,79000|
+--------------------+
only showing top 3 rows
读csv文件(csv文件中可能包含有字段名称信息的文件头,但数据类型需要推断,你也可以显式指定):
df = spark.read.csv("data/demo.csv", inferSchema = True, header = True)
df.show(3)
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 27| Spain| No| 52000|
| 48| France| Yes| 79000|
| 50|Germany| No| 83000|
+---+-------+---------+------+
only showing top 3 rows
读json文件(json文件中包含的字段的名称,但数据类型需要推断,你也可以显式指定):
df = spark.read.json("data/demo.json")
df.show(3)
+---+-------+---------+------+
|Age|Country|Purchased|Salary|
+---+-------+---------+------+
| 44| France| No| 72000|
| 27| Spain| Yes| 48000|
| 30|Germany| No| 54000|
+---+-------+---------+------+
only showing top 3 rows
读parquet文件(parquet文件中保存了数据结构的元信息):
df = spark.read.parquet("data/demo.parquet")
df.show(3)
+-------+---+------+---------+
|Country|Age|Salary|Purchased|
+-------+---+------+---------+
| France| 44| 72000| No|
| Spain| 27| 48000| Yes|
|Germany| 30| 54000| No|
+-------+---+------+---------+
only showing top 3 rows
2019年1月24日星期四
大数据之数据存取:Spark存取文件系统数据源
我们常见的数据文件包括:csv, json, txt和parquet等。Spark对这些文件的读写都提供了内建的支持。
订阅:
博文评论 (Atom)
没有评论:
发表评论