dataset_name = "vessel_fishsoop_realtime_qc"
# only run once, then restart session if needed
!pip install uv
import os
def is_colab():
try:
import google.colab
return True
except ImportError:
return False
if is_colab():
os.system('uv pip install --system -r https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt')
else:
os.system('uv venv')
os.system('uv pip install -r https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt')
Requirement already satisfied: uv in /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages (0.2.31)
Using Python 3.12.4 interpreter at: /home/lbesnard/miniforge3/envs/AodnCloudOptimised/bin/python3 Creating virtualenv at: .venv Activate with: source .venv/bin/activate Audited 127 packages in 110ms
import requests
import os
if not os.path.exists('parquet_queries.py'):
print('Downloading parquet_queries.py')
url = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/aodn_cloud_optimised/lib/ParquetDataQuery.py'
response = requests.get(url)
with open('parquet_queries.py', 'w') as f:
f.write(response.text)
from parquet_queries import create_time_filter, create_bbox_filter, plot_spatial_extent, get_temporal_extent, get_schema_metadata
import pyarrow.parquet as pq
import pyarrow.dataset as pds
import pandas as pd
BUCKET_OPTIMISED_DEFAULT="aodn-cloud-optimised"
dname = f"s3://anonymous@{BUCKET_OPTIMISED_DEFAULT}/{dataset_name}.parquet/"
parquet_ds = pq.ParquetDataset(dname,partitioning='hive')
Partitioning in Parquet involves organising data files based on the values of one or more columns, known as partition keys. When data is written to Parquet files with partitioning enabled, the files are physically stored in a directory structure that reflects the partition keys. This directory structure makes it easier to retrieve and process specific subsets of data based on the partition keys.
dataset = pds.dataset(dname, format="parquet", partitioning="hive")
partition_keys = dataset.partitioning.schema
print(partition_keys)
timestamp: int32 polygon: string
In this section, we're plotting the polygons where data exists. This helps then with creating a bounding box where there is data
plot_spatial_extent(parquet_ds)
Similary to the spatial extent, we're retrieving the minimum and maximum timestamp partition values of the dataset. This is not necessarely accurately representative of the TIME values, as the timestamp partition can be yearly/monthly... but is here to give an idea
get_temporal_extent(parquet_ds)
(datetime.datetime(2021, 11, 1, 11, 0), datetime.datetime(2024, 5, 1, 10, 0))
For all parquet dataset, we create a sidecar file in the root of the dataset named _common_matadata. This contains the variable attributes.
# parquet_meta = pa.parquet.read_schema(os.path.join(dname + '_common_metadata')) # parquet metadata
metadata = get_schema_metadata(dname) # schema metadata
metadata
{'TRAJECTORY_ID': {'type': 'double', 'long_name': 'trajectory identifier', 'cf_role': 'trajectory_id'}, 'TIME': {'type': 'timestamp[ns]', 'standard_name': 'time', 'long_name': 'time', 'axis': 'T', 'valid_min': 21915.0, 'valid_max': 90000.0, 'comment': 'The valid_min is set to January 1st, 2010, dates before this time are flagged by the quality control impossible date test'}, 'LATITUDE': {'type': 'float', 'ancillary_variables': 'LATITUDE_quality_control', 'standard_name': 'latitude', 'long_name': 'latitude', 'units': 'degrees_north', 'valid_min': -90.0, 'valid_max': 90.0, 'axis': 'Y', 'reference_datum': 'WGS84 geographic coordinate system'}, 'LATITUDE_quality_control': {'type': 'int32', 'long_name': 'quality flag for latitude', 'standard_name': 'latitude status_flag', 'quality_control_conventions': 'IMOS standard flags', 'flag_values': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'flag_meanings': 'No_QC_performed Good_data Probably_good_data Bad_data_that_are_potentially_correctable Bad_data Value_changed Not_used Not_used Not_used Missing_value'}, 'LONGITUDE': {'type': 'float', 'ancillary_variables': 'LONGITUDE_quality_control', 'standard_name': 'longitude', 'long_name': 'longitude', 'units': 'degrees_east', 'valid_min': -180.0, 'valid_max': 180.0, 'axis': 'X', 'reference_datum': 'WGS84 geographic coordinate system'}, 'LONGITUDE_quality_control': {'type': 'int32', 'long_name': 'quality flag for longitude', 'standard_name': 'longitude status_flag', 'quality_control_conventions': 'IMOS standard flags', 'flag_values': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'flag_meanings': 'No_QC_performed Good_data Probably_good_data Bad_data_that_are_potentially_correctable Bad_data Value_changed Not_used Not_used Not_used Missing_value'}, 'DEPTH': {'type': 'float', 'ancillary_variables': 'DEPTH_quality_control', 'standard_name': 'depth', 'long_name': 'depth', 'units': 'm', 'positive': 'down', 'valid_min': -5.0, 'valid_max': 3000.0, 'axis': 'Z', 'comment': 'Depth computed using the Gibbs-Seawater toolbox (TEOS10) v3.06 from the mean latitude in the data (or a default of -33 if no latitude was recorded) and pressure measurements from the Moana', 'reference_datum': 'sea surface'}, 'DEPTH_quality_control': {'type': 'int32', 'long_name': 'quality flag for depth', 'standard_name': 'depth status_flag', 'quality_control_conventions': 'IMOS standard flags', 'flag_values': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'flag_meanings': 'No_QC_performed Good_data Probably_good_data Bad_data_that_are_potentially_correctable Bad_data Value_changed Not_used Not_used Not_used Missing_value'}, 'TEMPERATURE': {'type': 'float', 'ancillary_variables': 'TEMPERATURE_quality_control', 'standard_name': 'sea_water_temperature', 'long_name': 'sea_water_temperature', 'units': 'degrees_Celsius', 'valid_min': -2.0, 'valid_max': 40.0, 'observation_type': 'measured'}, 'TEMPERATURE_quality_control': {'type': 'int32', 'long_name': 'quality flag for sea_water_temperature', 'standard_name': 'sea_water_temperature status_flag', 'quality_control_conventions': 'IMOS standard flags', 'flag_values': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'flag_meanings': 'No_QC_performed Good_data Probably_good_data Bad_data_that_are_potentially_correctable Bad_data Value_changed Not_used Not_used Not_used Missing_value'}, 'PRES': {'type': 'float', 'standard_name': 'sea_water_pressure', 'long_name': 'sea_water_pressure', 'units': 'dbar', 'valid_min': -5.0, 'valid_max': 3000.0, 'comment': 'Measure from the Moana', 'ancillary_variables': 'PRES_quality_control'}, 'PRES_quality_control': {'type': 'int32', 'long_name': 'quality flag for sea_water_pressure', 'standard_name': 'sea_water_pressure status_flag', 'quality_control_conventions': 'IMOS standard flags', 'flag_values': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'flag_meanings': 'No_QC_performed Good_data Probably_good_data Bad_data_that_are_potentially_correctable Bad_data Value_changed Not_used Not_used Not_used Missing_value'}, 'timestamp': {'type': 'int64'}, 'polygon': {'type': 'string'}, 'filename': {'type': 'string'}, 'dataset_metadata': {'metadata_uuid': 'bdb84466-dc53-49ad-a60f-83d9fa0baed5', 'title': '', 'principal_investigator': '', 'principal_investigator_email': '', 'featureType': ''}}
filter_time = create_time_filter(parquet_ds, date_start='2021-01-31 10:14:00', date_end='2023-09-01 07:50:00')
filter_geo = create_bbox_filter(parquet_ds, lat_min=-40, lat_max=-32, lon_min=130, lon_max=150)
filter = filter_geo & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 67404 entries, 0 to 67403 Data columns (total 15 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 TRAJECTORY_ID 67404 non-null float64 1 TIME 67404 non-null datetime64[ns] 2 LATITUDE 67404 non-null float32 3 LATITUDE_quality_control 67404 non-null int32 4 LONGITUDE 67404 non-null float32 5 LONGITUDE_quality_control 67404 non-null int32 6 DEPTH 67404 non-null float32 7 DEPTH_quality_control 67404 non-null int32 8 TEMPERATURE 67404 non-null float32 9 TEMPERATURE_quality_control 67404 non-null int32 10 PRES 67404 non-null float32 11 PRES_quality_control 67404 non-null int32 12 filename 67404 non-null object 13 timestamp 67404 non-null category 14 polygon 67404 non-null category dtypes: category(2), datetime64[ns](1), float32(5), float64(1), int32(5), object(1) memory usage: 4.2+ MB CPU times: user 5.8 s, sys: 665 ms, total: 6.46 s Wall time: 39.2 s
df[df['TEMPERATURE_quality_control'] == 1].sort_values('TIME').plot.scatter(x='LATITUDE', y='DEPTH', c='TEMPERATURE',
xlabel=metadata['LATITUDE']['standard_name'],
ylabel=metadata['DEPTH']['standard_name'],
cmap='RdYlBu_r', marker='.', linestyle="None")
<Axes: xlabel='latitude', ylabel='depth'>
df[df['TEMPERATURE_quality_control'] == 1].sort_values('TIME').plot.scatter(x='LONGITUDE', y='DEPTH', c='TEMPERATURE',
xlabel=metadata['LONGITUDE']['standard_name'],
ylabel=metadata['DEPTH']['standard_name'],
cmap='RdYlBu_r', marker='.', linestyle="None")
<Axes: xlabel='longitude', ylabel='depth'>
import matplotlib.pyplot as plt
filtered_df = df[df['TEMPERATURE_quality_control'] == 1].sort_values('TIME')
latitude = filtered_df['LATITUDE']
longitude = filtered_df['LONGITUDE']
depth = filtered_df['DEPTH']
temperature = filtered_df['TEMPERATURE']
# Create a 3D scatter plot
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
sc = ax.scatter(latitude, longitude, depth, c=temperature, cmap='RdYlBu_r', marker='.')
# Add labels
ax.set_xlabel(metadata['LATITUDE']['standard_name'])
ax.set_ylabel(metadata['LONGITUDE']['standard_name'])
ax.set_zlabel(metadata['DEPTH']['standard_name'])
cbar = plt.colorbar(sc)
cbar.set_label('TEMPERATURE')
plt.show()