Data Manipulation
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basics").getOrCreate()
df = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/people.json")
df.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
df.printSchema()
root |-- age: long (nullable = true) |-- name: string (nullable = true)
df.columns
Out[6]: ['age', 'name']
df.describe()
Out[7]: DataFrame[summary: string, age: string, name: string]
# Some data types make it easier to infer schema
# We need to set the schema by using a specific structure
from pyspark.sql.types import StructField,StringType,IntegerType,StructType
# we need to create the list of Structure fields
# * :param name: string, name of the field.
# * :param dataType: :class:`DataType` of the field.
# * :param nullable: boolean, whether the field can be null (None) or not.
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)]
final_struc = StructType(fields=data_schema)
df = spark.read.json("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/people.json", schema=final_struc)
df.printSchema()
root |-- age: integer (nullable = true) |-- name: string (nullable = true)
df['age']
Out[15]: Column<'age'>
type(df['age'])
Out[16]: pyspark.sql.column.Column
df.select('age')
Out[17]: DataFrame[age: int]
type(df.select('age'))
Out[18]: pyspark.sql.dataframe.DataFrame
df.select('age').show()
+----+ | age| +----+ |null| | 30| | 19| +----+
df.head(2)
Out[20]: [Row(age=None, name='Michael'), Row(age=30, name='Andy')]
df.select(['age','name'])
Out[21]: DataFrame[age: int, name: string]
df.select(['age','name']).show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
df.withColumn('newage',df['age']).show()
+----+-------+------+ | age| name|newage| +----+-------+------+ |null|Michael| null| | 30| Andy| 30| | 19| Justin| 19| +----+-------+------+
df.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
df.withColumnRenamed('age','supernewage').show()
+-----------+-------+ |supernewage| name| +-----------+-------+ | null|Michael| | 30| Andy| | 19| Justin| +-----------+-------+
df.withColumn('doubleage',df['age']*2).show()
+----+-------+---------+ | age| name|doubleage| +----+-------+---------+ |null|Michael| null| | 30| Andy| 60| | 19| Justin| 38| +----+-------+---------+
df.withColumn('half_age',df['age']/2).show()
+----+-------+--------+ | age| name|half_age| +----+-------+--------+ |null|Michael| null| | 30| Andy| 15.0| | 19| Justin| 9.5| +----+-------+--------+
df.withColumn('half_age',df['age']/2)
Out[28]: DataFrame[age: int, name: string, half_age: double]
SQL Query
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
sql_results = spark.sql("SELECT * FROM people")
sql_results
Out[33]: DataFrame[age: int, name: string]
sql_results.show()
+----+-------+ | age| name| +----+-------+ |null|Michael| | 30| Andy| | 19| Justin| +----+-------+
spark.sql("SELECT * FROM people WHERE age=30").show()
+---+----+ |age|name| +---+----+ | 30|Andy| +---+----+