"""clock_trigger.py - Emits signals based on the state of a clock."""
from __future__ import annotations
from typing import Callable
import numpy as np
import fridom.framework as fr
[docs]
class ClockTrigger:
"""
Emits signals based on the state of a clock.
Description
-----------
Some modules should not be active all the time. For example you may want
a module that is only active every 10 time steps. This class provides some
basic functionality to check if a start condition is met, if a stop condition
is met, and if the module should advance.
Parameters
----------
start_date : np.datetime64, float, optional
The start date of the module. If provided, the module will start when
the model time is greater or equal to this date.
start_step : int, optional
The start step of the module. If provided, the module will start when
the model step is greater or equal to this step.
stop_date : np.datetime64, float, optional
The stop date of the module. If provided, the module will stop when
the model time is greater or equal to this date.
stop_step : int, optional
The stop step of the module. If provided, the module will stop when
the model step is greater or equal to this step.
time_interval : np.timedelta64, float, optional
The time interval between each advance of the module. If provided, the
object will trigger an advance every `time_interval` seconds.
step_number : int, optional
The number of steps between each advance of the module. If provided,
the object will trigger an advance every `step_number` steps.
"""
[docs]
def __init__(self, # noqa: PLR0913
start_date: np.datetime64 | float | None = None,
start_step: int | None = None,
stop_date: np.datetime64 | float | None = None,
stop_step: int | None = None,
time_interval: np.timedelta64 | float | None = None,
step_size: int | None = None) -> None:
# check for too many arguments
fr.exceptions.TooManyArgumentsError.check(
max_args=1, time_interval=time_interval, step_number=step_size)
self._start_date = start_date
self._start_step = start_step
self._stop_date = stop_date
self._stop_step = stop_step
self._time_interval = time_interval
self._step_size = step_size
self._start_callback = self._set_start_callback(start_date, start_step)
self._stop_callback = self._set_stop_callback(stop_date, stop_step)
self._number_of_advanced_steps = 0
self._started = False
self._stopped = False
# convert the time interval to seconds
if time_interval is not None and isinstance(time_interval, np.timedelta64):
time_interval = fr.utils.to_seconds(time_interval)
self._trigger_on_first_step = True
self._time_interval = time_interval
self._step_size = step_size
self._start_time = None
self._start_it = None
def _set_start_callback(self,
start_date: np.datetime64 | float | None = None,
start_step: int | None = None,
) -> Callable[[fr.Clock], bool]:
fr.exceptions.TooManyArgumentsError.check(
max_args=1, start_date=start_date, start_step=start_step)
# create a trigger callback for the start condition
if start_date is None and start_step is None:
# No start condition
def _start_callback(_clock: fr.Clock) -> bool:
return True
elif isinstance(start_date, np.datetime64):
# convert the start date to a float in seconds
start_time = fr.utils.to_seconds(start_date)
def _start_callback(clock: fr.Clock) -> bool:
return clock.time >= start_time
elif start_date is not None:
# use the start date as a float in seconds
def _start_callback(clock: fr.Clock) -> bool:
return clock.time >= start_date
elif start_step is not None:
# use the start step
def _start_callback(clock: fr.Clock) -> bool:
return clock.it >= start_step
else:
msg = "Invalid start condition."
raise ValueError(msg)
return _start_callback
def _set_stop_callback(self,
stop_date: np.datetime64 | float | None = None,
stop_step: int | None = None,
) -> Callable[[fr.Clock], bool]:
fr.exceptions.TooManyArgumentsError.check(
max_args=1, stop_date=stop_date, stop_step=stop_step)
if stop_date is None and stop_step is None:
# No stop condition
def _stop_callback(_clock: fr.Clock) -> bool:
return False
elif isinstance(stop_date, np.datetime64):
# convert the stop date to a float in seconds
stop_time = fr.utils.to_seconds(stop_date)
def _stop_callback(clock: fr.Clock) -> bool:
return clock.time >= stop_time
elif stop_date is not None:
# use the stop date as a float in seconds
def _stop_callback(clock: fr.Clock) -> bool:
return clock.time >= stop_date
elif stop_step is not None:
# use the stop step
def _stop_callback(clock: fr.Clock) -> bool:
return clock.it >= stop_step
else:
msg = "Invalid stop condition."
raise ValueError(msg)
return _stop_callback
[docs]
def should_start(self, clock: fr.Clock) -> bool:
"""
Check if the module should start.
Description
-----------
This method checks if a start requirement is met. And returns `True` if
this requirement is met and the start condition has not been met before.
Hence, this method will only return `True` once.
Parameters
----------
clock : fr.Clock
The clock of the model.
Returns
-------
bool
`True` if the module should start, `False` otherwise.
"""
should_start = not self._started and self._start_callback(clock)
if should_start:
self._started = True
self._start_it = clock.it
self._start_time = clock.time
return should_start
[docs]
def should_stop(self, clock: fr.Clock) -> bool:
"""
Check if the module should stop.
Description
-----------
This method checks if a stop requirement is met. And returns `True` if
this requirement is met and the stop condition has not been met before.
Hence, this method will only return `True` once.
Parameters
----------
clock : fr.Clock
The clock of the model.
Returns
-------
bool
`True` if the module should stop, `False` otherwise.
"""
should_stop = not self._stopped and self._stop_callback(clock)
if should_stop:
self._stopped = True
return should_stop
[docs]
def should_advance(self, clock: fr.Clock) -> bool:
"""
Check if the module should advance.
Description
-----------
This method checks if the module should advance. It returns `True` if
the module should advance and `False` otherwise.
Parameters
----------
clock : fr.Clock
The clock of the model.
Returns
-------
bool
`True` if the module should advance, `False` otherwise.
"""
if not self._started or self._stopped:
return False
n_steps = self._number_of_advanced_steps
if self._time_interval is not None:
target_time = self._start_time + n_steps * self._time_interval
should_advance = clock.time >= target_time
elif self._step_size is not None:
target_it = self._start_it + n_steps * self._step_size
should_advance = clock.it >= target_it
else:
should_advance = True
if should_advance:
self._number_of_advanced_steps += 1
if not self.trigger_on_first_step and self._number_of_advanced_steps == 1:
should_advance = False
return should_advance
[docs]
def check(self, clock: fr.Clock) -> bool:
"""
Check if the module should advance with automatic start and stop.
Description
-----------
This method checks if the module should advance. It will automatically
start and stop the module if the start and stop conditions are met.
Parameters
----------
clock : fr.Clock
The clock of the model.
Returns
-------
bool
`True` if the module should advance, `False` otherwise.
"""
self.should_start(clock)
self.should_stop(clock)
return self.should_advance(clock)
[docs]
def reset(self) -> None:
"""Reset the module."""
self._number_of_advanced_steps = 0
self._started = False
self._stopped = False
def __repr__(self) -> str:
res = "ClockTrigger("
if self._start_date is not None:
res += f"start_date={self._start_date}, "
elif self._start_step is not None:
res += f"start_step={self._start_step}, "
else:
res += "start_date=None, "
if self._time_interval is not None:
res += f"time_interval={self._time_interval}, "
elif self._step_size is not None:
res += f"step_size={self._step_size}, "
else:
res += "time_interval=None, "
if self._stop_date is not None:
res += f"stop_date={self._stop_date})"
elif self._stop_step is not None:
res += f"stop_step={self._stop_step})"
else:
res += "stop_date=None)"
return res
@property
def trigger_on_first_step(self) -> bool:
"""Check if the module should trigger on the first step."""
return self._trigger_on_first_step
@trigger_on_first_step.setter
def trigger_on_first_step(self, value: bool) -> None:
self._trigger_on_first_step = value