dataset_name = "vessel_trv_realtime_qc"
# only run once, then restart session if needed
!pip install uv
import os
def is_colab():
try:
import google.colab
return True
except ImportError:
return False
if is_colab():
os.system('uv pip install --system -r https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt')
else:
os.system('uv venv')
os.system('uv pip install -r https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt')
Requirement already satisfied: uv in /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages (0.2.31)
Using Python 3.12.4 interpreter at: /home/lbesnard/miniforge3/envs/AodnCloudOptimised/bin/python3 Creating virtualenv at: .venv Activate with: source .venv/bin/activate Audited 127 packages in 87ms
import requests
import os
if not os.path.exists('parquet_queries.py'):
print('Downloading parquet_queries.py')
url = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/aodn_cloud_optimised/lib/ParquetDataQuery.py'
response = requests.get(url)
with open('parquet_queries.py', 'w') as f:
f.write(response.text)
from parquet_queries import create_time_filter, create_bbox_filter, query_unique_value, plot_spatial_extent, \
get_temporal_extent, get_schema_metadata
import pyarrow.parquet as pq
import pyarrow.dataset as pds
import pyarrow as pa
import pandas as pd
import pyarrow.compute as pc
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
import numpy as np
BUCKET_OPTIMISED_DEFAULT="aodn-cloud-optimised"
dname = f"s3://anonymous@{BUCKET_OPTIMISED_DEFAULT}/{dataset_name}.parquet/"
parquet_ds = pq.ParquetDataset(dname,partitioning='hive')
Partitioning in Parquet involves organising data files based on the values of one or more columns, known as partition keys. When data is written to Parquet files with partitioning enabled, the files are physically stored in a directory structure that reflects the partition keys. This directory structure makes it easier to retrieve and process specific subsets of data based on the partition keys.
dataset = pds.dataset(dname, format="parquet", partitioning="hive")
partition_keys = dataset.partitioning.schema
print(partition_keys)
platform_code: string timestamp: int32 polygon: string
%%time
unique_partition_value = query_unique_value(parquet_ds, 'platform_code')
print(list(unique_partition_value)[0:2]) # showing a subset only
['VMQ9273', 'VNCF'] CPU times: user 7.9 ms, sys: 1.74 ms, total: 9.64 ms Wall time: 8.98 ms
In this section, we're plotting the polygons where data exists. This helps then with creating a bounding box where there is data
plot_spatial_extent(parquet_ds)
Similary to the spatial extent, we're retrieving the minimum and maximum timestamp partition values of the dataset. This is not necessarely accurately representative of the TIME values, as the timestamp partition can be yearly/monthly... but is here to give an idea
get_temporal_extent(parquet_ds)
(datetime.datetime(2008, 9, 1, 10, 0), datetime.datetime(2022, 10, 1, 10, 0))
For all parquet dataset, we create a sidecar file in the root of the dataset named _common_matadata. This contains the variable attributes.
# parquet_meta = pa.parquet.read_schema(os.path.join(dname + '_common_metadata')) # parquet metadata
metadata = get_schema_metadata(dname) # schema metadata
metadata
{'platform_code': {'type': 'string'}, 'timestamp': {'type': 'int64'}, 'polygon': {'type': 'string'}, 'filename': {'type': 'string'}, 'LATITUDE_quality_control': {'type': 'double', 'long_name': 'LATITUDE quality control', 'standard_name': 'latitude status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'LATITUDE': {'type': 'double', 'long_name': 'latitude', 'standard_name': 'latitude', 'units': 'degrees_north', 'axis': 'Y', 'valid_min': -90.0, 'valid_max': 90.0, 'reference_datum': 'geographical coordinates, WGS84 projection', 'ancillary_variables': 'LATITUDE_quality_control'}, 'LONGITUDE_quality_control': {'type': 'double', 'long_name': 'LONGITUDE quality control', 'standard_name': 'longitude status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'LONGITUDE': {'type': 'double', 'long_name': 'longitude', 'standard_name': 'longitude', 'units': 'degrees_east', 'axis': 'X', 'valid_min': -180.0, 'valid_max': 180.0, 'reference_datum': 'geographical coordinates, WGS84 projection', 'ancillary_variables': 'LONGITUDE_quality_control'}, 'DEPTH': {'type': 'double', 'long_name': 'depth', 'standard_name': 'depth', 'units': 'metres', 'positive': 'down', 'axis': 'Z', 'reference_datum': 'sea surface', 'valid_max': 30.0, 'valid_min': -10.0}, 'CPHL': {'type': 'double', 'long_name': 'mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_water_concentration_of_chlorophyll_in_sea_water', 'units': 'micrograms per Litre', 'valid_min': 0.0, 'valid_max': 3.0, 'ancillary_variables': 'CPHL_quality_control', 'sensor_depth': 1.9}, 'CPHL_quality_control': {'type': 'double', 'long_name': 'mass_concentration_of_inferred_chlorophyll_from_relative_fluorescence_units_in_sea_waterconcentration_of_chlorophyll_in_sea_water status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'TURB': {'type': 'float', 'long_name': 'sea_water_turbidity', 'standard_name': 'sea_water_turbidity', 'units': '1', 'valid_min': 0.0, 'valid_max': 1000.0, 'ancillary_variables': 'TURB_quality_control', 'sensor_depth': 1.9}, 'TURB_quality_control': {'type': 'float', 'long_name': 'sea_water_turbidity quality control', 'standard_name': 'sea_water_turbidity status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'TEMP': {'type': 'float', 'long_name': 'seawater_intake_temperature', 'standard_name': 'sea_water_temperature', 'units': 'Celsius', 'valid_min': 15.0, 'valid_max': 32.0, 'ancillary_variables': 'TEMP_quality_control', 'sensor_depth': 1.9}, 'TEMP_quality_control': {'type': 'float', 'long_name': 'seawater_intake_temperature quality control', 'standard_name': 'sea_water_temperature status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'TIME': {'type': 'timestamp[ns]', 'long_name': 'time', 'standard_name': 'time', 'observation_type': 'measured', 'axis': 'T', 'valid_min': 0.0, 'valid_max': 9999999999.0}, 'PSAL': {'type': 'float', 'long_name': 'sea_water_salinity', 'standard_name': 'sea_water_salinity', 'units': '1e-3', 'valid_min': 10.0, 'valid_max': 36.0, 'ancillary_variables': 'PSAL_quality_control', 'sensor_depth': 1.9}, 'PSAL_quality_control': {'type': 'float', 'long_name': 'sea_water_salinity quality control', 'standard_name': 'sea_water_salinity status_flag', 'quality_control_conventions': 'IMOS standard set using the IODE flags', 'valid_min': 0.0, 'valid_max': 9.0, 'flag_values': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0], 'flag_meanings': 'no_qc_performed good_data probably_good_data bad_data_that_are_potentially_correctable bad_data value_changed not_used not_used interpolated_values missing_values'}, 'dataset_metadata': {'metadata_uuid': '8af21108-c535-43bf-8dab-c1f45a26088c', 'title': 'FILL UP MANUALLY - CHECK DOCUMENTATION'}}
filter_time = create_time_filter(parquet_ds, date_start='2010-01-01 10:14:00', date_end='2010-03-01 07:50:00')
filter_geo = create_bbox_filter(parquet_ds, lat_min=-23, lat_max=-20, lon_min=150, lon_max=155)
filter = filter_geo & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 800906 entries, 0 to 800905 Data columns (total 18 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 filename 800906 non-null object 1 LATITUDE_quality_control 800906 non-null float64 2 LATITUDE 800906 non-null float64 3 LONGITUDE_quality_control 800906 non-null float64 4 LONGITUDE 800906 non-null float64 5 DEPTH 800906 non-null float64 6 CPHL 211652 non-null float64 7 CPHL_quality_control 211652 non-null float64 8 TIME 800906 non-null datetime64[ns] 9 TURB 210179 non-null float32 10 TURB_quality_control 210179 non-null float32 11 TEMP 212770 non-null float32 12 TEMP_quality_control 212770 non-null float32 13 PSAL 166305 non-null float32 14 PSAL_quality_control 166305 non-null float32 15 platform_code 800906 non-null category 16 timestamp 800906 non-null category 17 polygon 800906 non-null category dtypes: category(3), datetime64[ns](1), float32(6), float64(7), object(1) memory usage: 76.4+ MB CPU times: user 799 ms, sys: 319 ms, total: 1.12 s Wall time: 5.86 s
df.plot.scatter(x="LONGITUDE", y='LATITUDE', c="TEMP", cmap='RdYlBu_r')
<Axes: xlabel='LONGITUDE', ylabel='LATITUDE'>
import cartopy.crs as ccrs
from matplotlib.colors import Normalize
norm = Normalize(vmin=df['TEMP'].min(), vmax=df['TEMP'].max())
fig, ax = plt.subplots(subplot_kw={'projection': ccrs.PlateCarree()})
sc = ax.scatter(df["LONGITUDE"], df["LATITUDE"], c=df["TEMP"], cmap='RdYlBu_r', norm=norm, transform=ccrs.PlateCarree())
# Add a coastline
ax.coastlines()
cbar = plt.colorbar(sc, ax=ax, orientation='vertical', fraction=0.046, pad=0.04)
cbar.set_label('Temperature')
ax.gridlines(draw_labels=True)
ax.set_title('Scatter plot with Coastline')
plt.show()
/home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/cartopy/io/__init__.py:241: DownloadWarning: Downloading: https://naturalearth.s3.amazonaws.com/10m_physical/ne_10m_coastline.zip warnings.warn(f'Downloading: {url}', DownloadWarning)
filter_time = create_time_filter(parquet_ds, date_start='2010-01-31 10:14:00', date_end='2010-02-01 07:50:00')
expr_1 = pc.field('platform_code') == pa.scalar("VNCF")
filter = expr_1 & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 11038 entries, 0 to 11037 Data columns (total 18 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 filename 11038 non-null object 1 LATITUDE_quality_control 11038 non-null float64 2 LATITUDE 11038 non-null float64 3 LONGITUDE_quality_control 11038 non-null float64 4 LONGITUDE 11038 non-null float64 5 DEPTH 11038 non-null float64 6 CPHL 3931 non-null float64 7 CPHL_quality_control 3931 non-null float64 8 TIME 11038 non-null datetime64[ns] 9 TURB 3091 non-null float32 10 TURB_quality_control 3091 non-null float32 11 TEMP 3953 non-null float32 12 TEMP_quality_control 3953 non-null float32 13 PSAL 63 non-null float32 14 PSAL_quality_control 63 non-null float32 15 platform_code 11038 non-null category 16 timestamp 11038 non-null category 17 polygon 11038 non-null category dtypes: category(3), datetime64[ns](1), float32(6), float64(7), object(1) memory usage: 1.1+ MB CPU times: user 415 ms, sys: 131 ms, total: 546 ms Wall time: 6.12 s
df_sorted = df.sort_values('TIME')
# Create a list of segments
points = np.array([df_sorted['LONGITUDE'], df_sorted['LATITUDE']]).T.reshape(-1, 1, 2)
segments = np.concatenate([points[:-1], points[1:]], axis=1)
# Create a LineCollection with segments colored by temperature
norm = plt.Normalize(df_sorted['TEMP'].min(), df_sorted['TEMP'].max())
lc = LineCollection(segments, cmap='RdYlBu_r', norm=norm)
lc.set_array(df_sorted['TEMP'])
lc.set_linewidth(2)
fig, ax = plt.subplots()
ax.add_collection(lc)
ax.autoscale()
ax.set_xlabel(metadata['LONGITUDE']['standard_name'])
ax.set_ylabel(metadata['LATITUDE']['standard_name'])
ax.invert_yaxis()
# Adding color bar
cbar = plt.colorbar(lc, ax=ax)
cbar.set_label('Temperature')
plt.show()