"""
Interpolate CFD data for the specified fields and time steps onto the coarse grid
sampled using the :math:`S^3` algorithm. Export the interpolated data to HDF5 and XDMF
to enable visualization in ParaView.
"""
import logging
import torch as pt
from time import time
from typing import Union
from os import makedirs, path
from multiprocessing import cpu_count
from sklearn.neighbors import NearestNeighbors
from .data import Datawriter
from .sparse_spatial_sampling import SparseSpatialSampling
from .const import GRID, CONST, FACES, CENTERS, VERTICES, DATA
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='[%(asctime)s] %(levelname)-8s %(message)s', datefmt='%Y-%m-%d %H:%M:%S',
force=True)
pt.set_default_dtype(pt.float64)
[docs]
class Fields:
def __init__(self, centers: pt.Tensor = None, vertices: pt.Tensor = None):
"""
Initialize a container for storing interpolated field values at cell centers and vertices.
:param centers: Interpolated field values at cell centers
:type centers: pt.Tensor | None
:param vertices: Interpolated field values at cell vertices
:type vertices: pt.Tensor | None
"""
self.centers = centers
self.vertices = vertices
[docs]
class ExportData:
def __init__(self, s_cube: SparseSpatialSampling, write_new_file_for_each_field: bool = False, n_jobs: int = None,
n_neighbors: int = None, interpolate_at_vertices: bool = False, write_times: Union[list, str] = None,
append_existing: bool = False):
"""
Initialize an Export object for interpolating original snapshots onto the grid generated by :math:`S^3`
and exporting them to HDF5.
:param s_cube: :class:`SparseSpatialSampling` object containing the sampled grid
:type s_cube: SparseSpatialSampling
:param write_new_file_for_each_field: If True, each field is written to a separate HDF5 file; if False,
all fields are written to a single file. Gets disables if `append_existing = True`
:type write_new_file_for_each_field: bool
:param n_jobs: Number of CPUs used for interpolation. If None, the same number of CPUs used for executing
:math:`S^3` will be used
:type n_jobs: int | None
:param n_neighbors: Number of neighbors for the KNN. If None, defaults are 8 for 2D and 26 for 3D
:type n_neighbors: int | None
:param interpolate_at_vertices: If True, interpolate a solution at cell vertices in addition to cell centers
:type interpolate_at_vertices: bool
:param write_times: Numerical time steps of the simulation to be exported
:type write_times: str | list[int | float | str] | None
:param append_existing: flag to append a field to an existing HFD5 file.
.. note::
The `s_cube` objects and therefore the meshes must be identical in order to append the field.
The mesh consistency is not checked.
:type append_existing: bool
"""
self._interpolate_at_vertices = interpolate_at_vertices
self._new_file = write_new_file_for_each_field
# properties we get from the s_cube object
self.n_dimensions = s_cube.n_dimensions
self._face_id = s_cube.faces
self._centers = s_cube.centers
self._vertices = s_cube.vertices
self._levels = s_cube.levels
self._metric = s_cube.metric
self._size_initial_cell = s_cube.size_initial_cell
self._save_dir = s_cube.save_path
self._save_name = s_cube.save_name
self._grid_name = s_cube.grid_name
# set the write_times if passed, else print warning msg
if write_times is not None:
self._write_times = write_times if isinstance(write_times, list) else [write_times]
else:
self._write_times = None
logger.warning("Argument ``write_times`` is ``None``. Make sure to set the ``write_times`` before calling the"
" ``export()`` method.")
# properties we need for interpolating and exporting the fields
self._interpolated_fields = Fields()
self._field_name = None
self._datawriter = None
self._snapshot_counter = 0
self._field_name = None
self._initialized_hdf5 = False if not append_existing else True
self._interpolated_metric = False if not append_existing else True
# we still need to re-compute the weights of the KNN if we add a field to an existing file
self._initialized_weights = False
self._finished = False
self._n_snapshots_total = None
self._t_start = time()
# perform some checks
if append_existing:
logger.info(f"Appending fields to file {path.join(self._save_dir, self._save_name)}.h5")
if self._new_file:
# otherwise conflict, we can't append field to existing once but say we want to have it in a new file
logger.warning("Setting `write_new_file_for_each_field = False` since `append_existing` is given as "
"`True`")
self._new_file = False
# initialize everything regarding the KNN interpolation
if n_neighbors is None:
n_neighbors = 8 if self.n_dimensions == 2 else 26
self._n_jobs = n_jobs if n_jobs is not None else cpu_count()
self._knn = NearestNeighbors(n_neighbors=n_neighbors, n_jobs=self._n_jobs)
self._knn_idx_centers = None
self._knn_w_centers = None
self._knn_idx_vertices = None
self._knn_w_vertices = None
self._coord_shape = None
self._chunk_size = None
[docs]
def export(self, coordinates: pt.Tensor, data: pt.Tensor, field_name: str, n_snapshots_total: int = None,
chunk_size: int = 100000) -> None:
"""
Interpolate the provided CFD data onto the grid generated by :math:`S^3` and
export it to HDF5 and XDMF files for all specified time steps.
Note:
The field data from CFD must have dimensions ``[N_cells, N_dimensions, N_snapshots]``.
For scalar fields, ``N_dimensions = 1``.
:param coordinates: Coordinates of the original CFD grid
:type coordinates: pt.Tensor
:param data: Original field data with dimensions ``[N_cells, N_dimensions, N_snapshots]``.
- ``N_snapshots`` can represent all snapshots, a batch of snapshots, or a single snapshot
:type data: pt.Tensor
:param field_name: Name of the field to export (e.g., ``'p'`` for the pressure field)
:type field_name: str
:param n_snapshots_total: Total number of snapshots to export
- If ``None``, it is assumed that all snapshots are included in ``data``
:type n_snapshots_total: int | None
:param chunk_size: Number of cells to interpolate at once. Helpful if the memory is the restricting factor and
the batch size is already 1. The memory requirements scale linearly with the chunks size, so decreasing it saves memory.
:type chunk_size: int
:return: None
:rtype: None
"""
# check if new write times were provided before loading any fields, if not exit
if self._write_times is None:
raise ValueError("Couldn't find any ``write_times`` for export. Make sure to pass the write times "
"when instantiating the export object or set it before calling the ``export method``.")
# no GPU support
if data.is_cuda:
raise RuntimeError("ExportData currently requires CPU tensors. Detected GPU tensor for ``data``.")
# make sure the field name is always up to date
self._chunk_size = int(chunk_size)
self._field_name = field_name
self._fit_data(coordinates, data, field_name, n_snapshots_total)
self._write_data_to_hdf5()
def _fit_data(self, _coord: pt.Tensor, _data: pt.Tensor, _field_name: str, _n_snapshots_total: int = None) -> None:
"""
Interpolate CFD data from the original grid onto the coarser grid generated by :math:`S^3`.
The field is interpolated at both cell centers and cell nodes (if ``interpolate_at_vertices = True``).
Note:
The variables ``centers`` and ``vertices`` in this method denote the field values at
the cell centers and cell nodes, respectively (not the node coordinates of the generated mesh).
:param _coord: Coordinates of the original CFD grid
:type _coord: pt.Tensor
:param _data: Original field data with dimensions ``[N_cells, N_dimensions, N_snapshots]``
:type _data: pt.Tensor
:param _field_name: Name of the field to export (e.g., ``'p'`` for the pressure field)
:type _field_name: str
:param _n_snapshots_total: Total number of snapshots to export.
If ``None``, it is assumed that all snapshots are included in ``_data``
:type _n_snapshots_total: int | None
:return: None
:rtype: None
"""
# check if the field has the correct shape -> scalar fields need to be unsqueezed at dim=1 as:
# [N_cells, N_dimensions, N_snapshots] (vector field) or [N_cells, 1, N_snapshots] (scalar field)
# if the dimensions don't match, autocorrect them if possible
if len(_data.size()) < 2:
raise ValueError("The provided field must have the shape '[N_cells, N_dimensions, N_snapshots]'"
"for a vector field and '[N_cells, 1, N_snapshots]' for a scalar field. "
f"Found a dimension of {len(_data.size())} for parameter 'data'.")
elif len(_data.size()) == 2:
logger.warning(f"Detected a scalar field of the dimension of {len(_data.size())} as input. Reshaping to the"
f" dimension of '[N_cells, 1, N_snapshots]'.")
_data = _data.unsqueeze(1)
# initialize KNN weights if this is the first call
if not self._initialized_weights:
self._build_knn_cache(_coord)
# we don't need the CFD grid after computing the KNN weights anymore, so delete them
del _coord
if self._snapshot_counter == 0:
logger.info(f"Starting interpolation and export of field {self._field_name}.")
# fit the metric at some point. If the number of coordinates is still the same as the CFD grid, then we haven't
# fitted it yet.
if not self._interpolated_metric:
self._metric = (self._knn_w_centers * self._metric[self._knn_idx_centers]).sum(dim=1)
self._interpolated_metric = True
# determine the required size of the data matrix if not yet initialized
if self._snapshot_counter == 0:
self._n_snapshots_total = _n_snapshots_total if _n_snapshots_total is not None else _data.size(-1)
# fit the KNN and interpolate the data
self._interpolated_fields.centers = interpolate_data(self._knn_w_centers, self._knn_idx_centers, _data,
self._chunk_size)
if self._interpolate_at_vertices:
self._interpolated_fields.vertices = interpolate_data(self._knn_w_vertices, self._knn_idx_vertices, _data,
self._chunk_size)
# update the number of snapshots we already interpolated
self._snapshot_counter += _data.size(-1)
def _write_data_to_hdf5(self) -> None:
"""
Write the generated grid and interpolated fields (at cell centers and nodes) to an HDF5 file
for the specified number of snapshots.
:return: None
:rtype: None
"""
# create a writer and datasets for the grid if on initial call
if not self._initialized_hdf5:
logger.info(f"Writing HDF5 file for field {self._field_name}.")
# create datawriter instance
if self._new_file:
self._datawriter = Datawriter(self._save_dir, f"{self._save_name}_{self._field_name}.h5")
else:
self._datawriter = Datawriter(self._save_dir, f"{self._save_name}.h5")
# write the grid
self._datawriter.write_data(FACES, group=GRID, data=self._face_id)
self._datawriter.write_data(VERTICES, group=GRID, data=self._vertices)
self._datawriter.write_data(CENTERS, group=GRID, data=self._centers)
# add field for the cell levels, metric and initial cell size (used for computing cell areas or volumes)
self._datawriter.write_data("levels", group=CONST, data=self._levels)
self._datawriter.write_data("metric", group=CONST, data=self._metric)
self._datawriter.write_data("size_initial_cell", group=CONST, data=self._size_initial_cell)
self._initialized_hdf5 = True
# since we wrote the level and metric fields, we can free up some memory
self._levels = None
self._metric = None
self._size_initial_cell = None
else:
# if we append to existing file we have to instantiate the Datawrite first (only necessary if
# _new_file = False since otherwise we would export the field in a separate file anyway)
# once the grid is written, we can append the field data
if not self._new_file and self._datawriter is None:
logger.info(f"Writing HDF5 file for field {self._field_name}.")
self._datawriter = Datawriter(self._save_dir, f"{self._save_name}.h5", mode="a")
else:
self._datawriter.mode = "a"
# write the datasets for each given time step, we already updated the snapshot counter after fitting the
# field, so we need to subtract it to get the starting dt
t_start = self._snapshot_counter - self._interpolated_fields.centers.size(-1)
t_end = self._snapshot_counter
# create a group for each specified field
for i, t in enumerate(self._write_times[t_start:t_end]):
# in case we have a scalar, we need to remove the additional dimension we created for fitting the data
if self._interpolated_fields.centers.size(1) == 1:
self._datawriter.write_data(f"{self._field_name}_center", group=DATA, time_step=str(t),
data=self._interpolated_fields.centers.squeeze(1)[:, i])
if self._interpolate_at_vertices:
self._datawriter.write_data(f"{self._field_name}_vertices", group=DATA, time_step=str(t),
data=self._interpolated_fields.vertices.squeeze(1)[:, i])
# in case we have a vector
else:
self._datawriter.write_data(f"{self._field_name}_center", group=DATA, time_step=str(t),
data=self._interpolated_fields.centers[:, :, i])
if self._interpolate_at_vertices:
self._datawriter.write_data(f"{self._field_name}_vertices", group=DATA, time_step=str(t),
data=self._interpolated_fields.vertices[:, :, i])
# check if we have written all snapshots, if yes, then write the XDMF file
if self._snapshot_counter == self._n_snapshots_total:
# close hdf file after writing
self._datawriter.close()
# the XDMF writer updates XDMF every time we are finished writing a field, so each new gets added
# automatically if we specified to write everything in the same file
self._datawriter.write_xdmf_file()
# reset properties for the next field, we don't need to reset the computed cell areas since the grid remains
# the same for all fields
self._interpolated_fields = Fields()
self._snapshot_counter = 0
if self._new_file:
self._initialized_hdf5 = False
logger.info(f"Finished export of field {self._field_name} in {round(time() - self._t_start, 3)}s.")
self._t_start = time()
@property
def write_times(self) -> list:
"""
Get the list of available write times
:return: List of time steps
:rtype: list
"""
return self._write_times
@write_times.setter
def write_times(self, value: Union[str, list]) -> None:
"""
Set the write times for the output data.
:param value: A single time step (str, int, or float) or a list of time steps
:type value: Union[str, int, float, list]
:return: None
:rtype: None
"""
self._write_times = value if isinstance(value, list) else [value]
@property
def new_file(self) -> bool:
"""
Flag indicating whether a new HDF5 file will be created for each field.
:return: True if a new file is created for each field, False otherwise
:rtype: bool
"""
return self._new_file
@property
def save_name(self) -> str:
"""
Get the base name of the output files.
:return: Name of the file used for saving the grid and data
:rtype: str
"""
return self._save_name
@save_name.setter
def save_name(self, new_name: str) -> None:
"""
Set a new base name for the output files and reset initialization to ensure consistency.
:param new_name: New file name
:type new_name: str
:return: None
:rtype: None
"""
self._save_name = new_name
self._initialized_hdf5 = False
@property
def save_dir(self) -> str:
"""
Get the directory where output files will be saved.
:return: Path of the save directory
:rtype: str
"""
return self._save_dir
@save_dir.setter
def save_dir(self, new_path: str) -> None:
"""
Set the directory where output files will be saved.
:param new_path: Path of the save directory
:type new_path: str
:return: None
:rtype: None
"""
self._save_dir = new_path
self._initialized_hdf5 = False
# make sure that the directory exists when we change the save path
if not path.exists(self._save_dir):
makedirs(self._save_dir)
def _build_knn_cache(self, _coord: pt.Tensor) -> None:
"""
Compute the interpolation weights base on an inverse distance for the KNN. Caches the weights for later usage.
:param _coord: Coordinate tensor containing the original grid from CFD
:type _coord: pt.Tensor
:return: None
:rtype: None
"""
logger.info(f"Initializing KNN and computing interpolation weights.")
# check and detect if CFD grid changes, this means our computed weights become invalid
if self._coord_shape is not None and _coord.shape != self._coord_shape:
logger.warning(f"CFD grid change detected. Re-computing interpolation weights of the KNN.")
# if we call the fit method the first time, we have to compute the interpolation weights. Since both the CFD
# and the S^3 mesh are static in time, we can re-use these weights for all snapshots and fields to export
self._coord_shape = _coord.shape
# fit the KNN
self._knn.fit(_coord.cpu().numpy())
_distance_centers, _idx_centers = self._knn.kneighbors(self._centers.cpu().numpy())
# inverse distance weighting
_w_c = 1.0 / pt.clamp(pt.from_numpy(_distance_centers), min=1e-12)
_w_c /= _w_c.sum(axis=1, keepdim=True)
self._knn_idx_centers = pt.from_numpy(_idx_centers)
self._knn_w_centers = _w_c
# set the corresponding flag
self._initialized_weights = True
if self._interpolate_at_vertices:
_distance_vertices, _idx_vertices = self._knn.kneighbors(self._vertices.cpu().numpy())
_w_v = 1.0 / pt.clamp(pt.from_numpy(_distance_vertices), min=1e-12)
_w_v /= _w_v.sum(axis=1, keepdim=True)
self._knn_idx_vertices = pt.from_numpy(_idx_vertices)
self._knn_w_vertices = _w_v
[docs]
def interpolate_data(weights: pt.Tensor, idx_weights: pt.Tensor, data: pt.Tensor, chunk_size: int = 100000) -> pt.Tensor:
"""
Interpolate the data matrix from CFD onto the :math:`S^3` grid. To avoid memory issues, we loop over chunks of cells
:param weights: computed KNN weights for each point in the CFD grid
:type weights: pt.Tensor
:param idx_weights: indices of the k nearest neighbors for each point in the CFD grid
:type idx_weights: pt.Tensor
:param data: data matrix from CFD, which should be interpolated
:type data: pt.Tensor
:param chunk_size: number of cells to be interpolated at once
:type chunk_size: int
:return: interpolated datamatrix
:rtype: pt.Tensor
"""
# data.shape = [N_snapshots_CFD, N_dims, N_snapshots], weights.shape = [N_cells_s_cube, N_nb]
Nc = weights.shape[0]
_interpolated = pt.empty((Nc, data.shape[1], data.shape[2]), dtype=weights.dtype, device="cpu")
for start in range(0, Nc, chunk_size):
end = min(start + chunk_size, Nc)
_interpolated[start:end] = (weights[start:end, :, None, None] * data[idx_weights[start:end]]).sum(dim=1)
return _interpolated
if __name__ == "__main__":
pass