Find tracks#
Show code cell source
from collections.abc import Iterable
from datetime import datetime
from inspect import Signature
from boilercore.paths import ISOLIKE, dt_fromisolike
from devtools import pprint
from matplotlib.pyplot import subplot_mosaic, subplots
from more_itertools import one, only
from numpy import diff, gradient, linalg, log10, logspace, pi
from pandas import DataFrame, Series, melt, read_hdf
from seaborn import lineplot, scatterplot
from trackpy import link, quiet
from boilercv.correlations import GROUPS, beta
from boilercv.correlations import nusselt as correlations_nusselt
from boilercv.data import FRAME, TIME, VIDEO
from boilercv.dimensionless_params import (
fourier,
jakob,
kinematic_viscosity,
nusselt,
prandtl,
reynolds,
thermal_diffusivity,
)
from boilercv.images import scale_bool
from boilercv_docs.nbs import init
from boilercv_pipeline.dfs import limit_group_size
from boilercv_pipeline.models.column import Col, LinkedCol, convert, rename
from boilercv_pipeline.models.deps import get_slices
from boilercv_pipeline.models.df import GBC
from boilercv_pipeline.models.subcool import const
from boilercv_pipeline.palettes import cat10, cool
from boilercv_pipeline.plotting import get_cat_colorbar
from boilercv_pipeline.sets import get_dataset2
from boilercv_pipeline.stages import find_objects, get_thermal_data
from boilercv_pipeline.stages.find_tracks import FindTracks as Params
from boilercv_pipeline.units import U
quiet()
PARAMS = None
Show code cell source
if isinstance(PARAMS, str):
params = Params.model_validate_json(PARAMS)
else:
params = Params(
context=init(),
include_patterns=const.nb_include_patterns,
slicer_patterns=const.nb_slicer_patterns,
)
params.format.set_display_options()
data = params.data
C = params.cols
context = params.context
objects_path = one(params.objects)
objects = DataFrame(read_hdf(objects_path))
OC = find_objects.Cols()
thermal = read_hdf(params.deps.thermal)
TC = get_thermal_data.Cols()
slices = get_slices(one(params.filled_slicers))
frames = slices.get(FRAME, slice(None))
filled = scale_bool(get_dataset2(one(params.filled), slices=slices)[VIDEO])
dfs = only(params.dfs)
step_time = diff(filled.time.values)[0]
frames_per_step = one(params.slicer_patterns.values())[FRAME].step
time_per_frame = step_time / frames_per_step
cols_to_link = [OC.frame, OC.x_tp, OC.y_tp]
# ! To find (202 - 96)
# %matplotlib widget
# from boilercv_pipeline.sets import get_dataset
# _, ax = subplots()
# ax.imshow(get_dataset("2024-07-18T17-44-35", stage="large_sources")[VIDEO].sel(frame=0))
M_PER_PX = U.convert(3 / 8, "in", "m") / (202 - 96)
# PX_PER_M = 20997.3753
U.define(f"px = {M_PER_PX} m")
U.define(f"frames = {time_per_frame} s")
# Linking
SEARCH_RANGE = 5 # 35
"""Pixel range to search for the next bubble."""
MEMORY = 5 # * frames_per_step
"""Frames to remember a bubble."""
# Track tuning
Y_SURFACE_THRESHOLD = U.convert(280, "px", "m")
"""Vertical position of bubble centroids considered attached to the surface."""
Y_DEPARTURE_THRESHOLD = U.convert(300, "px", "m")
"""Vertical position of bubble centroids considered to have departed the surface."""
MINIMUM_LIFETIME = 0.010 # s
"""Minimum bubble lifetime to consider."""
# Physical parameters
LATENT_HEAT_OF_VAPORIZATION = 2.23e6 # J/kg
LIQUID_DENSITY = 960 # kg/m^3
LIQUID_DYNAMIC_VISCOSITY = 2.88e-4 # Pa-s
LIQUID_ISOBARIC_SPECIFIC_HEAT = 4213 # J/kg-K
LIQUID_THERMAL_CONDUCTIVITY = 0.676 # W/m-K
VAPOR_DENSITY = 0.804 # kg/m^3
# Plotting
CORRELATIONS_PALETTE = cat10
"""For plotting one approach."""
TRACKS_PALETTE = cool
"""For plotting the other approach."""
MAX_FOURIER = 0.005
"""Maximum Fourier number to plot."""
MAX_BETA = 1.05
"""Maximum dimensionless bubble diameter to plot."""
MAX_NUSSELT = 500
"""Maximum Nusselt number to plot."""
TRACKS_ALPHA = 0.3
"""Transparency of the tracks."""
TRACKS_SIZE = 10
"""Size of the tracks."""
def get_time() -> datetime:
"""Get time from path."""
if match := ISOLIKE.search(objects_path.stem):
return dt_fromisolike(match)
else:
raise ValueError("No time found in path.")
time = get_time()
subcooling = thermal.set_index(TC.time())[TC.subcool()][time]
def get_init(ser: "Series[float]") -> float:
"""Get initial value of a series."""
return ser.head(int(MINIMUM_LIFETIME // step_time) or 1).median()
def get_delta(df: DataFrame, c: LinkedCol) -> "Series[float]":
"""Get position time delta across frames."""
return df.groupby(C.bub(), **GBC)[[c.source()]].diff().fillna(0) / step_time
def query_lifetime(df: DataFrame) -> DataFrame:
"""Filter bubbles by lifetime."""
return (
df.rename(columns={C.bub_visible(): (temp_name := C.bub_visible.no_unit.name)})
.query(f"`{temp_name}` > {MINIMUM_LIFETIME}")
.rename(columns={temp_name: C.bub_visible()})
)
def preview(
df: DataFrame, cols: Iterable[Col] | None = None, index: Col | None = None
) -> DataFrame:
"""Preview a dataframe in the notebook."""
df = params.format.preview(
cols=cols,
df=df,
index=index,
f=lambda df: df.groupby(C.bub(), **GBC).head(4).head(16),
)
return df
pprint(params)
Show code cell source
data.dfs.tracks = preview(
cols=[OC.frame, *C.tracks],
df=link(
# ? TrackPy expects certain column names
f=objects.rename(columns={c(): c.source.raw for c in cols_to_link}),
search_range=SEARCH_RANGE,
memory=MEMORY,
)
.pipe(rename, cols_to_link) # ? Back to our names
.pipe(C.bub.rename)
.assign(**{
C.time_elapsed(): lambda df: filled.sel(frame=df[OC.frame()].values)[TIME],
C.bub_visible_frames(): lambda df: (
df.groupby(C.bub(), **GBC)[C.bub()].transform("count") * frames_per_step
),
})
.pipe(convert, [C.x, C.y, C.diameter, C.radius_of_gyration], U)
.sort_values(
[C.bub_visible_frames(), C.bub(), OC.frame()], ascending=[False, True, True]
)
.assign(**{
C.bub(): (lambda df: df.groupby(C.bub(), **GBC).ngroup()),
C.u(): lambda df: get_delta(df, C.u),
C.v(): lambda df: get_delta(df, C.v),
C.distance(): lambda df: linalg.norm(df[[C.u(), C.v()]].abs(), axis=1),
})
.pipe(convert, [C.bub_visible], U),
)
print(f"Found {data.dfs.tracks[C.bub()].nunique()} bubbles")
Show code cell source
data.dfs.bubbles = preview(
cols=C.bubbles,
# ? Find rows corresponding to stagnant or invalid bubbles
df=data.dfs.tracks.pipe(lambda df: df[df[C.bub_visible()] > MINIMUM_LIFETIME])
.groupby(C.bub(), **GBC)[data.dfs.tracks.columns]
.apply(
# ? Don't assign any other columns until invalid rows have been filtered out
lambda df: df.assign(**{
"bubble_visible_y": lambda df: df[C.y()].pipe(get_init),
# ? Initial y position is close to the surface
"began": lambda df: df["bubble_visible_y"] > Y_SURFACE_THRESHOLD,
# ? When the bubble gets far enough away from the surface
"departed": lambda df: df[C.y()] < Y_DEPARTURE_THRESHOLD,
})
)
# ? Filter out invalid rows
.pipe(lambda df: df[df["began"] & df["departed"]])
.pipe(limit_group_size, C.bub(), 1)
# Groupby again after filtering out invalid rows
.groupby(C.bub(), **GBC)[data.dfs.tracks.columns]
# Now columns that depend on the initial row (*.iat[0]) can be assigned
.apply(
lambda df: df.assign(**{
C.bub_time(): (
lambda df: df[C.time_elapsed()] - df[C.time_elapsed()].iat[0]
),
C.bub_lifetime(): lambda df: (
df[C.bub_time()].iat[-1] - df[C.bub_time()].pipe(get_init)
),
C.bub_t0(): lambda df: df[C.bub_time()].pipe(get_init),
C.bub_x0(): lambda df: df[C.x()].pipe(get_init),
C.bub_y0(): lambda df: df[C.y()].pipe(get_init),
C.bub_d0(): lambda df: df[C.diameter()].pipe(get_init),
C.bub_u0(): lambda df: df[C.u()].pipe(get_init),
C.bub_v0(): lambda df: df[C.v()].pipe(get_init),
C.max_diam(): lambda df: df[C.diameter()].max(),
C.diam_rate_of_change(): lambda df: gradient(
df[C.diameter()], df[C.bub_time()]
),
})
),
)
print(f"{data.dfs.bubbles[C.bub()].nunique()} bubbles remain")
Show code cell source
data.plots.multi, axs = subplot_mosaic([
[C.y(), C.v()],
[C.diameter(), C.diam_rate_of_change()],
])
data.plots.multi.set_figwidth(1.89 * data.plots.multi.get_figwidth())
data.plots.multi.set_figheight(1.30 * data.plots.multi.get_figheight())
for plot, ax in axs.items():
palette, tracks_data = get_cat_colorbar(
ax, C.bub(), TRACKS_PALETTE, data.dfs.bubbles
)
scatterplot(
ax=ax,
edgecolor="none",
s=TRACKS_SIZE,
alpha=TRACKS_ALPHA,
x=C.bub_time(),
y=plot, # pyright: ignore[reportArgumentType] 1.1.356
hue=C.bub(),
legend=False,
palette=palette,
data=tracks_data,
)
Show code cell source
liquid_kinematic_viscosity = kinematic_viscosity(
density=LIQUID_DENSITY, dynamic_viscosity=LIQUID_DYNAMIC_VISCOSITY
)
liquid_thermal_diffusivity = thermal_diffusivity(
thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
density=LIQUID_DENSITY,
isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
)
data.dfs.dst = preview(
cols=C.dests,
df=data.dfs.bubbles.assign(**{
C.bub_reynolds(): lambda df: reynolds(
velocity=abs(df[C.v()]),
characteristic_length=df[C.diameter()],
kinematic_viscosity=liquid_kinematic_viscosity,
),
C.bub_reynolds0(): lambda df: df.groupby(C.bub(), **GBC)[
C.bub_reynolds()
].transform(get_init),
C.bub_fourier(): fourier(
initial_bubble_diameter=data.dfs.bubbles[C.max_diam()],
liquid_thermal_diffusivity=liquid_thermal_diffusivity,
time=data.dfs.bubbles[C.bub_time()],
),
C.bub_nusselt(): nusselt( # Nu_c
heat_transfer_coefficient=-(
2
* VAPOR_DENSITY
* LATENT_HEAT_OF_VAPORIZATION
/ subcooling
* data.dfs.bubbles[C.diam_rate_of_change()]
),
characteristic_length=data.dfs.bubbles[C.diameter()],
thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
),
C.bub_beta(): (lambda df: df[C.diameter()] / df[C.max_diam()]),
}),
)
Show code cell source
object_averages = data.dfs.dst.set_index(C.bub()).groupby(C.bub(), **GBC).mean().mean()
bubble_initial_reynolds = reynolds(
velocity=abs(object_averages[C.bub_v0()]),
characteristic_length=object_averages[C.bub_d0()],
kinematic_viscosity=liquid_kinematic_viscosity,
)
liquid_prandtl = prandtl(
dynamic_viscosity=LIQUID_DYNAMIC_VISCOSITY,
isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
)
bubble_jakob = jakob(
liquid_density=LIQUID_DENSITY,
vapor_density=VAPOR_DENSITY,
liquid_isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
subcooling=subcooling,
latent_heat_of_vaporization=LATENT_HEAT_OF_VAPORIZATION,
)
bubble_fourier_smooth = logspace(
stop=log10(MAX_FOURIER), start=log10(MAX_FOURIER) - 4, num=int(1e4)
)
beta_correlations = beta.get_correlations()
nusselt_correlations = correlations_nusselt.get_correlations()
constants = {
"Fo_0": bubble_fourier_smooth,
"Ja": bubble_jakob,
"Re_b0": bubble_initial_reynolds,
"Pr": liquid_prandtl,
"alpha": liquid_thermal_diffusivity,
"pi": pi,
}
beta_empirical = DataFrame(index=bubble_fourier_smooth).assign(**{
label: corr.expr(**{
kwd: value
for kwd, value in constants.items()
if kwd in Signature.from_callable(corr.expr).parameters
})
for label, corr in beta_correlations.items()
})
beta_empirical_pivoted = (
melt(
beta_empirical.where(lambda s: s > 0.0),
value_vars=list(beta_correlations.keys()),
var_name="Correlation",
ignore_index=False,
)
.assign(**{
C.bub_fourier(): lambda df: df.index,
C.bub_beta(): lambda df: df["value"],
"Group": lambda df: df["Correlation"].map(GROUPS),
})
.reset_index(drop=True)
).sort_values("Group")
data.plots.corr, ax = subplots()
ax.set_xlim(0, MAX_FOURIER)
ax.set_ylim(0, MAX_BETA)
lineplot(
ax=ax,
x=C.bub_fourier(),
y=C.bub_beta(),
data=beta_empirical_pivoted,
style="Correlation",
hue="Group",
palette=CORRELATIONS_PALETTE,
)
params.format.move_legend(ax)
palette, tracks_data = get_cat_colorbar(
ax, palette=TRACKS_PALETTE, data=data.dfs.dst, col=C.bub(), alpha=TRACKS_ALPHA
)
scatterplot(
ax=ax,
s=TRACKS_SIZE,
alpha=TRACKS_ALPHA,
x=C.bub_fourier(),
y=C.bub_beta(),
hue=C.bub(),
palette=TRACKS_PALETTE,
legend=False,
data=tracks_data,
)
data.plots.corr.set_figwidth(1.89 * data.plots.corr.get_figwidth())
data.plots.corr.set_figheight(1.30 * data.plots.corr.get_figheight())