Source code for sparseSpatialSampling.data

"""
Module containing classes for handling input and output of S^3 data.

- **Dataloader**: Class for loading data from the HDF5 output file of :math:`S^3` and assembling the data matrix.
- **Datawriter**: Class for writing data generated by :math:`S^3` to an HDF5 file.
- **XDMFwriter**: Class for generating an XDMF file based on a given HDF5 file from :math:`S^3`.
"""
import logging
import torch as pt

from h5py import File
from os.path import join, isfile
from typing import Union, List

from .const import DATA, GRID, CONST, CENTERS, VERTICES, FACES

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S',
                    force=True)


[docs] class Dataloader: def __init__(self, load_path: str, file_name: str, dtype: pt.dtype = pt.float32): """ Load data from the HDF5 output file of :math:`S^3` and assemble the data matrix. :param load_path: Path to the HDF5 file :type load_path: str :param file_name: Name of the HDF5 file :type file_name: str :param dtype: Data type of the tensor; default is ``float32`` (single precision) :type dtype: pt.dtype """ # create a file object self._load_path = load_path self._file_name = file_name self._dtype = dtype # load some values we may need, we always have to close the file after reading, otherwise we can't access it # later for writing (or sth. else) with File(join(self._load_path, self._file_name), "r") as f: self._n_cells = f.get(f"{GRID}/{CENTERS}")[()].shape[0] self._n_dimensions = f.get(f"{GRID}/{CENTERS}")[()].shape[1] self._size_initial_cell = f.get(f"{CONST}/size_initial_cell")[()] # placeholders for properties which are only loaded if requested self._write_times = None self._weights = None # cell areas (2D) / volumes (3D) self._levels = None self._field_names = None # properties of the grid self._vertices = None self._faces = None self._nodes = None @property def write_times(self) -> List[str]: """ Load all available time steps present in the HDF5 file. The time steps at which a specific field is present are stored in the ``field_names`` property. :return: List of available write times :rtype: list """ if self._write_times is None: with File(join(self._load_path, self._file_name), "r") as f: if DATA in f.keys(): self._write_times = list(f.get(f"{DATA}").keys()) return self._write_times @property def weights(self) -> pt.Tensor: """ Load and compute the cell areas (2D) or volumes (3D) of the grid. :return: Tensor containing cell areas (2D) or volumes (3D) :rtype: pt.Tensor """ if self._weights is None: self._compute_cell_area() return self._weights @property def vertices(self) -> pt.Tensor: """ Load the cell centers of the grid. :return: Tensor containing the cell centers of the grid :rtype: pt.Tensor """ if self._vertices is None: with File(join(self._load_path, self._file_name), "r") as f: self._vertices = pt.from_numpy(f.get(f"{GRID}/{CENTERS}")[()]) return self._vertices @property def nodes(self) -> pt.Tensor: """ Load the cell nodes of the grid. :return: Tensor containing the cell nodes of the grid :rtype: pt.Tensor """ if self._nodes is None: with File(join(self._load_path, self._file_name), "r") as f: self._nodes = pt.from_numpy(f.get(f"{GRID}/{VERTICES}")[()]) return self._nodes @property def faces(self) -> pt.Tensor: """ Load the cell faces of the grid. :return: Tensor containing the indices of the cell faces of the grid :rtype: pt.Tensor """ if self._faces is None: with File(join(self._load_path, self._file_name), "r") as f: self._faces = pt.from_numpy(f.get(f"{GRID}/{FACES}")[()]) return self._faces @property def field_names(self) -> dict: """ Create a dictionary of available fields for each time step. The dictionary keys are the available time steps, and the values are lists of fields available at each corresponding time step. :return: Dictionary mapping each time step to its available fields :rtype: dict[int, list] """ if self._field_names is None: # we can't use CENTERS, because CENTERS only stands for the cell centers of the grid; the location of the # field is marked as center or vertices with File(join(self._load_path, self._file_name), "r") as f: self._field_names = {k: [f.split("_")[0] for f in f[f"{DATA}/{k}"].keys() if f.endswith("center")] for k in f[DATA].keys()} return self._field_names @property def levels(self) -> pt.Tensor: """ Load the refinement levels of the cells in the grid. :return: Tensor containing the cell levels of the grid :rtype: pt.Tensor """ if self._levels is None: with File(join(self._load_path, self._file_name), "r") as f: self._levels = pt.from_numpy(f.get(f"{CONST}/levels")[()]).squeeze() return self._levels @property def load_path(self) -> str: """ Get the current load path. :return: Current load path :rtype: str """ return self._load_path @load_path.setter def load_path(self, value: str) -> None: """ Set a new load path and automatically reset related properties to avoid inconsistencies. :return: None :rtype: None """ self._load_path = value self._reset() @property def file_name(self) -> str: """ Get the current file name. :return: Current file name :rtype: str """ return self._file_name @file_name.setter def file_name(self, value: str) -> None: """ Set a new file name and automatically reset related properties to avoid inconsistencies. :return: None :rtype: None """ self._file_name = value self._reset() def _reset(self) -> None: """ Reset the class properties to avoid inconsistencies. :return: None :rtype: None """ with File(join(self._load_path, self._file_name), "r") as f: self._n_cells = f.get(f"{GRID}/{CENTERS}")[()].shape[0] self._n_dimensions = f.get(f"{GRID}/{CENTERS}")[()].shape[1] self._size_initial_cell = f.get(f"{CONST}/size_initial_cell")[()] self._write_times = None self._weights = None self._levels = None self._field_names = None self._vertices = None def _compute_cell_area(self) -> None: """ Compute the cell areas (2D) or volumes (3D) of the grid. :return: None :rtype: None """ self._weights = (pow(self._size_initial_cell / pow(2, self.levels), self._n_dimensions)).squeeze()
[docs] def load_snapshot(self, field_name: Union[List[str], str], write_times: Union[str, List[str]] = None) -> Union[List[pt.Tensor], pt.Tensor]: """ Load snapshots for the specified field(s) and time steps. :param field_name: Name of the field for which the data matrix should be created :type field_name: str :param write_times: Time steps for which the data matrix should be created. If ``None``, all available time steps will be used. :type write_times: Union[str, List[str], None] :return: - Data matrix containing snapshots: - ``[N_cells, N_dimensions, N_snapshots]`` for vector fields - ``[N_cells, N_snapshots]`` for scalar fields - Or a list of data matrices if multiple fields are loaded :rtype: Union[pt.Tensor, List[pt.Tensor]] """ if write_times is None: write_times = self.write_times if isinstance(write_times, str): write_times = [write_times] if isinstance(field_name, str): field_name = [field_name] # create a file object and empty list for storing the data matrices _file = File(join(self._load_path, self._file_name), "r") _dm = [] for f in field_name: # get the shape of the field (scalar / vector field) shape = _file.get(f"{DATA}/{write_times[0]}/{f}_center")[()].shape # allocate the data matrix accordingly if len(shape) == 1: _data_matrix = pt.zeros((self._n_cells, len(write_times)), dtype=self._dtype) else: _data_matrix = pt.zeros((shape[0], shape[1], len(write_times)), dtype=self._dtype) # assemble the data matrix for the given time steps for i, k in enumerate(write_times): if len(shape) == 1: _data_matrix[:, i] = pt.from_numpy(_file.get(f"{DATA}/{k}/{f}_center")[()]) else: _data_matrix[:, :, i] = pt.from_numpy(_file.get(f"{DATA}/{k}/{f}_center")[()]) _dm.append(_data_matrix) # close the file _file.close() return _dm[0] if len(_dm) == 1 else _dm
[docs] class Datawriter: def __init__(self, file_path: str, file_name: str, mode: str = "w", mixed: bool = False): """ Initialize a Datawriter for writing data generated by :math:`S^3` to an HDF5 file and optionally create an XDMF file for visualization in ParaView or other software. Inspired and partially adopted from the flowtorch Datawriter: https://github.com/FlowModelingControl/flowtorch/blob/main/flowtorch/data/hdf5_file.py :param file_path: Path where the HDF5 file should be saved :type file_path: str :param file_name: Name of the HDF5 file (including extension, e.g., ``data.h5``) :type file_name: str :param mode: File access mode: - ``'w'``: create a new file (overwriting any existing file with the same name) - ``'a'``: append to an existing HDF5 file The mode can be changed after instantiation. :type mode: str :param mixed: Flag indicating if the grid is of type 'Mixed' (for unstructured grids not created with :math:`S^3`); only relevant for writing the XDMF file :type mixed: bool """ self._file_name = file_name self._mode = mode self._mixed = mixed self._file_path = file_path self._file = File(join(self._file_path, self._file_name), self._mode) # allocate groups for DATA, GRID & CONST if they don't exist yet, otherwise load them self._data = None if DATA not in self._file.keys() else self._file[DATA] self._const = None if CONST not in self._file.keys() else self._file[CONST] self._grid = None if GRID not in self._file.keys() else self._file[GRID]
[docs] def close(self) -> None: """ Close the HDF5 writer :return: None :rtype: None """ self._file.close()
[docs] def write_grid(self, loader: Dataloader) -> None: """ Write the grid to a new HDF5 file based on a given Dataloader. :param loader: Dataloader instance containing the path to the HDF5 file from which the grid should be loaded :type loader: Dataloader :return: None :rtype: None """ self.write_data("centers", group="grid", data=loader.vertices) self.write_data("vertices", group="grid", data=loader.nodes) self.write_data("faces", group="grid", data=loader.faces)
[docs] def write_data(self, name: str, data: any, group: str = "constant", time_step: Union[int, float, str] = None) -> None: """ Write data to an HDF5 file. :param name: Name of the dataset to be created :type name: str :param data: Data to write :type data: any :param group: Target group for the data. Available groups are: - ``'constant'``: arbitrary values and fields without time dependency - ``'grid'``: grid datasets including ``'centers'``, ``'vertices'``, and ``'faces'`` from :math:`S^3` - ``'data'``: temporal data such as flow fields :type group: str :param time_step: Time step associated with the field when writing temporal data. If ``None`` and the group is ``'data'``, the data will be written to the zeroth time step (``'data/0'``) :type time_step: int | float | str | None :return: None :rtype: None """ # if we want to write something in data, but we don't provide a time step, just write it to t = 0 if group == DATA and time_step is None: logger.warning(f"No time step for group 'data' provided. Writing data to the zeroth time step '{DATA}/0'.") time_step = "0" # make sure that we write all snapshots to the temporal structure of DATA if a time step is given if time_step is not None or group == DATA: # if the group for data or the current time step doesn't exist yet, write it if self._data is None or str(time_step) not in self._file[DATA].keys(): self._data = self._file.create_group(f"{DATA}/{time_step}") # set the correct group (time step), because once a temporal series is written, the group is set to the # last time step. if we now want to write a new field, we have to set the group to the correct time step else: self._data = self._file[f"{DATA}/{time_step}"] # write the data to HDF self._data.create_dataset(name, data=data) # do the same for the constant properties... elif group == CONST: if self._const is None: self._const = self._file.create_group(CONST) else: self._const = self._file[CONST] self._const.create_dataset(name, data=data) # ... and the grid elif group == GRID: if self._grid is None: self._grid = self._file.create_group(GRID) else: self._grid = self._file[GRID] self._grid.create_dataset(name, data=data) else: logger.critical(f"Unknown group type, available types are '{DATA}', '{CONST}' and '{GRID}'.") exit()
[docs] def write_xdmf_file(self) -> None: """ Write an XDMF file based on the contents of a given HDF5 file. :return: None :rtype: None """ # check if an HDF file exists if not isfile(join(self._file_path, self._file_name)): logger.error(f"Could not find {join(self._file_path, self._file_name)}. Make sure the file exists and the " f"provided path is correct.") exit(0) # if yes, generate an XDMF file based on the contents of the HDF5 file logger.info(f"Writing XDMF file for file {self._file_name}") xdmf_writer = XDMFWriter(self._file_path, self._file_name, mixed=self._mixed) xdmf_writer.write_xdmf()
@property def mode(self) -> str: """ Get the current file access mode for the HDF5 file (e.g., ``'r'`` for read, ``'w'`` for write). :return: File access mode :rtype: str """ return self._mode @property def file_name(self) -> str: """ Get the name of the HDF5 file. :return: File name :rtype: str """ return self._file_name @mode.setter def mode(self, value) -> None: """ Set the file access mode for the HDF5 file and reopen it with the new mode. :param value: File access mode (e.g., ``'r'`` for read, ``'w'`` for write) :type value: str :return: None :rtype: None """ self._mode = value self._file = File(join(self._file_path, self._file_name), self._mode)
[docs] class XDMFWriter: def __init__(self, file_path: str, file_name: str, grid_name: str = "grid_s_cube", mixed: bool = False): """ Initialize the XDMF writer for a given HDF5 file from :math:`S^3`. :param file_path: Path to the HDF5 file for which the XDMF file should be created :type file_path: str :param file_name: Name of the HDF5 file (including extension, e.g., ``data.h5``) :type file_name: str :param grid_name: Name of the grid (used inside the XDMF file) :type grid_name: str :param mixed: Flag indicating if the grid is of type 'Mixed' (for unstructured grids not created with :math:`S^3`); only relevant for writing the XDMF file :type mixed: bool """ self._file_path = file_path self._grid_name = grid_name self._mixed = mixed self._hdf_file_name = file_name self._file = File(join(self._file_path, self._hdf_file_name), "r") self._header = '<?xml version="1.0"?>\n<!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" []>\n<Xdmf Version="2.0">\n' self._temporal_grid = False self._const_attributes = False self._keys_const_attributes = [] # get the file name of the corresponding XDMF file self._xdmf_file_name = f"{self._hdf_file_name.split('.h5')[0]}.xdmf" # check if the grid is present and if all required keys exist, if not then we can't create the XDMF self._check_grid() # get the number of dimensions based on the coordinates of the grid self._n_dimensions = self._file.get(f"{GRID}/{CENTERS}")[()].shape[-1] self._n_cells = self._file.get(f"{GRID}/{CENTERS}")[()].shape[0] self._n_faces = self._file.get(f"{GRID}/{FACES}")[()].shape[0] self._n_vertices = self._file.get(f"{GRID}/{VERTICES}")[()].shape[0] # grid type and dimensions if self._mixed: self._grid_type = "Mixed" else: self._grid_type = "Quadrilateral" if self._n_dimensions == 2 else "Hexahedron" self._dims = "XY" if self._n_dimensions == 2 else "XYZ"
[docs] def write_xdmf(self) -> None: """ Wrapper function for writing the XDMF file. :return: None :rtype: None """ # 1. check if we have temporal data available self._temporal_grid = True if self._check_data() else False # 2. check what is available in const. to write ->> if we found something then it will be written into the # first time step of the temporal grid structure self._keys_const_attributes = self._get_const_keys() self._const_attributes = True if self._keys_const_attributes else False # write the XDMF file self._write_temporal_grid() if self._temporal_grid else self._write_const_grid()
def _write_temporal_grid(self) -> None: """ Write the XDMF file based on the temporal data in the HDF5 file. All time-independent fields are written into the first time step. :return: None :rtype: None """ # header for temporal grid structure _domain_header = f'<Domain>\n<Grid Name="{self._grid_name}" GridType="Collection" CollectionType="temporal">\n' # write the corresponding XDMF file with open(join(self._file_path, self._xdmf_file_name), "w") as f_out: # write global header f_out.write(self._header) f_out.write(_domain_header) # loop over all available time steps and write all specified data to XDMF & HDF5 files, since we have the # HDMF5 file written completely for all specified snapshots, we can now iterate over all time steps for i, t in enumerate(sorted(self._file.get(DATA).keys(), key=lambda x: float(x))): # write grid specific header tmp = f'<Grid Name="{self._grid_name} {t}" GridType="Uniform">\n<Time Value="{t}"/>\n' \ f'<Topology TopologyType="{self._grid_type}" NumberOfElements="{self._n_faces}">\n' \ f'<DataItem Format="HDF" DataType="Int" Dimensions="{self._n_faces}' # the number of dimensions depends on the grid type tmp += '">\n' if self._mixed else f' {pow(2, self._n_dimensions)}">\n' f_out.write(tmp) # include the grid data from the HDF5 file f_out.write(f"{self._hdf_file_name}:/{GRID}/{FACES}\n") # write geometry part f_out.write(f'</DataItem>\n</Topology>\n<Geometry GeometryType="{self._dims}">\n' f'<DataItem Rank="2" Dimensions="{self._n_vertices} {self._n_dimensions}" ' f'NumberType="Float" Precision="8" Format="HDF">\n') # write coordinates of vertices f_out.write(f"{self._hdf_file_name}:/{GRID}/{VERTICES}\n</DataItem>\n</Geometry>\n") # write everything we found in constant (fields) into the first time step if i == 0: f_out.write(self._write_attributes()) # loop over all fields available at each time step for k in self._file[f"{DATA}/{t}"].keys(): # assuming data is written out by s_cube as <field_name>_<position>, otherwise the name is taken # as it is _name = k.split("_")[0] if len(k.split("_")) > 1 else k # determine if scalar or vector field _shape = self._file.get(f"{DATA}/{t}/{k}")[()].shape _second_dim = 1 if len(_shape) == 1 else _shape[1] # in case our field is defined at the cell center if _shape[0] == self._n_cells: f_out.write(f'<Attribute Name="{_name}" AttributeType="Vector" Center="Cell">\n<DataItem ' f'NumberType="Float" Precision="8" Format="HDF" ' f'Dimensions="{self._n_cells} {_second_dim}">\n') f_out.write(f"{self._hdf_file_name}:/{DATA}/{t}/{k}\n</DataItem>\n</Attribute>\n") # if our field is located at the cell vertices elif _shape[0] == self._n_vertices: f_out.write(f'<Attribute Name="{_name}" AttributeType="Vector" Center="Node">\n<DataItem ' f'NumberType="Float" Precision="8" Format="HDF" ' f'Dimensions="{self._n_vertices} {_second_dim}">\n') f_out.write(f"{self._hdf_file_name}:/{DATA}/{t}/{k}\n</DataItem>\n</Attribute>\n") # we need to check if the field inside the temporal data matches our current field, because in # contrast to the data inside constant, we didn't check here else: logger.warning(f"Field in '{DATA}/{t}/{k}' with a size of {_shape} doesn't match the number " f"of cells with N_cells = {self._n_cells} or the number of vertices with " f"N_vertices = {self._n_vertices}. Skipping this field.") continue # write end tag of the current grid f_out.write('</Grid>\n') # write rest of file f_out.write('</Grid>\n</Domain>\n</Xdmf>') def _write_const_grid(self) -> None: """ Write the XDMF file when no temporal data is present in the HDF5 file. :return: None :rtype: None """ _grid_header = f'<Domain>\n<Grid Name="{self._grid_name}" GridType="Uniform">\n' \ f'<Topology TopologyType="{self._grid_type}" NumberOfElements="{self._n_faces}">\n' \ f'<DataItem Format="HDF" DataType="Int" Dimensions="{self._n_faces}' # the number of dimensions depends on the grid type _grid_header += '">\n' if self._mixed else f' {pow(2, self._n_dimensions)}">\n' # write the corresponding XDMF file with open(join(self._file_path, self._xdmf_file_name), "w") as f_out: # write global header f_out.write(self._header) f_out.write(_grid_header) # include the grid data from the HDF5 file f_out.write(f"{self._hdf_file_name}:/{GRID}/{FACES}\n") # write geometry part f_out.write(f'</DataItem>\n</Topology>\n<Geometry GeometryType="{self._dims}">\n' f'<DataItem Rank="2" Dimensions="{self._n_vertices} {self._n_dimensions}" ' f'NumberType="Float" Precision="8" Format="HDF">\n') # write coordinates of vertices f_out.write(f"{self._hdf_file_name}:/{GRID}/{VERTICES}\n") # write end tags f_out.write("</DataItem>\n</Geometry>\n") # write the attributes found in constant f_out.write(self._write_attributes()) # write rest of file f_out.write("</Grid>\n</Domain>\n</Xdmf>") def _write_attributes(self) -> str: """ Loop over all constant attributes and generate the corresponding XDMF content. :return: String containing all constant attributes formatted for writing to the XDMF file :rtype: str """ str_to_write = [] for k in self._keys_const_attributes: _shape = self._file.get(f"{CONST}/{k}")[()].shape _second_dim = 1 if len(_shape) == 1 else _shape[1] if _shape[0] == self._n_cells: str_to_write.append(f'<Attribute Name="{k}" AttributeType="Vector" Center="Cell">\n<DataItem ' f'NumberType="Float" Precision="8" Format="HDF" ' f'Dimensions="{self._n_cells} {_second_dim}">\n' f'{self._hdf_file_name}:/{CONST}/{k}\n</DataItem>\n</Attribute>\n') elif _shape[0] == self._n_vertices: str_to_write.append(f'<Attribute Name="{k}" AttributeType="Vector" Center="Node">\n<DataItem ' f'NumberType="Float" Precision="8" Format="HDF" ' f'Dimensions="{self._n_vertices} {_second_dim}">\n' f'{self._hdf_file_name}:/{CONST}/{k}\n</DataItem>\n</Attribute>\n') # since we only added fields which match n_cells or n_vertices, this condition shouldn't be happening, but # just in case we missed something... else: logger.warning(f"Field in '{CONST}/{k}' with a size of {_shape} doesn't match the number of cells " f"with N_cells = {self._n_cells} or the number of vertices with " f"N_vertices = {self._n_vertices}. Skipping this field.") continue return "".join(str_to_write) def _get_const_keys(self) -> list: """ Check whether any fields are available in the constant group of the HDF5 file. :return: List of all available fields; returns an empty list if none are found :rtype: list """ if CONST in self._file.keys(): # loop over all entries in const. and add everything, which has the same dimensions as we have number of # cells -> means we have some sort of field keys = [] for k in self._file[CONST].keys(): tmp = self._file.get(f"{CONST}/{k}")[()] if not tmp.shape: continue elif self._n_cells == tmp.shape[0] or self._n_vertices == tmp.shape[0]: keys.append(k) else: logger.info("Couldn't find any constant fields to write.") keys = [] return keys def _check_data(self) -> bool: """ Check whether temporal data is available in the HDF5 file. :return: True if temporal data is available, False otherwise :rtype: bool """ return DATA in self._file.keys() def _check_grid(self) -> None: """ Check whether a grid (and all its components) exists in the HDF5 file and verify that the keys match the expected keys for cell centers, vertices, and faces. :return: None :rtype: None """ if GRID in self._file.keys(): if FACES not in self._file[GRID].keys(): logger.error(f"Unable to find cell faces in group {GRID}. Make sure the key to the cell faces is " f"present and named {FACES}.") exit(0) if CENTERS not in self._file[GRID].keys(): logger.error(f"Unable to find cell centers in group {GRID}. Make sure the key to the cell centers is " f"present and named {CENTERS}.") exit(0) if VERTICES not in self._file[GRID].keys(): logger.error(f"Unable to find cell vertices in group {GRID}. Make sure the key to the cell vertices is" f"present and named {VERTICES}.") exit(0) else: logger.error("Found no grid in the provided HDF5 file. Unable to create XDMF file.without a grid.") exit(0)
if __name__ == "__main__": pass