Find objects#

Find bubble size using two different approaches.

Hide code cell source
from collections.abc import Iterable

from devtools import pprint
from geopandas import GeoDataFrame, points_from_xy
from matplotlib.pyplot import subplots
from more_itertools import one, only
from numpy import pi, sqrt
from pandas import DataFrame, IndexSlice, NamedAgg, concat
from seaborn import scatterplot
from shapely import LinearRing, Polygon

from boilercv.data import FRAME, VIDEO
from boilercv.images import scale_bool
from boilercv_docs.nbs import init
from boilercv_pipeline.dfs import limit_group_size
from boilercv_pipeline.images import bounded_ax
from boilercv_pipeline.models.column import Col, rename
from boilercv_pipeline.models.deps import get_slices
from boilercv_pipeline.models.df import GBC, agg
from boilercv_pipeline.models.params import Format
from boilercv_pipeline.models.subcool import const
from boilercv_pipeline.sets import get_contours_df2, get_dataset2
from boilercv_pipeline.stages.find_objects import FindObjects as Params

PARAMS = None
Hide code cell source
if isinstance(PARAMS, str):
    params = Params.model_validate_json(PARAMS)
else:
    params = Params(
        context=init(),
        compare_with_trackpy=True,
        include_patterns=const.nb_include_patterns,
        slicer_patterns=const.nb_slicer_patterns,
    )
params.format.set_display_options()
data = params.data
C = params.cols
context = params.context
contours = get_contours_df2(one(params.contours))
slices = get_slices(one(params.filled_slicers))
frames = slices.get(FRAME, slice(None))
filled = scale_bool(get_dataset2(one(params.filled), slices=slices)[VIDEO])
dfs = only(params.dfs)

PALETTE = {C.approach_tp.val: "red", C.approach.val: "blue"}
IMSHOW_FORMAT = Format(scale=1.0)


def preview(
    df: DataFrame, cols: Iterable[Col] | None = None, index: Col | None = None
) -> DataFrame:
    """Preview a dataframe in the notebook."""
    df = params.format.preview(
        cols=cols,
        df=df,
        index=index,
        f=lambda df: df.groupby(C.frame(), **GBC).head(3).head(6),
    )
    return df


pprint(params)

Data#

Load a video of filled contours and the contour loci and plot a composite of all frames to analyze.

Hide code cell source
data.plots.composite, ax = subplots()
composite_video = filled.max(FRAME).values
with params.format.display_options(IMSHOW_FORMAT), bounded_ax(composite_video, ax=ax):
    ax.imshow(~composite_video, alpha=0.4)
    ax.set_xlabel(C.x.latex)
    ax.set_ylabel(C.y.latex)

Find size from filled contours using Trackpy#

Use Trackpy to find bubble size given the filled contours.

Hide code cell source
if params.compare_with_trackpy:
    from trackpy import batch, quiet

    quiet()

    # ? Assign here to avoid duplicate columns in schema
    trackpy_cols = [*C.trackpy, C.x_tp, C.y_tp]

    data.dfs.trackpy = preview(
        cols=trackpy_cols,
        df=batch(
            frames=filled.values, diameter=params.guess_diameter, characterize=True
        )
        .pipe(C.frame.rename)
        .assign(**{
            C.frame(): lambda df: df[C.frame()].replace(
                dict(enumerate(filled.frame.values))
            )
        })
        .pipe(rename, trackpy_cols)[[c() for c in trackpy_cols]],
    )

Find size from contours#

The prior approach throws out contour data, instead operating on filled contours. Instead, try using shapely to find size directly from contour data.

Prepare to find objects#

Prepare a dataframe with columns in a certain order, assign contour data to it, and demote the hiearchical indices to plain columns. Count the number of points in each contour and each frame, keeping only those which have enough points to describe a linear ring. Construct a GeoPandas geometry column and operate on it with Shapely to construct linear rings, returning centroids and the representative polygonal area. Also report the number of points in the loci of each contour per frame.

Hide code cell source
data.dfs.centroids = preview(
    cols=C.centroids,
    df=contours.loc[IndexSlice[frames, :], :]
    .reset_index()
    .pipe(rename, C.sources)
    .pipe(limit_group_size, [C.frame(), C.contour()], 3)
    .assign(**{C.geometry(): lambda df: points_from_xy(df[C.x()], df[C.y()])})
    .groupby([C.frame(), C.contour()], **GBC)
    .pipe(
        agg,
        {
            C.centroid(): NamedAgg(C.geometry(), lambda df: LinearRing(df).centroid),
            C.area(): NamedAgg(C.geometry(), lambda df: Polygon(df).area),
        },
    ),
)

Split the centroid point objects into separate named columns that conform to the Trackpy convention. Report the centroids in each frame.

Hide code cell source
data.dfs.geo = params.format.preview(
    cols=C.geo,
    df=data.dfs.centroids.assign(**{
        C.diameter(): lambda df: sqrt(4 * df[C.area()] / pi),
        C.radius_of_gyration(): lambda df: df[C.diameter()] / 4,
        C.size(): lambda df: df[C.radius_of_gyration()],
    }),
)
Hide code cell source
data.dfs.dst = preview(
    cols=C.dests,
    df=GeoDataFrame(data.dfs.geo).assign(**{
        C.x(): lambda df: df[C.centroid()].x,
        C.y(): lambda df: df[C.centroid()].y,
    })[[c() for c in C.dests]],
)

Compare approaches#

Compare Trackpy objects with contour objects. Here the guess radius for Trackpy object finding and contour perimeter filtering are matched to produce the same number of objects from each algorithm. Trackpy features more intelligent filtering, but takes much longer. Trackpy’s approach for finding local maxima in grayscale images is applied even to binarized images, exhaustively searching for high points in the binary image, adding to execution time.

The percent difference between the approaches is relatively low for this subset, suggesting the contour centroid approach is reasonable.

A warm color palette is used to plot Trackpy objects, and a cool color palette is used to plot contour centroids.

Hide code cell source
data.dfs.trackpy
Hide code cell source
with params.format.display_options(IMSHOW_FORMAT):
    scatterplot(
        ax=ax,
        alpha=0.5,
        x=C.x(),
        y=C.y(),
        hue=C.approach(),
        palette=PALETTE,
        data=concat([
            (
                data.dfs.trackpy[[C.y_tp(), C.x_tp()]].pipe(C.approach_tp.assign)
                if params.compare_with_trackpy
                else DataFrame()
            ),
            data.dfs.dst[[C.y(), C.x()]].pipe(C.approach.assign),
        ]),
        legend=params.compare_with_trackpy,
    )
    if params.compare_with_trackpy:
        params.format.move_legend(ax, ncol=1)
data.plots.composite