Source code for eva.data.mon_data_space

# (C) Copyright 2021-2023 NOAA/NWS/EMC
#
# (C) Copyright 2021-2023 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.


# --------------------------------------------------------------------------------------------------


import os
import numpy as np

from xarray import Dataset, concat
from scipy.io import FortranFile
from datetime import datetime

from eva.data.eva_dataset_base import EvaDatasetBase
from eva.utilities.config import get
from eva.utilities.utils import parse_channel_list, is_number

# --------------------------------------------------------------------------------------------------


[docs]class MonDataSpace(EvaDatasetBase): """ A class for handling MonDataSpace dataset configuration and processing. """
[docs] def execute(self, dataset_config, data_collections, timing): """ Executes the processing of MonDataSpace dataset. Args: dataset_config (dict): Configuration dictionary for the dataset. data_collections (DataCollections): Object for managing data collections. timing (Timing): Timing object for tracking execution time. """ # Set the collection name # ----------------------- collection_name = get(dataset_config, self.logger, 'name') # Get control file and parse # -------------------------- control_file = get(dataset_config, self.logger, 'control_file') coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict = ( self.get_ctl_dict(control_file[0])) ndims_used, dims_arr = self.get_ndims_used(dims) # Get the groups to be read # ------------------------- groups = get(dataset_config, self.logger, 'groups') # Trim coordinates # ---------------- coord_dict = { 0: ['channels', 'Channel'], 1: ['regions', 'Region'], 2: ['levels', 'Level'] } drop_coord = [False, False, False] requested_coord = [None, None, None] for x in range(len(coord_dict)): str_or_list = get(dataset_config, self.logger, coord_dict[x][0], abort_on_failure=False) if str_or_list is not None: requested_coord[x] = parse_channel_list(str(str_or_list), self.logger) drop_coord[x] = True # Set coordinate ranges # --------------------- channo = chans_dict["chan_nums"] if chans_dict is not None else None x_range, y_range, z_range = self.get_dim_ranges(coords, dims, channo) # Filenames to be read into this collection # ----------------------------------------- filenames = get(dataset_config, self.logger, 'filenames') ds_list = [] # Get missing value threshold # --------------------------- threshold = float(get(dataset_config, self.logger, 'missing_value_threshold', 1.0e30)) for filename in filenames: # read data file darr, cycle_tm = self.read_ieee(filename, coords, dims, ndims_used, dims_arr, nvars, vars) # add cycle as a variable to data array cyc_darr = self.var_to_np_array(dims, ndims_used, dims_arr, cycle_tm) # create dataset from file contents timestep_ds = None timestep_ds = self.load_dset(vars, nvars, coords, darr, dims, ndims_used, dims_arr, x_range, y_range, z_range, cyc_darr, channo) if attribs['sat']: timestep_ds.attrs['satellite'] = attribs['sat'] if attribs['sensor']: timestep_ds.attrs['sensor'] = attribs['sensor'] # add cycle_tm dim for concat timestep_ds['Time'] = cycle_tm.strftime("%Y%m%d%H") # Add this dataset to the list of ds_list ds_list.append(timestep_ds) # Concatenate datasets from ds_list into a single dataset ds = concat(ds_list, dim='Time') # Group name and variables # ------------------------ for group in groups: group_name = get(group, self.logger, 'name') group_vars = get(group, self.logger, 'variables', 'all') # Drop coordinates not in requested list # -------------------------------------- for x in range(len(coord_dict)): if drop_coord[x]: ds, chans_dict = \ self.subset_coordinate(ds, coord_dict[x][1], requested_coord[x], chans_dict) # Conditionally add channel, level, and scan related variables # ------------------------------------------------------------ ds = self.loadConditionalItems(ds, chans_dict, levs_dict, scanpo) # Rename variables with group rename_dict = {} for var in list(ds.data_vars): rename_dict[var] = group_name + '::' + var ds = ds.rename(rename_dict) # Assert that the collection contains at least one variable if not ds.keys(): self.logger.abort('Collection \'' + dataset_config['name'] + '\', group \'' + group_name + '\' in file ' + filename + ' does not have any variables.') # Add the dataset to the collections data_collections.create_or_add_to_collection(collection_name, ds, 'cycle') # Nan out unphysical values data_collections.nan_float_values_outside_threshold(threshold) # Display the contents of the collections for helping the user with making plots data_collections.display_collections()
# ----------------------------------------------------------------------------------------------
[docs] def generate_default_config(self, filenames, collection_name, control_file): """ Generates a default configuration for MonDataSpace dataset. Args: filenames (list): List of file names. collection_name (str): Name of the data collection. control_file (str): Path to the control file. Returns: dict: Default configuration dictionary. """ eva_dict = {'satellite': None, 'sensor': None, 'control_file': control_file, 'filenames': filenames, 'channels': [], 'regions': [], 'levels': [], 'groups': [], 'name': collection_name} return eva_dict
# ----------------------------------------------------------------------------------------------
[docs] def subset_coordinate(self, ds, coordinate, requested_subset, chans_dict): """ Subset the input dataset along the specified coordinate dimension and update channel information. Args: ds (xarray.Dataset): Input dataset to be subset. coordinate (str): Name of the coordinate dimension to subset. requested_subset (list): List of values to keep along the specified coordinate. chans_dict (dict): Dictionary of channel components. Returns: xarray.Dataset: Subset of the input dataset. chans_dict (dict): Updated dictionary of channel components. """ if coordinate in list(ds.dims): # Number of entries in requested_subset num_coord_to_use = len(requested_subset) # Number of specified coordinate in the ds num_specified_coordinate_in_ds = len(ds.coords[coordinate]) if all(x in ds[coordinate] for x in requested_subset): ds = ds.sel(**{coordinate: requested_subset}) else: bad_coordinates = [x for x in requested_subset if x not in ds[coordinate]] self.logger.abort(f"{', '.join(str(i) for i in bad_coordinates)} was inputted" + " as a selected value of the coordinate " + str(coordinate) + ", but that is not a valid value. \n" + "Valid values for " + str(coordinate) + " are: \n" + f"{', '.join(str(i) for i in ds[coordinate].data)}") # If Channel coordinate has been reduced from "all" (by yaml # file) then reset chan_assim/nassim and channo accordingly if coordinate == 'Channel': new_chan_assim = [] new_chan_nassim = [] for x in requested_subset: if x in chans_dict['chans_assim']: new_chan_assim.append(x) elif x in chans_dict['chans_nassim']: new_chan_nassim.append(x) for x in range(len(new_chan_assim), len(requested_subset)): new_chan_assim.append(0) for x in range(len(new_chan_nassim), len(requested_subset)): new_chan_nassim.append(0) chans_dict['chan_nums'] = requested_subset chans_dict['chans_assim'] = new_chan_assim chans_dict['chans_nassim'] = new_chan_nassim else: self.logger.info('Warning: requested coordinate, ' + str(coordinate) + ' is not in ' + ' dataset dimensions valid dimensions include ' + str(ds.dims)) return ds, chans_dict
# ---------------------------------------------------------------------------------------------- # Parse control file and return elements in dictionaries
[docs] def get_ctl_dict(self, control_file): """ Parse the control file and extract information into dictionaries. Args: control_file (str): Path to the control file. Returns: dict: Dictionary containing various coordinates and information. dict: Dictionary containing dimension sizes. dict: Dictionary containing sensor and satellite attributes. int: Number of variables. list: List of variable names. list: List of scan positions. dict: Dictionary containing channel information. dict: Dictionary containing level information. """ coords = {'xdef': None, 'ydef': None, 'zdef': None} coord_list = [] dims = {'xdef': 0, 'ydef': 0, 'zdef': 0} dim_list = [] attribs = {'sensor': None, 'sat': None} vars = [] nvars = 0 chans_dict = None channo = [] chan_assim = [] chan_nassim = [] scanpo = None scan_info = [] levs_dict = None levs = [] level_assim = [] level_nassim = [] coord_dict = { 'channel': 'Channel', 'scan': 'Scan', 'pressure': 'Level', 'region': 'Region', 'iter': 'Iteration' } with open(control_file, 'r') as fp: lines = fp.readlines() for line in lines: # Locate the coordinates using coord_dict. There will be 1-3 # coordinates specified as XDEF, YDEF, and ZDEF. for item in list(coord_dict.keys()): if 'DEF' in line and item in line: coord_list.append(coord_dict[item]) # In most cases xdef, ydef, and zdef specify the size of # the coresponding coordinate. # # Scan is different. If xdef coresponds to Scan then the xdef line # specifies the number of scan steps, starting position, and step size. # The last 2 are floats. Add all to scan_info for later use. if line.find('xdef') != -1: strs = line.split() for st in strs: if st.isdigit(): dim_list.append(int(st)) if is_number(st): scan_info.append(st) if line.find('ydef') != -1: strs = line.split() for st in strs: if st.isdigit(): dim_list.append(int(st)) if line.find('zdef') != -1: strs = line.split() for st in strs: if st.isdigit(): dim_list.append(int(st)) if line.find('vars') != -1: strs = line.split() for st in strs: if st.isdigit(): nvars = int(st) if line.find('title') != -1: if line.find('conventional') == -1 and line.find('gsistat') == -1: strs = line.split() attribs['sensor'] = strs[1] attribs['sat'] = strs[2] # Note we need to extract the actual channel numbers. We have the # number of channels via the xdef line, but they are not necessarily # ordered consecutively. # # If channel is used then assign channel numbers to the chan_assim and # chan_nassim arrays based on the channel's iuse setting in the control # file. In the file 1 = assimilated, -1 = not assimilated. if line.find('channel=') != -1: strs = line.split() if strs[4].isdigit(): channo.append(int(strs[4])) if strs[7] == '1': chan_assim.append(int(strs[4])) if strs[7] == '-1': chan_nassim.append(int(strs[4])) if line.find('level=') != -1: strs = line.split() tlev = strs[2].replace(',', '') if tlev.isdigit(): levs.append(int(tlev)) if strs[7] == '1': level_assim.append(int(tlev)) if strs[7] == '-1': level_nassim.append(int(tlev)) # The list of variables is at the end of the file between the lines # "vars" and "end vars". start = len(lines) - (nvars + 1) for x in range(start, start + nvars): strs = lines[x].split() vars.append(strs[-1]) # Ignore any coordinates in the control file that have a value of 1. used = 0 mydef = ["xdef", "ydef", "zdef"] for x in range(3): if dim_list[x] > 2: coords[mydef[used]] = coord_list[x] dims[mydef[used]] = dim_list[x] used += 1 # If Scan is in the coords calculate the scan positions. # scan_info[0] = num steps, [1] = start position, [2] = step size if 'Scan' in coords.values(): scanpo = [(float(scan_info[1]))] for x in range(1, int(scan_info[0])): scanpo.append(float(scan_info[1])+(float(scan_info[2])*x)) # If Channel is in the coords then pad out the chan_assim adn chan_nassim # arrays with zeros. They need to be the correct length to use 'Channel' as # the dimension. Also the yaml file can use the 'select where' transform # to drop all values < 1 and plot only the assim/nassim channel markers. if 'Channel' in coords.values(): for x in range(len(chan_assim), len(channo)): chan_assim.append(0) for x in range(len(chan_nassim), len(channo)): chan_nassim.append(0) chans_dict = {'chan_nums': channo, 'chans_assim': chan_assim, 'chans_nassim': chan_nassim} if 'Level' in coords.values(): for x in range(len(level_assim), len(levs)): level_assim.append(0) for x in range(len(level_nassim), len(levs)): level_nassim.append(0) levs_dict = {'levels': levs, 'levels_assim': level_assim, 'levels_nassim': level_nassim} return coords, dims, attribs, nvars, vars, scanpo, levs_dict, chans_dict
[docs] def read_ieee(self, file_name, coords, dims, ndims_used, dims_arr, nvars, vars, file_path=None): """ Read data from an IEEE file and arrange it into a numpy array. Args: file_name (str): Name of the IEEE file to read. coords (dict): Dictionary of coordinates. dims (dict): Dictionary of dimension sizes. ndims_used (int): Number of dimensions used. dims_arr (list): List of dimension names used. nvars (int): Number of variables. vars (list): List of variable names. file_path (str, optional): Path to the directory containing the file. Defaults to None. Returns: numpy.ndarray: Numpy array containing the read data. datetime.datetime: Cycle time extracted from the filename. """ # find cycle time in filename and create cycle_tm as datetime object cycle_tm = None cycstrs = file_name.replace('/', '.').split('.') for cycstr in cycstrs: if ((cycstr.isnumeric()) and (len(cycstr) == 10)): cycle_tm = datetime(int(cycstr[0:4]), int(cycstr[4:6]), int(cycstr[6:8]), int(cycstr[8:])) filename = os.path.join(file_path, file_name) if file_path else file_name load_data = True if os.path.isfile(filename): f = FortranFile(filename, 'r', '>u4') else: self.logger.info(f"WARNING: file {filename} is missing") load_data = False if ndims_used == 1: # MinMon rtn_array = np.empty((0, dims[dims_arr[0]]), dtype=float) if not load_data: zarray = np.zeros((dims[dims_arr[0]]), float) dimensions = [dims[dims_arr[0]]] if ndims_used == 2: # RadMon time, bcoef, bcor, OznMon time rtn_array = np.empty((0, dims[dims_arr[0]], dims[dims_arr[1]]), float) if not load_data: zarray = np.zeros((dims[dims_arr[0]], dims[dims_arr[1]]), float) dimensions = [dims[dims_arr[1]], dims[dims_arr[0]]] if ndims_used == 3: # RadMon angle rtn_array = np.empty((0, dims[dims_arr[0]], dims[dims_arr[1]], dims[dims_arr[2]]), float) zarray = np.zeros((dims[dims_arr[0]], dims[dims_arr[1]], dims[dims_arr[2]]), float) dimensions = [dims[dims_arr[0]], dims[dims_arr[1]]] for x in range(nvars): if load_data: # satang variable is not used and a non-standard size if vars[x] == 'satang': skip = f.read_reals(dtype=np.dtype('>f4')).reshape(dims['xdef'], dims['ydef']) tarr = zarray else: mylist = [] for z in range(5): arr = f.read_reals(dtype=np.dtype('>f4')).reshape(dims['ydef'], dims['xdef']) mylist.append(np.transpose(arr)) tarr = np.dstack(mylist) else: tarr = zarray rtn_array = np.append(rtn_array, [tarr], axis=0) else: # ndims_used == 1|2 for x in range(nvars): if load_data: if ndims_used == 1: arr = f.read_reals(dtype=np.dtype('>f4')) else: arr = np.transpose(f.read_reals(dtype=np.dtype('>f4')).reshape(dimensions)) else: arr = zarray rtn_array = np.append(rtn_array, [arr], axis=0) if load_data: f.close() return rtn_array, cycle_tm
# ----------------------------------------------------------------------------------------------
[docs] def var_to_np_array(self, dims, ndims_used, dims_arr, var): """ Create a numpy array with specified dimensions and fill it with a given value. Args: dims (dict): Dictionary of dimension sizes. ndims_used (int): Number of dimensions used. dims_arr (list): List of dimension names used. var: Value to fill the array with. Returns: numpy.ndarray: Numpy array with the requested dimensions and filled with the given value. """ # build numpy array with requested dimensions d = { 1: np.reshape([[var] * dims[dims_arr[0]]], (dims[dims_arr[0]])), 2: np.reshape([[var] * dims[dims_arr[0]]] * dims[dims_arr[1]], (dims[dims_arr[0]], dims[dims_arr[1]])), 3: np.reshape([[var] * dims[dims_arr[0]] * dims[dims_arr[1]] * dims[dims_arr[2]]], (dims[dims_arr[0]], dims[dims_arr[1]], dims[dims_arr[2]])) } try: cycle_arr = d[ndims_used] except KeyError: self.logger.abort(f'ndims_used must be in range of 1-3, value is {ndims_used}') return cycle_arr
# ----------------------------------------------------------------------------------------------
[docs] def get_dim_ranges(self, coords, dims, channo): """ Get the valid ranges for each dimension based on the specified coordinates and channel numbers. Args: coords (dict): Dictionary of coordinates. dims (dict): Dictionary of dimension sizes. channo (list): List of channel numbers. Returns: numpy.ndarray or None: Valid x coordinate range or None. numpy.ndarray or None: Valid y coordinate range or None. numpy.ndarray or None: Valid z coordinate range or None. """ x_range = None y_range = None z_range = None # - Return None for a coordinate that has value 0 or 1. # - "Channel" can be either the x or y coordinate and can be # numbered non-consecutively, which has been captured in channo. # - The z coordinate is never used for channel. if dims['xdef'] > 1: x_range = channo if coords['xdef'] == 'Channel' else np.arange(1, dims['xdef']+1) if dims['ydef'] > 1: y_range = channo if coords['ydef'] == 'Channel' else np.arange(1, dims['ydef']+1) if dims['zdef'] > 1: z_range = np.arange(1, dims['zdef']+1) return x_range, y_range, z_range
# ----------------------------------------------------------------------------------------------
[docs] def get_ndims_used(self, dims): """ Determine the number of dimensions used based on the provided dimension sizes. Args: dims (dict): Dictionary of dimension sizes. Returns: int: Number of dimensions used. list: List of dimension names used. """ # Some ieee files (ozn) can be 1 or 2 dimensions depending on the # number of levels used. Levels is the xdef, Regions is the ydef. # All Ozn files use ydef, but many have xdef = 1. The dims_arr[] # will return the name(s) of the dimensions actually used. # Ignore dims with values of 0 or 1 ndims = len(dims) dims_arr = [] for x in range(ndims): if list(dims.values())[x] <= 1: ndims -= 1 else: dims_arr.append(list(dims)[x]) for x in range(ndims, 3): dims_arr.append(list(dims)[2]) return ndims, dims_arr
# ----------------------------------------------------------------------------------------------
[docs] def load_dset(self, vars, nvars, coords, darr, dims, ndims_used, dims_arr, x_range, y_range, z_range, cyc_darr, channo): """ Create a dataset from various components. Args: vars (list): List of variable names. nvars (int): Number of variables. coords (dict): Dictionary of coordinates. darr (numpy.ndarray): Numpy array of data. dims (dict): Dictionary of dimension sizes. ndims_used (int): Number of dimensions used. dims_arr (list): List of dimension names used. x_range (numpy.ndarray or None): Valid x coordinate range. y_range (numpy.ndarray or None): Valid y coordinate range. z_range (numpy.ndarray or None): Valid z coordinate range. cyc_darr (numpy.ndarray): Numpy array of cycle data. channo (list): List of channel numbers. Returns: xarray.Dataset: Created dataset. """ # create dataset from file components rtn_ds = None for x in range(0, nvars): if ndims_used == 1: d = { vars[x]: {"dims": (coords[dims_arr[0]]), "data": darr[x, :]} } if ndims_used == 2: d = { vars[x]: {"dims": (coords[dims_arr[0]], coords[dims_arr[1]]), "data": darr[x, :, :]} } if ndims_used == 3: d = { vars[x]: {"dims": (coords[dims_arr[0]], coords[dims_arr[1]], coords[dims_arr[2]]), "data": darr[x, :, :]} } if 'Channel' in coords.values(): d.update({"Channel": {"dims": ("Channel"), "data": channo}}) new_ds = Dataset.from_dict(d) rtn_ds = new_ds if rtn_ds is None else rtn_ds.merge(new_ds) # tack on 'cycle' as a variable # ----------------------------- if ndims_used == 1: new_cyc = Dataset( { 'cycle': ((coords[dims_arr[0]]), cyc_darr), }, coords={coords[dims_arr[0]]: x_range}, ) if ndims_used == 2: new_cyc = Dataset( { 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]]), cyc_darr), }, coords={coords[dims_arr[0]]: x_range, coords[dims_arr[1]]: y_range}, ) if ndims_used == 3: new_cyc = Dataset( { 'cycle': ((coords[dims_arr[0]], coords[dims_arr[1]], coords[dims_arr[2]]), cyc_darr), }, coords={coords[dims_arr[0]]: x_range, coords[dims_arr[1]]: y_range, coords[dims_arr[2]]: z_range}, ) rtn_ds = rtn_ds.merge(new_cyc) return rtn_ds
# ----------------------------------------------------------------------------------------------
[docs] def loadConditionalItems(self, dataset, chans_dict, levs_dict, scanpo): """ Add channel, level, and scan related variables to the dataset. Args: dataset (xarray.Dataset): Dataset to which variables will be added. chans_dict (dict): Dictionary of channel components. levs_dict (dict): Dictionary of level components. scanpo (list): List of scan positions. Returns: xarray.Dataset: Dataset with added scan-related variables. """ if chans_dict is not None: dataset['channel'] = (['Channel'], chans_dict["chan_nums"]) dataset['chan_yaxis_z'] = (['Channel'], [0.0]*len(chans_dict["chan_nums"])) if len(chans_dict["chans_assim"]) > 0: dataset['chan_assim'] = (['Channel'], chans_dict["chans_assim"]) if len(chans_dict["chans_nassim"]) > 0: dataset['chan_nassim'] = (['Channel'], chans_dict["chans_nassim"]) if levs_dict is not None: dataset['level'] = (['Level'], levs_dict["levels"]) dataset['level_yaxis_z'] = (['Level'], [0.0]*len(levs_dict["levels"])) if len(levs_dict["levels_assim"]) > 0: dataset['level_assim'] = (['Level'], levs_dict["levels_assim"]) if len(levs_dict["levels_nassim"]) > 0: dataset['level_nassim'] = (['Level'], levs_dict["levels_nassim"]) if scanpo is not None: nscan = dataset.dims.get('Scan') nchan = dataset.dims.get('Channel') # 'Channel' is always present with 'Scan' scan_array = np.zeros(shape=(nscan, nchan)) for x in range(nchan): scan_array[:, x] = np.array([scanpo]) dataset['scan'] = (['Scan', 'Channel'], scan_array) return dataset
# ----------------------------------------------------------------------------------------------