Source code for eva.data.jedi_log
# (C) Copyright 2021-2023 NOAA/NWS/EMC
#
# (C) Copyright 2021-2023 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.
#
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# --------------------------------------------------------------------------------------------------
import os
import numpy as np
import xarray as xr
from eva.data.eva_dataset_base import EvaDatasetBase
# --------------------------------------------------------------------------------------------------
# Parameters
space = ' '
# --------------------------------------------------------------------------------------------------
[docs]def get_data_from_line(jedi_log_line, search_term, separator, position):
"""
Extracts data from a line in a Jedi log based on the specified search term,
separator, and position.
Args:
jedi_log_line (str): Line from the Jedi log.
search_term (str): Search term to look for in the line.
separator (str): Separator used to split the line.
position (int): Position of the desired data after splitting.
Returns:
str: Extracted data value or None if not found.
"""
if search_term in jedi_log_line:
return jedi_log_line.split(separator)[position]
# --------------------------------------------------------------------------------------------------
[docs]class JediLog(EvaDatasetBase):
"""
A class for handling Jedi log data.
"""
[docs] def execute(self, dataset_config, data_collections, timing):
"""
Executes the processing of Jedi log data.
Args:
dataset_config (dict): Configuration dictionary for the dataset.
data_collections (DataCollections): Object for managing data collections.
timing (Timing): Timing object for tracking execution time.
"""
# Get name of the log file to parse
jedi_log_to_parse = dataset_config.get('jedi_log_to_parse')
# Collection name to use
collection_name = dataset_config.get('collection_name')
# Read log file into a string
with open(jedi_log_to_parse) as jedi_log_to_parse_open:
jedi_log_text = jedi_log_to_parse_open.read()
# Split log into list of lines
jedi_log_lines = jedi_log_text.split('\n')
# Check if this was a ctest and if so determine test prepend string
test_string = ''
for jedi_log_line in jedi_log_lines:
if jedi_log_line[0:4] == 'test':
test_string = jedi_log_line.split(' ')[1] + ': '
# Clean up lines
self.jedi_log_lines = []
for jedi_log_line in jedi_log_lines:
# Replace test number
new_line = jedi_log_line.replace(test_string, '')
# If new line is just spaces then set to empty string
if new_line.isspace():
new_line = ''
# Assemble new list of strings
self.jedi_log_lines.append(new_line)
# Split log into list of strings. Each element is all lines between two empty lines in the
# in the log file.
chunk_start_points = [-1]
for jedi_log_line_ind, jedi_log_line in enumerate(self.jedi_log_lines):
if jedi_log_line == '':
chunk_start_points.append(jedi_log_line_ind)
chunk_start_points.append(len(self.jedi_log_lines)+1)
self.log_chunks = []
for i in range(len(chunk_start_points)-2):
chunk = self.jedi_log_lines[chunk_start_points[i]+1:chunk_start_points[i+1]]
self.log_chunks.append('\n'.join(chunk))
# Get list of things to parse from the dictionary
data_to_parse = dataset_config.get('data_to_parse')
# Loop and add to dataset
for metric in data_to_parse:
if metric == 'convergence' and data_to_parse[metric]:
convergence_ds = self.parse_convergence()
# Add to the Eva dataset
data_collections.create_or_add_to_collection(collection_name, convergence_ds)
# Write out all the collections
data_collections.display_collections()
# ----------------------------------------------------------------------------------------------
[docs] def get_from_log(self, search_term, separator, position, custom_log=None):
"""
Searches the Jedi log for a specified term and extracts the corresponding data.
Args:
search_term (str): Search term to look for in the Jedi log.
separator (str): Separator used to split the log line.
position (int): Position of the desired data after splitting.
custom_log: Custom log to search in (optional).
Returns:
str: Extracted data value or None if not found.
"""
if custom_log is None:
log = self.jedi_log_lines
else:
log = custom_log
# Loop over elements of string
for jedi_log_line in log:
data_val = get_data_from_line(jedi_log_line, search_term, separator, position)
if data_val is not None:
return data_val
return None
# ----------------------------------------------------------------------------------------------
[docs] def get_matching_chunks(self, search_terms):
"""
Finds log chunks that match a list of search terms.
Args:
search_terms (list): List of search terms to match in log chunks.
Returns:
list: List of matching log chunks.
"""
# Create array to hold chunks that match
matching_chunks = []
# Loop over elements of string
for log_chunk in self.log_chunks:
# Build an array to check each match
search_terms_match = []
for search_term in search_terms:
search_terms_match.append(search_term in log_chunk)
# Append if all search terms are in the chunk
if all(search_terms_match):
matching_chunks.append(log_chunk)
return matching_chunks
# ----------------------------------------------------------------------------------------------
[docs] def parse_convergence(self):
"""
Parses convergence data from the Jedi log.
Returns:
xr.Dataset: Dataset containing the parsed convergence data.
"""
# Get the name of the minimizer
minimizer_algorithm = self.get_from_log('Minimizer algorithm', '=', 1)
# Get the chunks for the minimizer part (Norm reduction etc)
minimizer_chunks_strings = [f'{minimizer_algorithm} Starting Iteration',
f'{minimizer_algorithm} end of iteration']
minimizer_chunks = self.get_matching_chunks(minimizer_chunks_strings)
# Get the chunks for the J, Jb, JoJc part
j_chunks_strings = [f'Quadratic cost function: J ',
f'Quadratic cost function: Jb']
j_chunks = self.get_matching_chunks(j_chunks_strings)
# Total number of inner iterations
total_iter = len(minimizer_chunks)
# Check that some minimizer chunks were found
if total_iter == 0:
self.logger.abort('The number of iterations found in the log is zero. Check the ' +
'parsing of the log is correct.')
# Create list of variables that need to be built
var_names = []
var_search_criteria = []
var_split = []
var_position = []
var_dtype = []
if minimizer_chunks:
# Inner iteration number
var_names.append('inner_iteration')
var_search_criteria.append(f'{minimizer_algorithm} Starting Iteration')
var_split.append('Iteration')
var_position.append(1)
var_dtype.append('int32')
# Gradient reduction
var_names.append('gradient_reduction')
var_search_criteria.append('Gradient reduction (')
var_split.append('=')
var_position.append(1)
var_dtype.append('float32')
# Norm reduction
var_names.append('norm_reduction')
var_search_criteria.append('Norm reduction (')
var_split.append('=')
var_position.append(1)
var_dtype.append('float32')
if j_chunks:
# Inner iteration number
var_names.append('j')
var_search_criteria.append('Quadratic cost function: J ')
var_split.append('=')
var_position.append(1)
var_dtype.append('float32')
# Gradient reduction
var_names.append('jb')
var_search_criteria.append('Quadratic cost function: Jb')
var_split.append('=')
var_position.append(1)
var_dtype.append('float32')
# Norm reduction
var_names.append('jojc')
var_search_criteria.append('Quadratic cost function: JoJc')
var_split.append('=')
var_position.append(1)
var_dtype.append('float32')
# Create a dataset to hold the convergence data
convergence_ds = xr.Dataset()
# Add array for all iterations
gn = f'convergence::total_iteration'
convergence_ds[gn] = xr.DataArray(np.zeros(total_iter, dtype='int32'))
convergence_ds[gn].data[:] = range(1, total_iter+1)
# Concatenate chunks to simplify search algorithm
min_and_j_chunks = minimizer_chunks + j_chunks
for var_ind, var in enumerate(var_names):
var_array = []
for min_and_j_chunk in min_and_j_chunks:
min_and_j_chunk_split = min_and_j_chunk.split('\n')
var_found = self.get_from_log(var_search_criteria[var_ind], var_split[var_ind],
var_position[var_ind], min_and_j_chunk_split)
if var_found:
var_array.append(var_found)
# Add to the dataset if there is something to add
if var_array:
gn = f'convergence::{var_names[var_ind]}' # group::variable name
convergence_ds[gn] = xr.DataArray(np.zeros(total_iter, dtype=var_dtype[var_ind]))
convergence_ds[gn].data[:] = var_array
# Create special case variables
# Outer iteration
# ---------------
outer_iteration = 0
outer_iterations = []
if 'convergence::inner_iteration' in convergence_ds:
inner_iterations = convergence_ds['convergence::inner_iteration'].data[:]
# Set outer iteration number
for inner_iteration in inner_iterations:
if inner_iteration == 1:
outer_iteration = outer_iteration + 1
# Append vector of outer iterations
outer_iterations.append(outer_iteration)
gn = f'convergence::outer_iteration'
convergence_ds[gn] = xr.DataArray(np.zeros(total_iter, dtype='int32'))
convergence_ds[gn].data[:] = outer_iterations
# Normalized versions of data
# ---------------------------
normalize_var_names = ['gradient_reduction', 'norm_reduction', 'j', 'jb', 'jojc']
for normalize_var_name in normalize_var_names:
if normalize_var_name in var_names:
# Index in lists for the variable being normalized
var_ind = var_names.index(normalize_var_name)
# Extract existing data
gn = f'convergence::{var_names[var_ind]}'
var_array = convergence_ds[gn].data[:]
# Normalize and add back to the data
gn_nz = f'convergence::{var_names[var_ind]}_normalized'
var_array_nz = var_array / np.max(var_array)
convergence_ds[gn_nz] = xr.DataArray(np.zeros(total_iter, dtype=var_dtype[var_ind]))
convergence_ds[gn_nz].data[:] = var_array_nz
return convergence_ds
# ----------------------------------------------------------------------------------------------
[docs] def generate_default_config(self, filenames, collection_name):
"""
Generates a default configuration for Jedi log data ingest.
Args:
filenames (list): List of file names.
collection_name (str): Name of the data collection.
Returns:
dict: Default configuration dictionary.
"""
eva_dict = {'datasets': [{'jedi_log_to_parse': filenames[0],
'collection_name': collection_name,
'data_to_parse': {'convergence': 'true'}}]}
return eva_dict
# ----------------------------------------------------------------------------------------------