dataset_name = "vessel_xbt_realtime_nonqc"
# only run once, then restart session if needed
!pip install uv
import os
import sys
def is_colab():
try:
import google.colab
return True
except ImportError:
return False
# Get the current directory of the notebook
current_dir = os.getcwd()
# Check if requirements.txt exists in the current directory
local_requirements = os.path.join(current_dir, 'requirements.txt')
if os.path.exists(local_requirements):
requirements_path = local_requirements
else:
# Fall back to the online requirements.txt file
requirements_path = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt'
# Install packages using uv and the determined requirements file
if is_colab():
xr.set_options(display_style='text')
os.system(f'uv pip install --system -r {requirements_path}')
else:
os.system('uv venv')
os.system(f'uv pip install -r {requirements_path}')
Requirement already satisfied: uv in /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages (0.4.18)
Using CPython 3.12.6 interpreter at: /home/lbesnard/miniforge3/envs/AodnCloudOptimised/bin/python Creating virtual environment at: .venv Activate with: source .venv/bin/activate Audited 230 packages in 27ms
import requests
import os
if not os.path.exists('DataQuery.py'):
print('Downloading DataQuery.py')
url = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/aodn_cloud_optimised/lib/DataQuery.py'
response = requests.get(url)
with open('DataQuery.py', 'w') as f:
f.write(response.text)
from DataQuery import create_time_filter, create_bbox_filter, query_unique_value, plot_spatial_extent, get_spatial_extent, get_temporal_extent, get_schema_metadata
import pyarrow.parquet as pq
import pyarrow.dataset as pds
import pyarrow as pa
import os
import pandas as pd
import pyarrow.compute as pc
BUCKET_OPTIMISED_DEFAULT="aodn-cloud-optimised"
dname = f"s3://anonymous@{BUCKET_OPTIMISED_DEFAULT}/{dataset_name}.parquet/"
parquet_ds = pq.ParquetDataset(dname,partitioning='hive')
Partitioning in Parquet involves organising data files based on the values of one or more columns, known as partition keys. When data is written to Parquet files with partitioning enabled, the files are physically stored in a directory structure that reflects the partition keys. This directory structure makes it easier to retrieve and process specific subsets of data based on the partition keys.
dataset = pds.dataset(dname, format="parquet", partitioning="hive")
partition_keys = dataset.partitioning.schema
print(partition_keys)
XBT_line: string timestamp: int32 polygon: string
%%time
unique_partition_value = query_unique_value(parquet_ds, 'XBT_line')
print(list(unique_partition_value)[0:2]) # showing a subset only
['PX30-31', 'PX32'] CPU times: user 6.99 ms, sys: 756 µs, total: 7.75 ms Wall time: 8.31 ms
In this section, we're plotting the polygons where data exists. This helps then with creating a bounding box where there is data
plot_spatial_extent(parquet_ds)
/home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/cartopy/mpl/feature_artist.py:144: UserWarning: facecolor will have no effect as it has been defined as "never". warnings.warn('facecolor will have no effect as it has been ' /home/lbesnard/github_repo/aodn_cloud_optimised/notebooks/DataQuery.py:449: UserWarning: Legend does not support handles for PatchCollection instances. See: https://matplotlib.org/stable/tutorials/intermediate/legend_guide.html#implementing-a-custom-legend-handler ax.legend() /home/lbesnard/github_repo/aodn_cloud_optimised/notebooks/DataQuery.py:449: UserWarning: No artists with labels found to put in legend. Note that artists whose label start with an underscore are ignored when legend() is called with no argument. ax.legend() /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/cartopy/io/__init__.py:241: DownloadWarning: Downloading: https://naturalearth.s3.amazonaws.com/110m_physical/ne_110m_land.zip warnings.warn(f'Downloading: {url}', DownloadWarning) /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/cartopy/io/__init__.py:241: DownloadWarning: Downloading: https://naturalearth.s3.amazonaws.com/110m_physical/ne_110m_ocean.zip warnings.warn(f'Downloading: {url}', DownloadWarning) /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/cartopy/io/__init__.py:241: DownloadWarning: Downloading: https://naturalearth.s3.amazonaws.com/110m_cultural/ne_110m_admin_0_boundary_lines_land.zip warnings.warn(f'Downloading: {url}', DownloadWarning)
Similary to the spatial extent, we're retrieving the minimum and maximum timestamp partition values of the dataset. This is not necessarely accurately representative of the TIME values, as the timestamp partition can be yearly/monthly... but is here to give an idea
get_temporal_extent(parquet_ds)
(datetime.datetime(2020, 9, 1, 0, 0, tzinfo=datetime.timezone.utc), datetime.datetime(2024, 6, 1, 0, 0, tzinfo=datetime.timezone.utc))
For all parquet dataset, we create a sidecar file in the root of the dataset named _common_matadata. This contains the variable attributes.
# parquet_meta = pa.parquet.read_schema(os.path.join(dname + '_common_metadata')) # parquet metadata
metadata = get_schema_metadata(dname) # schema metadata
metadata
{'TIME': {'type': 'timestamp[ns]', 'standard_name': 'time', 'long_name': 'time', 'axis': 'T', 'valid_min': 0.0, 'valid_max': 90000.0}, 'LATITUDE': {'type': 'double', 'standard_name': 'latitude', 'long_name': 'latitude', 'units': 'degrees_north', 'axis': 'Y', 'valid_min': -90.0, 'valid_max': 90.0, 'reference_datum': 'WGS84 coordinate reference system'}, 'LONGITUDE': {'type': 'double', 'standard_name': 'longitude', 'long_name': 'longitude', 'units': 'degrees_east', 'axis': 'X', 'valid_min': -180.0, 'valid_max': 180.0, 'reference_datum': 'WGS84 coordinate reference system'}, 'DEPTH': {'type': 'double', 'axis': 'Z', 'long_name': 'depth', 'positive': 'down', 'reference_datum': 'sea surface', 'standard_name': 'depth', 'units': 'm', 'ancillary_variables': 'DEPTH_quality_control', 'valid_max': 12000.0, 'valid_min': -5.0, 'fallrate_equation_coefficient_a': 6.691, 'fallrate_equation_coefficient_b': -2.25}, 'DEPTH_quality_control': {'type': 'float', 'standard_name': 'depth status_flag', 'long_name': 'quality flag for sea_water_temperature', 'quality_control_conventions': 'BUFR GTSPP standard flags', 'valid_min': 0, 'valid_max': 9, 'flag_values': [0, 1, 2, 3, 4, 5, 8, 9, 15], 'flag_meanings': 'Unqualified Correct_value_all_checks_passed Probably_good_but_value_inconsistent_with_statistics_differ_from_climatology Probably_bad_spike_gradient_etc_if_other_tests_passed Bad_value_impossible_value_out_of_scale_vertical_instability_constant_profile Value_modified_during_quality_control Interpolated_value Good_for_operational_use_caution_check_literature_for_other_uses Missing_value'}, 'TEMP': {'type': 'double', 'standard_name': 'sea_water_temperature', 'long_name': 'sea_water_temperature', 'units': 'Celsius', 'ancillary_variables': 'TEMP_quality_control', 'valid_min': 5.0, 'valid_max': 30.0}, 'TEMP_quality_control': {'type': 'float', 'standard_name': 'sea_water_temperature status_flag', 'long_name': 'quality flag for depth', 'quality_control_conventions': 'BUFR GTSPP standard flags', 'valid_min': 0, 'valid_max': 9, 'flag_values': [0, 1, 2, 3, 4, 5, 8, 9, 15], 'flag_meanings': 'Unqualified Correct_value_all_checks_passed Probably_good_but_value_inconsistent_with_statistics_differ_from_climatology Probably_bad_spike_gradient_etc_if_other_tests_passed Bad_value_impossible_value_out_of_scale_vertical_instability_constant_profile Value_modified_during_quality_control Interpolated_value Good_for_operational_use_caution_check_literature_for_other_uses Missing_value'}, 'imo_number': {'type': 'int32'}, 'timestamp': {'type': 'int64'}, 'polygon': {'type': 'string'}, 'XBT_line': {'type': 'string'}, 'ship_name': {'type': 'string'}, 'Callsign': {'type': 'string'}, 'filename': {'type': 'string'}, 'dataset_metadata': {'metadata_uuid': '35234913-aa3c-48ec-b9a4-77f822f66ef8', 'title': 'Upper Ocean Thermal Data collected using XBT (expendable bathythermographs)', 'principal_investigator': 'Cowley, Rebecca', 'principal_investigator_email': 'rebecca.cowley@csiro.au', 'featureType': 'profile'}}
filter_time = create_time_filter(parquet_ds, date_start='2023-01-31 10:14:00', date_end='2024-02-01 07:50:00')
filter_geo = create_bbox_filter(parquet_ds, lat_min=-34, lat_max=-32, lon_min=150, lon_max=155)
filter = filter_geo & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 14193 entries, 0 to 14192 Data columns (total 14 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 TIME 14193 non-null datetime64[ns] 1 LATITUDE 14193 non-null float64 2 LONGITUDE 14193 non-null float64 3 DEPTH 14193 non-null float64 4 DEPTH_quality_control 14193 non-null float32 5 TEMP 14193 non-null float64 6 TEMP_quality_control 14193 non-null float32 7 imo_number 14193 non-null int32 8 ship_name 14193 non-null object 9 Callsign 14193 non-null object 10 filename 14193 non-null object 11 XBT_line 14193 non-null category 12 timestamp 14193 non-null category 13 polygon 14193 non-null category dtypes: category(3), datetime64[ns](1), float32(2), float64(4), int32(1), object(3) memory usage: 1.1+ MB CPU times: user 501 ms, sys: 52.9 ms, total: 554 ms Wall time: 4.97 s
df[df['TEMP_quality_control'] == 1].sort_values('TIME').plot.scatter(x='TEMP', y='DEPTH', c='TIME',
xlabel=metadata['TEMP']['standard_name'],
ylabel=metadata['DEPTH']['standard_name'],
cmap='RdYlBu_r', marker='.', linestyle="None").invert_yaxis()
filter_time = create_time_filter(parquet_ds, date_start='2024-01-31 10:14:00', date_end='2024-02-01 07:50:00')
expr_1 = pc.field('XBT_line') == pa.scalar("PX34")
filter = expr_1 & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
<class 'pandas.core.frame.DataFrame'> RangeIndex: 12616 entries, 0 to 12615 Data columns (total 14 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 TIME 12616 non-null datetime64[ns] 1 LATITUDE 12616 non-null float64 2 LONGITUDE 12616 non-null float64 3 DEPTH 12616 non-null float64 4 DEPTH_quality_control 12616 non-null float32 5 TEMP 12616 non-null float64 6 TEMP_quality_control 12616 non-null float32 7 imo_number 12616 non-null int32 8 ship_name 12616 non-null object 9 Callsign 12616 non-null object 10 filename 12616 non-null object 11 XBT_line 12616 non-null category 12 timestamp 12616 non-null category 13 polygon 12616 non-null category dtypes: category(3), datetime64[ns](1), float32(2), float64(4), int32(1), object(3) memory usage: 976.0+ KB CPU times: user 290 ms, sys: 26.8 ms, total: 317 ms Wall time: 3.89 s
## Plotting only Good Quality data using QC flags
df[df['TEMP_quality_control'] == 1].sort_values('TIME').plot.scatter(x='TEMP', y='DEPTH', c='TIME', cmap='RdYlBu_r',
xlabel=metadata['TEMP']['standard_name'],
ylabel=metadata['DEPTH']['standard_name'],
marker='.', linestyle="None").invert_yaxis()