# BoARIO : The Adaptative Regional Input Output model in python.
# Copyright (C) 2022 Samuel Juhel
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Simulation module
This module defines the Simulation object, which represent a BoARIO simulation environment.
"""
from __future__ import annotations
from functools import cached_property, partial
import json
import logging
import warnings
import math
import pathlib
import tempfile
from pprint import pformat
from typing import Callable, Literal, Optional, Union, overload
import numpy as np
import numpy.typing as npt
import pandas as pd
import progressbar
from boario import DEBUG_TRACE, DEBUGFORMATTER, logger
from boario.event import (
Event,
EventArbitraryProd,
EventKapitalDestroyed,
EventKapitalRebuild,
EventKapitalRecover,
RegionsList,
SectorsList,
)
from boario.extended_models import ARIOPsiModel
from boario.model_base import ARIOBaseModel
from boario.utils.misc import CustomNumpyEncoder, TempMemmap, print_summary, sizeof_fmt
__all__ = ["Simulation"]
[docs]
class Simulation:
"""Defines a simulation object with a set of parameters and an IOSystem.
This class wraps a :class:`~boario.model_base.ARIOBaseModel` or descendant, and create the context for
simulations using this model. It stores execution parameters as well as events perturbing
the model.
"""
__possible_records = [
"production_realised",
"production_capacity",
"final_demand",
"intermediate_demand",
"rebuild_demand",
"overproduction",
"final_demand_unmet",
"rebuild_prod",
"inputs_stocks",
"limiting_inputs",
"productive_capital_to_recover",
]
__file_save_array_specs = {
"production_realised": (
"float64",
"_production_evolution",
"industries",
np.nan,
),
"production_capacity": (
"float64",
"_production_cap_evolution",
"industries",
np.nan,
),
"final_demand": ("float64", "_final_demand_evolution", "industries", np.nan),
"intermediate_demand": (
"float64",
"_io_demand_evolution",
"industries",
np.nan,
),
"rebuild_demand": (
"float64",
"_rebuild_demand_evolution",
"industries",
np.nan,
),
"overproduction": (
"float64",
"_overproduction_evolution",
"industries",
np.nan,
),
"final_demand_unmet": (
"float64",
"_final_demand_unmet_evolution",
"industries",
np.nan,
),
"rebuild_prod": (
"float64",
"_rebuild_production_evolution",
"industries",
np.nan,
),
"inputs_stocks": ("float64", "_inputs_evolution", "stocks", np.nan),
"limiting_inputs": ("byte", "_limiting_inputs_evolution", "stocks", -1),
"productive_capital_to_recover": (
"float64",
"_regional_sectoral_productive_capital_destroyed_evolution",
"industries",
np.nan,
),
}
def __init__(
self,
model: Union[ARIOBaseModel, ARIOPsiModel],
register_stocks: bool = False,
n_temporal_units_to_sim: int = 365,
events_list: Optional[list[Event]] = None,
save_events: bool = False,
save_params: bool = False,
save_index: bool = False,
save_records: list | str = [],
boario_output_dir: str | pathlib.Path = tempfile.mkdtemp(prefix="boario"),
results_dir_name: Optional[str] = None,
show_progress: bool = False,
) -> None:
"""A Simulation instance can be initialized with the following parameters:
Parameters
----------
model : Union[ARIOBaseModel, ARIOPsiModel, ARIOClimadaModel]
The model to run the simulation with.
register_stocks : bool, default False
A boolean stating if stocks evolution should be registered in a file.
Be aware that such arrays have timesteps*sectors*sectors*regions size
which can rapidly lead to very large files.
n_temporal_units_to_sim : int, default 365
The number of temporal units to simulates.
events_list : list[Event], optional
An optional list of events to run the simulation with [WIP].
save_events: bool, default False
If True, saves a json file of the list of events to simulate when starting the model loop.
save_params: bool, default False
If True, saves a json file of the parameters used when starting the model loop.
save_index: bool, default False
If True, saves a json file of the list of industries in the model when starting the model loop (convenience).
save_records: list | str, default []
The list of simulation variable records to save in corresponding files.
boario_output_dir : str | pathlib.Path
An optional directory where to save files generated by the simulation. Defaults to a temporary directory prefixed by "boario".
results_dir_name : str, default 'results'
The name of the folder where simulation results will be stored.
show_progress: bool, default: False
If True, shows a progress bar in the console during the simulation.
"""
self.output_dir = pathlib.Path(boario_output_dir)
"""pathlib.Path, optional: Optional path to the directory where output are stored."""
self.results_storage = (
self.output_dir.resolve()
if not results_dir_name
else self.output_dir.resolve() / results_dir_name
)
"""str: Name of the folder in `output_dir` where the results will be stored if saved."""
if not self.results_storage.exists():
self.results_storage.mkdir(parents=True)
tmp = logging.FileHandler(self.results_storage / "simulation.log")
tmp.setLevel(logging.INFO)
tmp.setFormatter(DEBUGFORMATTER)
logger.addHandler(tmp)
if events_list is None:
events_list = []
logger.info("Initializing new simulation instance")
self._save_events = save_events
self._save_params = save_params
self._save_index = save_index
self._register_stocks = register_stocks
self._show_progress = show_progress
# Pre-init record variables
self._production_evolution = np.array([])
self._production_cap_evolution = np.array([])
self._final_demand_evolution = np.array([])
self._io_demand_evolution = np.array([])
self._rebuild_demand_evolution = np.array([])
self._overproduction_evolution = np.array([])
self._final_demand_unmet_evolution = np.array([])
self._rebuild_production_evolution = np.array([])
self._inputs_evolution = np.array([])
self._limiting_inputs_evolution = np.array([])
self._regional_sectoral_productive_capital_destroyed_evolution = np.array([])
if save_records != [] or save_events or save_params or save_index:
self.output_dir.resolve().mkdir(parents=True, exist_ok=True)
if save_records != []:
if not self.results_storage.exists():
self.results_storage.mkdir()
self.model = model
"""Union[ARIOBaseModel, ARIOPsiModel] : The model to run the simulation with.
See :class:`~boario.model_base.ARIOBaseModel`."""
self._event_tracking: list[EventTracker] = []
self._events_to_rebuild = 0
self.all_events: list[Event] = []
"""list[Event]: A list containing all events associated with the simulation."""
if events_list is not None:
self.add_events(events_list)
self.n_temporal_units_to_sim = n_temporal_units_to_sim
"""int: The total number of `temporal_units` to simulate."""
self.current_temporal_unit = 0
"""int: Tracks the number of `temporal_units` elapsed since simulation start.
This may differs from the number of `steps` if the parameter `n_temporal_units_by_step`
differs from 1 temporal_unit as `current_temporal_unit` is actually `step` * `n_temporal_units_by_step`."""
self.equi = {
(int(0), int(0), "production"): "equi",
(int(0), int(0), "stocks"): "equi",
(int(0), int(0), "rebuilding"): "equi",
}
self.n_temporal_units_simulated = 0
self._n_checks = 0
self._monotony_checker = 0
self.scheme = "proportional"
self.has_crashed = False
# RECORDS FILES
self.records_storage: pathlib.Path = self.results_storage / "records"
"""pathlib.Path: Place where records are stored if stored"""
self._vars_to_record = []
self._files_to_record = []
if save_records != []:
if isinstance(save_records, str):
if save_records == "all":
save_records = self.__possible_records
else:
raise ValueError(
f'save_records argument has to be either "all" or a sublist of {self.__possible_records}'
)
impossible_records = set(save_records).difference(
set(self.__possible_records)
)
if not len(impossible_records) == 0:
raise ValueError(
f"{impossible_records} are not possible records ({self.__possible_records})"
)
logger.info(f"Will save {save_records} records")
logger.info("Records storage is: {self.records_storage}")
self.records_storage.mkdir(parents=True, exist_ok=True)
self._save_index = True
self._save_events = True
self._save_params = True
self._init_records(save_records)
self.params_dict = {
"n_temporal_units_to_sim": self.n_temporal_units_to_sim,
"output_dir": (
str(self.output_dir) if hasattr(self, "output_dir") else "none"
),
"results_storage": (
self.results_storage.stem
if hasattr(self, "results_storage")
else "none"
),
"model_type": self.model.__class__.__name__,
"psi_param": (
self.model.psi if isinstance(self.model, ARIOPsiModel) else None
),
"order_type": self.model.order_type,
"n_temporal_units_by_step": self.model.n_temporal_units_by_step,
"year_to_temporal_unit_factor": self.model.iotable_year_to_temporal_unit_factor,
"inventory_restoration_tau": (
list(np.reciprocal(self.model.restoration_tau))
if isinstance(self.model, ARIOPsiModel)
else None
),
"alpha_base": self.model.overprod_base,
"alpha_max": self.model.overprod_max,
"alpha_tau": self.model.overprod_tau
* self.model.iotable_year_to_temporal_unit_factor,
"rebuild_tau": self.model.rebuild_tau,
}
"""dict: A dictionary saving the parameters the simulation was run with."""
logger.info("Initialized !")
formatted_params_dict = {
key: print_summary(value) if key == "inventory_restoration_tau" else value
for key, value in self.params_dict.items()
}
logger.info(
f"Simulation parameters:\n{pformat(formatted_params_dict, compact=True)}"
)
[docs]
def loop(self):
r"""Launch the simulation loop.
This method launch the simulation for the number of steps to simulate
described by the attribute ``self.n_temporal_units_to_sim``, calling the
:meth:`next_step` method. For convenience, it dumps the
parameters used in the logs just before running the loop. Once the loop
is completed, it flushes the different memmaps generated.
"""
logger.info(
f"Starting model loop for at most {self.n_temporal_units_to_sim // self.model.n_temporal_units_by_step + 1} steps"
)
logger.info(
f"One step is {self.model.n_temporal_units_by_step}/{self.model.iotable_year_to_temporal_unit_factor} of a year"
)
logger.info("Events : {self.all_events}")
run_range = range(
0,
self.n_temporal_units_to_sim,
math.floor(self.model.n_temporal_units_by_step),
)
if self._save_events:
(pathlib.Path(self.results_storage) / "jsons").mkdir(
parents=True, exist_ok=True
)
with (
pathlib.Path(self.results_storage) / "jsons" / "simulated_events.json"
).open("w") as ffile:
event_dicts = [ev.event_dict for ev in self.all_events]
json.dump(event_dicts, ffile, indent=4, cls=CustomNumpyEncoder)
if self._show_progress:
widgets = [
"Processed: ",
progressbar.Counter("Step: %(value)d"),
" ~ ",
progressbar.Percentage(),
" ",
progressbar.ETA(),
]
bar = progressbar.ProgressBar(widgets=widgets, redirect_stdout=True)
for _ in bar(run_range):
step_res = self.next_step()
self.n_temporal_units_simulated = self.current_temporal_unit
if step_res == 1:
self.has_crashed = True
warnings.warn(
f"""Economy seems to have crashed.
- At step : {self.current_temporal_unit}
"""
)
break
elif self._monotony_checker > 3:
warnings.warn(
f"""Economy seems to have found an equilibrium
- At step : {self.current_temporal_unit}
"""
)
bar.finish()
else:
for _ in run_range:
step_res = self.next_step()
self.n_temporal_units_simulated = self.current_temporal_unit
if step_res == 1:
self.has_crashed = True
warnings.warn(
f"""Economy or model seems to have crashed.
- At step : {self.current_temporal_unit}
"""
)
break
elif self._monotony_checker > 3:
warnings.warn(
f"""Economy seems to have found an equilibrium
- At step : {self.current_temporal_unit}
"""
)
break
if self._files_to_record:
self._flush_memmaps()
if self._save_index:
self.model.write_index(self.results_storage / "jsons" / "indexes.json")
self.params_dict["n_temporal_units_simulated"] = self.n_temporal_units_simulated
self.params_dict["has_crashed"] = self.has_crashed
if self._save_params:
with (
pathlib.Path(self.results_storage) / "jsons" / "simulated_params.json"
).open("w") as ffile:
json.dump(self.params_dict, ffile, indent=4, cls=CustomNumpyEncoder)
with (
pathlib.Path(self.results_storage) / "jsons" / "equilibrium_checks.json"
).open("w") as ffile:
json.dump(
{str(k): v for k, v in self.equi.items()},
ffile,
indent=4,
cls=CustomNumpyEncoder,
)
if self.has_crashed:
logger.info("Loop crashed before completion.")
else:
logger.info("Loop complete")
[docs]
def next_step(
self,
check_period: int = 182,
min_steps_check: Optional[int] = None,
min_failing_regions: Optional[int] = None,
):
"""Advance the model to the next temporal step.
This method wraps all computation required to advance to the next step
of the simulation.
First it checks if an event is planned to
occur at the current step and if so, shocks the model with the
corresponding event. Then:
0) If at least one step elapsed, it computes the new overproduction vector for the next step (using :meth:`~boario.model_base.ARIOBaseModel.calc_overproduction`)
1) Computes production for the current step. (See :meth:`~boario.model_base.ARIOBaseModel.calc_production`)
2) Distribute the `realised` production towards the different demands (intermediate, final, rebuilding) and compute the changes in the inputs stock matrix (see :meth:`~boario.model_base.ARIOBaseModel.distribute_production`)
Note that it is during this step that the model checks if an event is completely rebuild/recovered and removes it from the list the case being.
3) Computes the orders matrix (i.e. the intermediate demand) for the next step (see :meth:`~boario.model_base.ARIOBaseModel.calc_orders`)
Additionally, once every `check_period` steps elapsed, it checks for crash or equilibrium of the economy (see :meth:`~boario.simulation.check_equilibrium`).
Parameters
----------
check_period : int
The time period in number of temporal units to wait between each "crash/equilibrium" check.
min_steps_check : Optional[int]
The minimum wait before the first check.
min_failing_regions : Optional[int]
The minimum number of failing regions required to consider economy has crashed.
"""
try:
if min_steps_check is None:
min_steps_check = self.n_temporal_units_to_sim // 5
if min_failing_regions is None:
min_failing_regions = self.model.n_regions * self.model.n_sectors // 3
# Check if there are new events to add,
# if some happening events can start rebuilding (if rebuildable),
# and updates the internal model production_cap decrease and rebuild_demand
self._check_happening_events()
if ("_inputs_evolution" in self._files_to_record) or (
"_inputs_evolution" in self._vars_to_record
):
self._write_stocks()
# 0)
if self.current_temporal_unit > 1:
self.model.calc_overproduction()
if ("_overproduction_evolution" in self._files_to_record) or (
"_overproduction_evolution" in self._files_to_record
):
self._write_overproduction()
if ("_rebuild_demand_evolution" in self._files_to_record) or (
"_rebuild_demand_evolution" in self._vars_to_record
):
self._write_rebuild_demand()
if ("_final_demand_evolution" in self._files_to_record) or (
"_final_demand_evolution" in self._vars_to_record
):
self._write_final_demand()
if ("_io_demand_evolution" in self._files_to_record) or (
"_io_demand_evolution" in self._vars_to_record
):
self._write_io_demand()
# 1)
constraints = self.model.calc_production(self.current_temporal_unit)
if ("_limiting_inputs_evolution" in self._files_to_record) or (
"_limiting_inputs_evolution" in self._vars_to_record
):
self._write_limiting_stocks(constraints)
if ("_production_evolution" in self._files_to_record) or (
"_production_evolution" in self._vars_to_record
):
self._write_production()
if ("_production_cap_evolution" in self._files_to_record) or (
"_production_cap_evolution" in self._vars_to_record
):
self._write_production_max()
if (
"_regional_sectoral_productive_capital_destroyed_evolution"
in self._files_to_record
) or (
"_regional_sectoral_productive_capital_destroyed_evolution"
in self._vars_to_record
):
self._write_productive_capital_lost()
# 2)
try:
self.model.distribute_production(self.scheme)
if ("_final_demand_unmet_evolution" in self._files_to_record) or (
"_final_demand_unmet_evolution" in self._vars_to_record
):
self._write_final_demand_unmet()
if ("_rebuild_production_evolution" in self._files_to_record) or (
"_rebuild_production_evolution" in self._vars_to_record
):
self._write_rebuild_prod()
self.rebuild_events()
self.recover_events()
except RuntimeError:
logger.exception("An exception happened: ")
self.model.inputs_stock.dump(
self.results_storage / "matrix_stock_dump.pkl"
)
logger.error(
f"Negative values in the stocks, matrix has been dumped in the results dir : \n {self.results_storage / 'matrix_stock_dump.pkl'}"
)
return 1
self.model.calc_orders()
n_checks = self.current_temporal_unit // check_period
if n_checks > self._n_checks:
self.check_equilibrium(n_checks)
self._n_checks += 1
self.current_temporal_unit += self.model.n_temporal_units_by_step
return 0
except Exception as err:
logger.exception("The following exception happened:")
raise RuntimeError("An exception happened:") from err
[docs]
def check_equilibrium(self, n_checks: int):
"""Checks the status of production, stocks and rebuilding demand.
This methods checks and store the status of production, inputs stocks
and rebuilding demand and store the information in ``self.equi``.
At the moment, the following status are implemented:
- `production` and `stocks` can be `greater` (ie all industries are producing more), `equi` (ie all industries produce almost the same as at initial equilibrium (0.01 atol)), `not equi` (ie neither of the previous case)
- `rebuilding demand` can be `finished` or `not finished` depending if some events still have some rebuilding demand unanswered or if all are completely rebuilt.
Parameters
----------
n_checks : int
The number of checks counter.
"""
if np.greater_equal(self.model.production, self.model.X_0).all():
self.equi[(n_checks, self.current_temporal_unit, "production")] = "greater"
elif np.allclose(self.model.production, self.model.X_0, atol=0.01):
self.equi[(n_checks, self.current_temporal_unit, "production")] = "equi"
else:
self.equi[(n_checks, self.current_temporal_unit, "production")] = "not equi"
if np.greater_equal(self.model.inputs_stock, self.model.inputs_stock_0).all():
self.equi[(n_checks, self.current_temporal_unit, "stocks")] = "greater"
elif np.allclose(self.model.production, self.model.X_0, atol=0.01):
self.equi[(n_checks, self.current_temporal_unit, "stocks")] = "equi"
else:
self.equi[(n_checks, self.current_temporal_unit, "stocks")] = "not equi"
if (
self.model.rebuild_demand_tot is None
or self.model.rebuild_demand is None
or not self.model.rebuild_demand.any()
):
self.equi[(n_checks, self.current_temporal_unit, "rebuilding")] = "finished"
else:
self.equi[(n_checks, self.current_temporal_unit, "rebuilding")] = (
"not finished"
)
[docs]
def add_events(self, events: list[Event]):
"""Add a list of events to the simulation.
Parameters
----------
events : list[Event]
The events to add.
"""
if not isinstance(events, list):
raise TypeError(f"list[Event] expected, {type(events)} received.")
for ev in events:
self.add_event(ev)
[docs]
def add_event(self, ev: Event):
"""Add an event to the simulation.
Parameters
----------
ev : Event
The event to add.
"""
if not isinstance(ev, Event):
raise ValueError(f"Event expected, {type(ev)} received.")
self.event_compatibility(ev)
self.all_events.append(ev)
self._event_tracking.append(EventTracker(self, ev))
[docs]
def event_compatibility(self, ev: Event):
"""Checks if an event is compatible with current simulation environment
Parameters
----------
ev : Event
The event to checks.
Raises
------
ValueError
If one attribute of the event is not consistent with the simulation context.
"""
if not 0 < ev.occurrence <= self.n_temporal_units_to_sim:
raise ValueError(
f"Occurrence of event is not in the range of simulation steps (cannot be 0) : {ev.occurrence} not in ]0-{self.n_temporal_units_to_sim}]"
)
if not 0 < ev.occurrence + ev.duration <= self.n_temporal_units_to_sim:
raise ValueError(
f"Occurrence + duration of event is not in the range of simulation steps (cannot be 0) : {ev.occurrence} not in ]0-{self.n_temporal_units_to_sim}]"
)
if (impossible_regions := self.regions_compatible(ev.aff_regions)).size > 0:
raise ValueError(
"Some affected sectors of the event are not in the model : {}".format(
impossible_regions
)
)
if (impossible_sectors := self.sectors_compatible(ev.aff_sectors)).size > 0:
raise ValueError(
"Some affected sectors of the event are not in the model : {}".format(
impossible_sectors
)
)
if isinstance(ev, EventKapitalRebuild):
if (
impossible_sectors := self.sectors_compatible(
ev.rebuilding_sectors.index
)
).size > 0:
raise ValueError(
"Some affected sectors of the event are not in the model : {}".format(
impossible_sectors
)
)
if isinstance(ev, EventKapitalDestroyed):
if ev.event_monetary_factor != self.model.monetary_factor:
warnings.warn(
f"Event monetary factors ({ev.event_monetary_factor}), differs from model monetary factor ({self.model.monetary_factor}). Will automatically adjust."
)
[docs]
def regions_compatible(self, regions: RegionsList):
"""Checks if given regions are all present in the simulation context.
Parameters
----------
regions : RegionsList
The regions to checks.
Returns
-------
set
The set of regions not present in the model's regions.
"""
return np.setdiff1d(regions, self.model.regions)
[docs]
def sectors_compatible(self, sectors: SectorsList):
"""Checks if given sectors are all present in the simulation context.
Parameters
----------
sectors : SectorsList
The sectors to checks.
Returns
-------
set
The set of sectors not present in the model's regions.
"""
return np.setdiff1d(sectors, self.model.sectors)
def _check_happening_events(self) -> None:
"""Updates the status of all events.
Check the `all_events` attribute and `current_temporal_unit` and
updates the events accordingly (ie if they happened, if they can start
rebuild/recover)
"""
new_reb_event = False
for event_tracker in self._event_tracking:
if event_tracker.status == "pending":
if (
(self.current_temporal_unit - self.model.n_temporal_units_by_step)
<= event_tracker.event.occurrence
<= self.current_temporal_unit
):
logger.info(
f"Temporal_Unit : {self.current_temporal_unit} ~ Shocking model with new event"
)
logger.info(
f"Affected regions are : {event_tracker.event.aff_regions.to_list()}"
)
event_tracker._status = "happening"
for event_tracker in self._event_tracking:
if event_tracker.status == "happening":
if self.current_temporal_unit >= (
event_tracker.event.occurrence + event_tracker.event.duration
):
if isinstance(event_tracker.event, EventKapitalRebuild):
new_reb_event = True
self.model._n_rebuilding_events += 1
event_tracker._rebuild_id = self.model._n_rebuilding_events - 1
event_tracker._status = "rebuilding"
logger.info(
"Temporal_Unit : {} ~ Event named {} that occurred at {} in {} for {} damages has started rebuilding".format(
self.current_temporal_unit,
event_tracker.event.name,
event_tracker.event.occurrence,
event_tracker.event.aff_regions.to_list(),
event_tracker.impact_vector.sum(),
)
)
elif isinstance(
event_tracker.event, (EventKapitalRecover, EventArbitraryProd)
):
event_tracker._status = "recovering"
logger.info(
"Temporal_Unit : {} ~ Event named {} that occurred at {} in {} for {} damages has started recovering".format(
self.current_temporal_unit,
event_tracker.event.name,
event_tracker.event.occurrence,
event_tracker.event.aff_regions.to_list(),
event_tracker.impact_vector.sum(),
)
)
else:
event_tracker._status = "finished"
logger.info(
"Temporal_Unit : {} ~ Event named {} that occurred at {} in {} is now considered finished".format(
self.current_temporal_unit,
event_tracker.event.name,
event_tracker.event.occurrence,
event_tracker.event.aff_regions.to_list(),
)
)
self.update_prod_cap_delta_tot()
if new_reb_event:
self.model._chg_events_number()
self.update_rebuild_demand()
[docs]
def rebuild_events(self):
"""Updates rebuilding events from model's production dedicated to rebuilding.
This method loops through the event trackers and update the events depending
on their allocated rebuilding production. If the received production is sufficient,
then the events are flagged as "finished", and removed from the tracker.
Raises
------
RuntimeError
Raised if an event tracker has no rebuid_id (should not happen).
"""
events_rebuilt_ids = []
for event_tracker in self._event_tracking:
if event_tracker.status == "rebuilding":
if event_tracker._rebuild_id is None:
raise RuntimeError(
"Rebuilding event has no rebuilding id, which should not happen."
)
assert self.model.rebuild_prod is not None
event_tracker.receive_indus_rebuilding(
self.model.rebuild_prod_indus_event(event_tracker._rebuild_id)
)
if event_tracker.rebuild_demand_house is not None:
event_tracker.receive_house_rebuilding(
self.model.rebuild_prod_house_event(event_tracker._rebuild_id)
)
if (
event_tracker._house_dmg is None
and event_tracker._indus_dmg is None
):
event_tracker._status = "finished"
events_rebuilt_ids.append(event_tracker._rebuild_id)
self._events_to_rebuild -= 1
if len(events_rebuilt_ids) > 0:
events_rebuilt_ids = sorted(events_rebuilt_ids)
non_reb_events = sorted([ev for ev in self._event_tracking if ev._rebuild_id is not None and ev._rebuild_id not in events_rebuilt_ids], key=lambda x: x._rebuild_id) # type: ignore # Because lsp cannot understand rebuild_id is not none here.
for ev_to_rm in events_rebuilt_ids:
for evnt_trck in non_reb_events:
if evnt_trck._rebuild_id > ev_to_rm:
evnt_trck._rebuild_id -= 1
[docs]
def recover_events(self):
"""Updates recovering events with their recovery function.
This method loops through the event trackers and update the events depending
on their recovery function.
Raises
------
RuntimeError
Raised if an event tracker has no recovery function (should not happen).
"""
for event_tracker in self._event_tracking:
if event_tracker.status == "recovering":
if event_tracker.event.recovery_function is None:
raise RuntimeError(
"Recovering event has no recovery function, which should not happen."
)
event_tracker.recover()
[docs]
def update_rebuild_demand(self):
r"""Computes and updates total rebuilding demand based on a list of events.
Computes and updates the model rebuilding demand from the event tracker. Only events
tagged as rebuildable are accounted for. Both `house_rebuild_demand` and
`indus_rebuild_demand` are updated.
"""
if not isinstance(self._event_tracking, list):
ValueError(
f"Setting tot_rebuild_demand can only be done with a list of events self._event_tracking, not a {type(self._event_tracking)}"
)
if "rebuilding" in [
event_tracker.status for event_tracker in self._event_tracking
]:
_rebuilding_demand = np.zeros(
shape=(
self.model.n_regions * self.model.n_sectors,
(
self.model.n_regions * self.model.n_sectors
+ self.model.n_regions * self.model.n_fd_cat
)
* self.model._n_rebuilding_events,
)
)
for evnt_trck in self._event_tracking:
if evnt_trck._rebuild_id is not None:
_rebuilding_demand[
:,
(self.model.n_regions * self.model.n_sectors)
* (evnt_trck._rebuild_id) : (
self.model.n_regions * self.model.n_sectors
)
* (evnt_trck._rebuild_id + 1),
] = evnt_trck.distributed_reb_dem_indus_tau
if evnt_trck.households_damages is not None:
_rebuilding_demand[
:,
(self.model.n_regions * self.model.n_sectors)
* self.model._n_rebuilding_events : (
self.model.n_regions * self.model.n_sectors
+ self.model.n_regions * self.model.n_fd_cat
)
* (evnt_trck._rebuild_id + 1),
] = evnt_trck.distributed_reb_dem_house_tau
self.model.rebuild_demand = _rebuilding_demand
[docs]
def update_productive_capital_lost(self):
r"""Computes current capital lost and updates production delta accordingly.
Computes and sets the current stock of capital lost by each industry of
the model due to the events. Also update the production
capacity lost accordingly, by computing the ratio of capital lost to
capital stock.
"""
if DEBUG_TRACE:
logger.debug("Updating productive_capital lost from list of events")
source = [
ev
for ev in self._event_tracking
if (ev.status in ["happening", "rebuilding", "recovering"])
]
productive_capital_lost = np.add.reduce(
np.array([e._indus_dmg for e in source if e is not None])
)
if not isinstance(productive_capital_lost, np.number):
self.model.productive_capital_lost = productive_capital_lost
else:
self.model.productive_capital_lost = np.zeros_like(self.model.X_0)
[docs]
def update_prod_cap_delta_arb(self):
r"""Computes and sets the loss of production capacity from "arbitrary" sources.
.. warning::
If multiple events impact the same industry, only the maximum loss is
accounted for.
"""
source = [
ev
for ev in self._event_tracking
if (ev.status in ["happening", "recovering"])
]
event_arb = np.array(
[
ev._prod_delta_from_arb
for ev in source
if ev._prod_delta_from_arb is not None
]
)
if event_arb.size == 0:
self.model._prod_cap_delta_arbitrary = np.zeros_like(self.model.X_0)
else:
self.model._prod_cap_delta_arbitrary = np.maximum.reduce(event_arb)
[docs]
def update_prod_cap_delta_tot(self):
r"""Computes and sets the loss of production capacity from both "arbitrary" sources and
capital destroyed sources.
"""
if DEBUG_TRACE:
logger.debug("Updating total production delta")
self.update_productive_capital_lost()
self.update_prod_cap_delta_arb()
@property
def production_realised(self) -> pd.DataFrame:
"""Returns the evolution of the production realised by each industry (region,sector) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the production realised, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._production_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def production_capacity(self) -> pd.DataFrame:
"""Returns the evolution of the production capacity of each industry (region,sector) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the production capacity, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._production_cap_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def final_demand(self) -> pd.DataFrame:
"""Return the evolution of final demand asked of each industry as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the final demand asked, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._final_demand_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def intermediate_demand(self) -> pd.DataFrame:
"""Returns the evolution of intermediate demand asked of each industry (Total orders) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the intermediate demand asked, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._io_demand_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def rebuild_demand(self) -> pd.DataFrame:
"""Returns the evolution of rebuild demand asked of each industry (Total orders) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the rebuild demand asked, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._rebuild_demand_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def overproduction(self) -> pd.DataFrame:
"""Returns the evolution of the overproduction factor of each industry (region,sector) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the overproduction factor, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._overproduction_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def final_demand_unmet(self) -> pd.DataFrame:
"""Returns the evolution of the final demand that could not be answered by industries as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the final demand not met, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._final_demand_unmet_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def rebuild_prod(self) -> pd.DataFrame:
"""Returns the production allocated for the rebuilding demand by each industry (region,sector) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the production allocated, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._rebuild_production_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
@property
def inputs_stocks(self) -> pd.DataFrame:
"""Returns the evolution of the inventory amount of each input for each industry (region,sector) as a DataFrame. Not this is not available if "record_stocks" is not set to True,
as the DataFrame can be quite large for "classic" MRIOTs.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the amount in inventory, the columns are the industries
and the index are the step and input considered (MultiIndex).
"""
return pd.DataFrame(
self._inputs_evolution.reshape(
self.n_temporal_units_to_sim * self.model.n_sectors, -1
),
columns=self.model.industries,
copy=True,
index=pd.MultiIndex.from_product(
[list(range(self.n_temporal_units_to_sim)), self.model.sectors],
names=["step", "input"],
),
)
@property
def limiting_inputs(self) -> pd.DataFrame:
"""Returns the evolution of the inputs lacking for each industry (region,sector) as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is a boolean set to 1 if considered input constrains production, the columns are the industries
and the index are the step and input considered (MultiIndex).
"""
return pd.DataFrame(
self._limiting_inputs_evolution.reshape(
self.n_temporal_units_to_sim * self.model.n_sectors, -1
),
columns=self.model.industries,
copy=True,
index=pd.MultiIndex.from_product(
[list(range(self.n_temporal_units_to_sim)), self.model.sectors],
names=["step", "input"],
),
)
@property
def productive_capital_to_recover(self) -> pd.DataFrame:
"""Retursn the evolution of remaining capital destroyed/to recover for each industry (region,sector) if it exists as a DataFrame.
Returns
-------
pd.DataFrame: A pandas DataFrame where the value is the amount of capital (still) destroyed, the columns are the industries
and the index is the step considered.
"""
return pd.DataFrame(
self._regional_sectoral_productive_capital_destroyed_evolution,
columns=self.model.industries,
copy=True,
).rename_axis("step")
[docs]
def reset_sim_with_same_events(self):
"""Resets the model to its initial status (without removing the events). [WIP]"""
raise NotImplementedError(
"This methods has not been reimplemented for the updated version. Simplest way to reset is to recreate the Simulation and Model objects."
)
logger.info("Resetting model to initial status (with same events)")
self.current_temporal_unit = 0
self._monotony_checker = 0
self._n_checks = 0
self.n_temporal_units_simulated = 0
self.has_crashed = False
self.equi = {
(int(0), int(0), "production"): "equi",
(int(0), int(0), "stocks"): "equi",
(int(0), int(0), "rebuilding"): "equi",
}
self._reset_records()
self.model.reset_module()
[docs]
def reset_sim_full(self):
"""Resets the model to its initial status and remove all events."""
raise NotImplementedError(
"This methods has not been reimplemented for the updated version. Simplest way to reset is to recreate the Simulation and Model objects."
)
self.reset_sim_with_same_events()
logger.info("Resetting events")
self.all_events = []
self.currently_happening_events = []
self.events_timings = set()
[docs]
def write_index(self, index_file: Union[str, pathlib.Path]):
"""Writes the index of the dataframes used in the model in a json file.
See :meth:`~boario.model_base.ARIOBaseModel.write_index` for a more detailed documentation.
Parameters
----------
index_file : Union[str, pathlib.Path]
name of the file to save the indexes to.
"""
self.model.write_index(index_file)
def _write_production(self) -> None:
"""Saves the current production vector to the memmap."""
self._production_evolution[self.current_temporal_unit] = self.model.production
def _write_rebuild_prod(self) -> None:
"""Saves the current rebuilding production vector to the memmap."""
to_write = np.full(self.model.n_regions * self.model.n_sectors, 0.0)
if self.model.rebuild_prod_tot is not None:
self._rebuild_production_evolution[self.current_temporal_unit] = (
self.model.rebuild_prod_tot
)
else:
self._rebuild_production_evolution[self.current_temporal_unit] = to_write
def _write_productive_capital_lost(self) -> None:
"""Saves the current remaining productive_capital to rebuild vector to the memmap."""
self._regional_sectoral_productive_capital_destroyed_evolution[
self.current_temporal_unit
] = self.model.productive_capital_lost
def _write_production_max(self) -> None:
"""Saves the current production capacity vector to the memmap."""
self._production_cap_evolution[self.current_temporal_unit] = (
self.model.production_cap
)
def _write_io_demand(self) -> None:
"""Saves the current (total per industry) intermediate demand vector to the memmap."""
self._io_demand_evolution[self.current_temporal_unit] = (
self.model.intermediate_demand_tot
)
def _write_final_demand(self) -> None:
"""Saves the current (total per industry) final demand vector to the memmap."""
self._final_demand_evolution[self.current_temporal_unit] = (
self.model.final_demand_tot
)
def _write_rebuild_demand(self) -> None:
"""Saves the current (total per industry) rebuilding demand vector to the memmap."""
to_write = np.full(self.model.n_regions * self.model.n_sectors, 0.0)
if len(r_dem := self.model.rebuild_demand_tot) > 0:
self._rebuild_demand_evolution[self.current_temporal_unit] = r_dem # type: ignore
else:
self._rebuild_demand_evolution[self.current_temporal_unit] = to_write # type: ignore
def _write_overproduction(self) -> None:
"""Saves the current overproduction vector to the memmap."""
self._overproduction_evolution[self.current_temporal_unit] = self.model.overprod
def _write_final_demand_unmet(self) -> None:
"""Saves the unmet final demand (for this step) vector to the memmap."""
self._final_demand_unmet_evolution[self.current_temporal_unit] = (
self.model.final_demand_not_met
)
def _write_stocks(self) -> None:
"""Saves the current inputs stock matrix to the memmap."""
self._inputs_evolution[self.current_temporal_unit] = self.model.inputs_stock
def _write_limiting_stocks(self, limiting_stock: np.ndarray) -> None:
"""Saves the current limiting inputs matrix to the memmap."""
self._limiting_inputs_evolution[self.current_temporal_unit] = limiting_stock # type: ignore
def _flush_memmaps(self) -> None:
"""Saves files to records."""
for attr in self._files_to_record:
if not hasattr(self, attr):
raise RuntimeError(
f"{attr} should be a member yet it isn't. This shouldn't happen."
)
else:
getattr(self, attr).flush()
def _init_records(self, save_records):
for rec in self.__possible_records:
if rec == "inputs_stocks" and not self._register_stocks:
logger.debug("Will not save inputs stocks")
else:
if rec == "inputs_stocks":
logger.info(
f"Simulation will save inputs stocks. Estimated size is {sizeof_fmt(self.n_temporal_units_to_sim * self.model.n_sectors * self.model.n_sectors * self.model.n_regions * 64)}"
)
save = rec in save_records
filename = rec
dtype, attr_name, shapev, fillv = self.__file_save_array_specs[rec]
if shapev == "industries":
shape = (
self.n_temporal_units_to_sim,
self.model.n_sectors * self.model.n_regions,
)
elif shapev == "stocks":
shape = (
self.n_temporal_units_to_sim,
self.model.n_sectors,
self.model.n_sectors * self.model.n_regions,
)
else:
raise RuntimeError(f"shapev {shapev} unrecognised")
if save:
memmap_array = TempMemmap(
filename=(self.records_storage / filename),
dtype=dtype,
mode="w+",
shape=shape,
save=save,
)
self._files_to_record.append(attr_name)
else:
memmap_array = np.ndarray(shape=shape, dtype=dtype, order="C")
self._vars_to_record.append(attr_name)
memmap_array.fill(fillv)
setattr(self, attr_name, memmap_array)
def _reset_records(
self,
):
for rec in self.__possible_records:
_, attr_name, _, fillv = self.__file_save_array_specs[rec]
if rec == "input_stocks" and not self._register_stocks:
pass
else:
memmap_array = getattr(self, attr_name)
memmap_array.fill(fillv)
setattr(self, attr_name, memmap_array)
@overload
def _thin_to_wide(
thin: pd.Series, long_index: pd.Index, long_columns: None = None
) -> pd.Series: ...
@overload
def _thin_to_wide(
thin: pd.DataFrame,
long_index: pd.Index,
long_columns: pd.Index,
) -> pd.DataFrame: ...
def _thin_to_wide(
thin: pd.Series | pd.DataFrame,
long_index: pd.Index,
long_columns: pd.Index | None = None,
) -> pd.Series | pd.DataFrame:
"""
Converts a "thin" (sparse) DataFrame or Series into a "wide" (dense) format with a specified index
and optional column structure, filling missing values with zeros.
Parameters
----------
thin : pd.Series or pd.DataFrame
The sparse data to convert to a wide format. If `thin` is a DataFrame, `long_columns`
must be specified.
long_index : pd.Index
The index to use for the resulting wide-format data. All values in `thin` are
realigned to this index.
long_columns : pd.Index, optional
The columns to use for the resulting wide-format DataFrame. This parameter is
required if `thin` is a DataFrame and ignored if `thin` is a Series.
Returns
-------
pd.Series or pd.DataFrame
The wide-format representation of `thin`, with `long_index` as its index and
`long_columns` as its columns (if applicable). Missing values are filled with zeros.
Raises
------
ValueError
If `thin` is a DataFrame and `long_columns` is not provided.
Examples
--------
>>> thin_series = pd.Series([1, 2], index=[0, 1])
>>> long_index = pd.Index([0, 1, 2, 3])
>>> _thin_to_wide(thin_series, long_index)
0 1.0
1 2.0
2 0.0
3 0.0
dtype: float64
>>> thin_df = pd.DataFrame([[1, 2], [3, 4]], index=[0, 1], columns=['A', 'B'])
>>> long_columns = pd.Index(['A', 'B', 'C'])
>>> _thin_to_wide(thin_df, long_index, long_columns)
A B C
0 1.0 2.0 0.0
1 3.0 4.0 0.0
2 0.0 0.0 0.0
3 0.0 0.0 0.0
"""
if isinstance(thin, pd.Series):
wide = pd.Series(index=long_index, dtype=thin.dtype)
elif isinstance(thin, pd.DataFrame):
if long_columns is None:
raise ValueError(
"long_columns argument cannot be None when widening a DataFrame."
)
wide = pd.DataFrame(
index=long_index, columns=long_columns, dtype=thin.dtypes.iloc[0]
)
wide.fillna(0, inplace=True)
if isinstance(thin, pd.DataFrame):
wide.loc[thin.index, thin.columns] = thin.values
else:
wide.loc[thin.index] = thin.values
return wide
def _equal_distribution(
affected: pd.Index | None, addressed_to: pd.Index
) -> pd.DataFrame:
"""
Creates a DataFrame representing an equal distribution of values across
specified indices.
Parameters
----------
affected : pd.Index or None
The index for the columns, representing the entities affected by the distribution.
addressed_to : pd.Index
The index for the rows, representing the entities receiving the distribution.
Returns
-------
pd.DataFrame
A DataFrame where each element in `addressed_to` receives an equal share
of 1 divided by the length of `addressed_to` for each `affected` column.
Examples
--------
>>> affected = pd.Index(['A', 'B'])
>>> addressed_to = pd.Index(['X', 'Y', 'Z'])
>>> _equal_distribution(affected, addressed_to)
A B
X 0.333333 0.333333
Y 0.333333 0.333333
Z 0.333333 0.333333
"""
if addressed_to.empty:
raise ValueError("addressed_to index cannot be empty.")
ret = pd.DataFrame(0.0, index=addressed_to, columns=affected)
ret.loc[addressed_to, affected] = 1 / len(addressed_to)
return ret
def _normalize_distribution(
dist: pd.DataFrame | pd.Series, affected: pd.Index | None, addressed_to: pd.Index
) -> pd.DataFrame:
"""
Normalizes a distribution so that values across specified indices sum to 1 for each
entity in `addressed_to`.
Parameters
----------
dist : pd.DataFrame or pd.Series
The initial distribution to normalize. If a DataFrame, it should align with both
`affected` and `addressed_to`. If a Series, it should align with `addressed_to`.
affected : pd.Index or None
The index for the columns, representing entities affected by the distribution.
addressed_to : pd.Index
The index for the rows, representing entities receiving the distribution.
Returns
-------
pd.DataFrame
A DataFrame where values across `addressed_to` entities are normalized to sum to 1
across each `affected` column.
Raises
------
ValueError
If `dist` is not a Series or DataFrame.
Examples
--------
>>> dist = pd.Series([2, 3, 5], index=['X', 'Y', 'Z'])
>>> affected = pd.Index(['A'])
>>> addressed_to = pd.Index(['X', 'Y', 'Z'])
>>> _normalize_distribution(dist, affected, addressed_to)
A
X 0.2
Y 0.3
Z 0.5
"""
ret = pd.DataFrame(0.0, index=addressed_to, columns=affected)
if isinstance(dist, pd.DataFrame):
dist_sq = dist.squeeze()
else:
dist_sq = dist
if isinstance(dist_sq, pd.Series):
ret.loc[addressed_to, :] = (
dist_sq.loc[addressed_to].transform(lambda x: x / sum(x)).values[:, None]
)
return ret
elif isinstance(dist_sq, pd.DataFrame):
ret.loc[addressed_to, affected] = dist_sq.loc[addressed_to, affected].transform(
lambda x: x / sum(x)
)
return ret
else:
raise ValueError("given distribution should be a Series or a DataFrame.")
[docs]
class EventTracker:
"""
Tracks the state and progression of an event within a simulation, including
damages, recovery, and rebuilding demands.
Parameters
----------
sim : Simulation
The simulation object where the event is tracked.
source_event : Event
The source event to be tracked.
indus_rebuild_distribution : pd.DataFrame, None, or Literal["equal"], optional
Specifies the distribution of rebuilding demand across industries, default is None.
house_rebuild_distribution : pd.DataFrame, None, or Literal["equal"], optional
Specifies the distribution of rebuilding demand across households, default is None.
Attributes
----------
sim : Simulation
Reference to the simulation where the event takes place.
event : Event
The event being tracked.
_status : {"pending", "happening", "recovering", "rebuilding", "finished"}
Current status of the event.
_prod_delta_from_arb_0 : pd.Series or None
Initial production delta for arbitrary production events.
_prod_delta_from_arb : pd.Series or None
Current production delta for arbitrary production events.
_indus_dmg_0 : pd.Series or None
Initial industrial damages caused by the event.
_indus_dmg : pd.Series or None
Current industrial damages from the event.
_rebuildable : bool
Indicates if the event is eligible for rebuilding.
_rebuild_id : int or None
Unique identifier for rebuilding actions.
_rebuild_shares : pd.Series or None
Distribution shares for rebuilding demands.
_rebuild_demand_indus_0 : pd.DataFrame or None
Initial rebuilding demand for industries.
_rebuild_demand_house_0 : pd.DataFrame or None
Initial rebuilding demand for households.
_rebuild_demand_indus : pd.DataFrame or None
Current rebuilding demand for industries.
_distributed_reb_dem_indus : pd.DataFrame or None
Distributed demand for rebuilding industries.
_rebuild_demand_house : pd.DataFrame or None
Current rebuilding demand for households.
_distributed_reb_dem_house : pd.DataFrame or None
Distributed demand for rebuilding households.
_recovery_function : Callable or None
Function to compute recovery over time.
_house_dmg_0 : pd.Series or None
Initial household damages from the event.
_house_dmg : pd.Series or None
Current household damages.
"""
def __init__(
self,
sim: Simulation,
source_event: Event,
indus_rebuild_distribution: pd.DataFrame | None | Literal["equal"] = None,
house_rebuild_distribution: pd.DataFrame | None | Literal["equal"] = None,
):
self.sim: Simulation = sim
self.event = source_event
self._status: (
Literal["pending"]
| Literal["happening"]
| Literal["recovering"]
| Literal["rebuilding"]
| Literal["finished"]
) = "pending"
self._prod_delta_from_arb_0: pd.Series | None = None
self._prod_delta_from_arb: pd.Series | None = None
self._indus_dmg_0: pd.Series | None = None
self._indus_dmg: pd.Series | None = None
self._rebuildable: bool = False
self._rebuild_id: int | None = None
self._rebuild_shares: pd.Series | None = None
self._rebuild_demand_indus_0: pd.DataFrame | None = None
self._rebuild_demand_house_0: pd.DataFrame | None = None
self._rebuild_demand_indus: pd.DataFrame | None = None
# self._reb_dem_indus_distribution: npt.NDArray | None = None
self._distributed_reb_dem_indus: pd.DataFrame | None = None
self._rebuild_demand_house: pd.DataFrame | None = None
# self._reb_dem_house_distribution: pd.DataFrame | None = None
self._distributed_reb_dem_house: pd.DataFrame | None = None
self._recovery_function: Callable | None = None
self._house_dmg_0: pd.Series | None = None
self._house_dmg: pd.Series | None = None
impact = source_event.impact.copy()
if isinstance(source_event, EventKapitalDestroyed):
if source_event.event_monetary_factor != sim.model.monetary_factor:
impact = impact * (
source_event.event_monetary_factor / sim.model.monetary_factor
)
if isinstance(source_event, EventKapitalDestroyed):
self._indus_dmg_0 = _thin_to_wide(impact, self.sim.model.industries)
self._indus_dmg = self._indus_dmg_0.copy()
if source_event.impact_households is not None:
impact_house = source_event.impact_households.copy()
if source_event.event_monetary_factor != sim.model.monetary_factor:
impact_house = impact_house * (
source_event.event_monetary_factor / sim.model.monetary_factor
)
self._house_dmg_0 = _thin_to_wide(
impact_house, self.sim.model.all_regions_fd
)
self._house_dmg = self._house_dmg_0.copy()
if isinstance(source_event, EventKapitalRebuild):
self._init_distrib("indus", indus_rebuild_distribution)
assert self._indus_dmg_0 is not None
self._rebuild_demand_indus_0 = pd.DataFrame(
source_event.rebuilding_sectors.values[:, None]
* source_event.impact.values,
index=source_event.rebuilding_sectors.index,
columns=source_event.impact.index,
)
self._rebuild_demand_indus_0.rename_axis(
index="rebuilding sector", inplace=True
)
self._rebuild_demand_indus_0 *= source_event.rebuilding_factor
self._rebuild_demand_indus = self._rebuild_demand_indus_0.copy()
self.rebuild_dem_indus_distribution = self._reb_dem_indus_distribution
if source_event.impact_households is not None:
self._init_distrib("house", house_rebuild_distribution)
self._rebuild_demand_house_0 = pd.DataFrame(
source_event.rebuilding_sectors.values[:, None]
* impact_house.values,
index=source_event.rebuilding_sectors.index,
columns=source_event.impact_households.index,
)
self._rebuild_demand_house_0.rename_axis(
index="rebuilding sector", inplace=True
)
self._rebuild_demand_house_0 *= source_event.rebuilding_factor
self._rebuild_demand_house = self._rebuild_demand_house_0.copy()
self.rebuild_dem_house_distribution = self._reb_dem_house_distribution
if isinstance(self.event, EventKapitalRecover):
self._recovery_function_indus = partial(
self.event.recovery_function,
init_impact_stock=self._indus_dmg_0,
recovery_tau=self.event.recovery_tau,
)
if self._house_dmg_0 is not None:
self._recovery_function_house = partial(
self.event.recovery_function,
init_impact_stock=self._house_dmg_0,
recovery_tau=self.event.recovery_tau,
)
if isinstance(self.event, EventArbitraryProd):
self._prod_delta_from_arb_0 = _thin_to_wide(
source_event.impact.copy(), self.sim.model.industries
)
self._prod_delta_from_arb = self._prod_delta_from_arb_0.copy()
self._recovery_function_arb_delta = partial(
self.event.recovery_function,
init_impact_stock=self._prod_delta_from_arb_0,
recovery_tau=self.event.recovery_tau,
)
def _init_distrib(
self,
dtype: Literal["indus"] | Literal["house"],
distrib: pd.DataFrame | None | Literal["equal"],
):
"""
Initializes the distribution for rebuilding demand based on type.
Parameters
----------
dtype : {"indus", "house"}
Specifies the type of distribution to initialize, either "indus" for industries or "house" for households.
distrib : pd.DataFrame, None, or "equal"
The distribution data for rebuilding demand. If None, distribution is based on intermediate demand; if "equal," distribution is equal across all sectors.
"""
if dtype == "indus":
if distrib is None:
self._reb_dem_indus_distribution = _normalize_distribution(
self.sim.model.mriot.Z,
affected=self.event.aff_industries,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
elif distrib == "equal":
self._reb_dem_indus_distribution = _equal_distribution(
affected=self.event.aff_industries,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
else:
self._reb_dem_indus_distribution = _normalize_distribution(
distrib,
affected=self.event.aff_industries,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
if dtype == "house":
if distrib is None:
self._reb_dem_house_distribution = _normalize_distribution(
self.sim.model.mriot.Y,
affected=self.event._aff_final_demands,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
elif distrib == "equal":
self._reb_dem_house_distribution = _equal_distribution(
affected=self.event._aff_final_demands,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
else:
self._reb_dem_house_distribution = _normalize_distribution(
distrib,
affected=self.event._aff_final_demands,
addressed_to=pd.MultiIndex.from_product(
[self.sim.model.regions, self.event.rebuilding_sectors.index],
names=["region", "rebuilding sector"],
),
)
@cached_property
def impact_vector(self) -> pd.Series:
"""
Returns the impact vector of the event.
Returns
-------
pd.Series
The event impact, formatted as a series with all industries as index.
"""
return _thin_to_wide(self.event.impact, self.sim.model.industries)
@property
def productive_capital_dmg_init(self) -> pd.Series | None:
"""
Gets the initial productive capital damage from the event.
Returns
-------
pd.Series or None
Series of initial damages to productive capital; None if not available.
"""
return self._indus_dmg_0
@property
def productive_capital_dmg(self) -> pd.Series | None:
"""
Gets the current productive capital damage.
Returns
-------
pd.Series or None
Series of current damages to productive capital; None if not available.
"""
return self._indus_dmg
@property
def households_damages_init(self) -> pd.Series | None:
"""
Gets the initial household damage for the event.
Returns
-------
pd.Series or None
Series of initial damages to households; None if not available.
"""
return self._house_dmg_0
@property
def households_damages(self) -> pd.Series | None:
"""
Gets the current household damage for the event.
Returns
-------
pd.Series or None
Series of initial damages to households; None if not available.
"""
return self._house_dmg
@property
def rebuild_demand_indus(self) -> pd.DataFrame | None:
"""
Gets the current rebuilding demand for industries.
Returns
-------
pd.DataFrame or None
DataFrame of rebuilding demands for industries; None if not applicable.
"""
return self._rebuild_demand_indus
@property
def rebuild_demand_house(self) -> pd.DataFrame | None:
"""
Gets the current rebuilding demand for households.
Returns
-------
pd.DataFrame or None
DataFrame of rebuilding demands for households; None if not applicable.
"""
return self._rebuild_demand_house
@property
def prod_delta_arbitrary(self) -> pd.Series | None:
"""
Gets the current production delta for arbitrary production loss events.
Returns
-------
pd.Series or None
Series representing production changes; None if not available.
"""
return self._prod_delta_from_arb
@prod_delta_arbitrary.setter
def prod_delta_arbitrary(self, value: pd.Series | None):
self._prod_delta_from_arb = value
@property
def status(
self,
) -> (
Literal["pending"]
| Literal["happening"]
| Literal["recovering"]
| Literal["rebuilding"]
| Literal["finished"]
):
"""
Gets the current status of the event.
Returns
-------
{"pending", "happening", "recovering", "rebuilding", "finished"}
The status indicating the stage of the event.
"""
return self._status
def _compute_distributed_demand(
self, demand_by_sectors, distribution, rebuilding_industries
):
"""
Computes the distributed demand based on sector demand and distribution.
Parameters
----------
demand_by_sectors : pd.DataFrame
The demand data for sectors.
distribution : pd.DataFrame
Distribution weights for sectors.
rebuilding_industries : pd.Index
Index of rebuilding industries.
Returns
-------
pd.DataFrame
DataFrame of distributed demand across regions and sectors.
"""
multi_index = pd.MultiIndex.from_product(
[self.sim.model.regions, rebuilding_industries],
names=["region", "rebuilding sector"],
)
demand_by_sectors = demand_by_sectors.reindex(multi_index, level=1)
if (distribution.index.sort_values() != multi_index.sort_values()).any():
distribution = distribution.reindex(multi_index, level=0)
return demand_by_sectors.mul(distribution)
@property
def rebuild_dem_indus_distribution(self) -> pd.DataFrame | None:
"""
Gets the distribution for rebuilding industry demand.
Returns
-------
pd.DataFrame or None
Distribution DataFrame for rebuilding industry demand; None if not set.
"""
return self._reb_dem_indus_distribution
@rebuild_dem_indus_distribution.setter
def rebuild_dem_indus_distribution(self, value: pd.DataFrame):
self._rebuild_dem_indus_distribution = value
if self.rebuild_demand_indus is not None:
self._distributed_reb_dem_indus = _thin_to_wide(
self._compute_distributed_demand(
self.rebuild_demand_indus,
self._rebuild_dem_indus_distribution,
self.event.rebuilding_sectors.index,
),
long_index=self.sim.model.industries,
long_columns=self.sim.model.industries,
)
@property
def distributed_reb_dem_indus_tau(self) -> pd.DataFrame | None:
"""
Gets the current rebuilding demand for industries.
Returns
-------
pd.DataFrame or None
DataFrame of rebuilding demands for industries; None if not applicable.
"""
if self.event.rebuild_tau:
reb_tau = self.event.rebuild_tau
else:
reb_tau = self.sim.model.rebuild_tau
return self._distributed_reb_dem_indus * (
self.sim.model.n_temporal_units_by_step / reb_tau
)
@property
def distributed_reb_dem_house_tau(self) -> pd.DataFrame | None:
"""
Gets the current rebuilding demand for households.
Returns
-------
pd.DataFrame or None
DataFrame of rebuilding demands for households; None if not applicable.
"""
if self.event.rebuild_tau:
reb_tau = self.event.rebuild_tau
else:
reb_tau = self.sim.model.rebuild_tau
return self._distributed_reb_dem_house * (
self.sim.model.n_temporal_units_by_step / reb_tau
)
@property
def distributed_reb_dem_indus(self) -> pd.DataFrame | None:
"""
Gets the distributed rebuilding demand for industries.
Returns
-------
pd.DataFrame or None
Distributed DataFrame for rebuilding industry demand; None if not set.
"""
return self._distributed_reb_dem_indus
[docs]
def receive_indus_rebuilding(self, reb_prod: pd.DataFrame | npt.ArrayLike | None):
"""
Processes and updates the industry rebuilding demand based on received production.
Parameters
----------
reb_prod : pd.DataFrame or npt.ArrayLike
Rebuilding production to apply to industry demands.
Raises
------
ValueError
If reb_prod is None, or if rebuilding demand does not exist or event is not a rebuilding event.
"""
if reb_prod is None:
raise ValueError("Trying to rebuild with None rebuilding prod.")
if self._distributed_reb_dem_indus is None:
raise ValueError("The rebuilding demand of this event does not exist.")
if not isinstance(self.event, EventKapitalRebuild):
raise ValueError("The event is not a rebuilding event.")
self._distributed_reb_dem_indus -= reb_prod
precision = int(math.log10(self.event.event_monetary_factor)) + 1
self._distributed_reb_dem_indus = self._distributed_reb_dem_indus.round(
precision
)
self._distributed_reb_dem_indus[self._distributed_reb_dem_indus < 0] = 0.0
if not self._distributed_reb_dem_indus.values.any():
self._distributed_reb_dem_indus = None
self._rebuild_demand_indus = None
self._indus_dmg = None
else:
self._rebuild_demand_indus = self._distributed_reb_dem_indus.groupby(
"region"
).sum()
self._indus_dmg = (
self._rebuild_demand_indus.sum(axis=0) / self.event.rebuilding_factor
)
@property
def rebuild_dem_house_distribution(self) -> pd.DataFrame | None:
"""
Gets the distribution for rebuilding household demand.
Returns
-------
pd.DataFrame or None
Distribution DataFrame for rebuilding household demand; None if not set.
"""
return self._reb_dem_house_distribution
@rebuild_dem_house_distribution.setter
def rebuild_dem_house_distribution(self, value: pd.DataFrame):
self._rebuild_dem_house_distribution = value
if self.rebuild_demand_house is not None:
self._distributed_reb_dem_house = _thin_to_wide(
self._compute_distributed_demand(
self.rebuild_demand_house,
self._rebuild_dem_house_distribution,
self.event.rebuilding_sectors.index,
),
long_index=self.sim.model.industries,
long_columns=self.sim.model.all_regions_fd,
)
@property
def distributed_reb_dem_house(self) -> pd.DataFrame | None:
"""
Gets the distributed rebuilding demand for households.
Returns
-------
pd.DataFrame or None
Distributed DataFrame for rebuilding household demand; None if not set.
"""
return self._distributed_reb_dem_house
[docs]
def receive_house_rebuilding(self, reb_prod: pd.DataFrame | npt.ArrayLike | None):
"""
Processes and updates the household rebuilding demand based on received production.
Parameters
----------
reb_prod : pd.DataFrame or npt.ArrayLike
Rebuilding production to apply to household demands.
Raises
------
ValueError
If reb_prod is None, or if household rebuilding demand does not exist or event is not a rebuilding event.
"""
if reb_prod is None:
raise ValueError("Trying to rebuild with None rebuilding prod.")
if self._distributed_reb_dem_house is None:
raise ValueError(
"The household rebuilding demand of this event does not exist."
)
if not isinstance(self.event, EventKapitalRebuild):
raise ValueError("The event is not a rebuilding event.")
self._distributed_reb_dem_house -= reb_prod
precision = int(math.log10(self.event.event_monetary_factor)) + 1
self._distributed_reb_dem_house = self._distributed_reb_dem_house.round(
precision
)
self._distributed_reb_dem_house[self._distributed_reb_dem_house < 0] = 0.0
if not self._distributed_reb_dem_house.values.any():
self._distributed_reb_dem_house = None
self._rebuild_demand_house = None
self._house_dmg = None
else:
self._rebuild_demand_house = self._distributed_reb_dem_house.groupby(
"region"
).sum()
self._house_dmg = (
self._rebuild_demand_house.sum(axis=0) / self.event.rebuilding_factor
)
[docs]
def recover(self):
"""
Applies the recovery function to update the damages over time based on the recovery rate of the event.
Raises
------
ValueError
If the event is not a recoverable type (i.e., not a capital or production recovery event).
"""
if not isinstance(self.event, (EventKapitalRecover, EventArbitraryProd)):
raise ValueError("The event is not a recoverable event.")
if isinstance(self.event, EventKapitalRecover):
precision = int(math.log10(self.event.event_monetary_factor)) + 1
if self._indus_dmg is not None:
self._indus_dmg = self._recovery_function_indus(
self.sim.current_temporal_unit
- (self.event.occurrence + self.event.duration)
).round(precision)
if not self._indus_dmg.any():
self._indus_dmg = None
if self._house_dmg is not None:
self._house_dmg = self._recovery_function_house(
self.sim.current_temporal_unit
- (self.event.occurrence + self.event.duration)
).round(precision)
if not self._house_dmg.any():
self._house_dmg = None
if self._prod_delta_from_arb is not None:
self._prod_delta_from_arb = self._recovery_function_arb_delta(
self.sim.current_temporal_unit
- (self.event.occurrence + self.event.duration)
).round(6)
if not self._prod_delta_from_arb.any():
self._prod_delta_from_arb = None
if (
self._indus_dmg is None
and self._house_dmg is None
and self._prod_delta_from_arb is None
):
self._status = "finished"
@property
def recovery_function(self):
"""
Gets the recovery function associated with the event.
Returns
-------
Callable or None
Function to calculate recovery over time, if applicable.
"""
return self._recovery_function
@cached_property
def affected_sectors_idx(self) -> npt.NDArray:
"""
Gets the index of sectors affected by the event.
Returns
-------
npt.NDArray
Array of indices representing affected sectors.
"""
return np.searchsorted(self.sim.model.sectors, self.event.aff_sectors)
@cached_property
def affected_regions_idx(self) -> npt.NDArray:
"""
Gets the index of regions affected by the event.
Returns
-------
npt.NDArray
Array of indices representing affected regions.
Raises
------
ValueError
If some affected regions are not found in the model.
"""
impossible_regions = np.setdiff1d(
self.event.aff_regions, self.sim.model.regions
)
if impossible_regions.size > 0:
raise ValueError(
"Some affected regions of the event are not in the model : {}".format(
impossible_regions
)
)
return np.searchsorted(self.sim.model.regions, self.event.aff_regions)
@cached_property
def affected_industries_idx(self) -> npt.NDArray:
"""
Gets the index of industries affected by the event.
Returns
-------
npt.NDArray
Array of indices representing affected industries.
"""
return np.array(
[
np.size(self.sim.model.sectors) * ri + si
for ri in self.affected_regions_idx
for si in self.affected_sectors_idx
]
)
@cached_property
def rebuilding_sectors_idx(self) -> npt.NDArray:
"""
Gets the index of sectors involved in rebuilding for the event.
Returns
-------
npt.NDArray
Array of indices representing rebuilding sectors.
Raises
------
ValueError
If the event is not a rebuilding event or if some rebuilding sectors are not found in the model.
"""
if not isinstance(self.event, EventKapitalRebuild):
raise ValueError(
"This event is not a rebuilding event and has no rebuilding sectors."
)
else:
impossible_sectors = np.setdiff1d(
self.event.rebuilding_sectors, self.sim.model.sectors
)
if impossible_sectors.size > 0:
raise ValueError(
"Some rebuilding sectors of the event are not in the model : {}".format(
impossible_sectors
)
)
return np.searchsorted(self.sim.model.sectors, self.event.rebuilding_sectors)
@cached_property
def rebuilding_industries_idx_impacted(self) -> npt.NDArray:
"""
Gets the index of rebuilding industries that were impacted by the event.
Returns
-------
npt.NDArray
Array of indices representing rebuilding industries impacted by the event.
"""
rebuilding_sectors_idx = self.rebuilding_sectors_idx
affected_region_idx = self.affected_regions_idx
return np.array(
[
np.size(self.sim.model.sectors) * ri + si
for ri in affected_region_idx
for si in rebuilding_sectors_idx
],
dtype="int64",
)
@cached_property
def rebuilding_industries_idx_not_impacted(self) -> npt.NDArray:
"""
Gets the index of rebuilding industries not impacted by the event.
Returns
-------
npt.NDArray
Array of indices representing rebuilding industries not impacted by the event.
"""
rebuilding_sectors_idx = self.rebuilding_sectors_idx
affected_region_idx = self.affected_regions_idx
return np.array(
[
np.size(self.sim.model.sectors) * ri + si
for ri in range(np.size(self.sim.model.regions))
if ri not in affected_region_idx
for si in rebuilding_sectors_idx
],
dtype="int64",
)
@cached_property
def rebuilding_industries_idx_all(self) -> npt.NDArray:
"""
Gets the index of all rebuilding industries involved in the event.
Returns
-------
npt.NDArray
Array of indices representing all rebuilding industries for the event.
"""
rebuilding_sectors_idx = self.rebuilding_sectors_idx
return np.array(
[
np.size(self.sim.model.sectors) * ri + si
for ri in range(np.size(self.sim.model.regions))
for si in rebuilding_sectors_idx
],
dtype="int64",
)