"""
This module provides functions and metadata to convert the Global Drifter
Program (GDP) data to a ``clouddrift.RaggedArray`` instance. The functions
defined in this module are common to both hourly (``clouddrift.adapters.gdp1h``)
and six-hourly (``clouddrift.adapters.gdp6h``) GDP modules.
"""
import os
import tempfile
import numpy as np
import pandas as pd
import xarray as xr
from clouddrift.adapters.utils import download_with_progress
from clouddrift.raggedarray import DimNames
GDP_DIMS: dict[str, DimNames] = {"traj": "rows", "obs": "obs"}
GDP_TMP_PATH = os.path.join(tempfile.gettempdir(), "clouddrift", "gdp")
os.makedirs(
GDP_TMP_PATH, exist_ok=True
) # generate temp directory for hurdat related intermerdiary data
GDP_COORDS = [
"id",
"time",
]
GDP_METADATA = [
"rowsize",
"WMO",
"expno",
"deploy_date",
"deploy_lat",
"deploy_lon",
"start_date",
"start_lat",
"start_lon",
"end_date",
"end_lat",
"end_lon",
"drogue_lost_date",
"typedeath",
"typebuoy",
"location_type",
"DeployingShip",
"DeploymentStatus",
"BuoyTypeManufacturer",
"BuoyTypeSensorArray",
"CurrentProgram",
"PurchaserFunding",
"SensorUpgrade",
"Transmissions",
"DeployingCountry",
"DeploymentComments",
"ManufactureYear",
"ManufactureMonth",
"ManufactureSensorType",
"ManufactureVoltage",
"FloatDiameter",
"SubsfcFloatPresence",
"DrogueType",
"DrogueLength",
"DrogueBallast",
"DragAreaAboveDrogue",
"DragAreaOfDrogue",
"DragAreaRatio",
"DrogueCenterDepth",
"DrogueDetectSensor",
]
[docs]
def cast_float64_variables_to_float32(
ds: xr.Dataset, variables_to_skip: list[str] = ["time", "lat", "lon"]
) -> xr.Dataset:
"""Cast all float64 variables except ``variables_to_skip`` to float32.
Extra precision from float64 is not needed and takes up memory and disk
space.
Parameters
----------
ds : xr.Dataset
Dataset to modify
variables_to_skip : list[str]
List of variables to skip; default is ["time", "lat", "lon"].
Returns
-------
ds : xr.Dataset
Modified dataset
"""
for var in ds.variables:
if var in variables_to_skip:
continue
if ds[var].dtype == "float64":
ds[var] = ds[var].astype("float32")
return ds
[docs]
def parse_directory_file(filename: str, tmp_path: str) -> pd.DataFrame:
"""Read a GDP directory file that contains metadata of drifter releases.
Parameters
----------
filename : str
Name of the directory file to parse.
Returns
-------
df : pd.DataFrame
List of drifters from a single directory file as a pandas DataFrame.
"""
gdp_dir_url = "https://www.aoml.noaa.gov/ftp/pub/phod/buoydata"
url = f"{gdp_dir_url}/{filename}"
path = os.path.join(tmp_path, filename)
download_with_progress([(url, path)])
df = pd.read_csv(path, delimiter=r"\s+", header=None)
# Combine the date and time columns to easily parse dates below.
df[4] += " " + df[5]
df[8] += " " + df[9]
df[12] += " " + df[13]
df = df.drop(columns=[5, 9, 13])
df.columns = pd.Index(
[
"ID",
"WMO_number",
"program_number",
"buoys_type",
"Start_date",
"Start_lat",
"Start_lon",
"End_date",
"End_lat",
"End_lon",
"Drogue_off_date",
"death_code",
],
dtype="str",
)
for t in ["Start_date", "End_date", "Drogue_off_date"]:
df[t] = pd.to_datetime(df[t], format="%Y/%m/%d %H:%M", errors="coerce")
return df
[docs]
def order_by_date(df: pd.DataFrame, idx: list[int]) -> list[int]: # noqa: F821
"""From the previously sorted DataFrame of directory files, return the
unique set of drifter IDs sorted by their start date (the date of the first
quality-controlled data point).
Parameters
----------
idx : list
List of drifters to include in the ragged array
Returns
-------
idx : list
Unique set of drifter IDs sorted by their start date.
"""
return df.ID[np.where(np.in1d(df.ID, idx))[0]].values # type: ignore
[docs]
def fetch_netcdf(url: str, file: str):
"""Download and save the file from the given url, if not already downloaded.
Parameters
----------
url : str
URL from which to download the file.
file : str
Name of the file to save.
"""
download_with_progress([(url, file)])
[docs]
def decode_date(t):
"""The date format is specified as 'seconds since 1970-01-01 00:00:00' but
the missing values are stored as -1e+34 which is not supported by the
default parsing mechanism in xarray.
This function returns replaced the missing value by NaN and returns a
datetime instance.
Parameters
----------
t : array
Array of time values
Returns
-------
out : datetime
Datetime instance with the missing value replaced by NaN
"""
nat_index = np.logical_or(np.isclose(t, -1e34), np.isnan(t))
t[nat_index] = np.nan
return t
[docs]
def fill_values(var, default=np.nan):
"""Change fill values (-1e+34, inf, -inf) in var array to the value
specified by default.
Parameters
----------
var : array
Array to fill
default : float
Default value to use for fill values
"""
missing_value = np.logical_or(np.isclose(var, -1e34), ~np.isfinite(var))
if np.any(missing_value):
var[missing_value] = default
return var
[docs]
def str_to_float(value: str, default: float = np.nan) -> float:
"""Convert a string to float, while returning the value of default if the
string is not convertible to a float, or if it's a NaN.
Parameters
----------
value : str
String to convert to float
default : float
Default value to return if the string is not convertible to float
Returns
-------
out : float
Float value of the string, or default if the string is not convertible to float.
"""
try:
fvalue = float(value)
if np.isnan(fvalue):
return default
else:
return fvalue
except ValueError:
return default
[docs]
def cut_str(value: str, max_length: int) -> np.chararray:
"""Cut a string to a specific length and return it as a numpy chararray.
Parameters
----------
value : str
String to cut
max_length : int
Length of the output
Returns
-------
out : np.chararray
String with max_length characters
"""
charar = np.chararray(1, max_length)
charar[:max_length] = value
return charar
[docs]
def drogue_presence(lost_time, time) -> np.ndarray:
"""Create drogue status from the drogue lost time and the trajectory time.
Parameters
----------
lost_time
Timestamp of the drogue loss (or NaT)
time
Observation time
Returns
-------
out : bool
True if drogues and False otherwise
"""
if pd.isnull(lost_time) or lost_time >= time[-1]:
return np.ones_like(time, dtype="bool")
else:
return time < lost_time
[docs]
def rowsize(index: int, **kwargs) -> int:
try:
return xr.open_dataset(
os.path.join(
kwargs["tmp_path"], kwargs["filename_pattern"].format(id=index)
),
decode_cf=False,
decode_times=False,
concat_characters=False,
decode_coords=False,
).sizes["obs"]
except Exception as e:
print(
f"Error processing {os.path.join(kwargs['tmp_path'], kwargs['filename_pattern'].format(id=index))}"
)
print(str(e))
return 0