Source code for pymc.distributions.timeseries

#   Copyright 2024 - present The PyMC Developers
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
import abc
import warnings

from abc import ABCMeta
from collections.abc import Callable

import pytensor
import pytensor.tensor as pt

from pytensor.graph.basic import Apply
from pytensor.graph.replace import clone_replace
from pytensor.graph.traversal import ancestors
from pytensor.tensor import TensorVariable
from pytensor.tensor.random.op import RandomVariable
from pytensor.tensor.random.utils import normalize_size_param

from pymc.distributions.continuous import Normal, get_tau_sigma
from pymc.distributions.distribution import (
    Distribution,
    SymbolicRandomVariable,
    _support_point,
    support_point,
)
from pymc.distributions.multivariate import MvNormal, MvStudentT
from pymc.distributions.shape_utils import (
    _change_dist_size,
    change_dist_size,
    get_support_shape,
    get_support_shape_1d,
    rv_size_is_none,
)
from pymc.exceptions import NotConstantValueError
from pymc.logprob.abstract import _logprob
from pymc.logprob.basic import logp
from pymc.pytensorf import constant_fold
from pymc.util import check_dist_not_registered

__all__ = [
    "AR",
    "GARCH11",
    "EulerMaruyama",
    "GaussianRandomWalk",
    "MvGaussianRandomWalk",
    "MvStudentTRandomWalk",
    "RandomWalk",
]


class RandomWalkRV(SymbolicRandomVariable):
    """RandomWalk Variable."""

    _print_name = ("RandomWalk", "\\operatorname{RandomWalk}")

    @classmethod
    def rv_op(cls, init_dist, innovation_dist, steps, size=None):
        # We don't allow passing `rng` because we don't fully control the rng of the components!
        steps = pt.as_tensor(steps, dtype=int, ndim=0)

        dist_ndim_supp = init_dist.owner.op.ndim_supp
        init_dist_shape = tuple(init_dist.shape)
        init_dist_batch_shape = init_dist_shape[: len(init_dist_shape) - dist_ndim_supp]
        innovation_dist_shape = tuple(innovation_dist.shape)
        innovation_batch_shape = innovation_dist_shape[
            : len(innovation_dist_shape) - dist_ndim_supp
        ]
        ndim_supp = dist_ndim_supp + 1

        size = normalize_size_param(size)

        # If not explicit, size is determined by the shapes of the input distributions
        if rv_size_is_none(size):
            size = pt.broadcast_shape(
                init_dist_batch_shape, innovation_batch_shape, arrays_are_shapes=True
            )

        # Resize input distributions. We will size them to (T, B, S) in order
        # to safely take random draws. We later swap the steps dimension so
        # that the final distribution will follow (B, T, S)
        # init_dist must have shape (1, B, S)
        init_dist = change_dist_size(init_dist, (1, *size))
        # innovation_dist must have shape (T-1, B, S)
        innovation_dist = change_dist_size(innovation_dist, (steps, *size))

        # We can only infer the logp of a dimshuffled variables, if the dimshuffle is
        # done directly on top of a RandomVariable. Because of this we dimshuffle the
        # distributions and only then concatenate them, instead of the other way around.
        # shape = (B, 1, S)
        init_dist_dimswapped = pt.moveaxis(init_dist, 0, -ndim_supp)
        # shape = (B, T-1, S)
        innovation_dist_dimswapped = pt.moveaxis(innovation_dist, 0, -ndim_supp)
        # shape = (B, T, S)
        grw = pt.concatenate([init_dist_dimswapped, innovation_dist_dimswapped], axis=-ndim_supp)
        grw = pt.cumsum(grw, axis=-ndim_supp)

        innov_supp_dims = [f"d{i}" for i in range(dist_ndim_supp)]
        innov_supp_str = ",".join(innov_supp_dims)
        out_supp_str = ",".join(["t", *innov_supp_dims])
        extended_signature = (
            f"({innov_supp_str}),({innov_supp_str}),(s),[rng]->({out_supp_str}),[rng]"
        )
        return RandomWalkRV(
            [init_dist, innovation_dist, steps],
            # We pass steps_ through just so we can keep a reference to it, even though
            # it's no longer needed at this point
            [grw],
            extended_signature=extended_signature,
        )(init_dist, innovation_dist, steps)


class RandomWalk(Distribution):
    r"""RandomWalk Distribution.

    TODO: Expand docstrings
    """

    rv_type = RandomWalkRV
    rv_op = RandomWalkRV.rv_op

    def __new__(cls, *args, innovation_dist, steps=None, **kwargs):
        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=None,  # Shape will be checked in `cls.dist`
            dims=kwargs.get("dims"),
            observed=kwargs.get("observed"),
        )

        return super().__new__(cls, *args, innovation_dist=innovation_dist, steps=steps, **kwargs)

    @classmethod
    def dist(cls, init_dist, innovation_dist, steps=None, **kwargs) -> pt.TensorVariable:
        if not (
            isinstance(init_dist, pt.TensorVariable)
            and init_dist.owner is not None
            and isinstance(init_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("init_dist must be a distribution variable")
        check_dist_not_registered(init_dist)

        if not (
            isinstance(innovation_dist, pt.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("innovation_dist must be a distribution variable")
        check_dist_not_registered(innovation_dist)

        if init_dist.owner.op.ndim_supp != innovation_dist.owner.op.ndim_supp:
            raise TypeError(
                "init_dist and innovation_dist must have the same support dimensionality"
            )

        # We need to check this, because we clone the variables when we ignore their logprob next
        if init_dist in ancestors([innovation_dist]) or innovation_dist in ancestors([init_dist]):
            raise ValueError("init_dist and innovation_dist must be completely independent")

        steps = cls.get_steps(
            innovation_dist=innovation_dist,
            steps=steps,
            shape=kwargs.get("shape"),
            dims=None,
            observed=None,
        )
        if steps is None:
            raise ValueError("Must specify steps or shape parameter")
        steps = pt.as_tensor_variable(steps, dtype=int)

        return super().dist([init_dist, innovation_dist, steps], **kwargs)

    @classmethod
    def get_steps(cls, innovation_dist, steps, shape, dims, observed):
        # We need to know the ndim_supp of the innovation_dist
        if not (
            isinstance(innovation_dist, pt.TensorVariable)
            and innovation_dist.owner is not None
            and isinstance(innovation_dist.owner.op, RandomVariable | SymbolicRandomVariable)
        ):
            raise TypeError("innovation_dist must be a distribution variable")

        dist_ndim_supp = innovation_dist.owner.op.ndim_supp
        dist_shape = tuple(innovation_dist.shape)
        support_shape = None
        if steps is not None:
            support_shape = (steps,) + (dist_shape[len(dist_shape) - dist_ndim_supp :])
        support_shape = get_support_shape(
            support_shape=support_shape,
            shape=shape,
            dims=dims,
            observed=observed,
            support_shape_offset=1,
            ndim_supp=dist_ndim_supp + 1,
        )
        if support_shape is not None:
            steps = support_shape[-dist_ndim_supp - 1]
        return steps


@_change_dist_size.register(RandomWalkRV)
def change_random_walk_size(op, dist, new_size, expand):
    init_dist, innovation_dist, steps = dist.owner.inputs
    if expand:
        old_shape = tuple(dist.shape)
        old_size = old_shape[: len(old_shape) - op.ndim_supp]
        new_size = tuple(new_size) + tuple(old_size)
    return RandomWalk.rv_op(init_dist, innovation_dist, steps, size=new_size)


@_support_point.register(RandomWalkRV)
def random_walk_support_point(op, rv, init_dist, innovation_dist, steps):
    # shape = (1, B, S)
    init_support_point = support_point(init_dist)
    # shape = (T-1, B, S)
    innovation_support_point = support_point(innovation_dist)
    # shape = (T, B, S)
    grw_support_point = pt.concatenate([init_support_point, innovation_support_point], axis=0)
    grw_support_point = pt.cumsum(grw_support_point, axis=0)
    # shape = (B, T, S)
    grw_support_point = pt.moveaxis(grw_support_point, 0, -op.ndim_supp)
    return grw_support_point


@_logprob.register(RandomWalkRV)
def random_walk_logp(op, values, *inputs, **kwargs):
    # Although we can derive the logprob of random walks, it does not collapse
    # what we consider the core dimension of steps. We do it manually here.
    (value,) = values
    # Recreate RV and obtain inner graph
    rv_node = op.make_node(*inputs)
    rv = clone_replace(op.inner_outputs, replace=dict(zip(op.inner_inputs, rv_node.inputs)))[
        op.default_output
    ]
    # Obtain logp of the inner graph and collapse steps dimension
    return logp(rv, value).sum(axis=-1)


class PredefinedRandomWalk(ABCMeta):
    """Base class for predefined RandomWalk distributions."""

    def __new__(cls, name, *args, **kwargs):
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk(name, init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    def dist(cls, *args, **kwargs) -> pt.TensorVariable:
        init_dist, innovation_dist, kwargs = cls.get_dists(*args, **kwargs)
        return RandomWalk.dist(init_dist=init_dist, innovation_dist=innovation_dist, **kwargs)

    @classmethod
    @abc.abstractmethod
    def get_dists(cls, *args, **kwargs):
        pass


[docs] class GaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Normal innovations. Parameters ---------- mu : tensor_like of float, default 0 innovation drift sigma : tensor_like of float, default 1 sigma > 0, innovation standard deviation. init_dist : unnamed_distribution Unnamed univariate distribution of the initial value. Unnamed refers to distributions created with the ``.dist()`` API. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Gaussian Random Walk (steps > 0). Only needed if shape is not provided. """ @classmethod def get_dists(cls, mu=0.0, sigma=1.0, *, init_dist=None, **kwargs): if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `Normal.dist(0, 100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100) mu = pt.as_tensor_variable(mu) sigma = pt.as_tensor_variable(sigma) innovation_dist = Normal.dist(mu=mu, sigma=sigma) return init_dist, innovation_dist, kwargs
[docs] class MvGaussianRandomWalk(PredefinedRandomWalk): r"""Random Walk with Multivariate Normal innovations. Parameters ---------- mu : tensor_like of float innovation drift cov : tensor_like of float pos def matrix, innovation covariance matrix tau : tensor_like of float pos def matrix, inverse covariance matrix chol : tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist : unnamed_distribution Unnamed multivariate distribution of the initial value. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """ @classmethod def get_dists(cls, mu, *, cov=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs): if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(pt.zeros_like(mu.shape[-1]), pt.eye(mu.shape[-1]) * 100) innovation_dist = MvNormal.dist(mu=mu, cov=cov, tau=tau, chol=chol, lower=lower) return init_dist, innovation_dist, kwargs
[docs] class MvStudentTRandomWalk(PredefinedRandomWalk): r"""Multivariate Random Walk with StudentT innovations. Parameters ---------- nu : int degrees of freedom mu : tensor_like of float innovation drift scale : tensor_like of float pos def matrix, innovation covariance matrix tau : tensor_like of float pos def matrix, inverse covariance matrix chol : tensor_like of float Cholesky decomposition of covariance matrix lower : bool, default=True Whether the cholesky fatcor is given as a lower triangular matrix. init_dist : unnamed_distribution Unnamed multivariate distribution of the initial value. .. warning:: init_dist will be cloned, rendering them independent of the ones passed as input. steps : int, optional Number of steps in Random Walk (steps > 0). Only needed if shape is not provided. Notes ----- Only one of cov, tau or chol is required. """ @classmethod def get_dists( cls, *, nu, mu, scale=None, tau=None, chol=None, lower=True, init_dist=None, **kwargs ): if init_dist is None: warnings.warn( "Initial distribution not specified, defaulting to `MvNormal.dist(0, I*100)`." "You can specify an init_dist manually to suppress this warning.", UserWarning, ) init_dist = MvNormal.dist(pt.zeros_like(mu.shape[-1]), pt.eye(mu.shape[-1]) * 100) innovation_dist = MvStudentT.dist( nu=nu, mu=mu, scale=scale, tau=tau, chol=chol, lower=lower, cov=kwargs.pop("cov", None) ) return init_dist, innovation_dist, kwargs
class AutoRegressiveRV(SymbolicRandomVariable): """A placeholder used to specify a distribution for an AR sub-graph.""" extended_signature = "(o),(),(o),(s),[rng]->[rng],(t)" ar_order: int constant_term: bool _print_name = ("AR", "\\operatorname{AR}") def __init__(self, *args, ar_order, constant_term, **kwargs): self.ar_order = ar_order self.constant_term = constant_term super().__init__(*args, **kwargs) @classmethod def rv_op(cls, rhos, sigma, init_dist, steps, ar_order, constant_term, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! noise_rng = pt.random.shared_rng(seed=None) size = normalize_size_param(size) # Init dist should have shape (*size, ar_order) if rv_size_is_none(size): # In this case the size of the init_dist depends on the parameters shape # The last dimension of rho and init_dist does not matter batch_size = pt.broadcast_shape( tuple(sigma.shape), tuple(rhos.shape)[:-1], tuple(pt.atleast_1d(init_dist).shape)[:-1], arrays_are_shapes=True, ) else: batch_size = size if init_dist.owner.op.ndim_supp == 0: init_dist_size = (*batch_size, ar_order) else: # In this case the support dimension must cover for ar_order init_dist_size = batch_size init_dist = change_dist_size(init_dist, init_dist_size) rhos_bcast_shape = init_dist.shape if constant_term: # In this case init shape is one unit smaller than rhos in the last dimension rhos_bcast_shape = (*rhos_bcast_shape[:-1], rhos_bcast_shape[-1] + 1) rhos_bcast = pt.broadcast_to(rhos, rhos_bcast_shape) def step(*args): *prev_xs, rng, reversed_rhos, sigma = args if constant_term: mu = reversed_rhos[-1] + pt.sum(prev_xs * reversed_rhos[:-1], axis=0) else: mu = pt.sum(prev_xs * reversed_rhos, axis=0) next_rng, new_x = Normal.dist(mu=mu, sigma=sigma, rng=rng, return_next_rng=True) return new_x, next_rng # We transpose inputs as scan iterates over first dimension innov, noise_next_rng = pytensor.scan( fn=step, outputs_info=[ {"initial": init_dist.T, "taps": range(-ar_order, 0)}, noise_rng, ], non_sequences=[rhos_bcast.T[::-1], sigma.T], n_steps=steps, strict=True, return_updates=False, ) ar = pt.concatenate([init_dist, innov.T], axis=-1) return AutoRegressiveRV( inputs=[rhos, sigma, init_dist, steps, noise_rng], outputs=[noise_next_rng, ar], ar_order=ar_order, constant_term=constant_term, )(rhos, sigma, init_dist, steps, noise_rng) def update(self, node: Apply): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class AR(Distribution): r"""Autoregressive process with p lags. .. math:: x_t = \rho_0 + \rho_1 x_{t-1} + \ldots + \rho_p x_{t-p} + \epsilon_t, \epsilon_t \sim N(0,\sigma^2) The innovation can be parameterized either in terms of precision or standard deviation. The link between the two parametrizations is given by .. math:: \tau = \dfrac{1}{\sigma^2} Parameters ---------- rho : tensor_like of float Tensor of autoregressive coefficients. The n-th entry in the last dimension is the coefficient for the n-th lag. sigma : tensor_like of float, default 1 Standard deviation of innovation (sigma > 0). Only required if tau is not specified. tau : tensor_like of float, optional Precision of innovation (tau > 0). constant : bool, default False Whether the first element of rho should be used as a constant term in the AR process. init_dist : unnamed_distribution, optional Scalar or vector distribution for initial values. Distributions should have shape (*shape[:-1], ar_order). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. ar_order : int, optional Order of the AR process. Inferred from length of the last dimension of rho, if possible. ar_order = rho.shape[-1] if constant else rho.shape[-1] - 1 steps : int, optional Number of steps in AR process (steps > 0). Only needed if shape is not provided. Notes ----- The init distribution will be cloned, rendering it distinct from the one passed as input. Examples -------- .. code-block:: python # Create an AR of order 3, with a constant term with pm.Model() as AR3: # The first coefficient will be the constant term coefs = pm.Normal("coefs", 0, size=4) # We need one init variable for each lag, hence size=3 init = pm.Normal.dist(5, size=3) ar3 = pm.AR("ar3", coefs, sigma=1.0, init_dist=init, constant=True, steps=500) """ rv_type = AutoRegressiveRV rv_op = AutoRegressiveRV.rv_op def __new__(cls, name, rho, *args, steps=None, constant=False, ar_order=None, **kwargs): rhos = pt.atleast_1d(pt.as_tensor_variable(rho)) ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=ar_order, ) return super().__new__( cls, name, rhos, *args, steps=steps, constant=constant, ar_order=ar_order, **kwargs )
[docs] @classmethod def dist( cls, rho, sigma=None, tau=None, *, init_dist=None, steps=None, constant=False, ar_order=None, **kwargs, ): _, sigma = get_tau_sigma(tau=tau, sigma=sigma) sigma = pt.as_tensor_variable(sigma) rhos = pt.atleast_1d(pt.as_tensor_variable(rho)) ar_order = cls._get_ar_order(rhos=rhos, constant=constant, ar_order=ar_order) steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=ar_order ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = pt.as_tensor_variable(steps, dtype=int, ndim=0) if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, RandomVariable | SymbolicRandomVariable ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 1: raise ValueError( "Init distribution must have a scalar or vector support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=(*sigma.shape, ar_order)) return super().dist([rhos, sigma, init_dist, steps, ar_order, constant], **kwargs)
@classmethod def _get_ar_order(cls, rhos: TensorVariable, ar_order: int | None, constant: bool) -> int: """Compute ar_order given inputs. If ar_order is not specified we do constant folding on the shape of rhos to retrieve it. For example, this will detect that Normal(size=(5, 3)).shape[-1] == 3, which is not known by PyTensor before. Raises ------ ValueError If inferred ar_order cannot be inferred from rhos or if it is less than 1 """ if ar_order is None: try: (folded_shape,) = constant_fold((rhos.shape[-1],)) except NotConstantValueError: raise ValueError( "Could not infer ar_order from last dimension of rho. Pass it " "explicitly or make sure rho have a static shape" ) ar_order = int(folded_shape) - int(constant) if ar_order < 1: raise ValueError( "Inferred ar_order is smaller than 1. Increase the last dimension " "of rho or remove constant_term" ) return ar_order
@_change_dist_size.register(AutoRegressiveRV) def change_ar_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return AR.rv_op( *dist.owner.inputs[:-1], ar_order=op.ar_order, constant_term=op.constant_term, size=new_size, ) @_logprob.register(AutoRegressiveRV) def ar_logp(op, values, rhos, sigma, init_dist, steps, noise_rng, **kwargs): (value,) = values ar_order = op.ar_order constant_term = op.constant_term # Convolve rhos with values if constant_term: expectation = pt.add( rhos[..., 0, None], *( rhos[..., i + 1, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ), ) else: expectation = pt.add( *( rhos[..., i, None] * value[..., ar_order - (i + 1) : -(i + 1)] for i in range(ar_order) ) ) # Compute and collapse logp across time dimension innov_logp = pt.sum( logp(Normal.dist(0, sigma[..., None]), value[..., ar_order:] - expectation), axis=-1 ) init_logp = logp(init_dist, value[..., :ar_order]) if init_dist.owner.op.ndim_supp == 0: init_logp = pt.sum(init_logp, axis=-1) return init_logp + innov_logp @_support_point.register(AutoRegressiveRV) def ar_support_point(op, rv, rhos, sigma, init_dist, steps, noise_rng): # Use last entry of init_dist support_point as the moment for the whole AR return pt.full_like(rv, support_point(init_dist)[..., -1, None]) class GARCH11RV(SymbolicRandomVariable): """A placeholder used to specify a GARCH11 graph.""" extended_signature = "(),(),(),(),(),(s),[rng]->[rng],(t)" _print_name = ("GARCH11", "\\operatorname{GARCH11}") @classmethod def rv_op(cls, omega, alpha_1, beta_1, initial_vol, init_dist, steps, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! steps = pt.as_tensor(steps, ndim=0) omega = pt.as_tensor(omega) alpha_1 = pt.as_tensor(alpha_1) beta_1 = pt.as_tensor(beta_1) initial_vol = pt.as_tensor(initial_vol) noise_rng = pt.random.shared_rng(seed=None) size = normalize_size_param(size) if rv_size_is_none(size): # In this case the size of the init_dist depends on the parameters shape batch_size = pt.broadcast_shape(omega, alpha_1, beta_1, initial_vol) else: batch_size = size init_dist = change_dist_size(init_dist, batch_size) # Create OpFromGraph representing random draws from GARCH11 process def step(prev_y, prev_sigma, rng, omega, alpha_1, beta_1): new_sigma = pt.sqrt( omega + alpha_1 * pt.square(prev_y) + beta_1 * pt.square(prev_sigma) ) next_rng, new_y = Normal.dist(mu=0, sigma=new_sigma, rng=rng, return_next_rng=True) return new_y, new_sigma, next_rng y_t, _, noise_next_rng = pytensor.scan( fn=step, outputs_info=[ init_dist, pt.broadcast_to(initial_vol.astype("floatX"), init_dist.shape), noise_rng, ], non_sequences=[omega, alpha_1, beta_1], n_steps=steps, strict=True, return_updates=False, ) garch11 = pt.concatenate([init_dist[None, ...], y_t], axis=0).dimshuffle( (*range(1, y_t.ndim), 0) ) return GARCH11RV( inputs=[omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng], outputs=[noise_next_rng, garch11], )(omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng) def update(self, node: Apply): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class GARCH11(Distribution): r""" GARCH(1,1) with Normal innovations. The model is specified by. .. math:: y_t \sim N(0, \sigma_t^2) .. math:: \sigma_t^2 = \omega + \alpha_1 * y_{t-1}^2 + \beta_1 * \sigma_{t-1}^2 where \sigma_t^2 (the error variance) follows a ARMA(1, 1) model. Parameters ---------- omega : tensor_like of float omega > 0, mean variance alpha_1 : tensor_like of float alpha_1 >= 0, autoregressive term coefficient beta_1 : tensor_like of float beta_1 >= 0, alpha_1 + beta_1 < 1, moving average term coefficient initial_vol : tensor_like of float initial_vol >= 0, initial volatility, sigma_0 """ rv_type = GARCH11RV rv_op = GARCH11RV.rv_op def __new__(cls, *args, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, omega, alpha_1, beta_1, initial_vol, *, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") init_dist = Normal.dist(0, initial_vol) return super().dist([omega, alpha_1, beta_1, initial_vol, init_dist, steps], **kwargs)
@_change_dist_size.register(GARCH11RV) def change_garch11_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) return GARCH11.rv_op( *dist.owner.inputs[:-1], size=new_size, ) @_logprob.register(GARCH11RV) def garch11_logp( op, values, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng, **kwargs ): (value,) = values # Move the time axis to the first dimension value_dimswapped = value.dimshuffle((value.ndim - 1, *range(0, value.ndim - 1))) initial_vol = initial_vol * pt.ones_like(value_dimswapped[0]) def volatility_update(x, vol, w, a, b): return pt.sqrt(w + a * pt.square(x) + b * pt.square(vol)) vol = pytensor.scan( fn=volatility_update, sequences=[value_dimswapped[:-1]], outputs_info=[initial_vol], non_sequences=[omega, alpha_1, beta_1], strict=True, return_updates=False, ) sigma_t = pt.concatenate([[initial_vol], vol]) # Compute and collapse logp across time dimension innov_logp = pt.sum(logp(Normal.dist(0, sigma_t), value_dimswapped), axis=0) return innov_logp @_support_point.register(GARCH11RV) def garch11_support_point(op, rv, omega, alpha_1, beta_1, initial_vol, init_dist, steps, noise_rng): # GARCH(1,1) mean is zero return pt.zeros_like(rv) class EulerMaruyamaRV(SymbolicRandomVariable): """A placeholder used to specify a distribution for a EulerMaruyama sub-graph.""" dt: float sde_fn: Callable _print_name = ("EulerMaruyama", "\\operatorname{EulerMaruyama}") def __init__(self, *args, dt: float, sde_fn: Callable, **kwargs): self.dt = dt self.sde_fn = sde_fn super().__init__(*args, **kwargs) @classmethod def rv_op(cls, init_dist, steps, sde_pars, dt, sde_fn, size=None): # We don't allow passing `rng` because we don't fully control the rng of the components! noise_rng = pt.random.shared_rng(seed=None) # Init dist should have shape (*size,) if size is not None: batch_size = size else: batch_size = pt.broadcast_shape(*sde_pars, init_dist) init_dist = change_dist_size(init_dist, batch_size) # Create OpFromGraph representing random draws from SDE process def step(*prev_args): prev_y, rng, *prev_sde_pars = prev_args f, g = sde_fn(prev_y, *prev_sde_pars) mu = prev_y + dt * f sigma = pt.sqrt(dt) * g next_rng, next_y = Normal.dist(mu=mu, sigma=sigma, rng=rng, return_next_rng=True) return next_y, next_rng y_t, noise_next_rng = pytensor.scan( fn=step, outputs_info=[init_dist, noise_rng], non_sequences=[*sde_pars], n_steps=steps, strict=True, return_updates=False, ) sde_out = pt.concatenate([init_dist[None, ...], y_t], axis=0).dimshuffle( (*range(1, y_t.ndim), 0) ) return EulerMaruyamaRV( inputs=[init_dist, steps, *sde_pars, noise_rng], outputs=[noise_next_rng, sde_out], dt=dt, sde_fn=sde_fn, extended_signature=f"(),(s),{','.join('()' for _ in sde_pars)},[rng]->[rng],(t)", )(init_dist, steps, *sde_pars, noise_rng) def update(self, node: Apply): """Return the update mapping for the noise RV.""" return {node.inputs[-1]: node.outputs[0]}
[docs] class EulerMaruyama(Distribution): r""" Stochastic differential equation discretized with the Euler-Maruyama method. Parameters ---------- dt : float time step of discretization sde_fn : callable function returning the drift and diffusion coefficients of SDE sde_pars : tuple parameters of the SDE, passed as ``*args`` to ``sde_fn`` init_dist : unnamed_distribution, optional Scalar distribution for initial values. Distributions should have shape (*shape[:-1]). If not, it will be automatically resized. Defaults to pm.Normal.dist(0, 100, shape=...). .. warning:: init_dist will be cloned, rendering it independent of the one passed as input. """ rv_type = EulerMaruyamaRV rv_op = EulerMaruyamaRV.rv_op def __new__(cls, name, dt, sde_fn, *args, steps=None, **kwargs): dt = pt.as_tensor_variable(dt) steps = get_support_shape_1d( support_shape=steps, shape=None, # Shape will be checked in `cls.dist` dims=kwargs.get("dims", None), observed=kwargs.get("observed", None), support_shape_offset=1, ) return super().__new__(cls, name, dt, sde_fn, *args, steps=steps, **kwargs)
[docs] @classmethod def dist(cls, dt, sde_fn, sde_pars, *, init_dist=None, steps=None, **kwargs): steps = get_support_shape_1d( support_shape=steps, shape=kwargs.get("shape", None), support_shape_offset=1 ) if steps is None: raise ValueError("Must specify steps or shape parameter") steps = pt.as_tensor_variable(steps, dtype=int, ndim=0) dt = pt.as_tensor_variable(dt) sde_pars = [pt.as_tensor_variable(x) for x in sde_pars] if init_dist is not None: if not isinstance(init_dist, TensorVariable) or not isinstance( init_dist.owner.op, RandomVariable | SymbolicRandomVariable ): raise ValueError( f"Init dist must be a distribution created via the `.dist()` API, " f"got {type(init_dist)}" ) check_dist_not_registered(init_dist) if init_dist.owner.op.ndim_supp > 0: raise ValueError( "Init distribution must have a scalar support dimension, ", f"got ndim_supp={init_dist.owner.op.ndim_supp}.", ) else: warnings.warn( "Initial distribution not specified, defaulting to " "`Normal.dist(0, 100, shape=...)`. You can specify an init_dist " "manually to suppress this warning.", UserWarning, ) init_dist = Normal.dist(0, 100, shape=sde_pars[0].shape) return super().dist([init_dist, steps, sde_pars, dt, sde_fn], **kwargs)
@_change_dist_size.register(EulerMaruyamaRV) def change_eulermaruyama_size(op, dist, new_size, expand=False): if expand: old_size = dist.shape[:-1] new_size = tuple(new_size) + tuple(old_size) init_dist, steps, *sde_pars, _ = dist.owner.inputs return EulerMaruyama.rv_op( init_dist, steps, sde_pars, dt=op.dt, sde_fn=op.sde_fn, size=new_size, ) @_logprob.register(EulerMaruyamaRV) def eulermaruyama_logp(op, values, init_dist, steps, *sde_pars_noise_arg, **kwargs): (x,) = values # noise arg is unused, but is needed to make the logp signature match the rv_op signature *sde_pars, _ = sde_pars_noise_arg # sde_fn is user provided and likely not broadcastable to additional time dimension, # since the input x is now [..., t], we need to broadcast each input to [..., None] # below as best effort attempt to make it work sde_pars_broadcast = [x[..., None] for x in sde_pars] xtm1 = x[..., :-1] xt = x[..., 1:] f, g = op.sde_fn(xtm1, *sde_pars_broadcast) mu = xtm1 + op.dt * f sigma = pt.sqrt(op.dt) * g # Compute and collapse logp across time dimension sde_logp = pt.sum(logp(Normal.dist(mu, sigma), xt), axis=-1) init_logp = logp(init_dist, x[..., 0]) return init_logp + sde_logp