Get thermal data#
Show code cell source
from collections.abc import Iterable
from boilercore.fits import fit_from_params
from boilercore.models.geometry import GEOMETRY
from boilercore.paths import ISOLIKE, dt_fromisolike
from devtools import pprint
from matplotlib.pyplot import subplots
from pandas import DataFrame, Series, concat, read_csv, read_hdf
from seaborn import lineplot, scatterplot
from boilercv_docs.nbs import init
from boilercv_pipeline.models.column import Col, convert, rename
from boilercv_pipeline.stages.find_objects import FindObjects
from boilercv_pipeline.stages.get_thermal_data import GetThermalData as Params
from boilercv_pipeline.units import U
PARAMS = None
Show code cell source
if isinstance(PARAMS, str):
params = Params.model_validate_json(PARAMS)
else:
params = Params(context=init(), load_src_from_outs=True)
params.format.set_display_options()
data = params.data
C = params.cols
context = params.context
def fit(df: DataFrame, flux: Col, sample_temps: Iterable[Col]) -> DataFrame:
"""Fit model function across sample temperatures."""
models = params.fit.get_models(params.deps.modelfunctions)[0]
def apply_fit(df: DataFrame) -> "Series[float]":
return df.loc[:, [c() for c in sample_temps]].apply(
lambda ser: fit(ser)[0]["q_s"], axis="columns"
)
def fit(ser: "Series[float]") -> tuple[dict[str, float], dict[str, float]]:
return fit_from_params(models, params.fit, GEOMETRY.rods["R"], ser)
return df.assign(**{flux(): apply_fit})
pprint(params)
Show code cell source
sources = [c.source.raw for c in C.sources]
data.dfs.src = params.format.preview(
cols=C.sources,
df=concat([
read_csv(
p,
usecols=[C.time.source.raw, *sources],
parse_dates=[C.time.source.raw],
index_col=C.time.source.raw,
)
for p in params.deps.thermal_paths
])
.reset_index()
.pipe(rename, C.sources)
.assign(**{
C.time_elapsed(): lambda df: (
(df[C.index()] - df[C.index()][0]).dt.total_seconds()
)
}),
)
Show code cell source
only_dests = [C.time, *[c for c in C.dests if c not in C.sources]]
data.dfs.dst = params.format.preview(
cols=only_dests,
df=(
DataFrame(read_hdf(params.outs.df))
if params.load_src_from_outs and params.outs.df.exists()
else (
data.dfs.src.set_index(C.index())
.resample("s")
.median()
.assign(**{
C.video(): lambda df: df.index.isin(
df.index[
df.index.get_indexer(
[
dt_fromisolike(match)
for p in FindObjects(context=context).contours
if (match := ISOLIKE.search(p.stem))
],
method="nearest",
)
]
)
})
.reset_index()
.ffill()
.assign(**{
C.water_temp(): lambda df: df[[c() for c in C.water_temps]].mean(
axis="columns"
),
C.boiling(): lambda df: df[C.water_temp()].max(),
C.superheat(): lambda df: df[C.surface_temp()] - df[C.boiling()],
C.subcool(): lambda df: df[C.boiling()] - df[C.water_temp()],
})
.pipe(fit, flux=C.flux.source, sample_temps=C.sample_temps)
.pipe(convert, cols=[C.time_elapsed_min, C.flux], ureg=U)
)
)[[c() for c in C.dests]],
)
Show code cell source
data.dfs.resampled = params.format.preview(
cols=only_dests,
df=(
data.dfs.dst.set_index(C.index())
.resample("20s")
.agg({
**dict.fromkeys([c() for c in C.dests if c != C.index], "median"),
C.video(): "max",
})
.reset_index()
),
)
Show code cell source
data.plots.subcool_superheat, ax = subplots()
lineplot(
ax=ax,
zorder=1,
data=data.dfs.resampled.set_index(C.time_elapsed_min())[
[C.subcool(), C.superheat()]
],
dashes=False,
errorbar=None,
)
scatterplot(
ax=ax,
s=10 * params.format.marker_scale,
markers={C.video(): "*"},
palette={C.video(): "red"},
data=data.dfs.resampled.assign(**{
C.video(): lambda df: df[df[C.video()]][C.superheat()]
}).set_index(C.time_elapsed_min())[[C.video()]],
)
scatterplot(
ax=ax,
s=10 * params.format.marker_scale,
markers={C.video(): "*"},
palette={C.video(): "red"},
data=data.dfs.resampled.assign(**{
C.video(): lambda df: df[df[C.video()]][C.subcool()]
}).set_index(C.time_elapsed_min())[[C.video()]],
legend=False,
)
ax.set_ylabel(C.subcool.no_sub())
params.format.move_legend(ax)
Show code cell source
data.plots.subcool, ax = subplots()
scatterplot(
ax=ax, data=data.dfs.resampled, x=C.subcool(), y=C.flux(), hue=C.time_elapsed_min()
)
scatterplot(
ax=ax,
s=10 * params.format.marker_scale,
markers={C.video(): "*"},
palette={C.video(): "red"},
data=data.dfs.resampled.assign(**{
C.video(): lambda df: df[df[C.video()]][C.flux()]
}).set_index(C.subcool())[[C.video()]],
)
ax.get_legend().set_title(C.time_elapsed_min()) # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax, ncol=4)
Show code cell source
data.plots.superheat, ax = subplots()
scatterplot(
ax=ax,
data=data.dfs.resampled,
x=C.superheat(),
y=C.flux(),
hue=C.time_elapsed_min(),
)
scatterplot(
ax=ax,
s=10 * params.format.marker_scale,
markers={C.video(): "*"},
palette={C.video(): "red"},
data=data.dfs.resampled.assign(**{
C.video(): lambda df: df[df[C.video()]][C.flux()]
}).set_index(C.superheat())[[C.video()]],
)
ax.get_legend().set_title(C.time_elapsed_min()) # pyright: ignore[reportOptionalMemberAccess]
params.format.move_legend(ax, ncol=4)