Purpose of the notebook: The purpose of this notebook is to load and aggregate the stream data every three second (try to approximate the live score).
Input of the notebook: The input data are ingested streamed data.
Output of the notebook: The output of this notebook is the output text, e.g livescore with live pitch.
Some notes::
spark.DataFrame
will always have notation _df
at the end of the name of variablepandas.DataFrame
will always have notation _pd
at the end of the name of variableIn this part, the environment is set. The set up is:
Other config, such as spark
application name, path, where the final delta
table will be saved, etc. are defined in config.yaml
file
from pyspark.sql import SparkSession
from utils import read_config, init_spark_session,plot_pitch,check_throw_corner
import time
from datetime import datetime
import matplotlib.pyplot as plt
import os
from bs4 import BeautifulSoup
config = read_config()
spark_app_name = config['spark_application']['spark_app_stream_name']
file_format = config['streaming']['format']
file_dir = config['streaming']['file_dir']
agg_time = config['streaming_aggregation']['aggregating_time_s']
wait_time = config['streaming_aggregation']['wait_for_agg_s']
meta_data_path = "/home/tomas/Personal_projects/Aston_Villa/data/g1059783_Metadata.xml"
spark = init_spark_session(spark_app_name)
22/07/18 23:20:30 WARN Utils: Your hostname, tomas-Yoga-Slim-7-Pro-14ACH5-O resolves to a loopback address: 127.0.1.1; using 192.168.0.53 instead (on interface wlp1s0) 22/07/18 23:20:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/07/18 23:20:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
with open(meta_data_path,'r') as f:
metadata = f.read()
match_metadata = BeautifulSoup(metadata,'xml')
metadata_match_data = match_metadata.find('match').get('dtDate').split(' ')[0]
field_x = float(match_metadata.find('match').get('fPitchXSizeMeters'))
field_y = float(match_metadata.find('match').get('fPitchYSizeMeters'))
metadata_field_dim = (field_x,field_y)
print(f"Match date: {metadata_match_data}")
print(f"Field dimension: {metadata_field_dim}")
Match date: 2019-10-19 Field dimension: (105.16, 67.97)
now = datetime.now()
delta = 0
init_ball_status = 'Alive'
init_ball_position = False # ball is not in the penalty box at the beginning
init_field_position = True
field_dimen = metadata_field_dim
#fig,ax = plot_pitch(field_dimen)
xy_init = (0,0)
if os.path.isdir(file_dir):
while delta < agg_time:
# ax.plot(float(xy_init[0]), float(xy_init[1]), 'ro')
# fig.show()
time.sleep(wait_time)
df = (
spark.read.format(file_format).load(file_dir)
)
df_pd = df.toPandas()
df_pd = df_pd.sort_values("wallClock")
df_pd = df_pd.tail(1)
alive = df_pd['ballStatus'].values[0]
box_position = df_pd['ballInsideBox'].values[0]
xy_position = df_pd['ballPosition'].values[0]
ts= df_pd['match_timestamp'].values[0]
field_position = df_pd['ballInsideField'].values[0]
team_last = df_pd['ballTeam'].values[0]
if alive != init_ball_status:
text = f'Timestamp: {ts}; Ball change status from {init_ball_status} to {alive}'
print(text)
if box_position != init_ball_position:
if box_position:
text = f'Timestamp: {ts}; Action is coming! Ball is going to inside penalty box!; Position: {xy_position}'
print(text)
else:
text = f'Timestamp: {ts}; There is no more danger! Ball is going outside the penalty box!; Position: {xy_position}'
print(text)
if field_position != init_field_position:
if field_position:
text = f'Timestamp: {ts}; Ball is back on the field!; Position: {xy_position}'
print(text)
else:
pos,adjust_xy_pos = check_throw_corner(field_dimen=field_dimen, ball_pos=xy_position)
team = 'home team' if team_last == 'A' else 'away team'
throw_corner = f'{pos} for {team}'
text = f'Timestamp: {ts}; Ball is outside of the field!; {throw_corner}; Position: {xy_position}'
print(text)
# ax.plot(float(xy_init[0]), float(xy_init[1]), color = 'mediumseagreen', marker='o', mec = 'mediumseagreen')
# fig.show()
then = datetime.now()
delta = (then - now).seconds
init_ball_position = box_position
init_field_position = field_position
init_ball_status = alive
xy_init = xy_position