# BoARIO : The Adaptative Regional Input Output model in python.
# Copyright (C) 2022  Samuel Juhel
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <>.

from __future__ import annotations

from typing import Dict, Optional

import numpy as np
import pandas as pd
from pymrio.core.mriosystem import IOSystem

from boario import logger

from boario.model_base import ARIOBaseModel, INV_THRESHOLD

__all__ = ["ARIOPsiModel"]

[docs] class ARIOPsiModel(ARIOBaseModel): def __init__( self, pym_mrio: IOSystem, *, order_type="alt", alpha_base=1.0, alpha_max=1.25, alpha_tau=365, rebuild_tau=60, main_inv_dur=90, monetary_factor=10**6, temporal_units_by_step: int = 1, iotable_year_to_temporal_unit_factor: int = 365, infinite_inventories_sect: Optional[list] = None, inventory_dict: Optional[dict] = None, productive_capital_vector: Optional[ pd.Series | np.ndarray | pd.DataFrame ] = None, productive_capital_to_VA_dict: Optional[dict] = None, psi_param=0.80, inventory_restoration_tau: int | Dict[str, int] = 60, ) -> None: """An ARIO model with some additional features. Added feature are parameter psi of production adjustment inventories constraint threshold, as well as a characteristic time of inventories resupplying. See :ref:`model_type`. """ super().__init__( pym_mrio, order_type=order_type, alpha_base=alpha_base, alpha_max=alpha_max, alpha_tau=alpha_tau, rebuild_tau=rebuild_tau, main_inv_dur=main_inv_dur, monetary_factor=monetary_factor, temporal_units_by_step=temporal_units_by_step, iotable_year_to_temporal_unit_factor=iotable_year_to_temporal_unit_factor, infinite_inventories_sect=infinite_inventories_sect, inventory_dict=inventory_dict, productive_capital_vector=productive_capital_vector, productive_capital_to_VA_dict=productive_capital_to_VA_dict, ) logger.debug("Model is an ARIOPsiModel") if isinstance(psi_param, str): self.psi = float(psi_param.replace("_", ".")) """float: Value of the psi parameter. (see :ref:`boario-math`).""" elif isinstance(psi_param, float): self.psi = psi_param elif isinstance(psi_param, int): self.psi = float(psi_param) else: raise ValueError( "'psi_param' parameter is neither a str rep of a float or a float" ) if self.psi > 1.0: raise ValueError("'psi_param' parameter must be less or equal than 1.") if isinstance(inventory_restoration_tau, int): restoration_tau = [ ( (self.n_temporal_units_by_step / inventory_restoration_tau) if v >= INV_THRESHOLD else v ) for v in self.inventories ] # for sector with no inventory TODO: reflect on that. elif isinstance(inventory_restoration_tau, dict): if not set(self.sectors).issubset(inventory_restoration_tau.keys()): sect_set = set(self.sectors) keys_sect = set(inventory_restoration_tau.keys()) raise NotImplementedError( f"""The given dict for inventory restoration tau does not contains all sectors as keys. Current implementation only allows dict with ALL sectors or just one integer value: Sectors not in keys: {sect_set - keys_sect} """ ) for _, value in inventory_restoration_tau.items(): if isinstance(value, float): if not value.is_integer(): raise ValueError( f"Invalid value: {value} ({type(value)}) in inventory_restoration_tau, values should be integer: {inventory_restoration_tau}" ) elif not isinstance(value, int): raise ValueError( f"Invalid value: {value} ({type(value)}) in inventory_restoration_tau, values should be integer: {inventory_restoration_tau}" ) inventory_restoration_tau = { key: int(val) for key, val in inventory_restoration_tau.items() } inventory_restoration_tau = dict(sorted(inventory_restoration_tau.items())) restoration_tau = [ (self.n_temporal_units_by_step / v) if v >= INV_THRESHOLD else v for _, v in inventory_restoration_tau.items() ] else: raise ValueError( f"Invalid inventory_restoration_tau: expected dict or int got {type(inventory_restoration_tau)}" ) self.restoration_tau = np.array(restoration_tau) r"""numpy.ndarray of int: Array of size :math:`n` setting for each inputs its characteristic restoration time :math:`\tau_{\textrm{INV}}` in ``n_temporal_units_by_step``. (see :ref:`boario-math`).""" ################################################################# @property def inventory_constraints_opt(self) -> np.ndarray: return self.calc_inventory_constraints(self.production_opt) @property def inventory_constraints_act(self) -> np.ndarray: return self.calc_inventory_constraints(self.production) # This is just so that theres a docstring for this one, it actually does the same as its parent method, # the difference is in the inventory constraints computation.
[docs] def calc_production(self, current_temporal_unit: int) -> np.ndarray: r"""Computes and updates actual production. The difference with :class:`ARIOBaseModel` is in the way inventory constraints are computed. See :ref:`boario-math-prod`. Parameters ---------- current_temporal_unit : int current step number """ return super().calc_production(current_temporal_unit)
[docs] def calc_inventory_constraints(self, production: np.ndarray) -> np.ndarray: r"""Compute inventory constraints (with psi parameter, for the non psi version, see :meth:`~boario.model_base.ARIOBaseModel.calc_inventory_constraints`) Parameters ---------- production : np.ndarray The production vector to consider. Returns ------- np.ndarray For each input, for each industry, the size of the inventory required to produce at `production` level for the duration goal (`inv_duration`) times the psi parameter. """ inventory_constraints = ( np.tile(production, (self.n_sectors, 1)) * self.tech_mat ) * self.psi np.multiply( inventory_constraints, np.tile( np.nan_to_num(self.inv_duration, posinf=0.0)[:, np.newaxis], (1, self.n_regions * self.n_sectors), ), out=inventory_constraints, ) return inventory_constraints
[docs] def calc_matrix_stock_gap(self, matrix_stock_goal) -> np.ndarray: matrix_stock_gap = super().calc_matrix_stock_gap(matrix_stock_goal) return np.expand_dims(self.restoration_tau, axis=1) * matrix_stock_gap