# BoARIO : The Adaptative Regional Input Output model in python.
# Copyright (C) 2022 Samuel Juhel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
import inspect
import math
import warnings
from abc import ABC, abstractmethod
from functools import partial
from typing import Callable, List, Optional, Tuple, Union
import numpy as np
import numpy.typing as npt
import pandas as pd
from pandas.api.types import is_numeric_dtype
from boario import logger
from boario.utils.recovery_functions import (
concave_recovery,
convexe_recovery,
convexe_recovery_scaled,
linear_recovery,
)
__all__ = [
"Event",
"EventKapitalDestroyed",
"EventArbitraryProd",
"EventKapitalRecover",
"EventKapitalRebuild",
"Impact",
"IndustriesList",
"SectorsList",
"RegionsList",
]
VectorImpact = Union[list, dict, np.ndarray, pd.DataFrame, pd.Series]
ScalarImpact = Union[int, float, np.integer]
Impact = Union[VectorImpact, ScalarImpact]
IndustriesList = Union[List[Tuple[str, str]], pd.MultiIndex]
SectorsList = Union[List[str], pd.Index, str]
RegionsList = Union[List[str], pd.Index, str]
FinalCatList = Union[List[str], pd.Index, str]
REBUILDING_FINALDEMAND_CAT_REGEX = (
r"(?i)(?=.*household)(?=.*final)(?!.*NPISH|profit).*|HFCE"
)
LOW_DEMAND_THRESH = 10
[docs]
class Event(ABC):
r"""An Event object stores all information about a unique shock during simulation
such as time of occurrence, duration, type of shock, amount of damages.
Computation of recovery or initially requested rebuilding demand is also
done in this class.
.. warning::
The Event class is abstract and cannot be instantiated directly. Only its non-abstract subclasses can be instantiated.
.. note::
Events should be constructed using :py:meth:`~Event.from_series()`, :py:meth:`~Event.from_dataframe()`, :py:meth:`~Event.from_scalar_industries()` or from :py:meth:`~Event.from_scalar_regions_sectors()`.
Depending on the type of event chosen, these constructors require additional keyword arguments, that are documented for each instantiable Event subclass.
For instance, :py:class:`EventKapitalRebuild` additionally requires `rebuild_tau` and `rebuilding_sectors`.
.. seealso::
Tutorial :ref:`boario-events`
"""
possible_sectors: pd.Index = pd.Index([])
r"""List of sectors present in the MRIOT used by the model"""
possible_regions: pd.Index = pd.Index([])
r"""List of regions present in the MRIOT used by the model"""
possible_final_demand_cat: pd.Index = pd.Index([])
r"""List of final demand categories present in the MRIOT used by the model"""
temporal_unit_range: int = 0
r"""Maximum temporal unit simulated"""
z_shape: tuple[int, int] = (0, 0)
r"""Shape of the Z (intermediate consumption) matrix in the model"""
y_shape: tuple[int, int] = (0, 0)
r"""Shape of the Y (final demand) matrix in the model"""
x_shape: tuple[int, int] = (0, 0)
r"""Shape of the x (production) vector in the model"""
regions_idx: npt.NDArray = np.array([])
r"""lexicographic region indexes"""
sectors_idx: npt.NDArray = np.array([])
r"""lexicographic sector indexes"""
model_monetary_factor: int = 1
r"""Amount of unitary currency used in the MRIOT (e.g. 1000000 if in € millions)"""
gva_df: pd.Series = pd.Series([], dtype="float64")
r"""GVA per (region,sector)"""
sectors_gva_shares: npt.NDArray = np.array([])
r"""Fraction of total (regional) GVA for each sectors"""
Z_distrib: npt.NDArray = np.array([])
r"""Normalized intermediate consumption matrix"""
Y_distrib: npt.NDArray = np.array([])
r"""Normalized final consumption matrix"""
mrio_name: str = ""
r"""MRIOT identification"""
@abstractmethod
def __init__(
self,
*,
impact: pd.Series,
name: str | None = "Unnamed",
occurrence: int = 1,
duration: int = 1,
) -> None:
logger.info("Initializing new Event")
logger.debug("Checking required Class attributes are defined")
if np.size(self.possible_regions) == 0 or np.size(self.possible_sectors) == 0:
raise AttributeError(
"It appears that no model has been instantiated as some class attributes are not initialized (possible_regions, possible_sectors). Events require to instantiate a model and a simulation context before they can be instantiated"
)
if self.temporal_unit_range == 0:
raise AttributeError(
"It appears that no simulation context has been instantiated as some class attributes are not initialized (temporal_unit_range). Events require to instantiate a model and a simulation context before they can be instantiated"
)
self.name: str = name if name else "unnamed"
r"""An identifying name for the event (for convenience mostly)"""
self.occurrence = occurrence
self.duration = duration
self.impact_df = impact
self.happened: bool = False
r"""States if the event happened"""
self.over: bool = False
r"""States if the event is over"""
self.event_dict: dict = {
"name": str(self.name),
"occurrence": self.occurrence,
"duration": self.duration,
"aff_regions": list(self.aff_regions),
"aff_sectors": list(self.aff_sectors),
"impact": self.total_impact,
"impact_industries_distrib": list(self.impact_industries_distrib),
"impact_regional_distrib": list(self.impact_regional_distrib),
"globals_vars": {
"possible_sectors": list(self.possible_sectors),
"possible_regions": list(self.possible_regions),
"temporal_unit_range": self.temporal_unit_range,
"z_shape": self.z_shape,
"y_shape": self.y_shape,
"x_shape": self.x_shape,
"model_monetary_factor": self.model_monetary_factor,
"mrio_used": self.mrio_name,
},
}
r"""Store relevant information about the event"""
@classmethod
@abstractmethod
def _instantiate(
cls,
impact: pd.Series,
*,
occurrence: int = 1,
duration: int = 1,
name: Optional[str] = None,
**_,
):
return cls(impact=impact, occurrence=occurrence, duration=duration, name=name)
[docs]
@classmethod
def from_series(
cls,
impact: pd.Series,
*,
occurrence: int = 1,
duration: int = 1,
name: Optional[str] = None,
**kwargs,
) -> Event:
"""Create an event for an impact given as a pd.Series.
Parameters
----------
impact : pd.Series
A pd.Series defining the impact per (region, sector)
occurrence : int
The ordinal of occurrence of the event (requires to be > 0). Defaults to 1.
duration : int
The duration of the event (entire impact applied during this number of steps). Defaults to 1.
name : Optional[str]
A possible name for the event, for convenience. Defaults to None.
**kwargs :
Keyword arguments keyword arguments to pass to the instantiating method
(depends on the type of event).
Returns
-------
Event
An Event object or one of its subclass
Raises
------
ValueError
Raised if impact is empty of contains negative values.
Examples
--------
>>> import pandas as pd
>>> import pymrio as pym
>>> from boario.simulation import Simulation
>>> from boario.extended_model import ARIOPsiModel
>>> from boario.event import EventKapitalRecover
>>>
>>> mriot = pym.load_test()
>>> mriot.calc_all()
>>>
>>> impact_series = pd.Series({('reg1', 'electricity'): 100000.0, ('reg1', 'mining'): 50000.0})
>>> model = ARIOPsiModel(mriot)
>>> sim = Simulation(model)
>>> event = EventKapitalRecover.from_series(impact_series, occurrence=5, duration=10, recovery_time=30, name="Event 1")
>>> sim.add_event(event)
"""
if impact.size == 0:
raise ValueError(
"Empty impact Series at init, did you not set the impact correctly ?"
)
impact = impact[impact != 0]
if np.less_equal(impact, 0).any():
logger.debug(f"Impact has negative values:\n{impact}\n{impact[impact<0]}")
raise ValueError("Impact has negative values")
return cls._instantiate(
impact=impact,
occurrence=occurrence,
duration=duration,
name=name,
**kwargs,
)
[docs]
@classmethod
def from_dataframe(
cls,
impact: pd.DataFrame,
*,
occurrence: int = 1,
duration: int = 1,
name: Optional[str] = None,
**kwargs,
) -> Event:
"""Convenience function for DataFrames. See :meth:`~boario.event.Event.from_series`. This constructor only apply ``.squeeze()`` to the given DataFrame.
Parameters
----------
impact : pd.DataFrame
A pd.DataFrame defining the impact per (region, sector)
occurrence : int
The ordinal of occurrence of the event (requires to be > 0). Defaults to 1.
duration : int
The duration of the event (entire impact applied during this number of steps). Defaults to 1.
name : Optional[str]
A possible name for the event, for convenience. Defaults to None.
**kwargs :
Keyword arguments
Other keyword arguments to pass to the instantiate method (depends on the type of event)
Raises
------
ValueError
If impact cannot be squeezed to a Series
Returns
-------
Event
An Event object (or one of its subclass).
"""
impact = impact.squeeze()
if not isinstance(impact, pd.Series):
raise ValueError("Could not squeeze impact dataframe to a serie.")
return cls.from_series(
impact=impact,
occurrence=occurrence,
duration=duration,
name=name,
**kwargs,
)
[docs]
@classmethod
def distribute_impact_by_gva(cls, impact_vec: pd.Series) -> pd.Series:
"""Distribute a vector of impact by the GVA of affected industries.
Each values of the given impact are mutliplied by the share of the GVA
the industry has over the GVA of all affected industries.
Parameters
----------
impact_vec : pd.Series
The impact values to be reweigthed. Current use-case assumes all values are the total impact.
Returns
-------
pd.Series
The impact where each value was multiplied by the share of GVA of each affected industry (over total GVA affected).
"""
gva = cls.gva_df.loc[impact_vec.index]
gva = gva.transform(lambda x: x / sum(x))
return impact_vec * gva
[docs]
@classmethod
def distribute_impact_equally(cls, impact_vec: pd.Series) -> pd.Series:
"""Distribute an impact equally between all affected regions.
Assume impact is given as a vector with all value being the
total impact to distribute.
Parameters
----------
impact_vec : pd.Series
The impact to distribute.
Returns
-------
pd.Series
The impact vector equally distributed among affected industries.
"""
dfg = impact_vec.groupby("region")
return dfg.transform(lambda x: x / (dfg.ngroups * x.count()))
[docs]
@classmethod
def from_scalar_industries(
cls,
impact: ScalarImpact,
*,
industries: IndustriesList,
impact_industries_distrib: Optional[npt.ArrayLike] = None,
gva_distrib: Optional[bool] = False,
occurrence: Optional[int] = 1,
duration: Optional[int] = 1,
name: Optional[str] = None,
**kwargs,
) -> Event:
"""Creates an Event from a scalar and a list of industries affected.
The scalar impact is distributed evenly by default. Otherwise it can
be distributed proportionnaly to the GVA of affected industries, or to
a custom distribution.
Parameters
----------
impact : ScalarImpact
The scalar impact.
industries : IndustriesList
The list of industries affected by the impact.
impact_industries_distrib : Optional[npt.ArrayLike]
A vector of equal size to the list of industries, stating the
share of the impact each industry should receive. Defaults to None.
gva_distrib : Optional[bool]
A boolean stating if the impact should be distributed proportionnaly to GVA. Defaults to False.
occurrence : Optional[int]
The ordinal of occurrence of the event (requires to be > 0). Defaults to 1.
duration : Optional[int]
The duration of the event (entire impact applied during this number of steps). Defaults to 1.
name : Optional[str]
A possible name for the event, for convenience. Defaults to None.
**kwargs :
Keyword arguments
Other keyword arguments to pass to the instantiate method (depends on the type of event)
Raises
------
ValueError
Raised if Impact is null, if len(industries) < 1 or if the sum of impact_industries_distrib differs from 1.0.
Returns
-------
Event
An Event object or one of its subclass.
"""
if impact <= 0:
raise ValueError("Impact is null")
if len(industries) < 1:
raise ValueError("Null sized affected industries ?")
if isinstance(industries, list):
industries = pd.MultiIndex.from_tuples(
industries, names=["region", "sector"]
)
impact_vec = pd.Series(impact, dtype="float64", index=industries)
if impact_industries_distrib:
if np.sum(impact_industries_distrib) != 1.0:
raise ValueError("Impact distribution doesn't sum up to 1.0")
else:
impact_vec *= impact_industries_distrib # type: ignore
elif gva_distrib:
impact_vec = cls.distribute_impact_by_gva(impact_vec)
else:
impact_vec = cls.distribute_impact_equally(impact_vec)
return cls.from_series(
impact=impact_vec,
occurrence=occurrence,
duration=duration,
name=name,
**kwargs,
)
[docs]
@classmethod
def from_scalar_regions_sectors(
cls,
impact: ScalarImpact,
*,
regions: RegionsList,
sectors: SectorsList,
impact_regional_distrib: Optional[npt.ArrayLike] = None,
impact_sectoral_distrib: Optional[Union[str, npt.ArrayLike]] = None,
occurrence: int = 1,
duration: int = 1,
name: Optional[str] = None,
**kwargs,
) -> Event:
"""Creates an Event from a scalar, a list of regions and a list of sectors affected.
Parameters
----------
impact : ScalarImpact
The scalar impact.
regions : RegionsList
The list of regions affected.
sectors : SectorsList
The list of sectors affected in each region.
impact_regional_distrib : Optional[npt.ArrayLike], optional
A vector of equal size to the list of regions affected, stating the
share of the impact each industry should receive. Defaults to None.
impact_sectoral_distrib : Optional[Union[str, npt.ArrayLike]], optional
Either:
* ``\"gdp\"``, the impact is then distributed using the gross value added of each sector as a weight.
* A vector of equal size to the list of sectors affected, stating the share of the impact each industry should receive. Defaults to None.
occurrence : int, optional
The ordinal of occurrence of the event (requires to be > 0). Defaults to 1.
duration : int, optional
The duration of the event (entire impact applied during this number of steps). Defaults to 1.
name : Optional[str], optional
A possible name for the event, for convenience. Defaults to None.
**kwargs :
Keyword arguments
Other keyword arguments to pass to the instantiate method (depends on the type of event)
Raises
------
ValueError
Raise if Impact is null, if len(regions) or len(sectors) < 1,
Returns
-------
Event
An Event object or one of its subclass.
"""
if not isinstance(impact, (int, float)):
raise ValueError("Impact is not scalar.")
if impact <= 0:
raise ValueError("Impact is null.")
if isinstance(regions, str):
regions = [regions]
if isinstance(sectors, str):
sectors = [sectors]
_regions = pd.Index(regions, name="region")
_sectors = pd.Index(sectors, name="sector")
if len(_regions) < 1:
raise ValueError("Null sized affected regions ?")
if len(_sectors) < 1:
raise ValueError("Null sized affected sectors ?")
if _sectors.duplicated().any():
warnings.warn(
UserWarning(
"Multiple presence of the same sector in affected sectors. (Will remove duplicate)"
)
)
_sectors = _sectors.drop_duplicates()
if _regions.duplicated().any():
warnings.warn(
UserWarning(
"Multiple presence of the same region in affected region. (Will remove duplicate)"
)
)
_regions = _regions.drop_duplicates()
industries = pd.MultiIndex.from_product(
[_regions, _sectors], names=["region", "sector"]
)
impact_vec = pd.Series(impact, dtype="float64", index=industries)
assert isinstance(_regions, pd.Index)
if impact_regional_distrib is None:
regional_distrib = pd.Series(1.0 / len(_regions), index=_regions)
elif not isinstance(impact_regional_distrib, pd.Series):
impact_regional_distrib = pd.Series(impact_regional_distrib, index=_regions)
regional_distrib = pd.Series(0.0, index=_regions)
regional_distrib.loc[impact_regional_distrib.index] = (
impact_regional_distrib
)
else:
regional_distrib = pd.Series(0.0, index=_regions)
try:
regional_distrib.loc[impact_regional_distrib.index] = (
impact_regional_distrib
)
except KeyError:
regional_distrib.loc[_regions] = impact_regional_distrib.values
assert isinstance(_sectors, pd.Index)
if impact_sectoral_distrib is None:
sectoral_distrib = pd.Series(1.0 / len(_sectors), index=_sectors)
elif (
isinstance(impact_sectoral_distrib, str)
and impact_sectoral_distrib == "gdp"
):
gva = cls.gva_df.loc[(_regions, _sectors)]
sectoral_distrib = gva.groupby("region").transform(lambda x: x / sum(x))
elif not isinstance(impact_sectoral_distrib, pd.Series):
impact_sectoral_distrib = pd.Series(impact_sectoral_distrib, index=_sectors)
sectoral_distrib = pd.Series(0.0, index=_sectors)
sectoral_distrib.loc[impact_sectoral_distrib.index] = (
impact_sectoral_distrib
)
else:
sectoral_distrib = pd.Series(0.0, index=_sectors)
try:
sectoral_distrib.loc[impact_sectoral_distrib.index] = (
impact_sectoral_distrib
)
except KeyError:
sectoral_distrib.loc[_sectors] = impact_sectoral_distrib.values
logger.debug(f"{sectoral_distrib}")
logger.debug(f"{regional_distrib}")
if isinstance(sectoral_distrib.index, pd.MultiIndex):
industries_distrib = sectoral_distrib * regional_distrib
else:
industries_distrib = pd.Series(
np.outer(regional_distrib.values, sectoral_distrib.values).flatten(), # type: ignore
index=pd.MultiIndex.from_product(
[regional_distrib.index, sectoral_distrib.index]
),
)
impact_vec *= industries_distrib
return cls.from_series(
impact=impact_vec,
occurrence=occurrence,
duration=duration,
name=name,
**kwargs,
)
@property
def impact_df(self) -> pd.Series:
r"""A pandas Series with all possible industries as index, holding the impact vector of the event. The impact is defined for each sectors in each region."""
return self._impact_df
@impact_df.setter
def impact_df(self, value: pd.Series):
self._impact_df = pd.Series(
0,
dtype="float64",
index=pd.MultiIndex.from_product(
[self.possible_regions, self.possible_sectors],
names=["region", "sector"],
),
)
self._impact_df[value.index] = value
logger.debug("Sorting impact Series")
self._impact_df.sort_index(inplace=True)
self.aff_industries = self.impact_df.loc[self.impact_df > 0].index # type: ignore
self.impact_industries_distrib = self.impact_df.transform(lambda x: x / sum(x))
self.total_impact = self.impact_df.sum()
self.impact_vector = self.impact_df.values
@property
def aff_industries(self) -> pd.MultiIndex:
r"""The industries affected by the event."""
return self._aff_industries
@aff_industries.setter
def aff_industries(self, index: pd.MultiIndex):
if not isinstance(index, pd.MultiIndex):
raise ValueError(
"Given impact vector does not have a (region,sector) MultiIndex"
)
if index.names != ["region", "sector"]:
raise ValueError("MultiIndex must have levels named 'region' and 'sector'")
regions = index.get_level_values("region").unique()
sectors = index.get_level_values("sector").unique()
if not set(regions).issubset(self.possible_regions):
raise ValueError(
f"Regions {set(regions) - set(self.possible_regions)} are not in the possible regions list"
)
if not set(sectors).issubset(self.possible_sectors):
raise ValueError(
f"Sectors {set(sectors) - set(self.possible_sectors)} are not in the possible sectors list"
)
self.aff_regions = regions
self.aff_sectors = sectors
logger.debug(
f"Setting _aff_industries. There are {np.size(index)} affected industries"
)
self._aff_industries = index
self._aff_industries_idx = np.array(
[
np.size(self.possible_sectors) * ri + si
for ri in self._aff_regions_idx
for si in self._aff_sectors_idx
]
)
@property
def occurrence(self) -> int:
r"""The temporal unit of occurrence of the event."""
return self._occur
@occurrence.setter
def occurrence(self, value: int):
if not 0 < value <= self.temporal_unit_range:
raise ValueError(
"Occurrence of event is not in the range of simulation steps (cannot be 0) : {} not in ]0-{}]".format(
value, self.temporal_unit_range
)
)
else:
logger.debug(f"Setting occurence to {value}")
self._occur = value
@property
def duration(self) -> int:
r"""The duration of the event."""
return self._duration
@duration.setter
def duration(self, value: int):
if not 0 <= self.occurrence + value <= self.temporal_unit_range:
raise ValueError(
"Occurrence + duration of event is not in the range of simulation steps : {} + {} not in [0-{}]".format(
self.occurrence, value, self.temporal_unit_range
)
)
else:
logger.debug(f"Setting duration to {value}")
self._duration = value
@property
def aff_regions(self) -> pd.Index:
r"""The array of regions affected by the event"""
return self._aff_regions
@property
def aff_regions_idx(self) -> npt.NDArray:
r"""The array of lexicographically ordered affected region indexes"""
return self._aff_regions_idx
@aff_regions.setter
def aff_regions(self, value: npt.ArrayLike | str):
if isinstance(value, str):
value = [value]
value = pd.Index(value) # type: ignore
impossible_regions = np.setdiff1d(value, self.possible_regions)
if impossible_regions.size > 0:
raise ValueError(
"These regions are not in the model : {}".format(impossible_regions)
)
else:
self._aff_regions = pd.Index(value, name="region")
logger.debug(f"Setting _aff_regions to {self._aff_regions}")
self._aff_regions_idx = np.searchsorted(self.possible_regions, value)
@property
def aff_sectors(self) -> pd.Index:
r"""The array of affected sectors by the event"""
return self._aff_sectors
@property
def aff_sectors_idx(self) -> npt.NDArray:
r"""The array of lexicographically ordered affected sectors indexes"""
return self._aff_sectors_idx
@aff_sectors.setter
def aff_sectors(self, value: npt.ArrayLike | str):
if isinstance(value, str):
value = [value]
value = pd.Index(value, name="sector") # type: ignore
impossible_sectors = np.setdiff1d(value, self.possible_sectors)
if impossible_sectors.size > 0:
raise ValueError(
"These sectors are not in the model : {}".format(impossible_sectors)
)
else:
self._aff_sectors = pd.Index(value, name="sector")
logger.debug(f"Setting _aff_sectors to {self._aff_sectors}")
self._aff_sectors_idx = np.searchsorted(self.possible_sectors, value)
@property
def impact_regional_distrib(self) -> pd.Series:
r"""The series specifying how damages are distributed among affected regions"""
return self._impact_regional_distrib
@property
def impact_industries_distrib(self) -> pd.Series:
r"""The series specifying how damages are distributed among affected industries (regions,sectors)"""
return self._impact_industries_distrib
@impact_industries_distrib.setter
def impact_industries_distrib(self, value: pd.Series):
self._impact_industries_distrib = value
self._impact_regional_distrib = self._impact_industries_distrib.groupby(
"region",
observed=False,
).sum()
def __repr__(self):
# TODO: find ways to represent long lists
return f"""[WIP]
Event(
name = {self.name},
occur = {self.occurrence},
duration = {self.duration}
aff_regions = {self.aff_regions.to_list()},
aff_sectors = {self.aff_sectors.to_list()},
)
"""
[docs]
class EventArbitraryProd(Event):
r"""An EventArbitraryProd object holds an event with arbitrary impact on production capacity.
Such events can be used to represent temporary loss of production capacity in a completely exogenous way (e.g., loss of working hours from a heatwave).
.. warning::
This type of event suffers from a problem with the recovery and does not function properly at the moment.
.. note::
For this type of event, the impact value represent the share of production capacity lost of an industry.
.. note::
In addition to the base arguments of an Event, EventArbitraryProd requires a ``recovery_time`` (1 step by default) and a ``recovery_function`` (linear by default).
.. seealso::
Tutorial :ref:`boario-events`
"""
def __init__(
self,
*,
impact: pd.Series,
recovery_time: int = 1,
recovery_function: str = "linear",
name: str | None = "Unnamed",
occurrence: int = 1,
duration: int = 1,
) -> None:
raise NotImplementedError(
"This type of Event suffers from a major bug and cannot be used at the moment."
)
if (impact > 1.0).any():
raise ValueError(
"Impact is greater than 100% (1.) for at least an industry."
)
super().__init__(
impact=impact,
name=name,
occurrence=occurrence,
duration=duration,
)
self._prod_cap_delta_arbitrary_0 = (
self.impact_vector.copy()
) # np.zeros(shape=len(self.possible_sectors))
self.prod_cap_delta_arbitrary = (
self.impact_vector.copy()
) # type: ignore # np.zeros(shape=len(self.possible_sectors))
self.recovery_time = recovery_time
r"""The characteristic recovery duration after the event is over"""
self.recovery_function = recovery_function
logger.info("Initialized")
@classmethod
def _instantiate(
cls,
impact: pd.Series,
*,
recovery_time: int = 1,
recovery_function: str = "linear",
occurrence: int = 1,
duration: int = 1,
name: Optional[str] = None,
**_,
):
return cls(
impact=impact,
occurrence=occurrence,
duration=duration,
name=name,
recovery_time=recovery_time,
recovery_function=recovery_function,
)
@property
def prod_cap_delta_arbitrary(self) -> npt.NDArray:
r"""The optional array storing arbitrary (as in not related to productive_capital destroyed) production capacity loss"""
return self._prod_cap_delta_arbitrary
@prod_cap_delta_arbitrary.setter
def prod_cap_delta_arbitrary(self, value: npt.NDArray):
self._prod_cap_delta_arbitrary = value
@property
def recoverable(self) -> bool:
r"""A boolean stating if an event can start recover"""
return self._recoverable
@recoverable.setter
def recoverable(self, current_temporal_unit: int):
reb = (self.occurrence + self.duration) <= current_temporal_unit
if reb and not self.recoverable:
logger.info(
"Temporal_Unit : {} ~ Event named {} that occured at {} in {} has started recovering (arbitrary production capacity loss)".format(
current_temporal_unit,
self.name,
self.occurrence,
self.aff_regions.to_list(),
)
)
self._recoverable = reb
@property
def recovery_function(self) -> Callable:
r"""The recovery function used for recovery (`Callable`)"""
return self._recovery_fun
@recovery_function.setter
def recovery_function(self, r_fun: str | Callable | None):
if r_fun is None:
r_fun = "instant"
if self.recovery_time is None:
raise AttributeError(
"Impossible to set recovery function if no recovery time is given."
)
if isinstance(r_fun, str):
if r_fun == "linear":
fun = linear_recovery
elif r_fun == "convexe":
fun = convexe_recovery_scaled
elif r_fun == "convexe noscale":
fun = convexe_recovery
elif r_fun == "concave":
fun = concave_recovery
else:
raise NotImplementedError(
"No implemented recovery function corresponding to {}".format(r_fun)
)
elif callable(r_fun):
r_fun_argsspec = inspect.getfullargspec(r_fun)
r_fun_args = r_fun_argsspec.args + r_fun_argsspec.kwonlyargs
if not all(
args in r_fun_args
for args in [
"init_impact_stock",
"elapsed_temporal_unit",
"recovery_time",
]
):
raise ValueError(
"Recovery function has to have at least the following keyword arguments: {}".format(
[
"init_impact_stock",
"elapsed_temporal_unit",
"recovery_time",
]
)
)
fun = r_fun
else:
raise ValueError("Given recovery function is not a str or callable")
r_fun_partial = partial(
fun,
init_impact_stock=self._prod_cap_delta_arbitrary_0,
recovery_time=self.recovery_time,
)
self._recovery_fun = r_fun_partial
[docs]
def recovery(self, current_temporal_unit: int):
"""Apply the recovery function to the capital destroyed for the current temporal unit.
Parameters
----------
current_temporal_unit : int
The current temporal unit
Raises
------
RuntimeError
Raised if no recovery function has been set.
"""
elapsed = current_temporal_unit - (self.occurrence + self.duration)
if elapsed < 0:
raise RuntimeError("Trying to recover before event is over")
if self.recovery_function is None:
raise RuntimeError(
"Trying to recover event while recovery function isn't set yet"
)
res = self.recovery_function(elapsed_temporal_unit=elapsed)
precision = int(math.log10(self.model_monetary_factor)) + 1
res = np.around(res, precision)
if not np.any(res):
self.over = True
self._prod_cap_delta_arbitrary = res
[docs]
class EventKapitalDestroyed(Event, ABC):
r"""EventKapitalDestroyed is an abstract class to hold events with where some capital (from industries or households) is destroyed. See :py:class:`EventKapitalRecover` and :py:class:`EventKapitalRebuild` for its instantiable classes.
.. note::
For this type of event, the impact value represent the amount of capital destroyed in monetary terms.
.. note::
We distinguish between impacts on household and industrial (productive) capital. We assume destruction of the former not to reduce production capacity contrary to the latter (but possibly induce reconstruction demand). Impacts on household capital is null by default, but can be set via the ``households_impacts`` argument in the constructor. The amount of production capacity lost is computed as the share of capital lost over total capital of the industry.
.. note::
The user can specify a monetary factor via the ``event_monetary_factor`` argument for the event if it differs from the monetary factor of the MRIOT used. By default the constructor assumes the two factors to be the same (i.e., if the MRIOT is in €M, the so is the impact).
.. seealso::
Tutorial :ref:`boario-events`
"""
def __init__(
self,
*,
impact: pd.Series,
households_impact: Optional[Impact] = None,
name: str | None = "Unnamed",
occurrence: int = 1,
duration: int = 1,
event_monetary_factor: Optional[int] = None,
) -> None:
if event_monetary_factor is None:
logger.info(
f"No event monetary factor given. Assuming it is the same as the model ({self.model_monetary_factor})"
)
self.event_monetary_factor = self.model_monetary_factor
r"""The monetary factor for the impact of the event (e.g. 10**6, 10**3, ...)"""
else:
self.event_monetary_factor = event_monetary_factor
if self.event_monetary_factor != self.model_monetary_factor:
logger.warning(
f"Event monetary factor ({self.event_monetary_factor}) differs from model monetary factor ({self.model_monetary_factor}). Be careful to define your impact with the correct unit (ie in event monetary factor)."
)
if (impact < LOW_DEMAND_THRESH / self.event_monetary_factor).all():
raise ValueError(
"Impact is too small to be considered by the model. Check you units perhaps ?"
)
super().__init__(
impact=impact,
name=name,
occurrence=occurrence,
duration=duration,
)
# The only thing we have to do is affecting/computing the regional_sectoral_productive_capital_destroyed
self.impact_df = self.impact_df * (
self.event_monetary_factor / self.model_monetary_factor
)
self.total_productive_capital_destroyed = self.total_impact
logger.info(
f"Total impact on productive capital is {self.total_productive_capital_destroyed} (in model unit)"
)
self.remaining_productive_capital_destroyed = (
self.total_productive_capital_destroyed
)
self._regional_sectoral_productive_capital_destroyed_0 = (
self.impact_vector.copy()
)
self.regional_sectoral_productive_capital_destroyed = self.impact_vector.copy() # type: ignore
self.households_impact_df: pd.Series = pd.Series(
0,
dtype="float64",
index=pd.MultiIndex.from_product(
[self.possible_regions, self.possible_final_demand_cat],
names=["region", "final_demand_cat"],
),
)
r"""A pandas Series with all possible (regions, final_demand_cat) as index, holding the households impacts vector of the event. The impact is defined for each region and each final demand category."""
if households_impact:
try:
rebuilding_demand_idx = self.possible_final_demand_cat[
self.possible_final_demand_cat.str.match(
REBUILDING_FINALDEMAND_CAT_REGEX
)
] # .values[0]
if len(rebuilding_demand_idx) > 1:
raise ValueError(
f"The rebuilding demand index ({rebuilding_demand_idx}) contains multiple values which is a problem. Contact the dev to update the matching regexp."
)
except IndexError:
logger.warning(
f"No final demand category matched common rebuilding final demand category, hence we will put it in the first available ({self.possible_final_demand_cat[0]})."
)
rebuilding_demand_idx = self.possible_final_demand_cat[0]
if isinstance(households_impact, pd.Series):
logger.debug("Given household impact is a pandas Series")
self.households_impact_df.loc[
households_impact.index, rebuilding_demand_idx
] = households_impact # type: ignore
elif isinstance(households_impact, dict):
logger.debug("Given household impact is a dict")
households_impact = pd.Series(households_impact, dtype="float64")
self.households_impact_df.loc[
households_impact.index, rebuilding_demand_idx
] = households_impact # type: ignore
elif isinstance(households_impact, pd.DataFrame):
logger.debug("Given household impact is a dataframe")
households_impact = households_impact.squeeze()
if not isinstance(households_impact, pd.Series):
raise ValueError(
"The given households_impact DataFrame is not a Series after being squeezed"
)
self.households_impact_df.loc[
households_impact.index, rebuilding_demand_idx
] = households_impact # type: ignore
elif isinstance(households_impact, (list, np.ndarray)):
if np.size(households_impact) != np.size(self.aff_regions):
raise ValueError(
f"Length mismatch between households_impact ({np.size(households_impact)} and affected regions ({np.size(self.aff_regions)}))"
)
else:
self.households_impact_df.loc[
self.aff_regions, rebuilding_demand_idx
] = households_impact # type: ignore
elif isinstance(households_impact, (float, int)):
if self.impact_regional_distrib is not None:
logger.warning(
f"households impact ({households_impact}) given as scalar, distributing among region following `impact_regional_distrib` ({self.impact_regional_distrib}) to {self.aff_regions}"
)
logger.debug(f"{rebuilding_demand_idx}")
self.households_impact_df.loc[:, rebuilding_demand_idx] = (
households_impact * self.impact_regional_distrib
).to_numpy() # type: ignore
self.households_impact_df *= (
self.event_monetary_factor / self.model_monetary_factor
)
logger.info(
f"Total impact on households is {self.households_impact_df.sum()} (in model unit)"
)
@property
def regional_sectoral_productive_capital_destroyed(self) -> npt.NDArray:
r"""The (optional) array of productive_capital destroyed per industry (ie region x sector)"""
return self._regional_sectoral_productive_capital_destroyed
@regional_sectoral_productive_capital_destroyed.setter
def regional_sectoral_productive_capital_destroyed(self, value: npt.ArrayLike):
value = np.array(value)
logger.debug(
f"Changing regional_sectoral_productive_capital_destroyed with {value}\n Sum is {value.sum()}"
)
if value.shape != self.x_shape:
raise ValueError(
"Incorrect shape give for regional_sectoral_productive_capital_destroyed: {} given and {} expected".format(
value.shape, self.x_shape
)
)
self._regional_sectoral_productive_capital_destroyed = value
[docs]
class EventKapitalRebuild(EventKapitalDestroyed):
r"""EventKapitalRebuild holds a :py:class:`EventKapitalDestroyed` event where the destroyed capital requires to be rebuilt, and creates a reconstruction demand.
This subclass requires and enables new arguments to pass to the constructor:
* A characteristic time for reconstruction (``tau_rebuild``)
* A set of sectors responsible for the reconstruction (``rebuilding_sectors``)
* A ``rebuilding_factor`` in order to modulate the reconstruction demand. By default, this factor is 1, meaning that the entire impact value is translated as an additional demand.
.. note::
The ``tau_rebuild`` of an event takes precedence over the one defined for a model.
.. seealso::
Tutorial :ref:`boario-events`
"""
def __init__(
self,
*,
impact: pd.Series,
households_impact: Impact | None = None,
name: str | None = "Unnamed",
occurrence: int = 1,
duration: int = 1,
event_monetary_factor: Optional[int] = None,
rebuild_tau: int,
rebuilding_sectors: dict[str, float] | pd.Series,
rebuilding_factor: float = 1.0,
) -> None:
super().__init__(
impact=impact,
households_impact=households_impact,
name=name,
occurrence=occurrence,
duration=duration,
event_monetary_factor=event_monetary_factor,
)
self._rebuildable = False
self.rebuild_tau = rebuild_tau
self.rebuilding_sectors = rebuilding_sectors
self.rebuilding_factor = rebuilding_factor
industrial_rebuilding_demand = np.zeros(shape=self.z_shape)
tmp = np.zeros(self.z_shape, dtype="float")
mask = np.ix_(
np.union1d(
self._rebuilding_industries_RoW_idx, self._rebuilding_industries_idx
),
self._aff_industries_idx,
)
industrial_rebuilding_demand = np.outer(
self.rebuilding_sectors_shares,
self.regional_sectoral_productive_capital_destroyed,
)
industrial_rebuilding_demand = industrial_rebuilding_demand * rebuilding_factor
tmp[mask] = self.Z_distrib[mask] * industrial_rebuilding_demand[mask]
precision = int(math.log10(self.model_monetary_factor)) + 1
np.around(tmp, precision, tmp)
self.rebuilding_demand_indus = tmp
households_rebuilding_demand = np.zeros(shape=self.y_shape)
tmp = np.zeros(self.y_shape, dtype="float")
mask = np.ix_( # type: ignore
np.union1d( # type: ignore
self._rebuilding_industries_RoW_idx, self._rebuilding_industries_idx
),
list(range(self.y_shape[1])),
)
households_rebuilding_demand = np.outer(
self.rebuilding_sectors_shares, self.households_impact_df.to_numpy()
)
households_rebuilding_demand = households_rebuilding_demand * rebuilding_factor
tmp[mask] = self.Y_distrib[mask] * households_rebuilding_demand[mask]
np.around(tmp, precision, tmp)
self.rebuilding_demand_house = tmp
logger.debug(
f"house rebuilding demand is: {pd.DataFrame(self.rebuilding_demand_house, index=pd.MultiIndex.from_product([self.possible_regions,self.possible_sectors]))}"
)
self.event_dict["rebuilding_sectors"] = {
sec: share
for sec, share in zip(
self.rebuilding_sectors, self.rebuilding_sectors_shares
)
}
@classmethod
def _instantiate(
cls,
impact: pd.Series,
*,
households_impact: Optional[Impact] = None,
occurrence: int = 1,
duration: int = 1,
name: str | None = None,
event_monetary_factor: Optional[int] = None,
rebuild_tau: int,
rebuilding_sectors: dict[str, float] | pd.Series,
rebuilding_factor: float = 1.0,
**_,
):
return cls(
impact=impact,
households_impact=households_impact,
occurrence=occurrence,
duration=duration,
name=name,
event_monetary_factor=event_monetary_factor,
rebuild_tau=rebuild_tau,
rebuilding_sectors=rebuilding_sectors,
rebuilding_factor=rebuilding_factor,
)
@property
def rebuild_tau(self) -> int:
r"""The characteristic time for rebuilding."""
return self._rebuild_tau
@rebuild_tau.setter
def rebuild_tau(self, value: int):
if not isinstance(value, int) or value < 1:
raise ValueError(
f"``rebuild_tau`` should be a strictly positive integer. Value given is {value}."
)
else:
self._rebuild_tau = value
@property
def rebuilding_sectors(self) -> pd.Index:
r"""The (optional) array of rebuilding sectors"""
return self._rebuilding_sectors
@property
def rebuilding_sectors_idx(self) -> npt.NDArray:
r"""The (optional) array of indexes of the rebuilding sectors (in lexicographic order)"""
return self._rebuilding_sectors_idx
@property
def rebuilding_sectors_shares(self) -> npt.NDArray:
r"""The array specifying how rebuilding demand is distributed along the rebuilding sectors"""
return self._rebuilding_sectors_shares
@rebuilding_sectors.setter
def rebuilding_sectors(self, value: dict[str, float] | pd.Series):
if isinstance(value, dict):
reb_sectors = pd.Series(value)
else:
reb_sectors = value
if not is_numeric_dtype(reb_sectors):
raise TypeError(
"Rebuilding sectors should be given as ``dict[str, float] | pd.Series``."
)
if not np.isclose(reb_sectors.sum(), 1.0):
raise ValueError(f"Reconstruction shares among sectors do not sum up to 1.")
impossible_sectors = np.setdiff1d(reb_sectors.index, self.possible_sectors)
if impossible_sectors.size > 0:
raise ValueError(
"These sectors are not in the model : {}".format(impossible_sectors)
)
else:
self._rebuilding_sectors = reb_sectors.index
self._rebuilding_sectors_idx = np.searchsorted(
self.possible_sectors, reb_sectors.index
)
self._rebuilding_sectors_shares = np.zeros(self.x_shape)
self._rebuilding_industries_idx = np.array(
[
np.size(self.possible_sectors) * ri + si
for ri in self._aff_regions_idx
for si in self._rebuilding_sectors_idx
],
dtype="int64",
)
self._rebuilding_industries_RoW_idx = np.array(
[
np.size(self.possible_sectors) * ri + si
for ri in range(np.size(self.possible_regions))
if ri not in self._aff_regions_idx
for si in self._rebuilding_sectors_idx
],
dtype="int64",
)
self._rebuilding_sectors_shares[self._rebuilding_industries_idx] = np.tile(
np.array(reb_sectors.values), np.size(self.aff_regions)
)
if self._rebuilding_industries_RoW_idx.size != 0:
self._rebuilding_sectors_shares[self._rebuilding_industries_RoW_idx] = (
np.tile(
np.array(reb_sectors.values),
(np.size(self.possible_regions) - np.size(self.aff_regions)),
)
)
@property
def rebuilding_demand_house(self) -> npt.NDArray:
r"""The optional array of rebuilding demand from households"""
return self._rebuilding_demand_house
@rebuilding_demand_house.setter
def rebuilding_demand_house(self, value: npt.ArrayLike):
value = np.array(value)
if value.shape != self.y_shape:
raise ValueError(
"Incorrect shape give for rebuilding_demand_house: {} given and {} expected".format(
value.shape, self.y_shape
)
)
value[value < LOW_DEMAND_THRESH / self.model_monetary_factor] = 0.0
self._rebuilding_demand_house = value
@property
def rebuilding_demand_indus(self) -> npt.NDArray:
r"""The optional array of rebuilding demand from industries (to rebuild their productive_capital)"""
return self._rebuilding_demand_indus
@rebuilding_demand_indus.setter
def rebuilding_demand_indus(self, value: npt.ArrayLike):
value = np.array(value)
if value.shape != self.z_shape:
raise ValueError(
"Incorrect shape give for rebuilding_demand_indus: {} given and {} expected".format(
value.shape, self.z_shape
)
)
value[value < LOW_DEMAND_THRESH / self.model_monetary_factor] = 0.0
self._rebuilding_demand_indus = value
# Also update productive_capital destroyed
self.regional_sectoral_productive_capital_destroyed = value.sum(axis=0) * (
1 / self.rebuilding_factor
)
@property
def rebuildable(self) -> bool:
r"""A boolean stating if an event can start rebuild"""
return self._rebuildable
@rebuildable.setter
def rebuildable(self, current_temporal_unit: int):
reb = (self.occurrence + self.duration) <= current_temporal_unit
if reb and not self.rebuildable:
logger.info(
"Temporal_Unit : {} ~ Event named {} that occured at {} in {} for {} damages has started rebuilding".format(
current_temporal_unit,
self.name,
self.occurrence,
self.aff_regions.to_list(),
self.total_productive_capital_destroyed,
)
)
self._rebuildable = reb
[docs]
class EventKapitalRecover(EventKapitalDestroyed):
r"""EventKapitalRecover holds a :py:class:`EventKapitalDestroyed` event where the destroyed capital does not create a reconstruction demand.
This subclass requires and enables new arguments to pass to the constructor:
* A characteristic time for the recovery (``recovery_time``)
* Optionally a ``recovery_function`` (linear by default).
.. seealso::
Tutorial :ref:`boario-events`
"""
def __init__(
self,
*,
impact: pd.Series,
recovery_time: int,
recovery_function: str = "linear",
households_impact: Optional[Impact] = None,
name: str | None = "Unnamed",
occurrence=1,
duration=1,
event_monetary_factor=None,
) -> None:
super().__init__(
impact=impact,
households_impact=households_impact,
name=name,
occurrence=occurrence,
duration=duration,
event_monetary_factor=event_monetary_factor,
)
self.recovery_time = recovery_time
self.recovery_function = recovery_function
self._recoverable = False
@classmethod
def _instantiate(
cls,
impact: pd.Series,
*,
households_impact: Optional[Impact] = None,
occurrence: int = 1,
duration: int = 1,
name: str | None = None,
event_monetary_factor: Optional[int] = None,
recovery_time: int,
recovery_function: str = "linear",
**_,
):
return cls(
impact=impact,
households_impact=households_impact,
occurrence=occurrence,
duration=duration,
name=name,
event_monetary_factor=event_monetary_factor,
recovery_time=recovery_time,
recovery_function=recovery_function,
)
@property
def recoverable(self) -> bool:
r"""A boolean stating if an event can start recover"""
return self._recoverable
@recoverable.setter
def recoverable(self, current_temporal_unit: int):
reb = (self.occurrence + self.duration) <= current_temporal_unit
if reb and not self.recoverable:
logger.info(
"Temporal_Unit : {} ~ Event named {} that occured at {} in {} for {} damages has started recovering (no rebuilding demand)".format(
current_temporal_unit,
self.name,
self.occurrence,
self.aff_regions.to_list(),
self.total_productive_capital_destroyed,
)
)
self._recoverable = reb
@property
def recovery_function(self) -> Callable:
r"""The recovery function used for recovery (`Callable`)"""
return self._recovery_fun
@recovery_function.setter
def recovery_function(self, r_fun: str | Callable | None):
if r_fun is None:
r_fun = "linear"
if self.recovery_time is None:
raise AttributeError(
"Impossible to set recovery function if no recovery time is given."
)
if isinstance(r_fun, str):
if r_fun == "linear":
fun = linear_recovery
elif r_fun == "convexe":
fun = convexe_recovery_scaled
elif r_fun == "convexe noscale":
fun = convexe_recovery
elif r_fun == "concave":
fun = concave_recovery
else:
raise NotImplementedError(
"No implemented recovery function corresponding to {}".format(r_fun)
)
elif callable(r_fun):
r_fun_argsspec = inspect.getfullargspec(r_fun)
r_fun_args = r_fun_argsspec.args + r_fun_argsspec.kwonlyargs
if not all(
args in r_fun_args
for args in [
"init_impact_stock",
"elapsed_temporal_unit",
"recovery_time",
]
):
raise ValueError(
"Recovery function has to have at least the following keyword arguments: {}".format(
[
"init_impact_stock",
"elapsed_temporal_unit",
"recovery_time",
]
)
)
fun = r_fun
else:
raise ValueError("Given recovery function is not a str or callable")
r_fun_partial = partial(
fun,
init_impact_stock=self._regional_sectoral_productive_capital_destroyed_0,
recovery_time=self.recovery_time,
)
self._recovery_fun = r_fun_partial
def recovery(self, current_temporal_unit: int):
elapsed = current_temporal_unit - (self.occurrence + self.duration)
if elapsed < 0:
raise RuntimeError("Trying to recover before event is over")
if self.recovery_function is None:
raise RuntimeError(
"Trying to recover event while recovery function isn't set yet"
)
res = self.recovery_function(elapsed_temporal_unit=elapsed)
precision = int(math.log10(self.model_monetary_factor)) + 1
res = np.around(res, precision)
res[res < 0] = 0.0
if not np.any(res):
self.over = True
self.regional_sectoral_productive_capital_destroyed = res