Source code for boario.model_base

# BoARIO : The Adaptative Regional Input Output model in python.
# Copyright (C) 2022  Samuel Juhel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""This module defines the core mechanisms of the model."""

from __future__ import annotations

import json
import pathlib
import typing
from typing import Optional
import warnings

import numpy as np
import numpy.typing as npt
import pandas as pd
import pymrio
from pymrio.core.mriosystem import IOSystem

from boario import DEBUG_TRACE, logger
from boario.utils.misc import lexico_reindex, _fast_sum, _divide_arrays_ignore

__all__ = [
    "ARIOBaseModel",
    "INV_THRESHOLD",
    "VALUE_ADDED_NAMES",
    "VA_idx",
    "lexico_reindex",
]

INV_THRESHOLD = 0  # 20 #temporal_units

TECHNOLOGY_THRESHOLD = (
    0.00001  # Do not care about input if producing requires less than this value
)

VALUE_ADDED_NAMES = [
    "VA",
    "Value Added",
    "value added",
    "factor inputs",
    "factor_inputs",
    "Factors Inputs",
    "Satellite Accounts",
    "satellite accounts",
    "satellite_accounts",
    "satellite",
]

VA_idx = np.array(
    [
        "Taxes less subsidies on products purchased: Total",
        "Other net taxes on production",
        "Compensation of employees; wages, salaries, & employers' social contributions: Low-skilled",
        "Compensation of employees; wages, salaries, & employers' social contributions: Medium-skilled",
        "Compensation of employees; wages, salaries, & employers' social contributions: High-skilled",
        "Operating surplus: Consumption of fixed capital",
        "Operating surplus: Rents on land",
        "Operating surplus: Royalties on resources",
        "Operating surplus: Remaining net operating surplus",
    ],
    dtype=object,
)


[docs] class ARIOBaseModel: r"""The core of an ARIO model. Handles the different arrays containing the mrio tables. `ARIOBaseModel` wrap all the data and functions used in the core of the most basic version of the ARIO model (based on :cite:`2013:hallegatte` and :cite:`2020:guan`). An ARIOBaseModel instance based is build on the given IOSystem. Most default parameters are the same as in :cite:`2013:hallegatte` and default order mechanisms comes from :cite:`2020:guan`. By default each industry capital stock is 4 times its value added (:cite:`2008:hallegatte`). Parameters ---------- pym_mrio : IOSystem The IOSystem to base the model on. order_type : str, default "alt" The type of orders mechanism to use. Currently, can be "alt" or "noalt". See :ref:`boario-math` alpha_base : float, default 1.0 Base value of overproduction factor :math:`\alpha^{b}` (Default to 1.0). alpha_max : float, default 1.25 Maximum factor of overproduction :math:`\alpha^{\textrm{max}}` (default should be 1.25). alpha_tau : int, default 365 Characteristic time of overproduction :math:`\tau_{\alpha}` in ``n_temporal_units_by_step`` (default should be 365 days). rebuild_tau : int, default 60 Rebuilding characteristic time :math:`\tau_{\textrm{REBUILD}}` (see :ref:`boario-math`). Overwritten by per event value if it exists. main_inv_dur : int, default 90 The default numbers of days for inputs inventory to use if it is not defined for an input. monetary_factor : int, default 10**6 Monetary unit factor (i.e. if the tables unit is 10^6 € instead of 1 €, it should be set to 10^6). temporal_units_by_step: int, default 1 The number of temporal_units between each step. (Current version of the model showed inconsistencies with values other than `1`). iotable_year_to_temporal_unit_factor : int, default 365 The (inverse of the) factor by which MRIO data should be multiplied in order to get "per temporal unit value", assuming IO data is yearly. infinite_inventories_sect: list, optional List of inputs (sector) considered never constraining for production. inventory_dict: dict, optional Dictionary in the form input:initial_inventory_size, (where input is a sector, and initial_inventory_size is in "temporal_unit" (defaults to a day)) productive_capital_vector: ArrayLike, optional Array of capital per industry if you need to give it exogenously. productive_capital_to_VA_dict: dict, optional Dictionary in the form sector:ratio. Where ratio is used to estimate capital stock based on the value added of the sector. Notes ----- It is recommended to use ``productive_capital_to_VA_dict`` if you have a more precise estimation of the ratio of (Capital Stock / Value Added) per sectors than the default 4/1 ratio. You may also feed in directly a ``productive_capital_vector`` if you did your estimation before-hand. (This is especially useful if you have events based of an exposure layer for instance) Regarding inventories, they default to 90 days for all inputs (ie sectors). You can set some inputs to be never constraining for production by listing them in ``infinite_inventories_sect`` or directly feed in a dictionary of the inventory duration for each input. """ def __init__( self, pym_mriot: IOSystem, *, order_type: str = "alt", alpha_base: float = 1.0, alpha_max: float = 1.25, alpha_tau: int = 365, rebuild_tau: int = 60, main_inv_dur: int = 90, monetary_factor: int = 10**6, temporal_units_by_step: int = 1, iotable_year_to_temporal_unit_factor: int = 365, infinite_inventories_sect: Optional[list] = None, inventory_dict: Optional[dict[str, int]] = None, productive_capital_vector: Optional[ pd.Series | npt.NDArray | pd.DataFrame ] = None, productive_capital_to_VA_dict: Optional[dict] = None, ) -> None: logger.debug("Initiating new ARIOBaseModel instance") # ############### Parameters variables ####################### try: logger.info( "IO system metadata :\n\t{}\n\t{}\n\t{}\n\t{}".format( str(pym_mriot.meta.description), str(pym_mriot.meta.name), str(pym_mriot.meta.system), str(pym_mriot.meta.version), ) ) except AttributeError: warnings.warn( "It seems the MRIOT you loaded doesn't have metadata to print." ) source_mriot = lexico_reindex(pym_mriot) self.mriot = source_mriot self._check_mriot() self._set_indexes(source_mriot) if hasattr(source_mriot, "monetary_factor"): warnings.warn( f"Custom monetary factor found in the IOSystem, continuing with this one ({getattr(source_mriot,'monetary_factor')})" ) self.monetary_factor = getattr(source_mriot, "monetary_factor") else: self.monetary_factor = monetary_factor r"""int, default 10^6: Monetary unit factor (i.e. if the tables unit is 10^6 € instead of 1 €, it should be set to 10^6).""" self.n_temporal_units_by_step = temporal_units_by_step r"""int, default 1: The number of temporal_units between each step. (Current version of the model was not tested with values other than `1`).""" self.iotable_year_to_temporal_unit_factor = iotable_year_to_temporal_unit_factor r"""int, default 365: The (inverse of the) factor by which MRIO data should be multiplied in order to get "per temporal unit value", assuming IO data is yearly.""" if self.iotable_year_to_temporal_unit_factor != 365: warnings.warn( "iotable_to_daily_step_factor is not set to 365 (days). This should probably not be the case if the IO tables you use are on a yearly basis." ) self.steply_factor = ( self.n_temporal_units_by_step / self.iotable_year_to_temporal_unit_factor ) self.rebuild_tau = rebuild_tau r"""int: Rebuilding characteristic time :math:`\tau_{\textrm{REBUILD}}` (see :ref:`boario-math`).""" self.overprod_max = alpha_max r"""float: Maximum factor of overproduction :math:`\alpha^{\textrm{max}}` (default should be 1.25).""" self.overprod_tau = self.n_temporal_units_by_step / alpha_tau r"""float: Characteristic time of overproduction :math:`\tau_{\alpha}` in ``n_temporal_units_by_step`` (default should be 365 days).""" self.overprod_base = alpha_base r"""float: Base value of overproduction factor :math:`\alpha^{b}` (Default to 1.0).""" self._init_input_stocks(main_inv_dur, inventory_dict, infinite_inventories_sect) ################################################################# # ####### INITIAL MRIO STATE (in step temporality) ############### self._matrix_id = np.eye(self.n_sectors) self._matrix_I_sum = np.tile(self._matrix_id, self.n_regions) self.Z_0 = source_mriot.Z.to_numpy() # type: ignore r"""numpy.ndarray of float: 2-dim square matrix array :math:`\ioz` of size :math:`(n \times m, n \times m)` representing the daily intermediate (transaction) matrix (see :ref:`boario-math-init`).""" self.Z_C = self._matrix_I_sum @ self.Z_0 r"""numpy.ndarray of float: 2-dim matrix array :math:`\ioz^{\sectorsset}` of size :math:`(n, n \times m)` representing the intermediate (transaction) matrix aggregated by inputs (see :ref:`here <boario-math-z-agg>`).""" self.Z_distrib = _divide_arrays_ignore( self.Z_0, (np.tile(self.Z_C, (self.n_regions, 1))) ) r"""numpy.ndarray of float: math:`\ioz` normalised by :math:`\ioz^{\sectorsset}`, i.e. representing for each input the share of the total ordered transiting from an industry to another (see :ref:`here <boario-math-z-agg>`).""" self.Z_0 = source_mriot.Z.to_numpy() * self.steply_factor self.Y_0 = source_mriot.Y.to_numpy() self.Y_C = self._matrix_I_sum @ self.Y_0 r"""numpy.ndarray of float: 2-dim matrix array :math:`\ioy^{\sectorsset}` of size :math:`(n, m \times \text{number of final demand categories})` representing the final demand matrix aggregated by inputs (see :ref:`here <boario-math-z-agg>`).""" self.Y_distrib = _divide_arrays_ignore( self.Y_0, (np.tile(self.Y_C, (self.n_regions, 1))) ) r"""numpy.ndarray of float: math:`\ioy` normalised by :math:`\ioy^{\sectorsset}`, i.e. representing for each input the share of the total ordered transiting from an industry to final demand (see :ref:`here <boario-math-z-agg>`).""" self.Y_0 = source_mriot.Y.to_numpy() * self.steply_factor r"""numpy.ndarray of float: 2-dim array :math:`\ioy` of size :math:`(n \times m, m \times \text{number of final demand categories})` representing the daily final demand matrix.""" self.X_0 = np.array(source_mriot.x.T).copy().flatten() * self.steply_factor r"""numpy.ndarray of float: Array :math:`\iox(0)` of size :math:`n \times m` representing the initial daily gross production.""" value_added: pd.DataFrame = source_mriot.x.T - source_mriot.Z.sum(axis=0) # type: ignore # value_added = value_added.reindex(sorted(value_added.index), axis=0) # value_added = value_added.reindex(sorted(value_added.columns), axis=1) if (value_added < 0).any().any(): # type: ignore tmp = (value_added[value_added < 0].dropna(axis=1)).columns # type: ignore warnings.warn( f"""Found negative values in the value added, will set to 0. Note that industries with null value added will have a null productive capital if it is defined from value added. industries with negative VA: {tmp} """ ) value_added[value_added < 0] = 0.0 self.gva_df = value_added.T.groupby("region").sum().T r"""pandas.DataFrame: Dataframe of the total GDP of each region of the model""" self.VA_0 = value_added.to_numpy().flatten() r"""numpy.ndarray of float: Array :math:`\iov` of size :math:`n \times m` representing the total value added for each sectors.""" self.tech_mat = (self._matrix_I_sum @ source_mriot.A).to_numpy() # type: ignore #to_numpy is not superfluous ! r"""numpy.ndarray of float: 2-dim array :math:`\ioa` of size :math:`(n \times m, n \times m)` representing the technical coefficients matrix.""" if productive_capital_vector is not None: self.productive_capital = productive_capital_vector """numpy.ndarray of float: Array of size :math:`n \times m` representing the estimated stock of capital of each industry.""" if isinstance(self.productive_capital, pd.DataFrame): self.productive_capital = ( self.productive_capital.squeeze().sort_index().to_numpy() ) elif productive_capital_to_VA_dict is None: warnings.warn("No capital to VA dictionary given, considering 4/1 ratio") self.capital_to_VA_ratio = 4 self.productive_capital = self.VA_0 * self.capital_to_VA_ratio else: kratio = productive_capital_to_VA_dict kratio_ordered = [kratio[k] for k in sorted(kratio.keys())] self.capital_to_VA_ratio = np.tile(np.array(kratio_ordered), self.n_regions) self.productive_capital = self.VA_0 * self.capital_to_VA_ratio # Currently not used, and dubious definition # if value_added.ndim > 1: # self.regional_production_share = ( # self.VA_0 # / value_added.sum(axis=0).groupby("region").transform("sum").to_numpy() # ) # else: # self.regional_production_share = ( # self.VA_0 / value_added.groupby("region").transform("sum").to_numpy() # ) # self.regional_production_share = self.regional_production_share.flatten() # r"""numpy.ndarray of float: Array of size :math:`n \times m` representing the estimated share of a sector in its regional economy.""" self.threshold_not_input = ( self.Z_C > np.tile(self.X_0, (self.n_sectors, 1)) * TECHNOLOGY_THRESHOLD ) # [n_sectors, n_regions*n_sectors] r"""numpy.ndarray of float: 2-dim square matrix array of size :math:`(n , n \times m)` representing the threshold under which an input is not considered being an input (0.00001).""" ################################################################# # ###### SIMULATION VARIABLES #################################### self._entire_demand = np.zeros( shape=( self.n_regions * self.n_sectors, self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat, ) ) self.overprod = np.full( (self.n_regions * self.n_sectors), self.overprod_base, dtype=np.float64 ) r"""numpy.ndarray of float: Array of size :math:`n \times m` representing the overproduction coefficients vector :math:`\mathbf{\alpha}(t)`.""" with np.errstate(divide="ignore", invalid="ignore"): self.inputs_stock = ( np.tile(self.X_0, (self.n_sectors, 1)) * self.tech_mat ) * self.inv_duration[:, np.newaxis] self.inputs_stock = np.nan_to_num(self.inputs_stock, nan=np.inf, posinf=np.inf) r"""numpy.ndarray of float: 2-dim square matrix array :math:`\ioinv` of size :math:`(n \times m, n \times m)` representing the stock of inputs (see :ref:`boario-math-init`).""" self.inputs_stock_0 = self.inputs_stock.copy() self.intermediate_demand = self.Z_0.copy() r"""numpy.ndarray of float: 2-dim square matrix array :math:`\ioorders` of size :math:`(n \times m, n \times m)` representing the matrix of orders.""" self.production = self.X_0.copy() r"""numpy.ndarray of float: Array :math:`\iox(t)` of size :math:`n \times m` representing the current gross production.""" self.final_demand = self.Y_0.copy() self.rebuilding_demand = None self._rebuild_demand_tot = np.zeros_like(self.X_0) self._prod_cap_delta_arbitrary = ( None # Required to init productive_capital_lost ) self.productive_capital_lost = None r"""numpy.ndarray of float: Array of size :math:`n \times m` representing the estimated stock of capital currently destroyed for each industry.""" self.prod_cap_delta_arbitrary = None r"""numpy.ndarray of float: Array of size :math:`n \times m` representing an arbitrary reduction of production capacity to each industry.""" self.order_type = order_type ################################################################# # ################# SIMULATION TRACKING VARIABLES ################ self.in_shortage = False r"""Boolean stating if at least one industry is in shortage (i.e.) if at least one of its inputs inventory is low enough to reduce production.""" self.had_shortage = False r"""Boolean stating if at least one industry was in shortage at some point.""" self.rebuild_prod = None r"""numpy.ndarray of float: Array of size :math:`n \times m` representing the remaining stock of rebuilding demand asked of each industry.""" self.final_demand_not_met = np.zeros(self.Y_0.shape) r"""numpy.ndarray of float: Array of size :math:`n \times m` representing the final demand that could not be met at current step for each industry.""" ################################################################# # ## Internals self._prod_delta_type = None self._n_rebuilding_events = 0 def _check_mriot(self): # Mapping of attributes to error messages logger.debug("Checking validity of MRIOT") required_attributes = { "x": "Production vector of MRIOT is not set (mriot.x).", "Z": "Intermediate demand of MRIOT is not set (mriot.Z).", "Y": "Final demand of MRIOT is not set (mriot.Y).", "A": "Technical coefficients of MRIOT are not set (mriot.A).", } # Check for missing attributes for attr, error_msg in required_attributes.items(): if getattr(self.mriot, attr) is None: raise ValueError(error_msg) # Consistency check for technical coefficients calculated_A = pymrio.calc_A(self.mriot.Z, self.mriot.x) if not np.allclose(self.mriot.A, calculated_A, atol=1e-8): raise ValueError( "Technical coefficients (mriot.A) are inconsistent with Z and x." ) def _init_input_stocks( self, main_inv_dur: int, inventory_dict: dict[str, int] | None, infinite_inventories_sect: list[str] | None, ): self.main_inv_dur: int = main_inv_dur r"""int, default 90: The default numbers of days for inputs inventory to use if it is not defined for an input.""" if inventory_dict is None: infinite_inventories_sect = ( [] if infinite_inventories_sect is None else infinite_inventories_sect ) self.inventories = [ np.inf if sector in infinite_inventories_sect else main_inv_dur for sector in self.sectors ] r"""numpy.ndarray of int: Array :math:`\mathbf{s}` of size :math:`n` (sectors), setting for each input the initial number of ``n_temporal_units_by_step`` of stock for the input. (see :ref:`boario-math`).""" else: infinite_inventories_sect = ( [] if infinite_inventories_sect is None else infinite_inventories_sect ) self.inventories = [ ( np.inf if inventory_dict[k] in ["inf", "Inf", "Infinity", "infinity"] or k in infinite_inventories_sect else inventory_dict[k] ) for k in sorted(inventory_dict.keys()) ] logger.debug(f"inventories: {self.inventories}") logger.debug(f"n_temporal_units_by_step: {self.n_temporal_units_by_step}") self.inv_duration = np.array(self.inventories) / self.n_temporal_units_by_step # Note that this creates inconsistencies between inventories and inv_duration if (self.inv_duration <= 1).any(): warnings.warn( f"At least one product has inventory duration lower than the numbers of temporal units in one step ({self.n_temporal_units_by_step}), model will set it to 2 by default, but you should probably check this !" ) self.inv_duration[self.inv_duration <= 1] = 2 def _set_indexes(self, pym_mrio: IOSystem) -> None: reg = pym_mrio.get_regions() reg = typing.cast("pd.Index", reg) self.regions = reg.sort_values() r"""numpy.ndarray of str : An array of the regions of the model.""" self.n_regions = len(reg) r"""int : The number :math:`m` of regions.""" sec = pym_mrio.get_sectors() sec = typing.cast("pd.Index", sec) self.sectors = sec.sort_values() r"""numpy.ndarray of str: An array of the sectors of the model.""" self.n_sectors = len(sec) r"""int : The number :math:`n` of sectors.""" self.industries = pd.MultiIndex.from_product( [self.regions, self.sectors], names=["region", "sector"] ) r"""pandas.MultiIndex : A pandas MultiIndex of the industries (region,sector) of the model.""" self.n_industries = len(self.industries) r"""int : The number :math:`m * n` of industries.""" # try: self.final_demand_cat = np.array(sorted(list(pym_mrio.get_Y_categories()))) # type: ignore r"""numpy.ndarray of str: An array of the final demand categories of the model (``["Final demand"]`` if there is only one)""" self.n_fd_cat = len(pym_mrio.get_Y_categories()) # type: ignore r"""int: The numbers of final demand categories.""" self.all_regions_fd = pd.MultiIndex.from_product( [self.regions, self.final_demand_cat], names=["region", "category"] ) # Not required anymore? # except (KeyError,IndexError): # self.n_fd_cat = 1 # self.final_demand_cat = pd.Index(["Final demand"], name="category") ##### PRODUCTION CAPACITY CHANGES ##### @property def prod_cap_delta_tot(self) -> npt.NDArray: r"""Computes and return total current production delta. Returns ------- npt.NDArray The total production delta (ie share of production capacity lost) for each industry. """ return self._prod_cap_delta_tot def update_prod_delta(self): tmp = [] if self.prod_cap_delta_productive_capital is not None: tmp.append(self.prod_cap_delta_productive_capital) if self.prod_cap_delta_arbitrary is not None: tmp.append(self.prod_cap_delta_arbitrary) if tmp: self._prod_cap_delta_tot = np.amax(np.stack(tmp, axis=-1), axis=1) else: self._prod_cap_delta_tot = np.zeros_like(self.X_0) @property def productive_capital_lost(self) -> npt.NDArray | None: r"""Returns current stock of destroyed capital Returns ------- npt.NDArray An array of same shape as math:`\iox`, containing the "stock" of capital currently destroyed for each industry. """ return self._productive_capital_lost @productive_capital_lost.setter def productive_capital_lost(self, value: npt.NDArray | None): r"""Returns current stock of destroyed capital Returns ------- npt.NDArray An array of same shape as math:`\iox`, containing the "stock" of capital currently destroyed for each industry. """ self._productive_capital_lost = value if self._productive_capital_lost is not None: if (self._productive_capital_lost > self.productive_capital).any(): raise ValueError( "Total capital lost for events is higher than productive capital for at least one industry." ) tmp = np.zeros_like(self.productive_capital, dtype=float) np.divide( self._productive_capital_lost, self.productive_capital, where=self.productive_capital != 0, out=tmp, ) self._prod_cap_delta_productive_capital = tmp else: self._prod_cap_delta_productive_capital = None self.update_prod_delta() @property def prod_cap_delta_productive_capital(self) -> npt.NDArray | None: r"""Return the possible production capacity lost due to capital destroyed vector if it was set. Returns ------- npt.NDArray An array of same shape as math:`\iox`, stating the amount of production capacity lost due to capital destroyed. """ return self._prod_cap_delta_productive_capital @property def prod_cap_delta_arbitrary(self) -> npt.NDArray | None: r"""Return the possible "arbitrary" production capacity lost vector if it was set. Returns ------- npt.NDArray An array of same shape as math:`\iox`, stating the amount of production capacity lost arbitrarily (ie exogenous). """ return self._prod_cap_delta_arbitrary @prod_cap_delta_arbitrary.setter def prod_cap_delta_arbitrary(self, value: npt.NDArray | None): if value is not None: if value.shape != self.X_0.shape: raise ValueError( f"Incorrect shape: {self.X_0.shape} expected, got {value.shape}" ) self._prod_cap_delta_arbitrary = value self.update_prod_delta() @property def production_cap(self) -> npt.NDArray: r"""Compute and update production capacity. Compute and update production capacity from current total production delta and overproduction. .. math:: x^{Cap}_{f}(t) = \alpha_{f}(t) (1 - \Delta_{f}(t)) x_{f}(t) Raises ------ ValueError Raised if any industry has negative production. """ production_cap = self.X_0.copy() production_cap = production_cap * (1 - self.prod_cap_delta_tot) production_cap = production_cap * self.overprod if (production_cap < 0).any(): raise ValueError( "Production capacity was found negative for at least on industry. It may be caused by an impact being to important for a sector." ) return production_cap @property def entire_demand(self) -> npt.NDArray: r"""Returns the entire demand matrix, including intermediate demand (orders), final demand, and possible rebuilding demand.""" return self._entire_demand def _chg_events_number(self): new_entire_demand = np.zeros( shape=( self.n_regions * self.n_sectors, self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat + (self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat) * self._n_rebuilding_events, ) ) # Only copying intermediate and final demand new_entire_demand[ :, : self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ] = self.entire_demand[ :, : self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ] self._entire_demand = new_entire_demand.copy() @property def entire_demand_tot(self) -> npt.NDArray: r"""Returns the entire demand matrix, including intermediate demand (orders), final demand, and possible rebuilding demand.""" return self._entire_demand_tot @property def intermediate_demand(self) -> npt.NDArray: """Returns the entire intermediate demand matrix (orders)""" return self._entire_demand[:, : (self.n_regions * self.n_sectors)] @intermediate_demand.setter def intermediate_demand(self, value: npt.NDArray | pd.DataFrame | None): if value is None: value = np.zeros( shape=(self.n_regions * self.n_sectors, self.n_regions * self.n_sectors) ) if isinstance(value, pd.DataFrame): value = value.values np.copyto(self._entire_demand[:, : (self.n_regions * self.n_sectors)], value) self._intermediate_demand_tot = _fast_sum(self.intermediate_demand, axis=1) self._entire_demand_tot = _fast_sum(self.entire_demand, axis=1) @property def intermediate_demand_tot(self) -> npt.NDArray: """Returns the total intermediate demand addressed to each industry""" return self._intermediate_demand_tot @property def final_demand(self) -> npt.NDArray: """Returns the entire intermediate demand matrix (orders)""" return self._entire_demand[ :, (self.n_regions * self.n_sectors) : ( self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ), ] @final_demand.setter def final_demand(self, value: npt.NDArray | pd.DataFrame | None): if value is None: value = np.zeros( shape=(self.n_regions * self.n_sectors, self.n_regions * self.n_fd_cat) ) if isinstance(value, pd.DataFrame): value = value.values np.copyto( self._entire_demand[ :, (self.n_regions * self.n_sectors) : ( self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ), ], value, ) self._final_demand_tot = _fast_sum(self.final_demand, axis=1) self._entire_demand_tot = _fast_sum(self.entire_demand, axis=1) @property def final_demand_tot(self) -> npt.NDArray: """Returns the total final demand addressed to each industry""" return self._final_demand_tot @property def rebuild_demand(self) -> npt.NDArray | None: """Returns the entire intermediate demand matrix (orders)""" ret = self._entire_demand[ :, (self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat) : ] if ret.size > 0: return ret else: return None @rebuild_demand.setter def rebuild_demand(self, value: npt.NDArray): if self._n_rebuilding_events < 1 and value is not None: raise RuntimeError( "Cannot set a non-null rebuilding demand if the number of events is 0." ) try: np.copyto( self._entire_demand[ :, ( self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ) :, ], value, ) except ValueError as err: raise ValueError("Unable to assign rebuilding demand.") from err if self.rebuild_demand is None: raise RuntimeError("There was a problem assigning rebuilding demand") self._rebuild_demand_tot = _fast_sum(self.rebuild_demand, axis=1) indus = _fast_sum( self.rebuild_demand[ :, : self.n_regions * self.n_sectors * self._n_rebuilding_events ], axis=1, ) house = _fast_sum( self.rebuild_demand[ :, self.n_regions * self.n_sectors * self._n_rebuilding_events : ], axis=1, ) self._rebuild_demand_indus_tot = ( indus if indus.size > 0 else np.zeros((1, self.n_regions * self.n_sectors)) ) self._rebuild_demand_house_tot = ( house if house.size > 0 else np.zeros((1, self.n_regions * self.n_sectors)) ) self._entire_demand_tot = _fast_sum(self.entire_demand, axis=1) @property def rebuild_demand_tot(self) -> npt.NDArray: """Returns the total rebuild demand addressed to each industry""" return self._rebuild_demand_tot @property def rebuild_demand_house(self) -> npt.NDArray | None: r"""Returns household rebuilding demand matrix. Returns ------- npt.NDArray An array of same shape as math:`\ioy`, containing the sum of all currently rebuildable final demand stock. """ return self._entire_demand[ :, ( self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ) # Intermediate and final demand + ( self.n_regions * self.n_sectors * self._n_rebuilding_events ) :, # indus demand * n_events ] @property def rebuild_demand_house_tot(self) -> npt.NDArray | None: r"""Returns total household rebuilding demand vector. Returns ------- npt.NDArray An array of same shape as math:`\iox`, containing the sum of all currently rebuildable households demands. """ return self._rebuild_demand_house_tot @property def rebuild_demand_indus(self) -> npt.NDArray | None: r"""Returns industrial rebuilding demand matrix. Returns ------- npt.NDArray An array of same shape as math:`\ioz`, containing the sum of all currently rebuildable intermediate demand stock. """ return self._entire_demand[ :, ( self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat ) : self.n_regions * self.n_sectors + self.n_regions * self.n_fd_cat # After intermediate and final demand + # up to (self.n_regions * self.n_sectors) # indus demand * self._n_rebuilding_events, # times events ] @property def rebuild_demand_indus_tot(self) -> npt.NDArray | None: r"""Returns total industrial rebuilding demand vector. Returns ------- npt.NDArray An array of same shape as math:`\iox`, containing the sum of all currently rebuildable intermediate demands. """ return self._rebuild_demand_indus_tot @property def rebuild_prod(self) -> npt.NDArray | None: return self._rebuild_prod @property def rebuild_prod_indus(self) -> npt.NDArray | None: if self._rebuild_prod is not None: return self._rebuild_prod[ :, : (self.n_regions * self.n_sectors * (self._n_rebuilding_events)), ] else: return None @property def rebuild_prod_house(self) -> npt.NDArray | None: if self._rebuild_prod is not None: return self._rebuild_prod[ :, (self.n_regions * self.n_sectors * self._n_rebuilding_events) : ] else: return None def rebuild_prod_indus_event(self, ev_id) -> npt.NDArray | None: indus = self.rebuild_prod_indus if indus is not None: return indus[ :, (self.n_regions * self.n_sectors) * ev_id : (self.n_regions * self.n_sectors) * (ev_id + 1), ] else: return None def rebuild_prod_house_event(self, ev_id) -> npt.NDArray | None: house = self.rebuild_prod_house if house is not None: return house[ :, (self.n_regions * self.n_fd_cat) * ev_id : (self.n_regions * self.n_fd_cat) * (ev_id + 1), ] else: return None @property def rebuild_prod_tot(self) -> npt.NDArray | None: return self._rebuild_prod_tot @rebuild_prod.setter def rebuild_prod(self, value: npt.NDArray | None): self._rebuild_prod = value if value is not None: self._rebuild_prod_tot = _fast_sum(value, axis=1) else: self._rebuild_prod_tot = None @property def production_opt(self) -> npt.NDArray: r"""Computes and returns "optimal production" :math:`\iox^{textrm{Opt}}`, as the per industry minimum between total demand and production capacity. """ return np.fmin(self.entire_demand_tot, self.production_cap) @property def inventory_constraints_opt(self) -> npt.NDArray: r"""Computes and returns inventory constraints for "optimal production" (see :meth:`calc_inventory_constraints`)""" return self.calc_inventory_constraints(self.production_opt) @property def inventory_constraints_act(self) -> npt.NDArray: r"""Computes and returns inventory constraints for "actual production" (see :meth:`calc_inventory_constraints`)""" return self.calc_inventory_constraints(self.production)
[docs] def calc_production(self, current_temporal_unit: int) -> npt.NDArray: r"""Computes and updates actual production. See :ref:`boario-math-prod`. 1. Computes ``production_opt`` and ``inventory_constraints`` 2. If stocks do not meet ``inventory_constraints`` for any inputs, then decrease production accordingly. Also warns in logs if such shortages happen. Parameters ---------- current_temporal_unit : int current step number """ # 1. production_opt = self.production_opt.copy() inventory_constraints = self.inventory_constraints_opt.copy() # 2. if ( stock_constraint := (self.inputs_stock < inventory_constraints) * self.threshold_not_input ).any(): if not self.in_shortage: logger.info( f"At least one industry entered shortage regime. (step:{current_temporal_unit})" ) self.in_shortage = True self.had_shortage = True production_ratio_stock = np.ones(shape=self.inputs_stock.shape) np.divide( self.inputs_stock, inventory_constraints, out=production_ratio_stock, where=(self.threshold_not_input * (inventory_constraints != 0)), ) production_ratio_stock[production_ratio_stock > 1] = 1 production_max = ( np.tile(production_opt, (self.n_sectors, 1)) * production_ratio_stock ) assert not (np.min(production_max, axis=0) < 0).any() self.production = np.min(production_max, axis=0) else: if self.in_shortage: self.in_shortage = False logger.info( f"All industries exited shortage regime. (step:{current_temporal_unit})" ) assert not (production_opt < 0).any() self.production = production_opt return stock_constraint
[docs] def calc_inventory_constraints(self, production: npt.NDArray) -> npt.NDArray: r"""Compute inventory constraints (no psi parameter, for the psi version, the recommended one, see :meth:`~boario.extended_models.ARIOPsiModel.calc_inventory_constraints`) See :meth:`calc_production` for how inventory constraints are computed. Parameters ---------- production : npt.NDArray The production vector to consider. Returns ------- npt.NDArray For each input, for each industry, the size of the inventory required to produce at `production` level for the duration goal (`inv_duration`). """ inventory_constraints = np.tile(production, (self.n_sectors, 1)) * self.tech_mat tmp = np.tile( np.nan_to_num(self.inv_duration, posinf=0.0)[:, np.newaxis], (1, self.n_regions * self.n_sectors), ) return inventory_constraints * tmp
[docs] def distribute_production( self, general_distribution_scheme: str = "proportional", ): r"""Production distribution module #. Computes rebuilding demand for each rebuildable events (applying the `rebuild_tau` characteristic time) #. Creates/Computes total demand matrix (Intermediate + Final + Rebuild) #. Assesses if total demand is greater than realized production, hence requiring rationing #. Distributes production proportionally to demand. #. Updates stocks matrix. (Only if `np.allclose(stock_add, stock_use).all()` is false) #. Computes final demand not met due to rationing and write it. #. Updates rebuilding demand for each event (by substracting distributed production) Parameters ---------- rebuildable_events : 'list[Event]' List of rebuildable events scheme : str Placeholder for future distribution scheme Raises ------ RuntimeError If negative values are found in places there's should not be any NotImplementedError If an attempt to run an unimplemented distribution scheme is tried """ if general_distribution_scheme != "proportional": raise NotImplementedError( f"Scheme {general_distribution_scheme} is currently not implemented" ) # list_of_demands = [self.matrix_orders, self.final_demand] # # 1. Calc demand from rebuilding requirements (with characteristic time rebuild_tau) # # 2. Concat to have total demand matrix (Intermediate + Final + Rebuild) # # 3. Does production meet total demand if DEBUG_TRACE: logger.debug(f"entire_demand_tot shape : {self.entire_demand.shape}") # rationing_required = (self.production - self.entire_demand_tot) < ( # -1 / self.monetary_factor # ) # rationning_mask = np.tile( # rationing_required[:, np.newaxis], (1, self.entire_demand.shape[1]) # ) demand_shares = np.full(self.entire_demand.shape, 0.0) tot_dem_summed = np.expand_dims( np.sum( self.entire_demand, axis=1, # where=rationning_mask ), 1, ) # Get demand share np.divide( self.entire_demand, tot_dem_summed, where=(tot_dem_summed != 0), out=demand_shares, ) distributed_production = np.zeros_like(self.entire_demand) # 4. distribute production proportionally to demand np.multiply( demand_shares, np.expand_dims(self.production, 1), out=distributed_production, # where=rationning_mask, ) intmd_distribution = distributed_production[ :, : self.n_sectors * self.n_regions ] # Stock use is simply production times technical coefs stock_use = np.tile(self.production, (self.n_sectors, 1)) * self.tech_mat if (stock_use < 0).any(): raise RuntimeError( "Stock use contains negative values, this should not happen" ) # 5. Restock is the production from each supplier, summed. stock_add = self._matrix_I_sum @ intmd_distribution if (stock_add < 0).any(): raise RuntimeError( "stock_add (restocking) contains negative values, this should not happen" ) if not np.allclose(stock_add, stock_use): self.inputs_stock = self.inputs_stock - stock_use + stock_add if (self.inputs_stock < 0).any(): raise RuntimeError( "matrix_stock contains negative values, this should not happen" ) # 6. Compute final demand not met due to rationing final_demand_not_met = ( self.final_demand - distributed_production[ :, self.n_sectors * self.n_regions : ( self.n_sectors * self.n_regions + self.n_fd_cat * self.n_regions ), ] ) final_demand_not_met = _fast_sum(final_demand_not_met, axis=1) # avoid -0.0 (just in case) final_demand_not_met[final_demand_not_met == 0.0] = 0.0 self.final_demand_not_met = final_demand_not_met.copy() # 7. Compute production delivered to rebuilding if DEBUG_TRACE: logger.debug(f"distributed prod shape : {distributed_production.shape}") self.rebuild_prod = distributed_production[ :, (self.n_sectors * self.n_regions + self.n_fd_cat * self.n_regions) : ].copy() if self.rebuild_demand is not None: self.rebuild_demand = self.rebuild_demand - self.rebuild_prod
[docs] def calc_matrix_stock_gap(self, matrix_stock_goal) -> npt.NDArray: """Computes and returns inputs stock gap matrix The gap is simply the difference between the goal (given as argument) and the current stock. Parameters ---------- matrix_stock_goal : npt.NDArray of float The target inventories. Returns ------- npt.NDArray The (only positive) gap between goal and current inventories. Raises ------ RuntimeError If NaN are found in the result. """ matrix_stock_gap = np.zeros(matrix_stock_goal.shape) if DEBUG_TRACE: logger.debug("matrix_stock_goal: {}".format(matrix_stock_goal.shape)) logger.debug("matrix_stock: {}".format(self.inputs_stock.shape)) logger.debug( "matrix_stock_goal_finite: {}".format( matrix_stock_goal[np.isfinite(matrix_stock_goal)].shape ) ) logger.debug( "matrix_stock_finite: {}".format( self.inputs_stock[np.isfinite(self.inputs_stock)].shape ) ) matrix_stock_gap[np.isfinite(matrix_stock_goal)] = ( matrix_stock_goal[np.isfinite(matrix_stock_goal)] - self.inputs_stock[np.isfinite(self.inputs_stock)] ) if np.isnan(matrix_stock_gap).any(): raise RuntimeError("NaN in matrix stock gap") matrix_stock_gap[matrix_stock_gap < 0] = 0 return matrix_stock_gap
[docs] def calc_orders(self) -> None: r"""Computes and sets the orders (intermediate demand) for the next step. See :ref:`Order module <boario-math-orders>` Raises ------ RuntimeError If negative orders are found, which shouldn't happen. """ # total_demand = self.total_demand production_opt = self.production_opt.copy() matrix_stock_goal = np.tile(production_opt, (self.n_sectors, 1)) * self.tech_mat # Check this ! with np.errstate(invalid="ignore"): matrix_stock_goal *= self.inv_duration[:, np.newaxis] if np.allclose(self.inputs_stock, matrix_stock_goal): matrix_stock_gap = np.zeros(matrix_stock_goal.shape) else: matrix_stock_gap = self.calc_matrix_stock_gap(matrix_stock_goal) matrix_stock_gap += ( np.tile(self.production, (self.n_sectors, 1)) * self.tech_mat ) if self.order_type == "alt": prod_ratio = np.ones(shape=self.X_0.shape) np.divide( self.production_cap, self.X_0, out=prod_ratio, where=self.X_0 != 0 ) Z_prod = self.Z_0 * prod_ratio[:, np.newaxis] Z_Cprod = np.tile(self._matrix_I_sum @ Z_prod, (self.n_regions, 1)) out = np.zeros(shape=Z_prod.shape) np.divide(Z_prod, Z_Cprod, out=out, where=Z_Cprod != 0) tmp = np.tile(matrix_stock_gap, (self.n_regions, 1)) * out else: tmp = np.tile(matrix_stock_gap, (self.n_regions, 1)) * self.Z_distrib if (tmp < 0).any(): raise RuntimeError("Negative orders computed") self.intermediate_demand = tmp
[docs] def calc_overproduction(self) -> None: r"""Computes and updates the overproduction vector. See :ref:`Overproduction module <boario-math-overprod>` """ scarcity = np.full(self.production.shape, 0.0) total_demand = self.entire_demand_tot scarcity[total_demand != 0] = ( total_demand[total_demand != 0] - self.production[total_demand != 0] ) / total_demand[total_demand != 0] scarcity[np.isnan(scarcity)] = 0 overprod_chg = ( ((self.overprod_max - self.overprod) * scarcity * self.overprod_tau) + ( (self.overprod_base - self.overprod) * (scarcity == 0) * self.overprod_tau ) ).flatten() self.overprod += overprod_chg self.overprod[self.overprod < 1.0] = 1.0
[docs] def write_index(self, index_file: str | pathlib.Path) -> None: """Write the indexes of the different dataframes of the model in a json file. In order to easily rebuild the dataframes from the 'raw' data, this method create a JSON file with all columns and indexes names, namely : * regions names * sectors names * final demand categories * number of regions, sectors and industries (regions * sectors) Parameters ---------- index_file : pathlib.Path Path to the file to save the indexes. """ indexes = { "regions": list(self.regions), "sectors": list(self.sectors), "fd_cat": list(self.final_demand_cat), "n_sectors": self.n_sectors, "n_regions": self.n_regions, "n_industries": self.n_sectors * self.n_regions, } if isinstance(index_file, str): index_file = pathlib.Path(index_file) index_file.parent.mkdir(parents=True, exist_ok=True) with index_file.open("w") as ffile: json.dump(indexes, ffile)