Source code for sparseSpatialSampling.export

"""
Interpolate CFD data for the specified fields and time steps onto the coarse grid
sampled using the :math:`S^3` algorithm. Export the interpolated data to HDF5 and XDMF
to enable visualization in ParaView.
"""
import logging
import torch as pt

from time import time
from typing import Union
from os import makedirs, path
from sklearn.neighbors import KNeighborsRegressor

from .data import Datawriter
from .sparse_spatial_sampling import SparseSpatialSampling
from .const import GRID, CONST, FACES, CENTERS, VERTICES, DATA

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S',
                    force=True)

pt.set_default_dtype(pt.float64)


[docs] class Fields: def __init__(self, centers: pt.Tensor = None, vertices: pt.Tensor = None): """ Initialize a container for storing interpolated field values at cell centers and vertices. :param centers: Interpolated field values at cell centers :type centers: pt.Tensor | None :param vertices: Interpolated field values at cell vertices :type vertices: pt.Tensor | None """ self.centers = centers self.vertices = vertices
[docs] class ExportData: def __init__(self, s_cube: SparseSpatialSampling, write_new_file_for_each_field: bool = False, n_jobs: int = None, n_neighbors: int = None, interpolate_at_vertices: bool = False): """ Initialize an Export object for interpolating original snapshots onto the grid generated by :math:`S^3` and exporting them to HDF5. :param s_cube: :class:`SparseSpatialSampling` object containing the sampled grid :type s_cube: SparseSpatialSampling :param write_new_file_for_each_field: If True, each field is written to a separate HDF5 file; if False, all fields are written to a single file :type write_new_file_for_each_field: bool :param n_jobs: Number of CPUs used for interpolation. If None, the same number of CPUs used for executing :math:`S^3` will be used :type n_jobs: int | None :param n_neighbors: Number of neighbors for the KNN. If None, defaults are 8 for 2D and 26 for 3D :type n_neighbors: int | None :param interpolate_at_vertices: If True, interpolate a solution at cell vertices in addition to cell centers :type interpolate_at_vertices: bool """ self._interpolate_at_vertices = interpolate_at_vertices self._new_file = write_new_file_for_each_field # properties we get from the s_cube object self.n_dimensions = s_cube.n_dimensions self._face_id = s_cube.faces self._centers = s_cube.centers self._vertices = s_cube.vertices self._levels = s_cube.levels self._metric = s_cube.metric self._size_initial_cell = s_cube.size_initial_cell self._save_dir = s_cube.save_path self._save_name = s_cube.save_name self._grid_name = s_cube.grid_name self._write_times = s_cube.write_times if s_cube.write_times[0] is not None else None # properties we need for interpolating and exporting the fields self._interpolated_fields = Fields() self._field_name = None self._datawriter = None if n_neighbors is None: n_neighbors = 8 if self.n_dimensions == 2 else 26 self._knn = KNeighborsRegressor(n_neighbors=n_neighbors, weights="distance", n_jobs=s_cube.n_jobs if n_jobs is None else n_jobs) self._snapshot_counter = 0 self._field_name = None self._initialized = False self._finished = False self._n_snapshots_total = None self._t_start = time()
[docs] def export(self, coordinates: pt.Tensor, data: pt.Tensor, field_name: str, n_snapshots_total: int = None) -> None: """ Interpolate the provided CFD data onto the grid generated by :math:`S^3` and export it to HDF5 and XDMF files for all specified time steps. Note: The field data from CFD must have dimensions ``[N_cells, N_dimensions, N_snapshots]``. For scalar fields, ``N_dimensions = 1``. :param coordinates: Coordinates of the original CFD grid :type coordinates: pt.Tensor :param data: Original field data with dimensions ``[N_cells, N_dimensions, N_snapshots]``. - ``N_snapshots`` can represent all snapshots, a batch of snapshots, or a single snapshot :type data: pt.Tensor :param field_name: Name of the field to export (e.g., ``'p'`` for the pressure field) :type field_name: str :param n_snapshots_total: Total number of snapshots to export - If ``None``, it is assumed that all snapshots are included in ``data`` :type n_snapshots_total: int | None :return: None :rtype: None """ # check if new write times were provided before loading any fields, if not exit if self._write_times is None: logger.error("Couldn't find any write_times for export. Make sure to pass the write times to S^3 on " "instantiation or set a value for the write times before calling the export method.") # make sure the field name is always up to date self._field_name = field_name self._fit_data(coordinates, data, field_name, n_snapshots_total) self._write_data_to_hdf5()
def _fit_data(self, _coord: pt.Tensor, _data: pt.Tensor, _field_name: str, _n_snapshots_total: int = None) -> None: """ Interpolate CFD data from the original grid onto the coarser grid generated by :math:`S^3`. The field is interpolated at both cell centers and cell nodes (if ``interpolate_at_vertices = True``). Note: The variables ``centers`` and ``vertices`` in this method denote the field values at the cell centers and cell nodes, respectively (not the node coordinates of the generated mesh). :param _coord: Coordinates of the original CFD grid :type _coord: pt.Tensor :param _data: Original field data with dimensions ``[N_cells, N_dimensions, N_snapshots]`` :type _data: pt.Tensor :param _field_name: Name of the field to export (e.g., ``'p'`` for the pressure field) :type _field_name: str :param _n_snapshots_total: Total number of snapshots to export. If ``None``, it is assumed that all snapshots are included in ``_data`` :type _n_snapshots_total: int | None :return: None :rtype: None """ # check if the field has the correct shape -> scalar fields need to be unsqueezed at dim=1 as: # [N_cells, N_dimensions, N_snapshots] (vector field) or [N_cells, 1, N_snapshots] (scalar field) assert len(_data.size()) == 3, "The provided field must have the shape '[N_cells, N_dimensions, N_snapshots]'" \ "for a vector field and '[N_cells, 1, N_snapshots]' for a scalar field" if not self._initialized: logger.info(f"Starting interpolation and export of field {self._field_name}.") # fit the metric at some point. If the number of coordinates is still the same as the CFD grid, then we haven't # fitted it yet. if self._metric.size(0) == _coord.size(0): self._knn.fit(_coord, self._metric) # overwrite the metric with the interpolated one self._metric = pt.from_numpy(self._knn.predict(self._centers)) # determine the required size of the data matrix self._n_snapshots_total = _n_snapshots_total if _n_snapshots_total is not None else _data.size(-1) # create empty tensors for the field values at centers and vertices with dimensions # [N_cells, N_dimensions, N_snapshots_currently] each call to allow variable batch sizes self._interpolated_fields.centers = pt.zeros((self._centers.size(0), _data.size(1), _data.size(2))) # optionally interpolate solution at the cell vertices if self._interpolate_at_vertices: self._interpolated_fields.vertices = pt.zeros((self._vertices.size(0), _data.size(1), _data.size(2))) # fit the KNN and interpolate the data, we need to predict each dimension separately (otherwise dim. mismatch) for dimension in range(_data.size(1)): self._knn.fit(_coord, _data[:, dimension, :]) self._interpolated_fields.centers[:, dimension, :] = pt.from_numpy(self._knn.predict(self._centers)) if self._interpolate_at_vertices: self._interpolated_fields.vertices[:, dimension, :] = pt.from_numpy(self._knn.predict(self._vertices)) # update the number of snapshots we already interpolated self._snapshot_counter += _data.size(-1) def _write_data_to_hdf5(self) -> None: """ Write the generated grid and interpolated fields (at cell centers and nodes) to an HDF5 file for the specified number of snapshots. :return: None :rtype: None """ # create a writer and datasets for the grid if on initial call if not self._initialized: logger.info(f"Writing HDF5 file for field {self._field_name}.") # create datawriter instance if self._new_file: self._datawriter = Datawriter(self._save_dir, f"{self._save_name}_{self._field_name}.h5") else: self._datawriter = Datawriter(self._save_dir, f"{self._save_name}.h5") # write the grid self._datawriter.write_data(FACES, group=GRID, data=self._face_id) self._datawriter.write_data(VERTICES, group=GRID, data=self._vertices) self._datawriter.write_data(CENTERS, group=GRID, data=self._centers) # add field for the cell levels, metric and initial cell size (used for computing cell areas or volumes) self._datawriter.write_data("levels", group=CONST, data=self._levels) self._datawriter.write_data("metric", group=CONST, data=self._metric) self._datawriter.write_data("size_initial_cell", group=CONST, data=self._size_initial_cell) self._initialized = True else: # once the grid is written, we can append the field data self._datawriter.mode = "a" # write the datasets for each given time step, we already updated the snapshot counter after fitting the # field, so we need to subtract it to get the starting dt t_start = self._snapshot_counter - self._interpolated_fields.centers.size(-1) t_end = self._snapshot_counter # create a group for each specified field for i, t in enumerate(self._write_times[t_start:t_end]): # in case we have a scalar, we need to remove the additional dimension we created for fitting the data if self._interpolated_fields.centers.size(1) == 1: self._datawriter.write_data(f"{self._field_name}_center", group=DATA, time_step=str(t), data=self._interpolated_fields.centers.squeeze(1)[:, i]) if self._interpolate_at_vertices: self._datawriter.write_data(f"{self._field_name}_vertices", group=DATA, time_step=str(t), data=self._interpolated_fields.vertices.squeeze(1)[:, i]) # in case we have a vector else: self._datawriter.write_data(f"{self._field_name}_center", group=DATA, time_step=str(t), data=self._interpolated_fields.centers[:, :, i]) if self._interpolate_at_vertices: self._datawriter.write_data(f"{self._field_name}_vertices", group=DATA, time_step=str(t), data=self._interpolated_fields.vertices[:, :, i]) # check if we have written all snapshots, if yes, then write the XDMF file if self._snapshot_counter == self._n_snapshots_total: # close hdf file after writing self._datawriter.close() # the XDMF writer updates XDMF every time we are finished writing a field, so each new gets added # automatically if we specified to write everything in the same file self._datawriter.write_xdmf_file() # reset properties for the next field, we don't need to reset the computed cell areas since the grid remains # the same for all fields self._interpolated_fields = Fields() self._snapshot_counter = 0 if self._new_file: self._initialized = False logger.info(f"Finished export of field {self._field_name} in {round(time() - self._t_start, 3)}s.") self._t_start = time() @property def write_times(self) -> list: """ Get the list of available write times :return: List of time steps :rtype: list """ return self._write_times @write_times.setter def write_times(self, value: Union[str, list]) -> None: """ Set the write times for the output data. :param value: A single time step (str, int, or float) or a list of time steps :type value: Union[str, int, float, list] :return: None :rtype: None """ self._write_times = value if isinstance(value, list) else [value] @property def new_file(self) -> bool: """ Flag indicating whether a new HDF5 file will be created for each field. :return: True if a new file is created for each field, False otherwise :rtype: bool """ return self._new_file @property def save_name(self) -> str: """ Get the base name of the output files. :return: Name of the file used for saving the grid and data :rtype: str """ return self._save_name @save_name.setter def save_name(self, new_name: str) -> None: """ Set a new base name for the output files and reset initialization to ensure consistency. :param new_name: New file name :type new_name: str :return: None :rtype: None """ self._save_name = new_name self._initialized = False @property def save_dir(self) -> str: """ Get the directory where output files will be saved. :return: Path of the save directory :rtype: str """ return self._save_dir @save_dir.setter def save_dir(self, new_path: str) -> None: """ Set the directory where output files will be saved. :param new_path: Path of the save directory :type new_path: str :return: None :rtype: None """ self._save_dir = new_path self._initialized = False # make sure that the directory exists when we change the save path if not path.exists(self._save_dir): makedirs(self._save_dir)
if __name__ == "__main__": pass