P-vector fit comparison#

Hide code cell source
from __future__ import annotations

import logging
import os
import re
from collections import defaultdict
from collections.abc import Iterable, Mapping
from functools import lru_cache
from itertools import product
from pathlib import Path
from textwrap import dedent
from typing import Any

import ampform
import attrs
import graphviz
import jax.numpy as jnp
import matplotlib.pyplot as plt
import numpy as np
import qrules
import sympy as sp
from ampform.dynamics.builder import TwoBodyKinematicVariableSet
from ampform.helicity import HelicityModel
from ampform.io import aslatex, improve_latex_rendering
from ampform.kinematics.phasespace import Kallen
from ampform.sympy import perform_cached_doit, unevaluated
from attrs import define, field, frozen
from IPython.display import Markdown, Math, display
from qrules.particle import Particle, ParticleCollection
from qrules.transition import ReactionInfo
from sympy import Abs
from sympy.matrices.expressions.matexpr import MatrixElement
from tensorwaves.data import (
    IntensityDistributionGenerator,
    SympyDataTransformer,
    TFPhaseSpaceGenerator,
    TFUniformRealNumberGenerator,
    TFWeightedPhaseSpaceGenerator,
)
from tensorwaves.estimator import UnbinnedNLL
from tensorwaves.function.sympy import create_parametrized_function
from tensorwaves.interface import (
    DataSample,
    Estimator,
    FitResult,
    Function,
    ParameterValue,
)
from tensorwaves.optimizer import Minuit2

improve_latex_rendering()
logging.getLogger("absl").setLevel(logging.ERROR)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
plt.rc("font", size=12)

Studied decay#

Hide code cell source
@lru_cache(maxsize=1)
def create_particle_database() -> ParticleCollection:
    particles = qrules.load_default_particles()
    for nstar in particles.filter(lambda p: p.name.startswith("N")):
        particles.remove(nstar)
    particles += create_nstar(mass=1.82, width=0.6, parity=+1, spin=1.5, idx=1)
    return particles


def create_nstar(
    mass: float, width: float, parity: int, spin: float, idx: int
) -> Particle:
    spin = sp.Rational(spin)
    parity_symbol = "⁺" if parity > 0 else "⁻"
    unicode_subscripts = list("₀₁₂₃₄₅₆₇₈₉")
    return Particle(
        name=f"N{unicode_subscripts[idx]}({spin}{parity_symbol})",
        latex=Rf"N_{idx}({spin.numerator}/{spin.denominator}^-)",
        pid=2024_05_00_00 + 100 * bool(parity + 1) + idx,
        mass=mass,
        width=width,
        baryon_number=1,
        charge=+1,
        isospin=(0.5, +0.5),
        parity=parity,
        spin=1.5,
    )
FINAL_STATES: list[tuple[str, ...]] = [
    ["K0", "Sigma+", "p~"],
    ["eta", "p", "p~"],
]
REACTIONS: list[ReactionInfo] = [
    qrules.generate_transitions(
        initial_state="J/psi(1S)",
        final_state=final_state,
        allowed_intermediate_particles=["N"],
        allowed_interaction_types=["strong"],
        formalism="helicity",
        particle_db=create_particle_database(),
    )
    for final_state in FINAL_STATES
]
Hide code cell source
for i, reaction in enumerate(REACTIONS, 1):
    src = qrules.io.asdot(reaction, collapse_graphs=True)
    graph = graphviz.Source(src)
    output_file = Path(f"032/graph{i}")
    output_file.parent.mkdir(exist_ok=True)
    graph.render(output_file, format="svg")
    output_file.unlink()
    display(graph)
    del reaction, src, output_file, graph

Amplitude builder#

Hide code cell source
@define
class DynamicsSymbolBuilder:
    collected_symbols: set[sp.Symbol, tuple[Particle, TwoBodyKinematicVariableSet]] = (
        field(factory=lambda: defaultdict(set))
    )

    def __call__(
        self, resonance: Particle, variable_pool: TwoBodyKinematicVariableSet
    ) -> tuple[sp.Expr, dict[sp.Symbol, float]]:
        jp = render_jp(resonance)
        charge = resonance.charge
        if variable_pool.angular_momentum is not None:
            L = sp.Rational(variable_pool.angular_momentum)
            X = sp.Symbol(Rf"X_{{{jp}, Q={charge:+d}}}^{{l={L}}}")
        else:
            X = sp.Symbol(Rf"X_{{{jp}, Q={charge:+d}}}")
        self.collected_symbols[X].add((resonance, variable_pool))
        parameter_defaults = {}
        return X, parameter_defaults


def render_jp(particle: Particle) -> str:
    spin = sp.Rational(particle.spin)
    j = (
        str(spin)
        if spin.denominator == 1
        else Rf"\frac{{{spin.numerator}}}{{{spin.denominator}}}"
    )
    if particle.parity is None:
        return f"J={j}"
    p = "-" if particle.parity < 0 else "+"
    return f"J^P={{{j}}}^{{{p}}}"
MODELS: list[HelicityModel] = []
for reaction in REACTIONS:
    builder = ampform.get_builder(reaction)
    builder.adapter.permutate_registered_topologies()
    builder.config.scalar_initial_state_mass = True
    builder.config.stable_final_state_ids = [0, 1, 2]
    create_dynamics_symbol = DynamicsSymbolBuilder()
    for resonance in reaction.get_intermediate_particles():
        builder.set_dynamics(resonance.name, create_dynamics_symbol)
    MODELS.append(builder.formulate())
    del builder, reaction, resonance
Hide code cell source
selected_amplitudes = {
    k: v for i, (k, v) in enumerate(MODELS[0].amplitudes.items()) if i == 0
}
Math(aslatex(selected_amplitudes, terms_per_line=1))
\[\begin{split}\displaystyle \begin{array}{rcl} A^{01}_{0, 0, - \frac{1}{2}, - \frac{1}{2}} &=& - C_{J/\psi(1S) \to {N_1(3/2^-)}_{+1/2} \overline{p}_{+1/2}; N_1(3/2^-) \to K^{0}_{0} \Sigma^{+}_{+1/2}} X_{J^P={\frac{3}{2}}^{+}, Q=+1} D^{1}_{0,0}\left(- \phi_{01},\theta_{01},0\right) D^{\frac{3}{2}}_{- \frac{1}{2},\frac{1}{2}}\left(- \phi^{01}_{0},\theta^{01}_{0},0\right) \\ &+& - C_{J/\psi(1S) \to {N_1(3/2^-)}_{+1/2} \overline{p}_{-1/2}; N_1(3/2^-) \to K^{0}_{0} \Sigma^{+}_{+1/2}} X_{J^P={\frac{3}{2}}^{+}, Q=+1} D^{1}_{0,1}\left(- \phi_{01},\theta_{01},0\right) D^{\frac{3}{2}}_{\frac{1}{2},\frac{1}{2}}\left(- \phi^{01}_{0},\theta^{01}_{0},0\right) \\ &+& - C_{J/\psi(1S) \to {N_1(3/2^-)}_{+3/2} \overline{p}_{+1/2}; N_1(3/2^-) \to K^{0}_{0} \Sigma^{+}_{+1/2}} X_{J^P={\frac{3}{2}}^{+}, Q=+1} D^{1}_{0,-1}\left(- \phi_{01},\theta_{01},0\right) D^{\frac{3}{2}}_{- \frac{3}{2},\frac{1}{2}}\left(- \phi^{01}_{0},\theta^{01}_{0},0\right) \\ \end{array}\end{split}\]
Hide code cell source
src = R"\begin{array}{cll}" "\n"
for symbol, resonances in create_dynamics_symbol.collected_symbols.items():
    src += Rf"  {symbol} \\" "\n"
    for p, _ in resonances:
        src += Rf"  {p.latex} & m={p.mass:g}\text{{ GeV}} & \Gamma={p.width:g}\text{{ GeV}} \\"
        src += "\n"
src += R"\end{array}"
Math(src)
\[\begin{split}\displaystyle \begin{array}{cll} X_{J^P={\frac{3}{2}}^{+}, Q=+1} \\ N_1(3/2^-) & m=1.82\text{ GeV} & \Gamma=0.6\text{ GeV} \\ \end{array}\end{split}\]

Dynamics parametrization#

Phasespace factor#

See also

TR-026 and TR-027 on analyticity and Riemann sheets.

Hide code cell source
@unevaluated(real=False)
class PhaseSpaceCM(sp.Expr):
    s: Any
    m1: Any
    m2: Any
    _latex_repr_ = R"\rho^\mathrm{{CM}}_{{{m1},{m2}}}\left({s}\right)"

    def evaluate(self) -> sp.Expr:
        s, m1, m2 = self.args
        return -16 * sp.pi * sp.I * ChewMandelstam(s, m1, m2)


@unevaluated(real=False)
class ChewMandelstam(sp.Expr):
    s: Any
    m1: Any
    m2: Any
    _latex_repr_ = R"\Sigma\left({s}\right)"

    def evaluate(self) -> sp.Expr:
        s, m1, m2 = self.args
        q = BreakupMomentum(s, m1, m2)
        return (
            (2 * q / sp.sqrt(s))
            * sp.log(Abs((m1**2 + m2**2 - s + 2 * sp.sqrt(s) * q) / (2 * m1 * m2)))
            - (m1**2 - m2**2) * (1 / s - 1 / (m1 + m2) ** 2) * sp.log(m1 / m2)
        ) / (16 * sp.pi**2)


@unevaluated(real=False)
class BreakupMomentum(sp.Expr):
    s: Any
    m1: Any
    m2: Any
    _latex_repr_ = R"q\left({s}\right)"

    def evaluate(self) -> sp.Expr:
        s, m1, m2 = self.args
        return sp.sqrt(Kallen(s, m1**2, m2**2)) / (2 * sp.sqrt(s))
Hide code cell source
s, m1, m2 = sp.symbols("s m1 m2", nonnegative=True)
exprs = [
    PhaseSpaceCM(s, m1, m2),
    ChewMandelstam(s, m1, m2),
    BreakupMomentum(s, m1, m2),
]
Math(aslatex({e: e.doit(deep=False) for e in exprs}))
\[\begin{split}\displaystyle \begin{array}{rcl} \rho^\mathrm{CM}_{m_{1},m_{2}}\left(s\right) &=& - 16 i \pi \Sigma\left(s\right) \\ \Sigma\left(s\right) &=& \frac{- \left(m_{1}^{2} - m_{2}^{2}\right) \left(- \frac{1}{\left(m_{1} + m_{2}\right)^{2}} + \frac{1}{s}\right) \log{\left(\frac{m_{1}}{m_{2}} \right)} + \frac{2 \log{\left(\frac{\left|{m_{1}^{2} + m_{2}^{2} + 2 \sqrt{s} q\left(s\right) - s}\right|}{2 m_{1} m_{2}} \right)} q\left(s\right)}{\sqrt{s}}}{16 \pi^{2}} \\ q\left(s\right) &=& \frac{\sqrt{\lambda\left(s, m_{1}^{2}, m_{2}^{2}\right)}}{2 \sqrt{s}} \\ \end{array}\end{split}\]

\(K\)-matrix formalism#

n_channels = len(REACTIONS)
I = sp.Identity(n_channels)
K = sp.MatrixSymbol("K", n_channels, n_channels)
P = sp.MatrixSymbol("P", n_channels, 1)
F = sp.MatrixSymbol("F", n_channels, 1)
rho = sp.MatrixSymbol("rho", n_channels, n_channels)
Hide code cell source
def get_decay_products(reaction: ReactionInfo) -> DecayProducts:
    some_transition, *_ = reaction.transitions
    decay_product_ids = some_transition.topology.get_edge_ids_outgoing_from_node(1)
    for transition in reaction.transitions:
        if decay_product_ids != transition.topology.get_edge_ids_outgoing_from_node(1):
            msg = "Reaction contains multiple sub-systems"
            raise ValueError(msg)
    child1_id, child2_id = sorted(decay_product_ids)
    return DecayProducts(
        child1=reaction.final_state[child1_id],
        child2=reaction.final_state[child2_id],
    )


@frozen
class DecayProducts:
    child1: Particle
    child2: Particle

    @property
    def children(self) -> tuple[Particle, Particle]:
        return self.child1, self.child2


DECAYS = tuple(get_decay_products(m.reaction_info) for m in MODELS)
PARAMETERS_DEFAULTS = {}
for model in MODELS:
    PARAMETERS_DEFAULTS.update(model.parameter_defaults)
    del model
PARAMETERS_DEFAULTS = {
    par: value
    for par, value in PARAMETERS_DEFAULTS.items()
    if not re.match(r"^m_\d+$", par.name)
}

\(K\)-matrix parametrization#

Hide code cell source
def formulate_k_matrix(
    resonances: list[tuple[Particle, int]], n_channels: int
) -> dict[MatrixElement, sp.Expr]:
    expressions = {}
    for i, j in product(range(n_channels), range(n_channels)):
        resonance_contributions = []
        for res, _ in resonances:
            s = sp.Symbol("m_01", real=True) ** 2
            g_Ri = sp.Symbol(Rf"g_{{{res.latex},{i}}}")
            g_Rj = sp.Symbol(Rf"g_{{{res.latex},{j}}}")
            m_R = sp.Symbol(Rf"m_{{{res.latex}}}")
            parameter_defaults = {
                m_R: res.mass,
                g_Ri: 1,
                g_Rj: 0.1,
            }
            PARAMETERS_DEFAULTS.update(parameter_defaults)
            expr = (g_Ri * g_Rj) / (m_R**2 - s)
            resonance_contributions.append(expr)
        expressions[K[i, j]] = sum(resonance_contributions)
    return expressions


K_expressions = formulate_k_matrix(resonances, n_channels=len(REACTIONS))
K_matrix = K.as_explicit()
K.as_explicit().xreplace(K_expressions)
\[\begin{split}\displaystyle \left[\begin{matrix}\frac{\left(g_{N_1(3/2^-),0}\right)^{2}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}} & \frac{g_{N_1(3/2^-),0} g_{N_1(3/2^-),1}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}}\\\frac{g_{N_1(3/2^-),0} g_{N_1(3/2^-),1}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}} & \frac{\left(g_{N_1(3/2^-),1}\right)^{2}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}}\end{matrix}\right]\end{split}\]

\(P\)-vector parametrization#

Hide code cell source
def formulate_p_vector(
    resonances: list[tuple[Particle, int]], n_channels: int
) -> dict[MatrixElement, sp.Expr]:
    expressions = {}
    for i in range(n_channels):
        resonance_contributions = []
        for res, _ in resonances:
            s = sp.Symbol("m_01", real=True) ** 2
            g_Ri = sp.Symbol(Rf"g_{{{res.latex},{i}}}")
            beta_R = sp.Symbol(Rf"\beta_{{{res.latex}}}")
            m_R = sp.Symbol(Rf"m_{{{res.latex}}}")
            parameter_defaults = {
                m_R: res.mass,
                beta_R: 1 + 0j,
                g_Ri: 1,
            }
            PARAMETERS_DEFAULTS.update(parameter_defaults)
            expr = (beta_R * g_Ri) / (m_R**2 - s)
            resonance_contributions.append(expr)
        expressions[P[i, 0]] = sum(resonance_contributions)
    return expressions


P_expressions = formulate_p_vector(resonances, n_channels=len(REACTIONS))
P_vector = P.as_explicit()
P.as_explicit().xreplace(P_expressions)
\[\begin{split}\displaystyle \left[\begin{matrix}\frac{\beta_{N_1(3/2^-)} g_{N_1(3/2^-),0}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}}\\\frac{\beta_{N_1(3/2^-)} g_{N_1(3/2^-),1}}{- m_{01}^{2} + \left(m_{N_1(3/2^-)}\right)^{2}}\end{matrix}\right]\end{split}\]

Phase space factor parametrization#

Hide code cell source
def formulate_phsp_factor_matrix(n_channels: int) -> dict[sp.MatrixElement, sp.Expr]:
    expressions = {}
    for i in range(n_channels):
        for j in range(n_channels):
            if i == j:
                m_a_i = sp.Symbol(Rf"m_{{0,{i}}}")
                m_b_i = sp.Symbol(Rf"m_{{1,{i}}}")
                s = sp.Symbol("m_01", real=True) ** 2
                rho_i = PhaseSpaceCM(s, m_a_i, m_b_i)
                expressions[rho[i, j]] = rho_i
                parameter_defaults = {
                    m_a_i: DECAYS[i].child1.mass,
                    m_b_i: DECAYS[i].child2.mass,
                }
                PARAMETERS_DEFAULTS.update(parameter_defaults)
            else:
                expressions[rho[i, j]] = 0
    return expressions


rho_expressions = formulate_phsp_factor_matrix(n_channels=len(REACTIONS))
rho.as_explicit().xreplace(rho_expressions)
\[\begin{split}\displaystyle \left[\begin{matrix}\rho^\mathrm{CM}_{m_{0,0},m_{1,0}}\left(m_{01}^{2}\right) & 0\\0 & \rho^\mathrm{CM}_{m_{0,1},m_{1,1}}\left(m_{01}^{2}\right)\end{matrix}\right]\end{split}\]

\(F\)-vector construction#

Note

For some reason one has to leave out the multiplication of \(\rho\) by \(i\) within the calculation of the \(F\) vector

F = (I - sp.I * K * rho).inv() * P
F
\[\displaystyle \left(\mathbb{I} + - i K \rho\right)^{-1} P\]
F_vector = F.as_explicit()
parametrizations = {**K_expressions, **rho_expressions, **P_expressions}
F_exprs = F_vector.xreplace(parametrizations)
F_exprs[0].simplify(doit=False)
\[\displaystyle \frac{i \beta_{N_1(3/2^-)} g_{N_1(3/2^-),0}}{\left(g_{N_1(3/2^-),0}\right)^{2} \rho^\mathrm{CM}_{m_{0,0},m_{1,0}}\left(m_{01}^{2}\right) + \left(g_{N_1(3/2^-),1}\right)^{2} \rho^\mathrm{CM}_{m_{0,1},m_{1,1}}\left(m_{01}^{2}\right) - i m_{01}^{2} + i \left(m_{N_1(3/2^-)}\right)^{2}}\]

Create numerical functions#

Hide code cell source
F_unfolded_exprs = np.array([perform_cached_doit(expr) for expr in F_exprs])
DYNAMICS_EXPRESSIONS_FVECTOR = [
    {
        symbol: F_unfolded_exprs[i]
        for symbol, resonances in create_dynamics_symbol.collected_symbols.items()
    }
    for i in range(n_channels)
]
MODELS_FVECTOR = [
    attrs.evolve(
        model,
        parameter_defaults=PARAMETERS_DEFAULTS,
    )
    for model in MODELS
]
FULL_EXPRESSIONS_FVECTOR = [
    perform_cached_doit(MODELS_FVECTOR[i].expression).xreplace(
        DYNAMICS_EXPRESSIONS_FVECTOR[i]
    )
    for i in range(n_channels)
]
INTENSITY_FUNCS_FVECTOR = [
    create_parametrized_function(
        expression=perform_cached_doit(FULL_EXPRESSIONS_FVECTOR[i]),
        backend="jax",
        parameters=MODELS_FVECTOR[i].parameter_defaults,
    )
    for i in range(n_channels)
]

Generate data#

Phase space sample#

HELICITY_TRANSFORMERS = [
    SympyDataTransformer.from_sympy(model.kinematic_variables, backend="jax")
    for model in MODELS_FVECTOR
]
PHSP = []
ε = 1e-8
for i in range(n_channels):
    rng = TFUniformRealNumberGenerator(seed=0)
    phsp_generator = TFPhaseSpaceGenerator(
        initial_state_mass=REACTIONS[i].initial_state[-1].mass,
        final_state_masses={it: p.mass for it, p in REACTIONS[i].final_state.items()},
    )
    phsp_momenta = phsp_generator.generate(100_000, rng)
    phsp = HELICITY_TRANSFORMERS[i](phsp_momenta)
    phsp = {k: v.real for k, v in phsp.items()}
    phsp = {k: v + ε * 1j if re.match(r"^m_\d\d$", k) else v for k, v in phsp.items()}
    PHSP.append(phsp)

Set parameters for toy model#

Hide code cell source
def fast_histogram(
    data: jnp.ndarray,
    weights: jnp.ndarray | None = None,
    bins: int = 100,
    density: bool | None = None,
    fill: bool = True,
    ax=plt,
    **plot_kwargs,
) -> None:
    bin_values, bin_edges = jnp.histogram(
        data,
        bins=bins,
        density=density,
        weights=weights,
    )
    if fill:
        bin_rights = bin_edges[1:]
        ax.fill_between(bin_rights, bin_values, step="pre", **plot_kwargs)
    else:
        bin_mids = (bin_edges[:-1] + bin_edges[1:]) / 2
        ax.step(bin_mids, bin_values, **plot_kwargs)
Hide code cell source
def indicate_masses(ax, intensity_func, set_labels: bool = True):
    mass_pars = {
        k: v for k, v in intensity_func.parameters.items() if k.startswith("m_{N")
    }
    for i, (k, v) in enumerate(mass_pars.items()):
        label = f"${k}$" if set_labels else None
        ax.axvline(v, c=f"C{i + n_channels}", label=label, ls="dashed")


def indicate_thresholds(ax, set_labels: bool = True) -> None:
    for i, decay in enumerate(DECAYS):
        m_thr = sum(p.mass for p in decay.children)
        label = None
        if set_labels:
            label = f"${'+'.join(f'm_{{{p.latex}}}' for p in decay.children)}$"
        ax.axvline(m_thr, c=f"C{i}", label=label, ls="dotted")
toy_parameters = {
    R"\beta_{N_1(3/2^-)}": 1 + 0j,
    R"m_{N_1(3/2^-)}": 1.71,
    R"g_{N_1(3/2^-),0}": 3.2,
    R"g_{N_1(3/2^-),1}": 1.5,
}
for func in INTENSITY_FUNCS_FVECTOR:
    func.update_parameters(toy_parameters)
Hide code cell source
fig, ax = plt.subplots(figsize=(9, 4))
ax.set_title("Model rendering from phase space")
ax.set_xlabel(R"$m_{p\eta/K\Sigma}$ [GeV]")
for i in range(n_channels):
    intensity = np.real(INTENSITY_FUNCS_FVECTOR[i](PHSP[i]))
    fast_histogram(
        np.real(PHSP[i]["m_01"]),
        weights=intensity,
        alpha=0.5,
        bins=200,
        density=True,
        label=f"${' '.join(p.latex for p in DECAYS[i].children)}$",
        ax=ax,
    )
indicate_thresholds(ax)
indicate_masses(ax, INTENSITY_FUNCS_FVECTOR[i])
ax.legend()
ax.set_ylim(0, None)

output_file = Path("032/weighted-phsp.svg")
output_file.parent.mkdir(exist_ok=True)
fig.savefig(output_file, bbox_inches="tight")
fig.show()

Toy data sample#

DATA = []
for i in range(n_channels):
    weighted_phsp_generator = TFWeightedPhaseSpaceGenerator(
        initial_state_mass=MODELS[i].reaction_info.initial_state[-1].mass,
        final_state_masses={
            i: p.mass for i, p in MODELS[i].reaction_info.final_state.items()
        },
    )
    data_generator = IntensityDistributionGenerator(
        domain_generator=weighted_phsp_generator,
        function=INTENSITY_FUNCS_FVECTOR[i],
        domain_transformer=HELICITY_TRANSFORMERS[i],
    )
    data_momenta = data_generator.generate(50_000, rng)
    data = HELICITY_TRANSFORMERS[i](data_momenta)
    DATA.append(data)
Hide code cell source
fig, ax = plt.subplots(figsize=(9, 4))
ax.set_title("Toy data sample")
ax.set_xlabel(R"$m_{p\eta/K\Sigma}$ [GeV]")
for i in range(n_channels):
    fast_histogram(
        np.real(DATA[i]["m_01"]),
        alpha=0.5,
        bins=200,
        density=True,
        label=f"${' '.join(p.latex for p in DECAYS[i].children)}$",
        ax=ax,
    )
indicate_thresholds(ax)
indicate_masses(ax, INTENSITY_FUNCS_FVECTOR[i])
ax.legend()
ax.set_ylim(0, None)

output_file = Path("032/toy-date.svg")
output_file.parent.mkdir(exist_ok=True)
fig.savefig(output_file, bbox_inches="tight")
fig.show()

Perform fit#

Estimator definition#

class EstimatorSum(Estimator):
    def __init__(self, estimators: Iterable[Estimator]) -> None:
        self.__estimators = tuple(estimators)

    def __call__(self, parameters: Mapping[str, ParameterValue]) -> float:
        return sum(estimator(parameters) for estimator in self.__estimators)

    def gradient(
        self, parameters: Mapping[str, ParameterValue]
    ) -> dict[str, ParameterValue]:
        raise NotImplementedError
combined_estimators = EstimatorSum(
    UnbinnedNLL(
        INTENSITY_FUNCS_FVECTOR[i],
        data=DATA[i],
        phsp=PHSP[i],
        backend="jax",
    )
    for i in range(n_channels)
)

Initial parameters#

Hide code cell source
def compare_models(functions: list[Function], title: str, bins: int = 100):
    fig, axes = plt.subplots(figsize=(8.5, 4.5), nrows=2, sharex=True)
    axes[0].set_title(title)
    for ax in axes:
        ax.set_yticks([])
    for i in range(n_channels):
        _plot_comparison(
            axes[i],
            decay_id=i,
            variable_name="m_01",
            function=functions[i],
            bins=bins,
            color=f"C{i}",
            legend=(i == 1),
        )
    fig.legend()
    fig.tight_layout()

    output_file = Path(f"032/{title.lower().replace(' ', '-')}.svg")
    output_file.parent.mkdir(exist_ok=True)
    fig.savefig(output_file, bbox_inches="tight")
    fig.show()


def _plot_comparison(
    ax,
    decay_id: int,
    variable_name: str,
    function: Function[DataSample, np.ndarray],
    bins: int,
    color: str,
    legend: bool,
):
    phsp = PHSP[decay_id]
    fast_histogram(
        DATA[decay_id][variable_name].real,
        alpha=0.5,
        bins=bins,
        color=color,
        density=True,
        label=f"Data ${' '.join(p.latex for p in DECAYS[decay_id].children)}$",
        ax=ax,
    )
    fast_histogram(
        phsp[variable_name].real,
        weights=function(phsp),
        bins=bins,
        color="red",
        density=True,
        fill=False,
        label="Fit model" if legend else None,
        ax=ax,
    )
    indicate_thresholds(ax, set_labels=legend)
    indicate_masses(ax, function, set_labels=legend)
    ax.set_ylim(0, None)
initial_parameters = {
    R"m_{N_1(3/2^-)}": 1.9,
    R"g_{N_1(3/2^-),0}": 2.8,
    R"g_{N_1(3/2^-),1}": 1.6,
}
Hide code cell source
ORIGINAL_PARAMETERS_F = []
for i in range(n_channels):
    ORIGINAL_PARAMETERS_F.append(dict(INTENSITY_FUNCS_FVECTOR[i].parameters))
    INTENSITY_FUNCS_FVECTOR[i].update_parameters(initial_parameters)
compare_models(INTENSITY_FUNCS_FVECTOR, title="Model with starting parameters")

Optimize parameters#

minuit2 = Minuit2()
fit_result = minuit2.optimize(combined_estimators, initial_parameters)
assert fit_result.minimum_valid
fit_result
FitResult(
 minimum_valid=True,
 execution_time=2.1707212924957275,
 function_calls=130,
 estimator_value=-31725.142543423186,
 parameter_values={
  'm_{N_1(3/2^-)}': 1.7281071116110833,
  'g_{N_1(3/2^-),0}': 3.3232991703411034,
  'g_{N_1(3/2^-),1}': 1.4418247651741936,
 },
 parameter_errors={
  'm_{N_1(3/2^-)}': 0.011169368106137329,
  'g_{N_1(3/2^-),0}': 0.06518758826745548,
  'g_{N_1(3/2^-),1}': 0.04383431405848117,
 },
)
Hide code cell source
for i in range(n_channels):
    INTENSITY_FUNCS_FVECTOR[i].update_parameters(fit_result.parameter_values)
compare_models(INTENSITY_FUNCS_FVECTOR, title="Model with optimized parameters")

Fit quality check#

Hide code cell source
def compute_aic_bic(fit_result: FitResult) -> tuple[float, float]:
    n_real_par = fit_result.count_number_of_parameters(complex_twice=True)
    n_events = len(next(iter(data.values())))
    log_likelihood = -fit_result.estimator_value
    aic = 2 * n_real_par - 2 * log_likelihood
    bic = n_real_par * np.log(n_events) - 2 * log_likelihood
    return aic, bic


def compare_parameters(initial: dict, optimized: dict, expected: dict) -> Markdown:
    parameters = sorted(set(initial) | set(optimized))
    src = dedent("""
    |   | initial | fit result | expected | deviation |
    |---|--------:|-----------:|---------:|----------:|
    """).strip()
    for p in parameters:
        columns = (
            f"${p}$",
            f"{initial[p]:.3g}",
            f"{optimized[p]:.3g}",
            f"{expected[p]:.3g}",
            f"{100 * abs((optimized[p] - expected[p]) / expected[p]):.1f}%",
        )
        src += "\n| " + " | ".join(columns) + " |"
    return Markdown(src)
compute_aic_bic(fit_result)
(-63444.28508684637, -63417.82575199314)
Hide code cell source
compare_parameters(
    initial=initial_parameters,
    optimized=fit_result.parameter_values,
    expected={
        **ORIGINAL_PARAMETERS_F[0],
        **ORIGINAL_PARAMETERS_F[1],
    },
)

initial

fit result

expected

deviation

\(g_{N_1(3/2^-),0}\)

2.8

3.32

3.2

3.9%

\(g_{N_1(3/2^-),1}\)

1.6

1.44

1.5

3.9%

\(m_{N_1(3/2^-)}\)

1.9

1.73

1.71

1.1%