Source code for fluidgym.envs.tcf.tcf_env

"""Environment for turbulent channel flow control."""

from functools import partial
from pathlib import Path
from typing import Any

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
from gymnasium import spaces

from fluidgym.config import config as global_config
from fluidgym.envs.fluid_env import FluidEnv, Stats
from fluidgym.envs.tcf.grid import (
    get_van_driest_sqr,
    make_channel_flow_domain,
    set_dynamic_forcing,
)
from fluidgym.envs.util.obs_extraction import extract_moving_window_2d_x_z
from fluidgym.envs.util.visualization import (
    render_3d_iso,
)
from fluidgym.simulation.extensions import (
    PISOtorch,  # type: ignore[import-untyped,import-not-found]
)
from fluidgym.simulation.helpers import get_cell_centers, get_cell_size
from fluidgym.simulation.pict.data import TCF_tools
from fluidgym.simulation.pict.PISOtorch_simulation import (
    append_prep_fn,
)
from fluidgym.simulation.pict.util.output import _resample_block_data
from fluidgym.simulation.simulation import Simulation
from fluidgym.types import EnvMode

Q_CRITERION_ISOS = {
    np.pi / 2: {
        180: 0.05,
        330: 0.05,
        550: 0.05,
    },
    np.pi: {
        180: 0.05,
        330: 0.05,
        550: 0.05,
    },
}

VELOCITY_MAX = {
    np.pi / 2: {
        180: 0.9,
        330: 0.9,
        550: 0.9,
    },
    np.pi: {
        180: 0.9,
        330: 0.9,
        550: 0.9,
    },
}

SMALL_TCF_3D_DEFAULT_CONFIG = {
    "resolution_y": 65,
    "resolution_x_z": 64,
    "actor_size": 2,
    "L": np.pi,
    "D": np.pi / 2,
    "reynolds_number_wall": 180,
    "adaptive_cfl": 0.1,
    "step_length": 0.6,
    "episode_length": 1000,
    "local_obs_window": 1,
    "local_reward_weight": 0.0,
    "use_marl": True,
    "C_smag": 0.0,
    "use_van_driest": False,
    "init_with_noise": True,
    "dtype": torch.float32,
    "load_initial_domain": True,
    "load_domain_statistics": True,
    "randomize_initial_state": True,
    "enable_actions": True,
    "differentiable": False,
}

LARGE_TCF_3D_DEFAULT_CONFIG = {
    **SMALL_TCF_3D_DEFAULT_CONFIG,
    "resolution_x_z": 128,
    "L": 2 * np.pi,
    "D": np.pi,
}


[docs] class TCF3DBottomEnv(FluidEnv): """Environment for turbulent channel flow control. Parameters ---------- resolution_y: int The resolution of the simulation grid in the wall-normal direction. resolution_x_z: int The resolution of the simulation grid in the streamwise and spanwise directions. L: float The length of the domain in the streamwise direction. D: float The length of the domain in the spanwise direction. actor_size: int The size of each actor region in grid cells. reynolds_number_wall: float The Reynolds number based on the wall shear velocity and half channel height. adaptive_cfl: float Target CFL number for adaptive time stepping. step_length: float The non-dimensional time length of each environment step. episode_length: int The number of steps per episode. local_obs_window: int The size of the local observation window for each agent. local_reward_weight: float The weight of the local reward in the total reward. use_marl: bool Whether to enable multi-agent reinforcement learning mode. C_smag: float The Smagorinsky constant for the LES model. If 0, no LES model is used. Defaults to 0.0. use_van_driest: bool Whether to use Van Driest damping for the LES model. Defaults to False. init_with_noise: bool Whether to initialize the velocity field with added noise. Defaults to True. dtype: torch.dtype The data type to use for the simulation. Defaults to torch.float32. cuda_device: torch.device | None The CUDA device to use for the simulation. If None, the default cuda device is used. Defaults to None. debug: bool Whether to enable debug mode. Defaults to False. load_initial_domain: bool Whether to load initial domain states from disk. Defaults to True. load_domain_statistics: bool Whether to load domain statistics from disk. Defaults to True. randomize_initial_state: bool Whether to randomize the initial state on reset. Defaults to True. enable_actions: bool Whether to enable actions. If False, the environment will be run in uncontrolled mode. Defaults to True. differentiable: bool Whether to enable differentiable simulation mode. Defaults to False. References ---------- [1] L. Guastoni, J. Rabault, P. Schlatter, H. Azizpour, and R. Vinuesa, “Deep reinforcement learning for turbulent drag reduction in channel flows,” Eur. Phys. J. E, vol. 46, no. 4, p. 27, Apr. 2023, doi: 10.1140/epje/s10189-023-00285-8. [2] Z. Zhao et al., “Physics-informed Neural-operator Predictive Control for Drag Reduction in Turbulent Flows,” Oct. 03, 2025, arXiv: arXiv:2510.03360. doi: 10.48550/arXiv.2510.03360. """ _default_render_key: str = "3d_q_criterion" _actuation: str = "bottom" _supports_marl: bool = True # We need to be able to disable action scaling for opposition control _scale_actions: bool = True _action_smoothing_alpha: float = 0.1 _delta: float = 1.0 # half channel height _H: float = 2.0 * _delta # channel height _L: float _D: float _action_range = (-1.0, 1.0) _observation_range = (-2.5, 2.5) _y_obs_wall: float = 15.0 # See reference [1] _actor_size: int _local_obs_window: int _metrics: list[str] = ["wall_stress", "wall_stress_bottom", "wall_stress_top"] _vorticity_stats: Stats | None = None # Domain generation _initial_domain_ett: float = 50.0 _initial_domain_restart: bool = False _wall_distance: float | None = None def __init__( self, resolution_y: int, resolution_x_z: int, L: float, D: float, actor_size: int, reynolds_number_wall: float, adaptive_cfl: float, step_length: float, episode_length: int, local_obs_window: int, local_reward_weight: float, use_marl: bool, C_smag: float = 0.0, use_van_driest: bool = False, init_with_noise: bool = True, dtype: torch.dtype = torch.float32, cuda_device: torch.device | None = None, debug: bool = False, load_initial_domain: bool = True, load_domain_statistics: bool = True, randomize_initial_state: bool = True, enable_actions: bool = True, differentiable: bool = False, ): self._L = L self._D = D self._debug = debug self._re_wall = reynolds_number_wall self._re_center: float = TCF_tools.Re_wall_to_cl(self._re_wall) self._viscosity = torch.tensor([self._delta / self._re_center], dtype=dtype) self._u_wall = self._re_wall / self._re_center self._x = resolution_x_z self._y = resolution_y self._z = resolution_x_z self._grid_refinement_strength = 2 if resolution_x_z < 64 else 1 self._C_smag = C_smag self._use_van_driest = use_van_driest self._init_with_noise = init_with_noise self._actor_size = actor_size self._local_obs_window = local_obs_window self._local_reward_weight = local_reward_weight # We dt from wall units to physical units step_length = self._t_wall_to_t(step_length) # See https://doi.org/10.1017/jfm.2023.147 dt = step_length / 10 super().__init__( dt=dt, adaptive_cfl=adaptive_cfl, step_length=step_length, episode_length=episode_length, ndims=3, use_marl=use_marl, dtype=dtype, cuda_device=cuda_device, load_initial_domain=load_initial_domain, load_domain_statistics=load_domain_statistics, randomize_initial_state=randomize_initial_state, enable_actions=enable_actions, differentiable=differentiable, ) self._viscosity = self._viscosity.to(self._cpu_device) target_t = TCF_tools.ETT_to_t( self._initial_domain_ett, self._u_wall, self._delta, # type: ignore ) self._initial_domain_steps = round(target_t / self._step_length) # For the small domain with Re = 180, we need more initial steps to reach a # turbulent state if self._L < 3.0 and self._re_wall < 330: self._initial_domain_steps *= 2 @property def render_shape(self) -> tuple[int, ...]: """The shape of the rendered domain.""" x_render_size = 2 * self._x y_render_size = int(x_render_size / self._L * self._H) z_render_size = int(x_render_size / self._L * self._D) return (x_render_size, y_render_size, z_render_size) def _get_domain(self) -> PISOtorch.Domain: domain = make_channel_flow_domain( H=self._H, L=self._L, D=self._D, x=self._x, y=self._y, z=self._z, refinement_strength=self._grid_refinement_strength, n_dims=self._ndims, u_wall=self._u_wall, viscosity=self._viscosity, cuda_device=self._cuda_device, init_with_noise=self._init_with_noise, ) # finalize domain domain.PrepareSolve() return domain def _t_to_t_wall(self, t: float) -> float: return t / TCF_tools.t_star(self._viscosity.item(), self._u_wall) def _t_wall_to_t(self, t_wall: float) -> float: return t_wall * TCF_tools.t_star(self._viscosity.item(), self._u_wall) def _x_to_x_wall(self, pos: float) -> float: return pos * ((1 / self._viscosity.item()) * self._u_wall) def _x_wall_to_x(self, pos_wall: float) -> float: return pos_wall / ((1 / self._viscosity.item()) * self._u_wall) def _y_to_y_wall(self, pos: float) -> float: pos = pos + self._delta return pos * ((1 / self._viscosity.item()) * self._u_wall) def _y_wall_to_y(self, pos_wall: float) -> float: pos = pos_wall / ((1 / self._viscosity.item()) * self._u_wall) return -self._delta + pos def __get_y_obs_idx(self, y_wall: float) -> int: vertex_coords: torch.Tensor = self._domain.getVertexCoordinates()[0] cell_centers: torch.Tensor = get_cell_centers(vertex_coords) y_centers = cell_centers[1, 0, :, 0] y_obs = self._y_wall_to_y(y_wall) # find index of y center closedst to self._y_obs_slice center_distances = torch.abs(y_centers - y_obs) return int(torch.argmin(center_distances).item()) @property def _n_actors_x(self) -> int: return self._x // self._actor_size @property def _n_actors_z(self) -> int: return self._z // self._actor_size @property def n_agents(self) -> int: """The number of agents in the environment.""" return self._n_actors_x * self._n_actors_z def _get_action_space(self) -> spaces.Box: """Per-agent action space.""" shape: tuple[int, ...] if self.use_marl: shape = (1,) else: shape = ( self.n_agents, 1, ) return spaces.Box( low=-1.0, high=1.0, shape=shape, dtype=np.float32, ) def _get_observation_space(self) -> spaces.Dict: """Per-agent observation space.""" velocity_shape: tuple[int, ...] pressure_shape: tuple[int, ...] if self._use_marl: velocity_shape = ( self._local_obs_window, self._local_obs_window, 2, ) pressure_shape = ( self._local_obs_window, self._local_obs_window, ) else: velocity_shape = ( 2, self._z, self._x, ) pressure_shape = ( self._z, self._x, ) return spaces.Dict( { "velocity": spaces.Box( low=-np.inf, high=np.inf, shape=velocity_shape, dtype=np.float32, ), "pressure": spaces.Box( low=-np.inf, high=np.inf, shape=pressure_shape, dtype=np.float32, ), } ) @property def scale_actions(self) -> bool: r"""Whether actions are scaled by :math:`u_{\\mathrm{wall}}`.""" return self._scale_actions @scale_actions.setter def scale_actions(self, value: bool) -> None: self._scale_actions = value def _get_prep_fn(self, domain: PISOtorch.Domain) -> dict[str, Any]: prep_fn: dict[str, Any] = {} set_dynamic_forcing(self._ndims, domain, prep_fn) if self._C_smag != 0: block = domain.getBlock(0) SGS_coefficient = torch.tensor( [self._C_smag], dtype=self._dtype, device=self._cpu_device ) if self._use_van_driest: van_driest_scale_sqr = [ get_van_driest_sqr(block, domain, self._u_wall, self._cuda_device) ] def get_SGS_viscosity(domain): return PISOtorch.SGSviscosityIncompressibleSmagorinsky( domain, SGS_coefficient ) def add_block_SGS_viscosity(domain, **kwargs): domain.UpdateDomainData() SGS_viscosities = get_SGS_viscosity(domain) base_viscosity = domain.viscosity.to(self._cuda_device) for idx, (block, visc) in enumerate( zip(domain.getBlocks(), SGS_viscosities, strict=True) ): if self._use_van_driest: visc = visc * van_driest_scale_sqr[idx] visc = visc + base_viscosity block.setViscosity(visc) domain.UpdateDomainData() append_prep_fn(prep_fn, "PRE", add_block_SGS_viscosity) return prep_fn def _get_simulation( self, domain: PISOtorch.Domain, prep_fn: dict[str, Any], ) -> Simulation: sim = Simulation( domain=domain, prep_fn=prep_fn, substeps="ADAPTIVE", dt=self._dt, corrector_steps=2, advection_use_BiCG=True, advection_tol=1e-6, pressure_tol=1e-6, adaptive_CFL=self._adaptive_cfl, advect_non_ortho_steps=1, pressure_non_ortho_steps=1, pressure_return_best_result=True, velocity_corrector="FD", non_orthogonal=True, output_resampling_shape=self.render_shape[: self._ndims], output_resampling_fill_max_steps=16, differentiable=self._differentiable, ) sim.solver_double_fallback = False sim.preconditionBiCG = False sim.BiCG_precondition_fallback = True PISOtorch.CopyVelocityResultFromBlocks(self._domain) sim.make_divergence_free() return sim def _additional_initialization(self) -> None: self._block = self._domain.getBlock(0) self._bottom_plate = self._block.getBoundary("-y") self._inflow = self._block.getBoundary("-x") self._top_plate = self._block.getBoundary("+y") self._outflow = self._block.getBoundary("+x") self._y_obs_bottom_idx = self.__get_y_obs_idx(y_wall=self._y_obs_wall) def _action_to_control(self, action: torch.Tensor) -> torch.Tensor: # For opposition control, we do not scale the actions. For the RL case, # we ensure a zero mean and max abs. value of u_wall if self._scale_actions: # Ensure zero-net mass flux scaled_action = action - torch.mean(action) # Ensure max abs. value of u_wall scaled_action = ( self._u_wall * scaled_action / (torch.clamp(scaled_action.abs(), min=1.0)) ) scaled_action -= torch.mean(scaled_action) else: scaled_action = action velocity_profile = torch.zeros( (1, 3, self._z, 1, self._x), device=self._cuda_device ) v_expanded = scaled_action.repeat_interleave( self._actor_size, dim=0 ).repeat_interleave(self._actor_size, dim=1) velocity_profile[0, 1, :, 0, :] = v_expanded.transpose(1, 0) return velocity_profile def _apply_action(self, action: torch.Tensor) -> None: """Apply the given action to the simulation.""" reshaped_action = action.view(self._n_actors_x, self._n_actors_z) control = self._action_to_control(reshaped_action) self._bottom_plate.setVelocity(control) @property def tau_ref(self) -> float: """Reference bottom wall shear stress for normalization.""" if "wall_stress_bottom" in self._metrics_stats: return self._metrics_stats["wall_stress_bottom"].mean else: return 1.0 def _get_wall_stress(self) -> tuple[torch.Tensor, torch.Tensor]: """Compute the wall shear stress at both walls. Returns ------- tuple[torch.Tensor, torch.Tensor] The wall shear stress at the bottom and top walls. """ block = self._domain.getBlock(0) viscosity = self._domain.viscosity.to(self._domain.getDevice()) pos_y = torch.mean( self._domain.getBlock(0).getCellCoordinates()[0, 1], dim=(0, 2) ) d_y = (1 + pos_y[0].cpu().numpy(), 1 - pos_y[-1].cpu().numpy()) mean_vel_u = torch.mean(block.velocity[0, 0], dim=(0, 2)) tau_wall_bottom = viscosity * mean_vel_u[0] / d_y[0] tau_wall_top = viscosity * mean_vel_u[-1] / d_y[-1] return tau_wall_bottom, tau_wall_top def _get_q_criterion(self) -> torch.Tensor: """ Compute the Q-criterion of the domain. Returns ------- torch.Tensor The Q-criterion tensor. References ---------- [1] J. Jeong and F. Hussain, “On the identification of a vortex,” Journal of Fluid Mechanics, vol. 285, pp. 69-94, 1995. doi:10.1017/S0022112095000462 """ self._domain.UpdateDomainData() gradients = PISOtorch.ComputeSpatialVelocityGradients(self._domain) d_dx, d_dy, d_dz = gradients[0] du_dx = d_dx[0, 0, ...] du_dy = d_dy[0, 0, ...] du_dz = d_dz[0, 0, ...] dv_dy = d_dy[0, 1, ...] dv_dx = d_dx[0, 1, ...] dv_dz = d_dz[0, 1, ...] dw_dx = d_dx[0, 2, ...] dw_dy = d_dy[0, 2, ...] dw_dz = d_dz[0, 2, ...] grad_u = torch.stack( [ torch.stack([du_dx, du_dy, du_dz], dim=0), torch.stack([dv_dx, dv_dy, dv_dz], dim=0), torch.stack([dw_dx, dw_dy, dw_dz], dim=0), ], dim=0, ) # Compute the symmetric and antisymmetric parts S = 0.5 * (grad_u + grad_u.transpose(1, 0)) Omega = 0.5 * (grad_u - grad_u.transpose(1, 0)) # Compute the Frobenius norms S_norm_sq = torch.sum(S**2, dim=(0, 1)) Omega_norm_sq = torch.sum(Omega**2, dim=(0, 1)) # Compute Q Q = 0.5 * (Omega_norm_sq - S_norm_sq) Q = _resample_block_data( data_list=[Q[None, None, ...]], vertex_coord_list=self._sim.output_resampling_coords, resampling_out_shape=self._sim.output_resampling_shape, ndims=self._ndims, fill_max_steps=self._sim.output_resampling_fill_max_steps, ) return Q def _get_global_obs(self, y_idx: int | None = None) -> dict[str, torch.Tensor]: """Return the current observation.""" if y_idx is None: y_idx = self._y_obs_bottom_idx u: torch.Tensor = self._domain.getBlock(0).getVelocity(False) u = u.squeeze() p: torch.Tensor = self._domain.getBlock(0).pressure p = p.squeeze() cell_size = get_cell_size(self._domain.getBlock(0)) cell_size = cell_size.squeeze() # Compute spatial mean velocity mean_u = (u * cell_size[None, ...]).sum( dim=(1, 2, 3), keepdim=True ) / cell_size.sum() u_prime = u - mean_u u_slice = u_prime[ :2, # We only take u_x and u_y components :, y_idx, :, ] p_slice = p[:, y_idx, :] return { "velocity": u_slice, "pressure": p_slice, } def _get_render_data( self, render_3d: bool, output_path: Path | None = None, ) -> dict[str, np.ndarray]: y_wall = 150 y = self._y_wall_to_y(y_wall) y_shape_idx = round((y + self._delta) / self._H * self.render_shape[1]) q = self._get_q_criterion() q_arr = q.squeeze().detach().cpu().numpy() u = self.get_velocity() u = u.squeeze() u = torch.linalg.vector_norm(u, dim=0) # Flip x- and y-axis u = torch.flip(u, dims=[-2, -1]) vorticity = self.get_vorticity() vorticity = vorticity.squeeze() # Flip x- and y-axis vorticity = torch.flip(vorticity, dims=[-2, -1]) vorticity_arr = vorticity.detach().cpu().numpy() u_min = 0.0 u_max = VELOCITY_MAX[self._D][int(self._re_wall)] format_velocity = partial( self._format_render_data, v_min=u_min, v_max=u_max, cmap="viridis", ) if self._vorticity_stats: vort_min = self._vorticity_stats.min vort_max = self._vorticity_stats.p95 else: vort_min = vorticity_arr.min().item() vort_max = vorticity_arr.max().item() # We take the max abs value for symmetric colormap abs_max = max(abs(vort_min), abs(vort_max)) vort_min = -abs_max vort_max = abs_max format_vorticity = partial( self._format_render_data, v_min=vort_min, v_max=vort_max, cmap="icefire", ) render_data = {} u_xy = u[u.shape[0] // 2, :, :] u_xz = u[:, y_shape_idx // 2, :] u_yz = u[:, :, u.shape[2] // 2] render_data["x-y-velocity"] = format_velocity(data=u_xy.detach().cpu().numpy()) render_data["x-z-velocity"] = format_velocity(data=u_xz.detach().cpu().numpy()) render_data["y-z-velocity"] = format_velocity( data=u_yz.T.detach().cpu().numpy() ) vort_xy = vorticity_arr[2, vorticity_arr.shape[0] // 2, :, :] vort_xz = vorticity_arr[1, :, y_shape_idx // 2, :] vort_yz = vorticity_arr[0, :, :, vorticity_arr.shape[2] // 2] render_data["x-y-vorticity"] = format_vorticity(data=vort_xy) render_data["x-z-vorticity"] = format_vorticity(data=vort_xz) render_data["y-z-vorticity"] = format_vorticity(data=vort_yz.T) u_arr = u.detach().cpu().numpy() q_arr = q_arr.transpose(2, 1, 0) u_arr = u_arr.transpose(2, 1, 0) if render_3d: q_wall = q_arr[:, :y_shape_idx, :] u_wall = u_arr[:, :y_shape_idx, :] if output_path is not None: output_path = output_path / f"q_{self._n_episodes}_{self._n_steps}.png" q_iso_value = Q_CRITERION_ISOS[self._D][int(self._re_wall)] render_data["3d_q_criterion"] = render_3d_iso( iso_field=q_wall, iso=[q_iso_value], color_range=(u_min, u_max), output_path=output_path, color_field=u_wall, colormap="rainbow", extent=( (0, self._x_to_x_wall(self._L)), (self._y_to_y_wall(-self._delta), y_wall), (0, self._x_to_x_wall(self._D)), ), view_kwargs={"elev": 15, "azim": 60}, ) return render_data def _get_reward( self, tau_total: torch.Tensor, tau_bottom: torch.Tensor ) -> torch.Tensor: # For the bottom case, we only consider the bottom wall stress return 1 - tau_bottom / self.tau_ref def _step_impl( self, action: torch.Tensor ) -> tuple[dict[str, torch.Tensor], torch.Tensor, bool, dict[str, torch.Tensor]]: flat_action = action.squeeze() if self._enable_actions: self._apply_action(flat_action) tau_top_list = [] tau_bottom_list = [] for _ in range(self._n_sim_steps): self._sim.single_step() _tau_bottom, _tau_top = self._get_wall_stress() tau_bottom_list += [_tau_bottom] tau_top_list += [_tau_top] # Average over simulation steps tau_bottom = torch.stack(tau_bottom_list).mean() tau_top = torch.stack(tau_top_list).mean() tau_total = 0.5 * (tau_bottom + tau_top) reward = self._get_reward( tau_total=tau_total, tau_bottom=tau_bottom, ) obs = self._get_global_obs() info = { "wall_stress": tau_total, "wall_stress_bottom": tau_bottom, "wall_stress_top": tau_top, } return obs, reward, False, info
[docs] def plot(self, output_path: Path | None = None) -> None: """Plot the environments configuration. Parameters ---------- output_path: Path | None Path to save the plot. If None, the current directory is used. Defaults to None. """ if output_path is None: output_path = Path(".") y_sensor = self._y_wall_to_y(self._y_obs_wall) colors = global_config.palette plt.figure(figsize=(10, 5)) ax = plt.gca() # add vertical line for y_sensor as dotted line plt.axhline(y=y_sensor, color=colors[2], linestyle="dotted", linewidth=2) plt.axhline(y=-y_sensor, color=colors[2], linestyle="dotted", linewidth=2) plt.xlim(0, self._L) plt.ylim(-self._H / 2, self._H / 2) ax.set_yticks([-self._H / 2, 0, self._H / 2]) ax.set_yticklabels([f"{-self._H / 2:.1f}", "0.0", f"{self._H / 2:.1f}"]) ax.set_xticks([0, self._L]) aspect = round(self._L / torch.pi) aspect_str = "" if aspect == 1 else str(aspect) ax.set_xticklabels(["0", aspect_str + r"$\pi$"]) ax.set_xlabel("L") ax.set_ylabel("H") plt.savefig(output_path / f"{self.id}.pdf")
@property def initial_domain_id(self) -> str: """Unique identifier for the initial domain.""" return ( f"channel_flow3D_L{self._L:.2f}_Re{int(self._re_wall)}_Res{self._x}" f"_Ref{self._grid_refinement_strength}" ) @property def id(self) -> str: """Unique identifier for the environment.""" return f"ChannelFlow3D_Re{int(self._re_wall)}_L{self._L:.2f}" def _randomize_domain(self) -> None: velocity_noise = 0.01 pressure_noise = 0.01 max_n_steps = int(0.01 * self._episode_length) n_steps = self._np_rng.integers(int(0.5 * max_n_steps), max_n_steps) + 1 blocks = self._domain.getBlocks() for block in blocks: u = block.velocity p = block.pressure u += ( torch.normal( mean=0.0, std=1.0, size=u.shape, device=self._cuda_device, generator=self._torch_rng_cuda, ) * velocity_noise ) p += ( torch.normal( mean=0.0, std=1.0, size=p.shape, device=self._cuda_device, generator=self._torch_rng_cuda, ) * pressure_noise ) block.setVelocity(u) block.setPressure(p) for _ in range(n_steps): self._sim.single_step() def _get_local_obs( self, y_idx: int | None = None, flip_obs: bool = False ) -> dict[str, torch.Tensor]: if y_idx is None: y_idx = self._y_obs_bottom_idx u: torch.Tensor = self._domain.getBlock(0).getVelocity(False) u = u.squeeze() p: torch.Tensor = self._domain.getBlock(0).pressure p = p.squeeze() u_slice = u[ :2, # We only take u_x and u_y components :, y_idx, :, ] p_slice = p[:, y_idx, :] # Compute spatial mean velocity for the slice mean_u = u_slice.mean(dim=(1, 2), keepdim=True) u_prime = u_slice - mean_u u_x = u_prime[0, :, :] u_y = u_prime[1, :, :] local_obs_u_x = extract_moving_window_2d_x_z( field=u_x, n_agents_x=self._n_actors_x, n_agents_z=self._n_actors_z, agent_width=self._actor_size, n_agents_per_window_x=self._local_obs_window, n_agents_per_window_z=self._local_obs_window, pad_x=self._local_obs_window - 1, pad_z=self._local_obs_window // 2, ) if flip_obs: local_obs_u_x = torch.flip(local_obs_u_x, dims=[2]) local_obs_u_y = extract_moving_window_2d_x_z( field=u_y, n_agents_x=self._n_actors_x, n_agents_z=self._n_actors_z, agent_width=self._actor_size, n_agents_per_window_x=self._local_obs_window, n_agents_per_window_z=self._local_obs_window, pad_x=self._local_obs_window, pad_z=self._local_obs_window // 2, ) # This is for the top wall observation to have a consistent orientation if flip_obs: local_obs_u_y = torch.flip(local_obs_u_y, dims=[2]) local_obs_u_y *= -1 local_obs_p = extract_moving_window_2d_x_z( field=p_slice, n_agents_x=self._n_actors_x, n_agents_z=self._n_actors_z, agent_width=self._actor_size, n_agents_per_window_x=self._local_obs_window, n_agents_per_window_z=self._local_obs_window, pad_x=self._local_obs_window, pad_z=self._local_obs_window // 2, ) if flip_obs: local_obs_p = torch.flip(local_obs_p, dims=[1]) local_obs_u = torch.stack((local_obs_u_x, local_obs_u_y), dim=-1) return { "velocity": local_obs_u, "pressure": local_obs_p, } def _step_marl_impl( self, actions: torch.Tensor ) -> tuple[dict[str, torch.Tensor], torch.Tensor, bool, dict[str, torch.Tensor]]: if self._local_reward_weight is None: raise ValueError("local_reward_weight must be set for multi-agent step.") _, global_reward, terminated, info = self._step_impl(actions) local_obs = self._get_local_obs() agent_rewards = global_reward * torch.ones( self.n_agents, device=global_reward.device, dtype=global_reward.dtype, ) info["global_reward"] = global_reward return local_obs, agent_rewards, terminated, info def _load_domain_statistics(self) -> dict[str, dict[str, float]]: stats = super()._load_domain_statistics() self._vorticity_stats = Stats(**stats["vorticity_magnitude"]) return stats
[docs] def save_opposition_control_episode( self, idx: int, mode: EnvMode, df: pd.DataFrame ) -> None: """Save the opposition control episode data to a CSV file. Parameters ---------- idx: int The index of the episode. mode: EnvMode The mode of the environment (e.g., training, evaluation). df: pd.DataFrame The DataFrame containing the episode data to save. """ path = self._get_domain_dir(idx=idx) df.to_csv( path / f"{mode.value}_opposition_control_{self._actuation}_episode.csv", index=False, )
[docs] def load_opposition_control_episode(self, idx: int, mode: EnvMode) -> pd.DataFrame: """Load the opposition control episode data from a CSV file. Parameters ---------- idx: int The index of the episode. mode: EnvMode The mode of the environment (e.g., training, evaluation). Returns ------- pd.DataFrame The DataFrame containing the episode data. """ path = self._get_domain_dir(idx=idx) df = pd.read_csv( path / f"{mode.value}_opposition_control_{self._actuation}_episode.csv" ) return df
[docs] class TCF3DBothEnv(TCF3DBottomEnv): """Environment for turbulent channel flow control with both walls actuated. The first half of the agents control the bottom wall, while the second half control the top wall. References ---------- [1] T. R. Bewley, P. Moin, and R. Temam, “DNS-based predictive control of turbulence: an optimal benchmark for feedback algorithms,” J. Fluid Mech., vol. 447, pp. 179-225, Nov. 2001, doi: 10.1017/S0022112001005821. """ _actuation: str = "both" def _get_observation_space(self) -> spaces.Dict: """Per-agent observation space.""" velocity_shape: tuple[int, ...] pressure_shape: tuple[int, ...] if self._use_marl: velocity_shape = ( self._local_obs_window, self._local_obs_window, 2, ) pressure_shape = ( self._local_obs_window, self._local_obs_window, ) else: velocity_shape = ( 2, 2, self._z, self._x, ) pressure_shape = ( 2, self._z, self._x, ) return spaces.Dict( { "velocity": spaces.Box( low=-np.inf, high=np.inf, shape=velocity_shape, dtype=np.float32, ), "pressure": spaces.Box( low=-np.inf, high=np.inf, shape=pressure_shape, dtype=np.float32, ), } ) @property def n_agents(self) -> int: """The number of agents in the environment.""" return 2 * super().n_agents @property def tau_ref(self) -> float: """Reference overall wall shear stress for normalization.""" if "wall_stress" in self._metrics_stats: return self._metrics_stats["wall_stress"].mean else: return 1.0 def _additional_initialization(self) -> None: super()._additional_initialization() self._y_obs_top_idx = self._y - self._y_obs_bottom_idx def _apply_action(self, action: torch.Tensor) -> None: action_bottom = action[: self.n_agents // 2] action_top = action[self.n_agents // 2 :] action_bottom = action_bottom.view(self._n_actors_x, self._n_actors_z) action_top = action_top.view(self._n_actors_x, self._n_actors_z) control_bottom = self._action_to_control(action_bottom) control_top = -1 * self._action_to_control(action_top) self._bottom_plate.setVelocity(control_bottom) self._top_plate.setVelocity(control_top) def _get_reward( self, tau_total: torch.Tensor, tau_bottom: torch.Tensor ) -> torch.Tensor: # We take both the stress at both plates for the reward return 1 - tau_total / self.tau_ref def _get_local_rewards(self, full_wall_stress: torch.Tensor) -> torch.Tensor: # We take both the stress at both plates for local rewards return 1 - full_wall_stress.flatten() / self.tau_ref def _get_global_obs(self, y_idx: int | None = None) -> dict[str, torch.Tensor]: """Return the current observation.""" bottom_obs = super()._get_global_obs(y_idx=self._y_obs_bottom_idx) top_obs = super()._get_global_obs(y_idx=self._y_obs_top_idx) full_obs = { "velocity": torch.stack( (bottom_obs["velocity"], top_obs["velocity"]), dim=0 ), "pressure": torch.stack( (bottom_obs["pressure"], top_obs["pressure"]), dim=0 ), } return full_obs def _get_local_obs( self, y_idx: int | None = None, flip_obs: bool = False ) -> dict[str, torch.Tensor]: bottom_obs = super()._get_local_obs( y_idx=self._y_obs_bottom_idx, flip_obs=False ) top_obs = super()._get_local_obs(y_idx=self._y_obs_top_idx, flip_obs=True) full_obs = { "velocity": torch.cat((bottom_obs["velocity"], top_obs["velocity"]), dim=0), "pressure": torch.cat((bottom_obs["pressure"], top_obs["pressure"]), dim=0), } return full_obs