"""
This module provides functions and metadata that can be used to convert the
6-hourly Global Drifter Program (GDP) data to a ``clouddrift.RaggedArray``
instance.
"""
import datetime
import os
import re
import tempfile
import urllib.request
import warnings
import numpy as np
import xarray as xr
import clouddrift.adapters.gdp as gdp
from clouddrift.adapters.utils import download_with_progress, standard_retry_protocol
from clouddrift.raggedarray import RaggedArray
GDP_VERSION = "September 2023"
GDP_DATA_URL = "https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/6h"
GDP_TMP_PATH = os.path.join(tempfile.gettempdir(), "clouddrift", "gdp6h")
GDP_DATA = [
"lon",
"lat",
"ve",
"vn",
"temp",
"err_lat",
"err_lon",
"err_temp",
"drogue_status",
]
[docs]
def download(
url: str = GDP_DATA_URL,
tmp_path: str = GDP_TMP_PATH,
drifter_ids: list[int] | None = None,
n_random_id: int | None = None,
):
"""Download individual NetCDF files from the AOML server.
Parameters
----------
url : str
URL from which to download the data (Default: GDP_DATA_URL). Alternatively, it can be GDP_DATA_URL_EXPERIMENTAL.
tmp_path : str, optional
Path to the directory where the individual NetCDF files are stored
(default varies depending on operating system; /tmp/clouddrift/gdp6h on Linux)
drifter_ids : list
List of drifter to retrieve (Default: all)
n_random_id : int
Randomly select n_random_id drifter IDs to download (Default: None)
Returns
-------
out : list
List of retrieved drifters
"""
print(f"Downloading GDP 6-hourly data to {tmp_path}...")
# Create a temporary directory if doesn't already exists.
os.makedirs(tmp_path, exist_ok=True)
pattern = "drifter_6h_[0-9]*.nc"
directory_list = [
"netcdf_1_5000",
"netcdf_5001_10000",
"netcdf_10001_15000",
"netcdf_15001_current",
]
drifter_urls: list[str] = []
added = set()
for dir in directory_list:
dirdata = standard_retry_protocol(
lambda: urllib.request.urlopen(f"{url}/{dir}").read()
)()
string = dirdata.decode("utf-8")
filelist = list(set(re.compile(pattern).findall(string)))
for f in filelist:
did = int(f.split("_")[2].removesuffix(".nc"))
if (drifter_ids is None or did in drifter_ids) and did not in added:
drifter_urls.append(f"{url}/{dir}/{f}")
added.add(did)
# retrieve only a subset of n_random_id trajectories
if n_random_id:
if n_random_id > len(drifter_urls):
warnings.warn(
f"Retrieving all listed trajectories because {n_random_id} is larger than the {len(drifter_urls)} listed trajectories."
)
else:
rng = np.random.Generator(np.random.MT19937(42))
drifter_urls = list(rng.choice(drifter_urls, n_random_id, replace=False))
download_with_progress(
[(url, os.path.join(tmp_path, os.path.basename(url))) for url in drifter_urls]
)
# Download the metadata so we can order the drifter IDs by end date.
gdp_metadata = gdp.get_gdp_metadata(tmp_path)
drifter_ids = [
int(os.path.basename(f).split("_")[2].split(".")[0]) for f in drifter_urls
]
return gdp.order_by_date(gdp_metadata, drifter_ids)
[docs]
def preprocess(index: int, **kwargs) -> xr.Dataset:
"""Extract and preprocess the Lagrangian data and attributes.
This function takes an identification number that can be used to create a
file or url pattern or select data from a Dataframe. It then preprocesses
the data and returns a clean Xarray Dataset.
Parameters
----------
index : int
Drifter's identification number
Returns
-------
ds : xr.Dataset
Xarray Dataset containing the data and attributes
"""
ds = xr.load_dataset(
os.path.join(kwargs["tmp_path"], kwargs["filename_pattern"].format(id=index)),
decode_times=False,
decode_coords=False,
)
# parse the date with custom function
ds["deploy_date"].data = gdp.decode_date(np.array([ds.deploy_date.data[0]]))
ds["end_date"].data = gdp.decode_date(np.array([ds.end_date.data[0]]))
ds["drogue_lost_date"].data = gdp.decode_date(
np.array([ds.drogue_lost_date.data[0]])
)
ds["time"].data = gdp.decode_date(np.array([ds.time.data[0]]))
# convert fill values to nan
for var in [
"err_lon",
"err_lat",
"temp",
"err_temp",
]:
try:
ds[var].data = gdp.fill_values(ds[var].data)
except KeyError:
warnings.warn(f"Variable {var} not found; skipping.")
# fix missing values stored as str
for var in [
"longitude",
"latitude",
"err_lat",
"err_lon",
"ve",
"vn",
"temp",
"err_temp",
]:
try:
ds[var].encoding["missing value"] = -1e-34
except KeyError:
warnings.warn(f"Variable {var} not found in upstream data; skipping.")
# convert type of some variable
target_dtype = {
"ID": "int64",
"WMO": "int32",
"expno": "int32",
"typedeath": "int8",
}
for var in target_dtype.keys():
if var in ds.keys():
ds[var].data = ds[var].data.astype(target_dtype[var])
else:
warnings.warn(f"Variable {var} not found in upstream data; skipping.")
# new variables
ds["ids"] = (
["traj", "obs"],
[np.repeat(ds.ID.values, ds.sizes["obs"])],
)
ds["drogue_status"] = (
["traj", "obs"],
[gdp.drogue_presence(ds.drogue_lost_date.data, ds.time.data[0])],
)
# convert attributes to variable
ds["location_type"] = (
("traj"),
[False if ds.attrs.get("location_type") == "Argos" else True],
) # 0 for Argos, 1 for GPS
ds["DeployingShip"] = (("traj"), gdp.cut_str(ds.attrs.get("DeployingShip", ""), 20))
ds["DeploymentStatus"] = (
("traj"),
gdp.cut_str(ds.attrs.get("DeploymentStatus", ""), 20),
)
ds["BuoyTypeManufacturer"] = (
("traj"),
gdp.cut_str(ds.attrs.get(".BuoyTypeManufacturer", ""), 20),
)
ds["BuoyTypeSensorArray"] = (
("traj"),
gdp.cut_str(ds.attrs.get("BuoyTypeSensorArray", ""), 20),
)
ds["CurrentProgram"] = (
("traj"),
[np.int32(gdp.str_to_float(ds.attrs.get("CurrentProgram", ""), -1))],
)
ds["PurchaserFunding"] = (
("traj"),
gdp.cut_str(ds.attrs.get("PurchaserFunding", ""), 20),
)
ds["SensorUpgrade"] = (("traj"), gdp.cut_str(ds.attrs.get("SensorUpgrade", ""), 20))
ds["Transmissions"] = (("traj"), gdp.cut_str(ds.attrs.get("Transmissions", ""), 20))
ds["DeployingCountry"] = (
("traj"),
gdp.cut_str(ds.attrs.get("DeployingCountry", ""), 20),
)
ds["DeploymentComments"] = (
("traj"),
gdp.cut_str(
ds.attrs.get("DeploymentComments", "")
.encode("ascii", "ignore")
.decode("ascii"),
20,
),
) # remove non ascii char
ds["ManufactureYear"] = (
("traj"),
[np.int16(gdp.str_to_float(ds.attrs.get("ManufactureYear", ""), -1))],
)
ds["ManufactureMonth"] = (
("traj"),
[np.int16(gdp.str_to_float(ds.attrs.get("ManufactureMonth", ""), -1))],
)
ds["ManufactureSensorType"] = (
("traj"),
gdp.cut_str(ds.attrs.get("ManufactureSensorType", ""), 20),
)
ds["ManufactureVoltage"] = (
("traj"),
[np.int16(gdp.str_to_float(ds.attrs.get("ManufactureVoltage", "")[:-2], -1))],
) # e.g. 56 V
ds["FloatDiameter"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("FloatDiameter", "")[:-3])],
) # e.g. 35.5 cm
ds["SubsfcFloatPresence"] = (
("traj"),
np.array(
[gdp.str_to_float(ds.attrs.get("SubsfcFloatPresence", ""))], dtype="bool"
),
)
ds["DrogueType"] = (("traj"), gdp.cut_str(ds.attrs.get("DrogueType", ""), 7))
ds["DrogueLength"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DrogueLength", "")[:-2])],
) # e.g. 4.8 m
ds["DrogueBallast"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DrogueBallast", "")[:-3])],
) # e.g. 1.4 kg
ds["DragAreaAboveDrogue"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DragAreaAboveDrogue", "")[:-4])],
) # 10.66 m^2
ds["DragAreaOfDrogue"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DragAreaOfDrogue", "")[:-4])],
) # e.g. 416.6 m^2
ds["DragAreaRatio"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DragAreaRatio", ""))],
) # e.g. 39.08
ds["DrogueCenterDepth"] = (
("traj"),
[gdp.str_to_float(ds.attrs.get("DrogueCenterDepth", "")[:-2])],
) # e.g. 20.0 m
ds["DrogueDetectSensor"] = (
("traj"),
gdp.cut_str(ds.attrs.get("DrogueDetectSensor", ""), 20),
)
# vars attributes
vars_attrs = {
"ID": {"long_name": "Global Drifter Program Buoy ID", "units": "-"},
"longitude": {"long_name": "Longitude", "units": "degrees_east"},
"latitude": {"long_name": "Latitude", "units": "degrees_north"},
"time": {"long_name": "Time", "units": "seconds since 1970-01-01 00:00:00"},
"rowsize": {
"long_name": "Number of observations per trajectory",
"sample_dimension": "obs",
"units": "-",
},
"location_type": {
"long_name": "Satellite-based location system",
"units": "-",
"comments": "0 (Argos), 1 (GPS)",
},
"WMO": {
"long_name": "World Meteorological Organization buoy identification number",
"units": "-",
},
"expno": {"long_name": "Experiment number", "units": "-"},
"deploy_date": {
"long_name": "Deployment date and time",
"units": "seconds since 1970-01-01 00:00:00",
},
"deploy_lon": {"long_name": "Deployment longitude", "units": "degrees_east"},
"deploy_lat": {"long_name": "Deployment latitude", "units": "degrees_north"},
"end_date": {
"long_name": "End date and time",
"units": "seconds since 1970-01-01 00:00:00",
},
"end_lon": {"long_name": "End latitude", "units": "degrees_north"},
"end_lat": {"long_name": "End longitude", "units": "degrees_east"},
"drogue_lost_date": {
"long_name": "Date and time of drogue loss",
"units": "seconds since 1970-01-01 00:00:00",
},
"typedeath": {
"long_name": "Type of death",
"units": "-",
"comments": "0 (buoy still alive), 1 (buoy ran aground), 2 (picked up by vessel), 3 (stop transmitting), 4 (sporadic transmissions), 5 (bad batteries), 6 (inactive status)",
},
"typebuoy": {
"long_name": "Buoy type (see https://www.aoml.noaa.gov/phod/dac/dirall.html)",
"units": "-",
},
"DeployingShip": {"long_name": "Name of deployment ship", "units": "-"},
"DeploymentStatus": {"long_name": "Deployment status", "units": "-"},
"BuoyTypeManufacturer": {"long_name": "Buoy type manufacturer", "units": "-"},
"BuoyTypeSensorArray": {"long_name": "Buoy type sensor array", "units": "-"},
"CurrentProgram": {
"long_name": "Current Program",
"units": "-",
"_FillValue": "-1",
},
"PurchaserFunding": {"long_name": "Purchaser funding", "units": "-"},
"SensorUpgrade": {"long_name": "Sensor upgrade", "units": "-"},
"Transmissions": {"long_name": "Transmissions", "units": "-"},
"DeployingCountry": {"long_name": "Deploying country", "units": "-"},
"DeploymentComments": {"long_name": "Deployment comments", "units": "-"},
"ManufactureYear": {
"long_name": "Manufacture year",
"units": "-",
"_FillValue": "-1",
},
"ManufactureMonth": {
"long_name": "Manufacture month",
"units": "-",
"_FillValue": "-1",
},
"ManufactureSensorType": {"long_name": "Manufacture Sensor Type", "units": "-"},
"ManufactureVoltage": {
"long_name": "Manufacture voltage",
"units": "V",
"_FillValue": "-1",
},
"FloatDiameter": {"long_name": "Diameter of surface floater", "units": "cm"},
"SubsfcFloatPresence": {"long_name": "Subsurface Float Presence", "units": "-"},
"DrogueType": {"drogue_type": "Drogue Type", "units": "-"},
"DrogueLength": {"long_name": "Length of drogue.", "units": "m"},
"DrogueBallast": {
"long_name": "Weight of the drogue's ballast.",
"units": "kg",
},
"DragAreaAboveDrogue": {"long_name": "Drag area above drogue.", "units": "m^2"},
"DragAreaOfDrogue": {"long_name": "Drag area drogue.", "units": "m^2"},
"DragAreaRatio": {"long_name": "Drag area ratio", "units": "m"},
"DrogueCenterDepth": {
"long_name": "Average depth of the drogue.",
"units": "m",
},
"DrogueDetectSensor": {"long_name": "Drogue detection sensor", "units": "-"},
"ve": {"long_name": "Eastward velocity", "units": "m/s"},
"vn": {"long_name": "Northward velocity", "units": "m/s"},
"err_lat": {
"long_name": "95% confidence interval in latitude",
"units": "degrees_north",
},
"err_lon": {
"long_name": "95% confidence interval in longitude",
"units": "degrees_east",
},
"drogue_status": {
"long_name": "Status indicating the presence of the drogue",
"units": "-",
"flag_values": "1,0",
"flag_meanings": "drogued, undrogued",
},
"temp": {
"long_name": "Fitted sea water temperature",
"units": "Kelvin",
"comments": "Estimated near-surface sea water temperature from drifting buoy measurements. It is the sum of the fitted near-surface non-diurnal sea water temperature and fitted diurnal sea water temperature anomaly. Discrepancies may occur because of rounding.",
},
"err_temp": {
"long_name": "Standard uncertainty of fitted sea water temperature",
"units": "Kelvin",
"comments": "Estimated one standard error of near-surface sea water temperature estimate from drifting buoy measurements",
},
}
# global attributes
attrs = {
"title": "Global Drifter Program drifting buoy collection",
"history": f"version {GDP_VERSION}. Metadata from dirall.dat and deplog.dat",
"Conventions": "CF-1.6",
"time_coverage_start": "",
"time_coverage_end": "",
"date_created": datetime.datetime.now().isoformat(),
"publisher_name": "GDP Drifter DAC",
"publisher_email": "aoml.dftr@noaa.gov",
"publisher_url": "https://www.aoml.noaa.gov/phod/gdp",
"license": "freely available",
"processing_level": "Level 2 QC by GDP drifter DAC",
"metadata_link": "https://www.aoml.noaa.gov/phod/dac/dirall.html",
"contributor_name": "NOAA Global Drifter Program",
"contributor_role": "Data Acquisition Center",
"institution": "NOAA Atlantic Oceanographic and Meteorological Laboratory",
"acknowledgement": f"Lumpkin, Rick; Centurioni, Luca (2019). NOAA Global Drifter Program quality-controlled 6-hour interpolated data from ocean surface drifting buoys. [indicate subset used]. NOAA National Centers for Environmental Information. Dataset. https://doi.org/10.25921/7ntx-z961. Accessed {datetime.datetime.now(datetime.timezone.utc).strftime('%d %B %Y')}.",
"summary": "Global Drifter Program six-hourly data",
"doi": "10.25921/7ntx-z961",
}
# set attributes
for var in vars_attrs.keys():
if var in ds.keys():
ds[var].attrs = vars_attrs[var]
else:
warnings.warn(f"Variable {var} not found in upstream data; skipping.")
ds.attrs = attrs
# rename variables
ds = ds.rename_vars({"longitude": "lon", "latitude": "lat", "ID": "id"})
# Cast float64 variables to float32 to reduce memory footprint.
ds = gdp.cast_float64_variables_to_float32(ds)
return ds
[docs]
def to_raggedarray(
drifter_ids: list[int] | None = None,
n_random_id: int | None = None,
tmp_path: str = GDP_TMP_PATH,
) -> RaggedArray:
"""Download and process individual GDP 6-hourly files and return a
RaggedArray instance with the data.
Parameters
----------
drifter_ids : list[int], optional
List of drifters to retrieve (Default: all)
n_random_id : list[int], optional
Randomly select n_random_id drifter NetCDF files
tmp_path : str, optional
Path to the directory where the individual NetCDF files are stored
(default varies depending on operating system; /tmp/clouddrift/gdp6h on Linux)
Returns
-------
out : RaggedArray
A RaggedArray instance of the requested dataset
Examples
--------
Invoke `to_raggedarray` without any arguments to download all drifter data
from the 6-hourly GDP feed:
>>> from clouddrift.adapters.gdp6h import to_raggedarray
>>> ra = to_raggedarray()
To download a random sample of 100 drifters, for example for development
or testing, use the `n_random_id` argument:
>>> ra = to_raggedarray(n_random_id=100)
To download a specific list of drifters, use the `drifter_ids` argument:
>>> ra = to_raggedarray(drifter_ids=[54375, 114956, 126934])
Finally, `to_raggedarray` returns a `RaggedArray` instance which provides
a convenience method to emit a `xarray.Dataset` instance:
>>> ds = ra.to_xarray()
To write the ragged array dataset to a NetCDF file on disk, do
>>> ds.to_netcdf("gdp6h.nc", format="NETCDF4")
Alternatively, to write the ragged array to a Parquet file, first create
it as an Awkward Array:
>>> arr = ra.to_awkward()
>>> arr.to_parquet("gdp6h.parquet")
"""
ids = download(GDP_DATA_URL, tmp_path, drifter_ids, n_random_id)
ra = RaggedArray.from_files(
indices=ids,
preprocess_func=preprocess,
name_coords=gdp.GDP_COORDS,
name_meta=gdp.GDP_METADATA,
name_data=GDP_DATA,
name_dims=gdp.GDP_DIMS,
rowsize_func=gdp.rowsize,
filename_pattern="drifter_6h_{id}.nc",
tmp_path=tmp_path,
)
# update dynamic global attributes
ra.attrs_global["time_coverage_start"] = (
f"{datetime.datetime(1970,1,1) + datetime.timedelta(seconds=int(np.min(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"
)
ra.attrs_global["time_coverage_end"] = (
f"{datetime.datetime(1970,1,1) + datetime.timedelta(seconds=int(np.max(ra.coords['time']))):%Y-%m-%d:%H:%M:%SZ}"
)
return ra