"""
This module defines functions used to adapt the YoMaHa'07: Velocity data assessed
from trajectories of Argo floats at parking level and at the sea surface as
a ragged-arrays dataset.
The dataset is hosted at http://apdrc.soest.hawaii.edu/projects/yomaha/ and the user manual
is available at http://apdrc.soest.hawaii.edu/projects/yomaha/yomaha07/YoMaHa070612.pdf.
Example
-------
>>> from clouddrift.adapters import yomaha
>>> ds = yomaha.to_xarray()
Reference
---------
Lebedev, K. V., Yoshinari, H., Maximenko, N. A., & Hacker, P. W. (2007). Velocity data assessed from trajectories of Argo floats at parking level and at the sea surface. IPRC Technical Note, 4(2), 1-16.
"""
import gzip
import logging
import os
import shutil
import tempfile
import warnings
from datetime import datetime
from io import BytesIO
import numpy as np
import pandas as pd
import xarray as xr
from clouddrift.adapters.utils import download_with_progress
_logger = logging.getLogger(__name__)
YOMAHA_URLS = [
# order of the URLs is important
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/float_types.txt",
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/DACs.txt",
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/0-Near-Real_Time/0-date_time.txt",
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/0-Near-Real_Time/WMO2DAC2type.txt",
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/0-Near-Real_Time/end-prog.lst",
"http://apdrc.soest.hawaii.edu/projects/Argo/data/trjctry/0-Near-Real_Time/yomaha07.dat.gz",
]
YOMAHA_TMP_PATH = os.path.join(tempfile.gettempdir(), "clouddrift", "yomaha")
[docs]
def download(tmp_path: str):
download_requests = [
(url, f"{tmp_path}/{url.split('/')[-1]}") for url in YOMAHA_URLS[:-1]
]
download_with_progress(download_requests)
filename_gz = f"{tmp_path}/{YOMAHA_URLS[-1].split('/')[-1]}"
filename = filename_gz.removesuffix(".gz")
buffer = BytesIO()
download_with_progress([(YOMAHA_URLS[-1], buffer)])
decompressed_fp = os.path.join(tmp_path, filename)
with (
open(decompressed_fp, "wb") as file,
gzip.open(buffer, "rb") as compressed_file,
):
shutil.copyfileobj(compressed_file, file)
[docs]
def to_xarray(tmp_path: str | None = None):
if tmp_path is None:
tmp_path = YOMAHA_TMP_PATH
os.makedirs(tmp_path, exist_ok=True)
# get or update required files
download(tmp_path)
# database last update
with open(f"{tmp_path}/{YOMAHA_URLS[2].split('/')[-1]}") as f:
YOMAHA_VERSION = f.read().strip()
print(f"Last database update was: {YOMAHA_VERSION}")
# parse with panda
col_names = [
"lon_d",
"lat_d",
"pres_d",
"time_d",
"ve_d",
"vn_d",
"err_ve_d",
"err_vn_d",
"lon_s",
"lat_s",
"time_s",
"ve_s",
"vn_s",
"err_ve_s",
"err_vn_s",
"lon_lp",
"lat_lp",
"time_lp",
"lon_fc",
"lat_fc",
"time_fc",
"lon_lc",
"lat_lc",
"time_lc",
"surf_fix",
"id",
"cycle",
"time_inv",
]
na_col = list(
map(
lambda x: str(x),
[
-999.9999,
-99.9999,
-999.9,
-999.999,
-999.99,
-999.99,
-999.99,
-999.99,
-999.99,
-99.99,
-999.99,
-999.99,
-999.99,
-999.99,
-999.99,
-999.99,
-99.99,
-999.99,
-999.99,
-99.99,
-999.99,
-999.99,
-99.99,
-999.99,
np.nan,
np.nan,
np.nan,
np.nan,
],
)
)
# open with pandas
filename_gz = f"{tmp_path}/{YOMAHA_URLS[-1].split('/')[-1]}"
filename = filename_gz.removesuffix(".gz")
df = pd.read_csv(
filename, names=col_names, sep=r"\s+", header=None, na_values=na_col
)
# convert to an Xarray Dataset
ds = xr.Dataset.from_dataframe(df)
unique_id, rowsize = np.unique(ds["id"], return_counts=True)
# mapping of yomaha float id, wmo float id, daq id and float type
df_wmo = pd.read_csv(
f"{tmp_path}/{YOMAHA_URLS[3].split('/')[-1]}",
sep=r"\s+",
header=None,
names=["id", "wmo_id", "dac_id", "float_type_id"],
engine="python",
)
# mapping of Data Assembly Center (DAC) id and DAC name
df_dac = pd.read_csv(
f"{tmp_path}/{YOMAHA_URLS[1].split('/')[-1]}",
sep=":",
header=None,
names=["dac_id", "dac"],
)
df_dac["dac"] = df_dac["dac"].str.strip()
# mapping of float_type_id and float_type
df_float = pd.read_csv(
f"{tmp_path}/{YOMAHA_URLS[0].split('/')[-1]}",
sep=":",
header=None,
skipfooter=4,
names=["float_type_id", "float_type"],
engine="python",
)
# there is a note on METOCEAN * in float_types.txt but the
# float id, wmo id, and float type do not match (?)
# so we remove the * from the type
df_float.loc[df_float["float_type_id"] == 0, "float_type"] = "METOCEAN"
# combine metadata
df_metadata = (
pd.merge(df_wmo, df_dac, on="dac_id", how="left")
.merge(df_float, on="float_type_id", how="left")
.loc[lambda x: np.isin(x["id"], unique_id)]
)
ds = (
ds.rename_dims({"index": "obs"})
.assign({"id": ("traj", unique_id)})
.assign({"rowsize": ("traj", rowsize)})
.assign({"wmo_id": ("traj", df_metadata["wmo_id"])})
.assign({"dac_id": ("traj", df_metadata["dac_id"])})
.assign({"float_type": ("traj", df_metadata["float_type_id"])})
.set_coords(["id", "time_d", "time_s", "time_lp", "time_lc", "time_lp"])
.drop_vars(["index"])
)
# Cast double floats to singles
double_vars = [
"lat_d",
"lon_d",
"lat_s",
"lon_s",
"lat_lp",
"lon_lp",
"lat_lc",
"lon_lc",
]
for var in [v for v in ds.variables if v not in double_vars]:
if ds[var].dtype == "float64":
ds[var] = ds[var].astype("float32")
# define attributes
vars_attrs = {
"lon_d": {
"long_name": "Longitude of the location where the deep velocity is calculated",
"units": "degrees_east",
},
"lat_d": {
"long_name": "Latitude of the location where the deep velocity is calculated",
"units": "degrees_north",
},
"pres_d": {
"long_name": "Reference parking pressure for this cycle",
"units": "dbar",
},
"time_d": {
"long_name": "Julian time (days) when deep velocity is estimated",
"units": "days since 2000-01-01 00:00",
},
"ve_d": {
"long_name": "Eastward component of the deep velocity",
"units": "cm s-1",
},
"vn_d": {
"long_name": "Northward component of the deep velocity",
"units": "cm s-1",
},
"err_ve_d": {
"long_name": "Error on the eastward component of the deep velocity",
"units": "cm s-1",
},
"err_vn_d": {
"long_name": "Error on the northward component of the deep velocity",
"units": "cm s-1",
},
"lon_s": {
"long_name": "Longitude of the location where the first surface velocity is calculated (over the first 6 h at surface)",
"units": "degrees_east",
},
"lat_s": {
"long_name": "Latitude of the location where the first surface velocity is calculated",
"units": "degrees_north",
},
"time_s": {
"long_name": "Julian time (days) when the first surface velocity is calculated",
"units": "days since 2000-01-01 00:00",
},
"ve_s": {
"long_name": "Eastward component of first surface velocity",
"units": "cm s-1",
},
"vn_s": {
"long_name": "Northward component of first surface velocity",
"units": "cm s-1",
},
"err_ve_s": {
"long_name": "Error on the eastward component of the first surface velocity",
"units": "cm s-1",
},
"err_vn_s": {
"long_name": "Error on the northward component of the first surface velocity",
"units": "cm s-1",
},
"lon_lp": {
"long_name": "Longitude of the last fix at the sea surface during the previous cycle",
"units": "degrees_east",
},
"lat_lp": {
"long_name": "Latitude of the last fix at the sea surface during the previous cycle",
"units": "degrees_north",
},
"time_lp": {
"long_name": "Julian time of the last fix at the sea surface during the previous cycle",
"units": "days since 2000-01-01 00:00",
},
"lon_fc": {
"long_name": "Longitude of the first fix at the sea surface during the current cycle",
"units": "degrees_east",
},
"lat_fc": {
"long_name": "Latitude of the first fix at the sea surface during the current cycle",
"units": "degrees_north",
},
"time_fc": {
"long_name": "Julian time of the first fix at the sea surface during the current cycle",
"units": "days since 2000-01-01 00:00",
},
"lon_lc": {
"long_name": "Longitude of the last fix at the sea surface during the current cycle",
"units": "degrees_east",
},
"lat_lc": {
"long_name": "Latitude of the last fix at the sea surface during the current cycle",
"units": "degrees_north",
},
"time_lc": {
"long_name": "Julian time of the last fix at the sea surface during the current cycle",
"units": "days since 2000-01-01 00:00",
},
"surf_fix": {
"long_name": "Number of surface fixes during the current cycle",
"units": "-",
},
"id": {
"long_name": "Float WMO number",
"units": "-",
},
"cycle": {
"long_name": "Cycle number",
"units": "-",
},
"time_inv": {
"long_name": "Time inversion/duplication flag",
"description": "1 if at least one duplicate or inversion of time is found in the sequence containing last fix from the previous cycle and all fixes from the current cycle. Otherwise, 0.",
"units": "-",
},
"wmo_id": {
"long_name": "Float WMO number",
"units": "-",
},
"dac_id": {
"long_name": "Data Assembly Center (DAC) number",
"description": "1: AOML (USA), 2: CORIOLIS (France), 3: JMA (Japan), 4: BODC (UK), 5: MEDS (Canada), 6: INCOIS (India), 7: KMA (Korea), 8: CSIRO (Australia), 9: CSIO (China)",
"units": "-",
},
"float_type": {
"long_name": "Float type",
"description": "1: APEX, 2: SOLO, 3: PROVOR, 4: R1, 5: MARTEC, 6: PALACE, 7: NINJA, 8: NEMO, 9: ALACE, 0: METOCEAN",
"units": "-",
},
}
# global attributes
attrs = {
"title": "YoMaHa'07: Velocity data assessed from trajectories of Argo floats at parking level and at the sea surface",
"history": f"Dataset updated on {YOMAHA_VERSION}",
"date_created": datetime.now().isoformat(),
"publisher_name": "Asia-Pacific Data Research Center",
"publisher_url": "http://apdrc.soest.hawaii.edu/index.php",
"license": "freely available",
}
# set attributes
for var in vars_attrs.keys():
if var in ds.keys():
ds[var].attrs = vars_attrs[var]
else:
warnings.warn(f"Variable {var} not found in upstream data; skipping.")
ds.attrs = attrs
return ds