Source code for pyrfu.mms.list_files

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import bisect
import datetime
import json
import os
import re
from typing import Mapping, Optional, Union

# 3rd party imports
import numpy as np
from dateutil import parser
from dateutil.rrule import DAILY, rrule

from pyrfu.mms.db_init import MMS_CFG_PATH

# Local imports
from pyrfu.pyrf.datetime642iso8601 import datetime642iso8601
from pyrfu.pyrf.iso86012datetime64 import iso86012datetime64

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2024"
__license__ = "MIT"
__version__ = "2.4.13"
__status__ = "Prototype"


[docs]def list_files( tint: list[str], mms_id: Union[str, int], var: Mapping[str, str], data_path: Optional[str] = "", ) -> list[str]: r"""Find available files in the data directories for `var`. Parameters ---------- tint : list Time interval mms_id : str or int Index of the spacecraft var : dict Dictionary containing at least 4 keys * var["inst"] : name of the instrument * var["tmmode"] : data rate * var["lev"] : data level * var["dtype"] : data type data_path : str, Optional Path of MMS data. Default uses `pyrfu.mms.mms_config.py` Returns ------- file_names : list List of files corresponding to the parameters in the selected time interval """ # Check path if not data_path: # Read the current version of the MMS configuration file with open(MMS_CFG_PATH, "r", encoding="utf-8") as fs: config = json.load(fs) root_path = os.path.normpath(config["local"]) else: root_path = os.path.normpath(data_path) # Make sure that the data path exists assert os.path.exists(root_path), f"{root_path} doesn't exist!!" # Check time interval if isinstance(tint, list): tint_array = np.array(tint) else: raise TypeError("tint must be a list!!") # Convert time interval to ISO 8601 if isinstance(tint_array[0], str): tint_iso8601 = datetime642iso8601(iso86012datetime64(tint_array)) else: raise TypeError("Values must be in str!!") files_out = [] if not isinstance(mms_id, str): mms_id = str(mms_id) # directory and file name search patterns: # - assume directories are of the form: # (srvy, SITL): spacecraft/instrument/rate/level[/datatype]/year/month/ # (brst): spacecraft/instrument/rate/level[/datatype]/year/month/day/ # - assume file names are of the form: # spacecraft_instrument_rate_level[_datatype]_YYYYMMDD[hhmmss]_version.cdf file_name = ( f"mms{mms_id}_{var['inst']}_{var['tmmode']}_{var['lev']}" + r"(_)?.*_([0-9]{8,14})_v(\d+).(\d+).(\d+).cdf" ) d_start = parser.parse(parser.parse(tint_iso8601[0]).strftime("%Y-%m-%d")) until_ = parser.parse(tint_iso8601[1]) - datetime.timedelta(seconds=1) days = rrule(DAILY, dtstart=d_start, until=until_) if var["dtype"] == "" or var["dtype"] is None: level_and_dtype = var["lev"] else: level_and_dtype = os.sep.join([var["lev"], var["dtype"]]) for date in days: if var["tmmode"] == "brst": local_dir = os.sep.join( [ root_path, f"mms{mms_id}", var["inst"], var["tmmode"], level_and_dtype, date.strftime("%Y"), date.strftime("%m"), date.strftime("%d"), ], ) else: local_dir = os.sep.join( [ root_path, f"mms{mms_id}", var["inst"], var["tmmode"], level_and_dtype, date.strftime("%Y"), date.strftime("%m"), ], ) if os.name == "nt": full_path = os.sep.join([re.escape(local_dir) + os.sep, file_name]) else: full_path = os.sep.join([re.escape(local_dir), file_name]) regex = re.compile(full_path) for root, _, files in os.walk(local_dir): for file in files: file_path = os.sep.join([root, file]) matches = regex.match(file_path) if matches: this_time = parser.parse(matches.groups()[1]) if d_start <= this_time <= until_: this_file = { "file_name": file, "timetag": "", "full_name": file_path, "file_size": "", } if this_file not in files_out: files_out.append(this_file) in_files = files_out file_name = r"mms.*_([0-9]{8,14})_v(\d+).(\d+).(\d+).cdf" file_times = [] regex = re.compile(file_name) for in_file in in_files: matches = regex.match(in_file["file_name"]) if matches: file_times.append( ( in_file["file_name"], parser.parse(matches.groups()[0]).timestamp(), in_file["timetag"], in_file["file_size"], ), ) # sort in time sorted_files = sorted(file_times, key=lambda x: x[1]) times = [t[1] for t in sorted_files] idx_min = bisect.bisect_left(times, parser.parse(tint_iso8601[0]).timestamp()) # note: purposefully liberal here; include one extra file so that we # always get the burst mode # data if idx_min == 0: files_in_interval = [] for sorted_file in sorted_files[idx_min:]: files_in_interval.append( { "file_name": sorted_file[0], "timetag": sorted_file[2], "file_size": sorted_file[3], }, ) else: files_in_interval = [] for sorted_file in sorted_files[idx_min - 1 :]: files_in_interval.append( { "file_name": sorted_file[0], "timetag": sorted_file[2], "file_size": sorted_file[3], }, ) local_files = [] file_names = [f["file_name"] for f in files_in_interval] for file_out in files_out: if file_out["file_name"] in file_names: local_files.append(file_out["full_name"]) file_names = sorted(local_files) return file_names