#!/usr/bin/env python # coding: utf-8 # # DataFrame cơ bản # Tập dữ liệu phân tán biểu diễn dưới dạng dòng và cột như CSDL. # # SQLContext: Tạo DataFrame # ## Từ danh sách tuples # In[1]: l = [("Alice", 1)] print sqlContext.createDataFrame(l).collect() print sqlContext.createDataFrame(l, ["name", "age"]).collect() # ## Từ RDDs # In[2]: rdd = sc.parallelize(l) sqlContext.createDataFrame(rdd).collect() # In[3]: df = sqlContext.createDataFrame(rdd, ["name", "age"]) df.collect() # In[4]: print sqlContext.createDataFrame(rdd, "a: string, b: int").collect() rdd = sc.parallelize(l) rdd = rdd.map(lambda row: row[1]) print sqlContext.createDataFrame(rdd, "int").collect() # ## Từ Row # In[5]: from pyspark.sql import Row rdd = sc.parallelize([("Alice", 12)]) Person = Row("name", "age") person = rdd.map(lambda r: Person(*r)) df2 = sqlContext.createDataFrame(person) df2.collect() # ## Từ Schema # In[6]: from pyspark.sql.types import * schema = StructType([ StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) df3 = sqlContext.createDataFrame(rdd, schema) df3.collect() # ## Từ pandas # In[7]: import pandas print sqlContext.createDataFrame(df.toPandas()).collect() print sqlContext.createDataFrame(pandas.DataFrame([[1, 2]])).collect() # # Chuyển đổi định dạng # In[8]: df.toDF("f1", "f2").collect() # In[9]: df.toJSON().first() # In[10]: df.toPandas() # ## Tạo temp table # In[11]: sqlContext.registerDataFrameAsTable(df, "table1") sqlContext.registerDataFrameAsTable(df2, "table2") print sqlContext.tableNames() # In[12]: df3 = sqlContext.tables() print df3 print df3.filter("tableName = 'table1'").first() # In[13]: sqlContext.dropTempTable("table1") sqlContext.dropTempTable("table2") # ## Tạo hàm UDF: User Defined Function # In[14]: sqlContext.registerFunction("stringLengthString", lambda x: len(x)) sqlContext.sql("SELECT stringLengthString('test')").collect() # In[15]: from pyspark.sql.types import IntegerType sqlContext.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) sqlContext.sql("SELECT stringLengthInt('test')").collect() # In[16]: sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) sqlContext.sql("SELECT stringLengthInt('test')").collect() # # Thao tác với DataFrame # In[17]: l = [("Alice", 2, 12), ("Bob", 5, 25)] rdd = sc.parallelize(l) df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int") df.collect() df.createTempView("people") df2 = sqlContext.sql("select * from people") # In[18]: df.repartition(10).rdd.getNumPartitions() # In[19]: data = df.union(df).repartition("age") data.show() # In[20]: data = data.repartition(7, "age") data.show() # In[21]: data.rdd.getNumPartitions() # In[22]: data = data.repartition("name", "age") data.show() # In[23]: # withColumn(colName, col) # Returns a new DataFrame by adding a column or replacing the existing column that has the same name. df.withColumn("age2", df.age + 2).collect() # In[24]: df.withColumnRenamed("age", "age2").collect() # In[25]: print df.select(df.age.cast("string").alias("ages")).collect() print df.select(df.age.cast(StringType()).alias("ages")).collect() # ## Tổng hợp dữ liệu # In[26]: df.agg({"age": "max"}).collect() # In[27]: from pyspark.sql import functions as F df.agg(F.min(df.age)).collect() # In[28]: gdf = df.groupBy(df.name) sorted(gdf.agg({"*": "count"}).collect()) # In[29]: from pyspark.sql import functions as F sorted(gdf.agg(F.min(df.age)).collect()) # ## Alias # In[30]: from pyspark.sql.functions import * df_as1 = df.alias("df_as1") df_as2 = df.alias("df_as2") joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), "inner") joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() # ## Xem thống kê # In[31]: df.printSchema() # In[32]: df.schema # In[33]: df.storageLevel # In[34]: df.count() # In[35]: print df.groupBy().sum("age").collect() print df.groupBy().sum("age", "height").collect() # In[36]: df.groupBy().avg("age").collect() # In[37]: df.groupBy().avg("age", "height").collect() # In[38]: df.columns # In[39]: print df.name print df["name"] print df.age + 1 # In[40]: # cube(*col): Create a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregation on them. df.cube("name", df.age).count().orderBy("name", "age").show() # In[41]: df.describe(["age"]).show() # In[42]: df.describe().show() # In[43]: df.distinct().count() # In[44]: df.dtypes # In[45]: df.explain() # In[46]: df.explain(True) # In[47]: df.groupBy().avg().collect() # In[48]: df.groupBy("name").agg({"age": "mean"}).collect() # In[49]: df.groupBy(df.name).avg().collect() # In[50]: df.groupBy(["name", df.age]).count().collect() # In[51]: print df.groupBy().max("age").collect() print df.groupBy().max("age", "height").collect() # In[52]: print df.groupBy().mean("age").collect() print df.groupBy().mean("age", "height").collect() # In[53]: from pyspark.sql import Row df = sc.parallelize([ Row(name="Alice", age=5, height=80), Row(name="Alice", age=5, height=80), Row(name="Alice", age=10, height=80) ]).toDF() df.dropDuplicates().show() # In[54]: df.dropDuplicates(["name", "height"]).show() # ## Join # In[55]: print df.select("age", "name").collect() print df2.select("name", "height").collect() df.crossJoin(df2.select("height")).select("age", "name", df2.height).collect() # In[56]: df.drop("age").collect() # In[57]: df.drop(df.age).collect() # In[58]: df.join(df2, df.name == df2.name, "inner").drop(df.name).drop(df.age).collect() # In[59]: df.join(df2, "name", "inner").drop("age", "height").collect() # In[60]: df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() # In[61]: df.join(df2, 'name', 'outer').select('name', df.height).collect() # In[62]: cond = [df.name == df2.name, df.age == df2.age] df.join(df2, cond, 'outer').select(df.name, df2.age).collect() # In[63]: df.join(df2, 'name').select(df.name, df2.height).collect() # In[64]: df.join(df2, ['name', 'age']).select(df.name, df.age).collect() # ## Filter # In[65]: l = [("Alice", 2, 12), ("Bob", 5, 25)] rdd = sc.parallelize(l) df = sqlContext.createDataFrame(rdd, "name: string, age: int, height: int") print df.filter(df.age > 3).collect() print df.filter("age > 3").collect() print df.where("age=2").collect() # In[66]: df.first() # In[67]: df.head() # In[68]: print df.limit(1).collect() print df.limit(0).collect() # In[69]: # orderBy print df.sort(df.age.desc()).collect() print df.sort("age", ascending=False).collect() print df.orderBy(df.age.desc()).collect() from pyspark.sql.functions import * print df.sort(asc("age")).collect() print df.sort(desc("age"), "name").collect() print df.orderBy(["age", "name"], ascending=[0, 1]).collect() # In[70]: print df.filter(df.name.endswith("ice")).collect() df.filter(df.name.endswith("ice$")).collect() # In[71]: # get subfield RDD > RDD, gets a field by name in a StructField. from pyspark.sql import Row df1 = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF() df1.select(df1.r.getField("b")).show() df1.select(df1.r.getField("a")).show() # In[72]: # RDD contains list and dictionary df1 = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"]) df1.select(df1.l.getItem(0), df1.d.getItem("key")).show() df1.select(df1.l[0], df1.d["key"]).show() # In[73]: from pyspark.sql import Row df1 = sc.parallelize([Row(name=u"Tom", height=80), Row(name=u"Alice", height=None)]).toDF() print df1.filter(df1.height.isNotNull()).collect() print df1.filter(df1.height.isNull()).collect() # In[74]: print df[df.name.isin("Bob", "Mike")].collect() print df[df.age.isin(1, 2, 3)].collect() # In[75]: df.filter(df.name.like("Al%")).collect() # In[76]: from pyspark.sql import functions as F df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show() # # Làm việc với Sample # In[77]: df.na.replace(["Alice", "Bob"], ["A", "B"], "name").show() # In[78]: df.rollup("name", df.age).count().orderBy("name", "age").show() # In[79]: # sample(withReplacement, fraction, seed=None) df.sample(False, 0.5, 42).count() # In[80]: # sampleBy(col, fractions, seed=None) dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key")) sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) sampled.groupBy("key").count().orderBy("key").show() # In[81]: df.selectExpr("age * 2", "abs(age)").collect() # In[82]: # show(n=20, truncate=True) # truncate – If set to True, truncate strings longer than 20 chars by default. If set to a number greater than one, truncates long strings to length truncate and align cells right. df.show(truncate=3) # # Làm việc với Row # In[83]: row = Row(name="Alice", age=11) print row print row["name"], row["age"] print row.name, row.age print "name" in row print "wrong_key" in row # In[84]: # Row also can be used to create another Row like class, then it could be used to create Row objects Person = Row("name", "age") print Person print Person("Alice", 11) # In[85]: # asDict(recursive=False) print Row(name="Alice", age=11).asDict() row = Row(key=1, value=Row(name="a", age=2)) print row.asDict() print row.asDict(True)