# Imports from config import CATALOG_NAME, SCHEMA_NAME, USE_UC, USE_VOLUMES # If UC is enabled if USE_UC: # Creating a Catalog (Optional, skip if no-UC) _ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG_NAME};") # Select the Catalog as Default for this Notebook _ = spark.sql(f"USE CATALOG {CATALOG_NAME};") # Grant permissions so that all users can use this accelerator _ = spark.sql(f"GRANT ALL PRIVILEGES ON CATALOG {CATALOG_NAME} TO `account users`;") # Create a Database _ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA_NAME};") # Select the Database as Default _ = spark.sql(f"USE SCHEMA {SCHEMA_NAME};") # If Volumes are enabled if USE_VOLUMES: # Create a Volume _ = spark.sql("CREATE VOLUME IF NOT EXISTS data_store;") # Import the OS system to declare a ENV variable from config import MAIN_STORAGE_PATH, MAIN_DATA_PATH import os # Setting up the storage path (please edit this if you would like to store the data somewhere else) main_storage_path = f"{MAIN_STORAGE_PATH}/data_store" main_data_path = f"{MAIN_DATA_PATH}/data_store" # Declaring as an Environment Variable os.environ["MAIN_STORAGE_PATH"] = main_storage_path %sh # Confirming the variable made it through echo $MAIN_STORAGE_PATH %sh # Create a new folder in storage export AMAZON_REVIEWS_FOLDER=$MAIN_STORAGE_PATH/amazon_reviews mkdir -p $AMAZON_REVIEWS_FOLDER # Create a temporary folder on local disk export TMP_DATA_FOLDER=/local_disk0/tmp_data_save mkdir -p $TMP_DATA_FOLDER # Move to temp folder cd $TMP_DATA_FOLDER # Download the data curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/categoryFiles/Books.json.gz & curl -sO https://datarepo.eng.ucsd.edu/mcauley_group/data/amazon_v2/metaFiles2/meta_Books.json.gz & wait echo Download Complete # Unzip gunzip Books.json.gz & gunzip meta_Books.json.gz & wait echo Unzipping Complete # Copy to Target cp Books.json $AMAZON_REVIEWS_FOLDER/books.json & cp meta_Books.json $AMAZON_REVIEWS_FOLDER/meta_books.json & wait echo Copying Complete # Display whats there du -ah $AMAZON_REVIEWS_FOLDER %sh # Get a count of total reviews echo -e "Reviews Count" wc -l < $MAIN_STORAGE_PATH/amazon_reviews/books.json # Get a count of products (metadata) echo -e "\nMetadata Count" wc -l < $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json %sh # Preview Reviews echo -e "Reviews Example" head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/books.json # Preview Metadata (Books) echo -e "\nMetadata Example" head -n 1 $MAIN_STORAGE_PATH/amazon_reviews/meta_books.json # Imports from pyspark.sql.types import ( StructType, StructField, StringType, FloatType, BooleanType, IntegerType, LongType, ) # Define the reviews JSON schema reviews_schema = StructType( [ StructField("overall", FloatType(), True), StructField("verified", BooleanType(), True), StructField("reviewTime", StringType(), True), StructField("reviewerID", StringType(), True), StructField("asin", StringType(), True), StructField("reviewerName", StringType(), True), StructField("reviewText", StringType(), True), StructField("summary", StringType(), True), StructField("unixReviewTime", LongType(), True), ] ) # Read the JSON file raw_reviews_df = spark.read.json( f"{main_data_path}/amazon_reviews/books.json", mode="DROPMALFORMED", schema=reviews_schema ) # Repartition raw_reviews_df = raw_reviews_df.repartition(128) # Get count print(f"Table row count: {raw_reviews_df.count()}") # Display display(raw_reviews_df.limit(2)) # Imports from pyspark.sql.types import ( StructType, StructField, StringType, ArrayType, BooleanType, ) # Define the books JSON schema books_schema_schema = StructType( [ StructField("category", ArrayType(StringType()), True), StructField("tech1", StringType(), True), StructField("description", ArrayType(StringType()), True), StructField("fit", StringType(), True), StructField("title", StringType(), True), StructField("also_buy", ArrayType(StringType()), True), StructField("tech2", StringType(), True), StructField("brand", StringType(), True), StructField("feature", ArrayType(StringType()), True), StructField("rank", StringType(), True), StructField("also_view", ArrayType(StringType()), True), StructField("main_cat", StringType(), True), StructField("similar_item", StringType(), True), StructField("date", StringType(), True), StructField("price", StringType(), True), StructField("asin", StringType(), True), StructField("imageURL", ArrayType(StringType()), True), StructField("imageURLHighRes", ArrayType(StringType()), True), ] ) # Read the JSON file raw_books_df = spark.read.json( f"{main_data_path}/amazon_reviews/meta_books.json", mode="DROPMALFORMED", schema=books_schema_schema, ) # Get row count print(f"Table row count: {raw_books_df.count()}") # Repartition raw_books_df = raw_books_df.repartition(128) # Display display(raw_books_df.limit(2)) # Join and Create the new df raw_book_reviews_df = raw_books_df.join(raw_reviews_df, how="inner", on=["asin"]) # Partition raw_book_reviews_df = raw_book_reviews_df.repartition(128) # Get a count print(f"DF row count: {raw_book_reviews_df.count()}") # Display the dataframe display(raw_book_reviews_df.limit(2)) # Save Raw Reviews ( raw_reviews_df .write .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("raw_reviews") ) # Save Raw Books ( raw_books_df .write .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("raw_books") ) # Save Book Reviews ( raw_book_reviews_df .write .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("raw_book_reviews") ) # Optimize All _ = spark.sql("OPTIMIZE raw_reviews;") _ = spark.sql("OPTIMIZE raw_books;") _ = spark.sql("OPTIMIZE raw_book_reviews;")