Source code for clouddrift.adapters.mosaic

"""
This module defines functions used to adapt the MOSAiC sea-ice drift dataset as
a ragged-array dataset.

The dataset is hosted at https://doi.org/10.18739/A2KP7TS83.

Reference: Angela Bliss, Jennifer Hutchings, Philip Anderson, Philipp Anhaus,
Hans Jakob Belter, Jørgen Berge, Vladimir Bessonov, Bin Cheng, Sylvia Cole,
Dave Costa, Finlo Cottier, Christopher J Cox, Pedro R De La Torre, Dmitry V Divine,
Gilbert Emzivat, Ying-Chih Fang, Steven Fons, Michael Gallagher, Maxime Geoffrey,
Mats A Granskog, ... Guangyu Zuo. (2022). Sea ice drift tracks from the Distributed
Network of autonomous buoys deployed during the Multidisciplinary drifting Observatory
for the Study of Arctic Climate (MOSAiC) expedition 2019 - 2021. Arctic Data Center.
doi:10.18739/A2KP7TS83.

Example
-------
>>> from clouddrift.adapters import mosaic
>>> ra = mosaic.to_raggedarray()
"""

import xml.etree.ElementTree as ET
from datetime import datetime
from io import BytesIO

import numpy as np
import pandas as pd
import requests

from clouddrift.adapters.utils import download_with_progress, standard_retry_protocol
from clouddrift.raggedarray import RaggedArray

MOSAIC_VERSION = "2022"


[docs] def get_dataframes() -> tuple[pd.DataFrame, pd.DataFrame]: """Get the MOSAiC data (obs dimension in the target Dataset) and metadata (traj dimension in the target dataset ) as pandas DataFrames.""" xml = get_repository_metadata() filenames, urls = get_file_urls(xml) exclude_patterns = ["site_buoy_summary", "buoy_list"] data_filenames = [f for f in filenames if not any([s in f for s in exclude_patterns])] data_urls = [ f for n, f in enumerate(urls) if not any([s in filenames[n] for s in exclude_patterns]) ] sensor_ids = [f.split("_")[-1].rstrip(".csv") for f in data_filenames] sensor_list_url = urls[filenames.index([f for f in filenames if "buoy_list" in f].pop())] sensors = pd.read_csv(sensor_list_url) # Sort the urls by the order of sensor IDs in the sensor list order_index = {id: n for n, id in enumerate(sensors["Sensor ID"])} sorted_indices = sorted(range(len(sensor_ids)), key=lambda k: order_index[sensor_ids[k]]) sorted_data_urls = [data_urls[i] for i in sorted_indices] buffers = [BytesIO() for _ in range(len(sorted_data_urls))] requests = [(url, buffer) for url, buffer in zip(sorted_data_urls, buffers)] download_with_progress(requests, desc="Downloading data") [b.seek(0) for b in buffers] dfs = [pd.read_csv(b) for b in buffers] obs_df = pd.concat(dfs) # Use the index of the concatenated DataFrame to determine the count/rowsize zero_indices = [n for n, val in enumerate(list(obs_df.index)) if val == 0] sensors["rowsize"] = np.diff(zero_indices + [len(obs_df)]) # Make the time column the index of the DataFrame, which will make it a # coordinate in the xarray Dataset. obs_df.set_index("datetime", inplace=True) sensors.set_index("Sensor ID", inplace=True) return obs_df, sensors
[docs] def get_file_urls(xml: bytes) -> tuple[list[str], list[str]]: """Pass the MOSAiC XML string and return the list of filenames and URLs.""" filenames = [ tag.text for tag in ET.fromstring(xml).findall("./dataset/dataTable/physical/objectName") if tag.text ] urls = [ tag.text for tag in ET.fromstring(xml).findall( "./dataset/dataTable/physical/distribution/online/url" ) if tag.text ] return filenames, urls
[docs] def get_repository_metadata() -> bytes: """Get the MOSAiC repository metadata as an XML string. Pass this string to other get_* functions to extract the data you need. """ url = "https://arcticdata.io/metacat/d1/mn/v2/object/doi:10.18739/A2KP7TS83" r = standard_retry_protocol(lambda: requests.get(url))() return r.content
[docs] def to_raggedarray() -> RaggedArray: """Return the MOSAiC data as a RaggedArray instance. Returns ------- RaggedArray MOSAiC sea-ice drift trajectories as a ragged array. """ # Download the data and metadata as pandas DataFrames. obs_df, traj_df = get_dataframes() # Dates and datetimes are strings; convert them to datetime64 instances # for compatibility with CloudDrift's analysis functions. obs_df.index = pd.to_datetime(obs_df.index) for col in [ "Deployment Date", "Deployment Datetime", "First Data Datetime", "Last Data Datetime", ]: if col in traj_df.columns: traj_df[col] = pd.to_datetime(traj_df[col]) traj_ids = traj_df.index.to_numpy() rowsize = traj_df["rowsize"].to_numpy(dtype="int64") time = obs_df.index.to_numpy().astype("datetime64[ns]") attrs_global = { "title": "Multidisciplinary drifting Observatory for the Study of Arctic Climate (MOSAiC) expedition 2019 - 2021", "history": f"Dataset updated in {MOSAIC_VERSION}", "date_created": datetime.now().isoformat(), "publisher_name": "NSF Arctic Data Center", "publisher_url": "https://arcticdata.io/catalog/view/doi:10.18739/A2KP7TS83", "license": "Creative Commons Attribution 4.0 International License (http://creativecommons.org/licenses/by/4.0/)", } attrs_variables = { "id": {"long_name": "sensor identifier"}, "time": {"long_name": "time"}, "rowsize": { "long_name": "number of observations per trajectory", "sample_dimension": "obs", "units": "-", }, "latitude": { "long_name": "latitude", "standard_name": "latitude", "units": "degrees_north", }, "longitude": { "long_name": "longitude", "standard_name": "longitude", "units": "degrees_east", }, } # Build traj-level metadata from all traj_df columns (rowsize handled separately) traj_meta: dict = {} traj_var_dims: dict = {} for col in traj_df.columns: if col == "rowsize": continue traj_meta[col] = traj_df[col].to_numpy() traj_var_dims[col] = ["traj"] # Build obs-level data from all obs_df columns obs_data: dict = {} obs_var_dims: dict = {} for col in obs_df.columns: obs_data[col] = obs_df[col].to_numpy() obs_var_dims[col] = ["obs"] return RaggedArray( coords={ "id": traj_ids, "time": time, }, metadata={ "rowsize": rowsize, **traj_meta, }, data=obs_data, attrs_global=attrs_global, attrs_variables=attrs_variables, name_dims={"traj": "rows", "obs": "obs"}, coord_dims={"id": "traj", "time": "obs"}, var_dims={ "rowsize": ["traj"], **traj_var_dims, **obs_var_dims, }, )