#!/usr/bin/env python # coding: utf-8 # In[1]: # UNCOMMENT THIS IF YOU'RE USING GOOGLE COLAB! #!apt-get install openjdk-8-jdk-headless -qq > /dev/null #!wget -q http://apache.osuosl.org/spark/spark-2.4.1/spark-2.4.1-bin-hadoop2.7.tgz #!tar xf spark-2.4.1-bin-hadoop2.7.tgz #!pip install optimuspyspark # AFTER RUNNING THIS CELL, YOU MUST RESTART THE RUNTIME TO USE UPDATED VERSIONS OF PACKAGES! # In[2]: # UNCOMMENT THIS IF YOU'RE USING GOOGLE COLAB! #import os #os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #os.environ["SPARK_HOME"] = "/content/spark-2.3.3-bin-hadoop2.7" # In[3]: # Run if you clone Optimus and want to hack it from the notebook else you must install get_ipython().run_line_magic('load_ext', 'autoreload') get_ipython().run_line_magic('autoreload', '2') import sys sys.path.append("..") # In[4]: from optimus import Optimus # ### Load optimus local or in a cluster # In[5]: # Create optimus op = Optimus(master="local", app_name= "optimus", verbose = True) # ### Adding external packages # In[ ]: op = Optimus(packages="datastax:spark-cassandra-connector:1.6.1-s_2.10") # ### Get the spark session # In[7]: op.spark # ### Get the spark context # In[8]: op.sc # ### Packages loaded # In[7]: op.packages # ### Create dataframe # In[6]: from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType, ArrayType from datetime import date, datetime df = op.create.df( [ "names", "height(ft)", "function", "rank", "age", "weight(t)", "japanese name", "last position seen", "date arrival", "last date seen", "attributes", "DateType", "Tiemstamp", "Cybertronian", "function(binary)", "NullType", ], [ ("Optim'us", 28, "Leader", 10, 5000000, 4.30, ["Inochi", "Convoy"], "19.442735,-99.201111", "1980/04/10", "2016/09/10", [8.5344, 4300.0], date(2016, 9, 10), datetime(2014, 6, 24), True, bytearray("Leader", "utf-8"), None), ("bumbl#ebéé ", 17, "Espionage", 7, 5000000, 2.0, ["Bumble", "Goldback"], "10.642707,-71.612534", "1980/04/10", "2015/08/10", [5.334, 2000.0], date(2015, 8, 10), datetime(2014, 6, 24), True, bytearray("Espionage", "utf-8"), None), ("ironhide&", 26, "Security", 7, 5000000, 4.0, ["Roadbuster"], "37.789563,-122.400356", "1980/04/10", "2014/07/10", [7.9248, 4000.0], date(2014, 6, 24), datetime(2014, 6, 24), True, bytearray("Security", "utf-8"), None), ("Jazz", 13, "First Lieutenant", 8, 5000000, 1.80, ["Meister"], "33.670666,-117.841553", "1980/04/10", "2013/06/10", [3.9624, 1800.0], date(2013, 6, 24), datetime(2014, 6, 24), True, bytearray("First Lieutenant", "utf-8"), None), ("Megatron", None, "None", 10, 5000000, 5.70, ["Megatron"], None, "1980/04/10", "2012/05/10", [None, 5700.0], date(2012, 5, 10), datetime(2014, 6, 24), True, bytearray("None", "utf-8"), None), ("Metroplex_)^$", 300, "Battle Station", 8, 5000000, None, ["Metroflex"], None, "1980/04/10", "2011/04/10", [91.44, None], date(2011, 4, 10), datetime(2014, 6, 24), True, bytearray("Battle Station", "utf-8"), None), ]).h_repartition(1) df.table() # ## Create Spark dataframe using a Pandas dataframe # In[ ]: import pandas as pd pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'}, 'B': {0: 1, 1: 3, 2: 5}, 'C': {0: 2, 1: 4, 2: 6}}) sdf = op.create.df(pdf=pdf) # ## concat dataframes # # Concat dataframes two or multiple dataframes in a row or column way # In[ ]: op.append([df,df], like="rows").table() # ## IO Operations # ### Load from file # In[20]: df_csv =op.load.csv("data/foo.csv").table(5) # In[14]: df_csv =op.load.tsv("data/foo.tsv").table(5) # In[21]: df_json =op.load.json("data/foo.json").table(5) # In[22]: df_parquet =op.load.parquet("data/foo.parquet").table(5) # df_avro =op.load.avro("data/foo.avro").table(5) # In[15]: df_avro =op.load.excel("data/titanic3.xls").table(5) # ### Load from URL # In[16]: df_csv =op.load.csv("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.csv") df_csv.table() # In[17]: df_json =op.load.json("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.json") df_json.table() # In[25]: df_parquet =op.load.parquet("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.parquet") df_parquet.table() # df_avro =op.load.avro("https://raw.githubusercontent.com/ironmussa/Optimus/master/examples/data/foo.avro", "avro") # df_avro.table() # ### Save to file # In[ ]: df_csv.save.csv("test.csv") # In[ ]: df_csv.save.json("test.json") # In[ ]: df_csv.save.parquet("test.parquet") # df_csv.save.avro("test.avro")