Find tracks

Find tracks#

Hide code cell source
from collections.abc import Iterable
from datetime import datetime
from inspect import Signature

from boilercore.paths import ISOLIKE, dt_fromisolike
from devtools import pprint
from matplotlib.pyplot import subplot_mosaic, subplots
from more_itertools import one, only
from numpy import diff, gradient, linalg, log10, logspace, pi
from pandas import DataFrame, Series, melt, read_hdf
from seaborn import lineplot, scatterplot
from trackpy import link, quiet

from boilercv.correlations import GROUPS, beta
from boilercv.correlations import nusselt as correlations_nusselt
from boilercv.data import FRAME, TIME, VIDEO
from boilercv.dimensionless_params import (
    fourier,
    jakob,
    kinematic_viscosity,
    nusselt,
    prandtl,
    reynolds,
    thermal_diffusivity,
)
from boilercv.images import scale_bool
from boilercv_docs.nbs import init
from boilercv_pipeline.dfs import limit_group_size
from boilercv_pipeline.models.column import Col, LinkedCol, convert, rename
from boilercv_pipeline.models.deps import get_slices
from boilercv_pipeline.models.df import GBC
from boilercv_pipeline.models.subcool import const
from boilercv_pipeline.palettes import cat10, cool
from boilercv_pipeline.plotting import get_cat_colorbar
from boilercv_pipeline.sets import get_dataset2
from boilercv_pipeline.stages import find_objects, get_thermal_data
from boilercv_pipeline.stages.find_tracks import FindTracks as Params
from boilercv_pipeline.units import U

quiet()

PARAMS = None
Hide code cell source
if isinstance(PARAMS, str):
    params = Params.model_validate_json(PARAMS)
else:
    params = Params(
        context=init(),
        include_patterns=const.nb_include_patterns,
        slicer_patterns=const.nb_slicer_patterns,
    )
params.format.set_display_options()
data = params.data
C = params.cols
context = params.context

objects_path = one(params.objects)
objects = DataFrame(read_hdf(objects_path))
OC = find_objects.Cols()
thermal = read_hdf(params.deps.thermal)
TC = get_thermal_data.Cols()

slices = get_slices(one(params.filled_slicers))
frames = slices.get(FRAME, slice(None))
filled = scale_bool(get_dataset2(one(params.filled), slices=slices)[VIDEO])
dfs = only(params.dfs)

step_time = diff(filled.time.values)[0]
frames_per_step = one(params.slicer_patterns.values())[FRAME].step
time_per_frame = step_time / frames_per_step

cols_to_link = [OC.frame, OC.x_tp, OC.y_tp]

# ! To find (202 - 96)
# %matplotlib widget
# from boilercv_pipeline.sets import get_dataset
# _, ax = subplots()
# ax.imshow(get_dataset("2024-07-18T17-44-35", stage="large_sources")[VIDEO].sel(frame=0))

M_PER_PX = U.convert(3 / 8, "in", "m") / (202 - 96)
# PX_PER_M = 20997.3753
U.define(f"px = {M_PER_PX} m")
U.define(f"frames = {time_per_frame} s")

# Linking
SEARCH_RANGE = 5  # 35
"""Pixel range to search for the next bubble."""
MEMORY = 5  # * frames_per_step
"""Frames to remember a bubble."""

# Track tuning
Y_SURFACE_THRESHOLD = U.convert(280, "px", "m")
"""Vertical position of bubble centroids considered attached to the surface."""
Y_DEPARTURE_THRESHOLD = U.convert(300, "px", "m")
"""Vertical position of bubble centroids considered to have departed the surface."""
MINIMUM_LIFETIME = 0.010  # s
"""Minimum bubble lifetime to consider."""

# Physical parameters
LATENT_HEAT_OF_VAPORIZATION = 2.23e6  # J/kg
LIQUID_DENSITY = 960  # kg/m^3
LIQUID_DYNAMIC_VISCOSITY = 2.88e-4  # Pa-s
LIQUID_ISOBARIC_SPECIFIC_HEAT = 4213  # J/kg-K
LIQUID_THERMAL_CONDUCTIVITY = 0.676  # W/m-K
VAPOR_DENSITY = 0.804  # kg/m^3


# Plotting
CORRELATIONS_PALETTE = cat10
"""For plotting one approach."""
TRACKS_PALETTE = cool
"""For plotting the other approach."""
MAX_FOURIER = 0.005
"""Maximum Fourier number to plot."""
MAX_BETA = 1.05
"""Maximum dimensionless bubble diameter to plot."""
MAX_NUSSELT = 500
"""Maximum Nusselt number to plot."""
TRACKS_ALPHA = 0.3
"""Transparency of the tracks."""
TRACKS_SIZE = 10
"""Size of the tracks."""


def get_time() -> datetime:
    """Get time from path."""
    if match := ISOLIKE.search(objects_path.stem):
        return dt_fromisolike(match)
    else:
        raise ValueError("No time found in path.")


time = get_time()
subcooling = thermal.set_index(TC.time())[TC.subcool()][time]


def get_init(ser: "Series[float]") -> float:
    """Get initial value of a series."""
    return ser.head(int(MINIMUM_LIFETIME // step_time) or 1).median()


def get_delta(df: DataFrame, c: LinkedCol) -> "Series[float]":
    """Get position time delta across frames."""
    return df.groupby(C.bub(), **GBC)[[c.source()]].diff().fillna(0) / step_time


def query_lifetime(df: DataFrame) -> DataFrame:
    """Filter bubbles by lifetime."""
    return (
        df.rename(columns={C.bub_visible(): (temp_name := C.bub_visible.no_unit.name)})
        .query(f"`{temp_name}` > {MINIMUM_LIFETIME}")
        .rename(columns={temp_name: C.bub_visible()})
    )


def preview(
    df: DataFrame, cols: Iterable[Col] | None = None, index: Col | None = None
) -> DataFrame:
    """Preview a dataframe in the notebook."""
    df = params.format.preview(
        cols=cols,
        df=df,
        index=index,
        f=lambda df: df.groupby(C.bub(), **GBC).head(4).head(16),
    )
    return df


pprint(params)
Hide code cell source
data.dfs.tracks = preview(
    cols=[OC.frame, *C.tracks],
    df=link(
        # ? TrackPy expects certain column names
        f=objects.rename(columns={c(): c.source.raw for c in cols_to_link}),
        search_range=SEARCH_RANGE,
        memory=MEMORY,
    )
    .pipe(rename, cols_to_link)  # ? Back to our names
    .pipe(C.bub.rename)
    .assign(**{
        C.time_elapsed(): lambda df: filled.sel(frame=df[OC.frame()].values)[TIME],
        C.bub_visible_frames(): lambda df: (
            df.groupby(C.bub(), **GBC)[C.bub()].transform("count") * frames_per_step
        ),
    })
    .pipe(convert, [C.x, C.y, C.diameter, C.radius_of_gyration], U)
    .sort_values(
        [C.bub_visible_frames(), C.bub(), OC.frame()], ascending=[False, True, True]
    )
    .assign(**{
        C.bub(): (lambda df: df.groupby(C.bub(), **GBC).ngroup()),
        C.u(): lambda df: get_delta(df, C.u),
        C.v(): lambda df: get_delta(df, C.v),
        C.distance(): lambda df: linalg.norm(df[[C.u(), C.v()]].abs(), axis=1),
    })
    .pipe(convert, [C.bub_visible], U),
)
print(f"Found {data.dfs.tracks[C.bub()].nunique()} bubbles")
Hide code cell source
data.dfs.bubbles = preview(
    cols=C.bubbles,
    # ? Find rows corresponding to stagnant or invalid bubbles
    df=data.dfs.tracks.pipe(lambda df: df[df[C.bub_visible()] > MINIMUM_LIFETIME])
    .groupby(C.bub(), **GBC)[data.dfs.tracks.columns]
    .apply(
        # ? Don't assign any other columns until invalid rows have been filtered out
        lambda df: df.assign(**{
            "bubble_visible_y": lambda df: df[C.y()].pipe(get_init),
            # ? Initial y position is close to the surface
            "began": lambda df: df["bubble_visible_y"] > Y_SURFACE_THRESHOLD,
            # ? When the bubble gets far enough away from the surface
            "departed": lambda df: df[C.y()] < Y_DEPARTURE_THRESHOLD,
        })
    )
    # ? Filter out invalid rows
    .pipe(lambda df: df[df["began"] & df["departed"]])
    .pipe(limit_group_size, C.bub(), 1)
    # Groupby again after filtering out invalid rows
    .groupby(C.bub(), **GBC)[data.dfs.tracks.columns]
    # Now columns that depend on the initial row (*.iat[0]) can be assigned
    .apply(
        lambda df: df.assign(**{
            C.bub_time(): (
                lambda df: df[C.time_elapsed()] - df[C.time_elapsed()].iat[0]
            ),
            C.bub_lifetime(): lambda df: (
                df[C.bub_time()].iat[-1] - df[C.bub_time()].pipe(get_init)
            ),
            C.bub_t0(): lambda df: df[C.bub_time()].pipe(get_init),
            C.bub_x0(): lambda df: df[C.x()].pipe(get_init),
            C.bub_y0(): lambda df: df[C.y()].pipe(get_init),
            C.bub_d0(): lambda df: df[C.diameter()].pipe(get_init),
            C.bub_u0(): lambda df: df[C.u()].pipe(get_init),
            C.bub_v0(): lambda df: df[C.v()].pipe(get_init),
            C.max_diam(): lambda df: df[C.diameter()].max(),
            C.diam_rate_of_change(): lambda df: gradient(
                df[C.diameter()], df[C.bub_time()]
            ),
        })
    ),
)
print(f"{data.dfs.bubbles[C.bub()].nunique()} bubbles remain")
Hide code cell source
data.plots.multi, axs = subplot_mosaic([
    [C.y(), C.v()],
    [C.diameter(), C.diam_rate_of_change()],
])
data.plots.multi.set_figwidth(1.89 * data.plots.multi.get_figwidth())
data.plots.multi.set_figheight(1.30 * data.plots.multi.get_figheight())
for plot, ax in axs.items():
    palette, tracks_data = get_cat_colorbar(
        ax, C.bub(), TRACKS_PALETTE, data.dfs.bubbles
    )
    scatterplot(
        ax=ax,
        edgecolor="none",
        s=TRACKS_SIZE,
        alpha=TRACKS_ALPHA,
        x=C.bub_time(),
        y=plot,  # pyright: ignore[reportArgumentType] 1.1.356
        hue=C.bub(),
        legend=False,
        palette=palette,
        data=tracks_data,
    )
Hide code cell source
liquid_kinematic_viscosity = kinematic_viscosity(
    density=LIQUID_DENSITY, dynamic_viscosity=LIQUID_DYNAMIC_VISCOSITY
)
liquid_thermal_diffusivity = thermal_diffusivity(
    thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
    density=LIQUID_DENSITY,
    isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
)
data.dfs.dst = preview(
    cols=C.dests,
    df=data.dfs.bubbles.assign(**{
        C.bub_reynolds(): lambda df: reynolds(
            velocity=abs(df[C.v()]),
            characteristic_length=df[C.diameter()],
            kinematic_viscosity=liquid_kinematic_viscosity,
        ),
        C.bub_reynolds0(): lambda df: df.groupby(C.bub(), **GBC)[
            C.bub_reynolds()
        ].transform(get_init),
        C.bub_fourier(): fourier(
            initial_bubble_diameter=data.dfs.bubbles[C.max_diam()],
            liquid_thermal_diffusivity=liquid_thermal_diffusivity,
            time=data.dfs.bubbles[C.bub_time()],
        ),
        C.bub_nusselt(): nusselt(  # Nu_c
            heat_transfer_coefficient=-(
                2
                * VAPOR_DENSITY
                * LATENT_HEAT_OF_VAPORIZATION
                / subcooling
                * data.dfs.bubbles[C.diam_rate_of_change()]
            ),
            characteristic_length=data.dfs.bubbles[C.diameter()],
            thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
        ),
        C.bub_beta(): (lambda df: df[C.diameter()] / df[C.max_diam()]),
    }),
)
Hide code cell source
object_averages = data.dfs.dst.set_index(C.bub()).groupby(C.bub(), **GBC).mean().mean()
bubble_initial_reynolds = reynolds(
    velocity=abs(object_averages[C.bub_v0()]),
    characteristic_length=object_averages[C.bub_d0()],
    kinematic_viscosity=liquid_kinematic_viscosity,
)
liquid_prandtl = prandtl(
    dynamic_viscosity=LIQUID_DYNAMIC_VISCOSITY,
    isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
    thermal_conductivity=LIQUID_THERMAL_CONDUCTIVITY,
)
bubble_jakob = jakob(
    liquid_density=LIQUID_DENSITY,
    vapor_density=VAPOR_DENSITY,
    liquid_isobaric_specific_heat=LIQUID_ISOBARIC_SPECIFIC_HEAT,
    subcooling=subcooling,
    latent_heat_of_vaporization=LATENT_HEAT_OF_VAPORIZATION,
)
bubble_fourier_smooth = logspace(
    stop=log10(MAX_FOURIER), start=log10(MAX_FOURIER) - 4, num=int(1e4)
)
beta_correlations = beta.get_correlations()
nusselt_correlations = correlations_nusselt.get_correlations()
constants = {
    "Fo_0": bubble_fourier_smooth,
    "Ja": bubble_jakob,
    "Re_b0": bubble_initial_reynolds,
    "Pr": liquid_prandtl,
    "alpha": liquid_thermal_diffusivity,
    "pi": pi,
}
beta_empirical = DataFrame(index=bubble_fourier_smooth).assign(**{
    label: corr.expr(**{
        kwd: value
        for kwd, value in constants.items()
        if kwd in Signature.from_callable(corr.expr).parameters
    })
    for label, corr in beta_correlations.items()
})
beta_empirical_pivoted = (
    melt(
        beta_empirical.where(lambda s: s > 0.0),
        value_vars=list(beta_correlations.keys()),
        var_name="Correlation",
        ignore_index=False,
    )
    .assign(**{
        C.bub_fourier(): lambda df: df.index,
        C.bub_beta(): lambda df: df["value"],
        "Group": lambda df: df["Correlation"].map(GROUPS),
    })
    .reset_index(drop=True)
).sort_values("Group")
data.plots.corr, ax = subplots()
ax.set_xlim(0, MAX_FOURIER)
ax.set_ylim(0, MAX_BETA)
lineplot(
    ax=ax,
    x=C.bub_fourier(),
    y=C.bub_beta(),
    data=beta_empirical_pivoted,
    style="Correlation",
    hue="Group",
    palette=CORRELATIONS_PALETTE,
)
params.format.move_legend(ax)
palette, tracks_data = get_cat_colorbar(
    ax, palette=TRACKS_PALETTE, data=data.dfs.dst, col=C.bub(), alpha=TRACKS_ALPHA
)
scatterplot(
    ax=ax,
    s=TRACKS_SIZE,
    alpha=TRACKS_ALPHA,
    x=C.bub_fourier(),
    y=C.bub_beta(),
    hue=C.bub(),
    palette=TRACKS_PALETTE,
    legend=False,
    data=tracks_data,
)
data.plots.corr.set_figwidth(1.89 * data.plots.corr.get_figwidth())
data.plots.corr.set_figheight(1.30 * data.plots.corr.get_figheight())