"""
This module defines functions used to adapt the MOSAiC sea-ice drift dataset as
a ragged-array dataset.
The dataset is hosted at https://doi.org/10.18739/A2KP7TS83.
Reference: Angela Bliss, Jennifer Hutchings, Philip Anderson, Philipp Anhaus,
Hans Jakob Belter, Jørgen Berge, Vladimir Bessonov, Bin Cheng, Sylvia Cole,
Dave Costa, Finlo Cottier, Christopher J Cox, Pedro R De La Torre, Dmitry V Divine,
Gilbert Emzivat, Ying-Chih Fang, Steven Fons, Michael Gallagher, Maxime Geoffrey,
Mats A Granskog, ... Guangyu Zuo. (2022). Sea ice drift tracks from the Distributed
Network of autonomous buoys deployed during the Multidisciplinary drifting Observatory
for the Study of Arctic Climate (MOSAiC) expedition 2019 - 2021. Arctic Data Center.
doi:10.18739/A2KP7TS83.
Example
-------
>>> from clouddrift.adapters import mosaic
>>> ra = mosaic.to_raggedarray()
"""
import xml.etree.ElementTree as ET
from datetime import datetime
from io import BytesIO
import numpy as np
import pandas as pd
import requests
from clouddrift.adapters.utils import download_with_progress, standard_retry_protocol
from clouddrift.raggedarray import RaggedArray
MOSAIC_VERSION = "2022"
[docs]
def get_dataframes() -> tuple[pd.DataFrame, pd.DataFrame]:
"""Get the MOSAiC data (obs dimension in the target Dataset) and metadata
(traj dimension in the target dataset ) as pandas DataFrames."""
xml = get_repository_metadata()
filenames, urls = get_file_urls(xml)
exclude_patterns = ["site_buoy_summary", "buoy_list"]
data_filenames = [f for f in filenames if not any([s in f for s in exclude_patterns])]
data_urls = [
f for n, f in enumerate(urls) if not any([s in filenames[n] for s in exclude_patterns])
]
sensor_ids = [f.split("_")[-1].rstrip(".csv") for f in data_filenames]
sensor_list_url = urls[filenames.index([f for f in filenames if "buoy_list" in f].pop())]
sensors = pd.read_csv(sensor_list_url)
# Sort the urls by the order of sensor IDs in the sensor list
order_index = {id: n for n, id in enumerate(sensors["Sensor ID"])}
sorted_indices = sorted(range(len(sensor_ids)), key=lambda k: order_index[sensor_ids[k]])
sorted_data_urls = [data_urls[i] for i in sorted_indices]
buffers = [BytesIO() for _ in range(len(sorted_data_urls))]
requests = [(url, buffer) for url, buffer in zip(sorted_data_urls, buffers)]
download_with_progress(requests, desc="Downloading data")
[b.seek(0) for b in buffers]
dfs = [pd.read_csv(b) for b in buffers]
obs_df = pd.concat(dfs)
# Use the index of the concatenated DataFrame to determine the count/rowsize
zero_indices = [n for n, val in enumerate(list(obs_df.index)) if val == 0]
sensors["rowsize"] = np.diff(zero_indices + [len(obs_df)])
# Make the time column the index of the DataFrame, which will make it a
# coordinate in the xarray Dataset.
obs_df.set_index("datetime", inplace=True)
sensors.set_index("Sensor ID", inplace=True)
return obs_df, sensors
[docs]
def get_file_urls(xml: bytes) -> tuple[list[str], list[str]]:
"""Pass the MOSAiC XML string and return the list of filenames and URLs."""
filenames = [
tag.text
for tag in ET.fromstring(xml).findall("./dataset/dataTable/physical/objectName")
if tag.text
]
urls = [
tag.text
for tag in ET.fromstring(xml).findall(
"./dataset/dataTable/physical/distribution/online/url"
)
if tag.text
]
return filenames, urls
[docs]
def to_raggedarray() -> RaggedArray:
"""Return the MOSAiC data as a RaggedArray instance.
Returns
-------
RaggedArray
MOSAiC sea-ice drift trajectories as a ragged array.
"""
# Download the data and metadata as pandas DataFrames.
obs_df, traj_df = get_dataframes()
# Dates and datetimes are strings; convert them to datetime64 instances
# for compatibility with CloudDrift's analysis functions.
obs_df.index = pd.to_datetime(obs_df.index)
for col in [
"Deployment Date",
"Deployment Datetime",
"First Data Datetime",
"Last Data Datetime",
]:
if col in traj_df.columns:
traj_df[col] = pd.to_datetime(traj_df[col])
traj_ids = traj_df.index.to_numpy()
rowsize = traj_df["rowsize"].to_numpy(dtype="int64")
time = obs_df.index.to_numpy().astype("datetime64[ns]")
attrs_global = {
"title": "Multidisciplinary drifting Observatory for the Study of Arctic Climate (MOSAiC) expedition 2019 - 2021",
"history": f"Dataset updated in {MOSAIC_VERSION}",
"date_created": datetime.now().isoformat(),
"publisher_name": "NSF Arctic Data Center",
"publisher_url": "https://arcticdata.io/catalog/view/doi:10.18739/A2KP7TS83",
"license": "Creative Commons Attribution 4.0 International License (http://creativecommons.org/licenses/by/4.0/)",
}
attrs_variables = {
"id": {"long_name": "sensor identifier"},
"time": {"long_name": "time"},
"rowsize": {
"long_name": "number of observations per trajectory",
"sample_dimension": "obs",
"units": "-",
},
"latitude": {
"long_name": "latitude",
"standard_name": "latitude",
"units": "degrees_north",
},
"longitude": {
"long_name": "longitude",
"standard_name": "longitude",
"units": "degrees_east",
},
}
# Build traj-level metadata from all traj_df columns (rowsize handled separately)
traj_meta: dict = {}
traj_var_dims: dict = {}
for col in traj_df.columns:
if col == "rowsize":
continue
traj_meta[col] = traj_df[col].to_numpy()
traj_var_dims[col] = ["traj"]
# Build obs-level data from all obs_df columns
obs_data: dict = {}
obs_var_dims: dict = {}
for col in obs_df.columns:
obs_data[col] = obs_df[col].to_numpy()
obs_var_dims[col] = ["obs"]
return RaggedArray(
coords={
"id": traj_ids,
"time": time,
},
metadata={
"rowsize": rowsize,
**traj_meta,
},
data=obs_data,
attrs_global=attrs_global,
attrs_variables=attrs_variables,
name_dims={"traj": "rows", "obs": "obs"},
coord_dims={"id": "traj", "time": "obs"},
var_dims={
"rowsize": ["traj"],
**traj_var_dims,
**obs_var_dims,
},
)