dataset_name = "mooring_temperature_logger_delayed_qc"
# only run once, then restart session if needed
!pip install uv
import os
import sys
def is_colab():
try:
import google.colab
return True
except ImportError:
return False
# Get the current directory of the notebook
current_dir = os.getcwd()
# Check if requirements.txt exists in the current directory
local_requirements = os.path.join(current_dir, 'requirements.txt')
if os.path.exists(local_requirements):
requirements_path = local_requirements
else:
# Fall back to the online requirements.txt file
requirements_path = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/notebooks/requirements.txt'
# Install packages using uv and the determined requirements file
if is_colab():
os.system(f'uv pip install --system -r {requirements_path}')
else:
os.system('uv venv')
os.system(f'uv pip install -r {requirements_path}')
Requirement already satisfied: uv in /home/lbesnard/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages (0.2.31)
Using Python 3.12.4 interpreter at: /home/lbesnard/miniforge3/envs/AodnCloudOptimised/bin/python3 Creating virtualenv at: .venv Activate with: source .venv/bin/activate Audited 127 packages in 217ms
import requests
import os
if not os.path.exists('parquet_queries.py'):
print('Downloading parquet_queries.py')
url = 'https://raw.githubusercontent.com/aodn/aodn_cloud_optimised/main/aodn_cloud_optimised/lib/ParquetDataQuery.py'
response = requests.get(url)
with open('parquet_queries.py', 'w') as f:
f.write(response.text)
from parquet_queries import create_time_filter, create_bbox_filter, query_unique_value, plot_spatial_extent, \
get_temporal_extent, get_schema_metadata
import pyarrow.parquet as pq
import pyarrow.dataset as pds
import pyarrow as pa
import pandas as pd
import pyarrow.compute as pc
BUCKET_OPTIMISED_DEFAULT="aodn-cloud-optimised"
dname = f"s3://anonymous@{BUCKET_OPTIMISED_DEFAULT}/{dataset_name}.parquet/"
parquet_ds = pq.ParquetDataset(dname,partitioning='hive')
--------------------------------------------------------------------------- FileNotFoundError Traceback (most recent call last) /tmp/ipykernel_405294/1940703157.py in <cell line: 0>() 1 BUCKET_OPTIMISED_DEFAULT="aodn-cloud-optimised" 2 dname = f"s3://anonymous@{BUCKET_OPTIMISED_DEFAULT}/{dataset_name}.parquet/" ----> 3 parquet_ds = pq.ParquetDataset(dname,partitioning='hive') ~/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/pyarrow/parquet/core.py in __init__(self, path_or_paths, filesystem, schema, filters, read_dictionary, memory_map, buffer_size, partitioning, ignore_prefixes, pre_buffer, coerce_int96_timestamp_unit, decryption_properties, thrift_string_size_limit, thrift_container_size_limit, page_checksum_verification, use_legacy_dataset) 1338 infer_dictionary=True) 1339 -> 1340 self._dataset = ds.dataset(path_or_paths, filesystem=filesystem, 1341 schema=schema, format=parquet_format, 1342 partitioning=partitioning, ~/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/pyarrow/dataset.py in dataset(source, schema, format, filesystem, partitioning, partition_base_dir, exclude_invalid_files, ignore_prefixes) 792 793 if _is_path_like(source): --> 794 return _filesystem_dataset(source, **kwargs) 795 elif isinstance(source, (tuple, list)): 796 if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source): ~/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/pyarrow/dataset.py in _filesystem_dataset(source, schema, filesystem, partitioning, format, partition_base_dir, exclude_invalid_files, selector_ignore_prefixes) 474 fs, paths_or_selector = _ensure_multiple_sources(source, filesystem) 475 else: --> 476 fs, paths_or_selector = _ensure_single_source(source, filesystem) 477 478 options = FileSystemFactoryOptions( ~/miniforge3/envs/AodnCloudOptimised/lib/python3.12/site-packages/pyarrow/dataset.py in _ensure_single_source(path, filesystem) 439 paths_or_selector = [path] 440 else: --> 441 raise FileNotFoundError(path) 442 443 return filesystem, paths_or_selector FileNotFoundError: aodn-cloud-optimised/mooring_temperature_logger_delayed_qc.parquet
Partitioning in Parquet involves organising data files based on the values of one or more columns, known as partition keys. When data is written to Parquet files with partitioning enabled, the files are physically stored in a directory structure that reflects the partition keys. This directory structure makes it easier to retrieve and process specific subsets of data based on the partition keys.
dataset = pds.dataset(dname, format="parquet", partitioning="hive")
partition_keys = dataset.partitioning.schema
print(partition_keys)
%%time
unique_partition_value = query_unique_value(parquet_ds, 'site_code')
print(list(unique_partition_value)[0:2]) # showing a subset only
In this section, we're plotting the polygons where data exists. This helps then with creating a bounding box where there is data
plot_spatial_extent(parquet_ds)
Similary to the spatial extent, we're retrieving the minimum and maximum timestamp partition values of the dataset. This is not necessarely accurately representative of the TIME values, as the timestamp partition can be yearly/monthly... but is here to give an idea
get_temporal_extent(parquet_ds)
For all parquet dataset, we create a sidecar file in the root of the dataset named _common_matadata. This contains the variable attributes.
# parquet_meta = pa.parquet.read_schema(os.path.join(dname + '_common_metadata')) # parquet metadata
metadata = get_schema_metadata(dname) # schema metadata
metadata
filter_time = create_time_filter(parquet_ds, date_start='2022-12-02', date_end='2022-12-26')
filter_geo = create_bbox_filter(parquet_ds, lat_min=-34, lat_max=-28, lon_min=151, lon_max=160)
filter = filter_geo & filter_time
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow', filters=filter)
df.info()
filter_time = create_time_filter(parquet_ds, date_start='2022-12-02', date_end='2022-12-26')
expr_1 = pc.field('site_code') == pa.scalar("BMP070")
filter = expr_1 & filter_time
filter
%%time
# using pandas instead of pyarrow so that filters can directly be applied to the data, and not just the partition
df = pd.read_parquet(dname, engine='pyarrow',filters=filter)
df.info()
df['NOMINAL_DEPTH'].unique()
import matplotlib.pyplot as plt
# Filter DataFrame where NOMINAL_DEPTH == 20
filtered_df = df[df['NOMINAL_DEPTH'] == 13]
# Plotting
fig, ax1 = plt.subplots()
# Plot TEMP on the primary y-axis (left)
color = 'tab:blue'
ax1.set_xlabel('Time')
ax1.set_ylabel('TEMP', color=color)
ax1.plot(filtered_df['TIME'], filtered_df['TEMP'], color=color, label='TEMP')
ax1.tick_params(axis='y', labelcolor=color)
# Create a secondary y-axis for PRES
ax2 = ax1.twinx()
color = 'tab:red'
ax2.set_ylabel('PRES', color=color)
ax2.plot(filtered_df['TIME'], filtered_df['PRES'], color=color, label='PRES')
ax2.tick_params(axis='y', labelcolor=color)
# Set a fixed number of x-axis ticks
ax1.xaxis.set_major_locator(plt.MaxNLocator(5)) # Adjust the number of ticks as needed
# Show legend
fig.tight_layout()
fig.legend(loc='upper left')
plt.show()