#!/usr/bin/env python # coding: utf-8 # # Aggregate the stream data # # **Purpose of the notebook:** The purpose of this notebook is to load and aggregate the stream data every three second (try to approximate the live score). # # **Input of the notebook:** The input data are ingested streamed data. # # **Output of the notebook:** The output of this notebook is the output text, e.g livescore with live pitch. # # **Some notes:**: # * The `spark.DataFrame` will always have notation `_df` at the end of the name of variable # * the `pandas.DataFrame` will always have notation `_pd` at the end of the name of variable # ## Set the environment # # In this part, the environment is set. The set up is: # # * Loading the necessary python modules and helper functions # * Setting the path to data and metadata # * Initialize the spark session # # Other config, such as `spark` application name, path, where the final `delta` table will be saved, etc. are defined in `config.yaml` file # ### Import modules # In[1]: from pyspark.sql import SparkSession from utils import read_config, init_spark_session,plot_pitch,check_throw_corner import time from datetime import datetime import matplotlib.pyplot as plt import os from bs4 import BeautifulSoup # ### Read config # In[2]: config = read_config() # ### Set env variables # In[3]: spark_app_name = config['spark_application']['spark_app_stream_name'] file_format = config['streaming']['format'] file_dir = config['streaming']['file_dir'] agg_time = config['streaming_aggregation']['aggregating_time_s'] wait_time = config['streaming_aggregation']['wait_for_agg_s'] meta_data_path = "/home/tomas/Personal_projects/Aston_Villa/data/g1059783_Metadata.xml" # ### Init spark session # In[4]: spark = init_spark_session(spark_app_name) # ### Read the metadata # In[5]: with open(meta_data_path,'r') as f: metadata = f.read() match_metadata = BeautifulSoup(metadata,'xml') metadata_match_data = match_metadata.find('match').get('dtDate').split(' ')[0] field_x = float(match_metadata.find('match').get('fPitchXSizeMeters')) field_y = float(match_metadata.find('match').get('fPitchYSizeMeters')) metadata_field_dim = (field_x,field_y) print(f"Match date: {metadata_match_data}") print(f"Field dimension: {metadata_field_dim}") # ## Aggregate stream data # In[ ]: now = datetime.now() delta = 0 init_ball_status = 'Alive' init_ball_position = False # ball is not in the penalty box at the beginning init_field_position = True field_dimen = metadata_field_dim #fig,ax = plot_pitch(field_dimen) xy_init = (0,0) if os.path.isdir(file_dir): while delta < agg_time: # ax.plot(float(xy_init[0]), float(xy_init[1]), 'ro') # fig.show() time.sleep(wait_time) df = ( spark.read.format(file_format).load(file_dir) ) df_pd = df.toPandas() df_pd = df_pd.sort_values("wallClock") df_pd = df_pd.tail(1) alive = df_pd['ballStatus'].values[0] box_position = df_pd['ballInsideBox'].values[0] xy_position = df_pd['ballPosition'].values[0] ts= df_pd['match_timestamp'].values[0] field_position = df_pd['ballInsideField'].values[0] team_last = df_pd['ballTeam'].values[0] if alive != init_ball_status: text = f'Timestamp: {ts}; Ball change status from {init_ball_status} to {alive}' print(text) if box_position != init_ball_position: if box_position: text = f'Timestamp: {ts}; Action is coming! Ball is going to inside penalty box!; Position: {xy_position}' print(text) else: text = f'Timestamp: {ts}; There is no more danger! Ball is going outside the penalty box!; Position: {xy_position}' print(text) if field_position != init_field_position: if field_position: text = f'Timestamp: {ts}; Ball is back on the field!; Position: {xy_position}' print(text) else: pos,adjust_xy_pos = check_throw_corner(field_dimen=field_dimen, ball_pos=xy_position) team = 'home team' if team_last == 'A' else 'away team' throw_corner = f'{pos} for {team}' text = f'Timestamp: {ts}; Ball is outside of the field!; {throw_corner}; Position: {xy_position}' print(text) # ax.plot(float(xy_init[0]), float(xy_init[1]), color = 'mediumseagreen', marker='o', mec = 'mediumseagreen') # fig.show() then = datetime.now() delta = (then - now).seconds init_ball_position = box_position init_field_position = field_position init_ball_status = alive xy_init = xy_position