Get thermal data

Get thermal data#

Hide code cell source
from collections.abc import Iterable

from boilercore.fits import fit_from_params
from boilercore.models.geometry import GEOMETRY
from boilercore.paths import ISOLIKE, dt_fromisolike
from devtools import pprint
from matplotlib.pyplot import subplots
from pandas import DataFrame, Series, concat, read_csv, read_hdf
from seaborn import lineplot, scatterplot

from boilercv_docs.nbs import init
from boilercv_pipeline.models.column import Col, convert, rename
from boilercv_pipeline.stages.find_objects import FindObjects
from boilercv_pipeline.stages.get_thermal_data import GetThermalData as Params
from boilercv_pipeline.units import U

PARAMS = None
Hide code cell source
if isinstance(PARAMS, str):
    params = Params.model_validate_json(PARAMS)
else:
    params = Params(context=init(), load_src_from_outs=True)

params.format.set_display_options()
data = params.data
C = params.cols
context = params.context


def fit(df: DataFrame, flux: Col, sample_temps: Iterable[Col]) -> DataFrame:
    """Fit model function across sample temperatures."""
    models = params.fit.get_models(params.deps.modelfunctions)[0]

    def apply_fit(df: DataFrame) -> "Series[float]":
        return df.loc[:, [c() for c in sample_temps]].apply(
            lambda ser: fit(ser)[0]["q_s"], axis="columns"
        )

    def fit(ser: "Series[float]") -> tuple[dict[str, float], dict[str, float]]:
        return fit_from_params(models, params.fit, GEOMETRY.rods["R"], ser)

    return df.assign(**{flux(): apply_fit})


pprint(params)
Hide code cell source
sources = [c.source.raw for c in C.sources]
data.dfs.src = params.format.preview(
    cols=C.sources,
    df=concat([
        read_csv(
            p,
            usecols=[C.time.source.raw, *sources],
            parse_dates=[C.time.source.raw],
            index_col=C.time.source.raw,
        )
        for p in params.deps.thermal_paths
    ])
    .reset_index()
    .pipe(rename, C.sources)
    .assign(**{
        C.time_elapsed(): lambda df: (
            (df[C.index()] - df[C.index()][0]).dt.total_seconds()
        )
    }),
)
Hide code cell source
only_dests = [C.time, *[c for c in C.dests if c not in C.sources]]
data.dfs.dst = params.format.preview(
    cols=only_dests,
    df=(
        DataFrame(read_hdf(params.outs.df))
        if params.load_src_from_outs and params.outs.df.exists()
        else (
            data.dfs.src.set_index(C.index())
            .resample("s")
            .median()
            .assign(**{
                C.video(): lambda df: df.index.isin(
                    df.index[
                        df.index.get_indexer(
                            [
                                dt_fromisolike(match)
                                for p in FindObjects(context=context).contours
                                if (match := ISOLIKE.search(p.stem))
                            ],
                            method="nearest",
                        )
                    ]
                )
            })
            .reset_index()
            .ffill()
            .assign(**{
                C.water_temp(): lambda df: df[[c() for c in C.water_temps]].mean(
                    axis="columns"
                ),
                C.boiling(): lambda df: df[C.water_temp()].max(),
                C.superheat(): lambda df: df[C.surface_temp()] - df[C.boiling()],
                C.subcool(): lambda df: df[C.boiling()] - df[C.water_temp()],
            })
            .pipe(fit, flux=C.flux.source, sample_temps=C.sample_temps)
            .pipe(convert, cols=[C.time_elapsed_min, C.flux], ureg=U)
        )
    )[[c() for c in C.dests]],
)
Hide code cell source
data.dfs.resampled = params.format.preview(
    cols=only_dests,
    df=(
        data.dfs.dst.set_index(C.index())
        .resample("20s")
        .agg({
            **dict.fromkeys([c() for c in C.dests if c != C.index], "median"),
            C.video(): "max",
        })
        .reset_index()
    ),
)
Hide code cell source
data.plots.subcool_superheat, ax = subplots()
lineplot(
    ax=ax,
    zorder=1,
    data=data.dfs.resampled.set_index(C.time_elapsed_min())[
        [C.subcool(), C.superheat()]
    ],
    dashes=False,
    errorbar=None,
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={C.video(): "*"},
    palette={C.video(): "red"},
    data=data.dfs.resampled.assign(**{
        C.video(): lambda df: df[df[C.video()]][C.superheat()]
    }).set_index(C.time_elapsed_min())[[C.video()]],
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={C.video(): "*"},
    palette={C.video(): "red"},
    data=data.dfs.resampled.assign(**{
        C.video(): lambda df: df[df[C.video()]][C.subcool()]
    }).set_index(C.time_elapsed_min())[[C.video()]],
    legend=False,
)
ax.set_ylabel(C.subcool.no_sub())
params.format.move_legend(ax)
Hide code cell source
data.plots.subcool, ax = subplots()
scatterplot(
    ax=ax, data=data.dfs.resampled, x=C.subcool(), y=C.flux(), hue=C.time_elapsed_min()
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={C.video(): "*"},
    palette={C.video(): "red"},
    data=data.dfs.resampled.assign(**{
        C.video(): lambda df: df[df[C.video()]][C.flux()]
    }).set_index(C.subcool())[[C.video()]],
)
ax.get_legend().set_title(C.time_elapsed_min())  # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax, ncol=4)
Hide code cell source
data.plots.superheat, ax = subplots()
scatterplot(
    ax=ax,
    data=data.dfs.resampled,
    x=C.superheat(),
    y=C.flux(),
    hue=C.time_elapsed_min(),
)
scatterplot(
    ax=ax,
    s=10 * params.format.marker_scale,
    markers={C.video(): "*"},
    palette={C.video(): "red"},
    data=data.dfs.resampled.assign(**{
        C.video(): lambda df: df[df[C.video()]][C.flux()]
    }).set_index(C.superheat())[[C.video()]],
)
ax.get_legend().set_title(C.time_elapsed_min())  # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax, ncol=4)