mesa_frames.model

  1from copy import deepcopy
  2from time import time
  3from warnings import warn
  4
  5import geopandas as gpd
  6import numpy as np
  7import pandas as pd
  8
  9from mesa_frames.agent import AgentDF
 10
 11
 12class ModelDF:
 13    """The base class for all models
 14
 15    Attributes
 16    ----------
 17    unique_id : int
 18        The unique_id of the model.
 19    running : bool
 20        Indicates if the model is running or not.
 21    agents : pd.DataFrame | gpd.GeoDataFrame | None
 22        The dataframe containing the agents of the model.
 23    agent_types : list[tuple[type[AgentDF], float]] | None
 24        The list of agent types and their proportions.
 25    p_agents : dict[type[AgentDF], float] | None
 26        The dictionary of agents to create. The keys are the types of agents,
 27        the values are the percentages of each agent type. The sum of the values should be 1.
 28    space
 29        The space where the agents will be placed. Can be None if model does not have a space.
 30    """
 31
 32    def __new__(cls, *args, **kwargs):
 33        """Create a new model object and instantiate its RNG automatically
 34        (adds supports to numpy with respect to base model)."""
 35        obj = object.__new__(cls)
 36        obj._seed = kwargs.get("seed")
 37        if obj._seed is None:
 38            # We explicitly specify the seed here so that we know its value in
 39            # advance.
 40            obj._seed = np.random.SeedSequence().entropy
 41        # Use default_rng to get a new Generator instance
 42        obj.random = np.random.default_rng(obj._seed)
 43        return obj
 44
 45    def __init__(self, unique_id: int | None = None, space=None):
 46        """Create a new model. Overload this method with the actual code to
 47        start the model. Always start with super().__init__() to initialize the
 48        model object properly.
 49
 50        Parameters
 51        ----------
 52        unique_id : int | None
 53            The unique_id of the model.
 54            If None, a random unique_id is assigned using a 64-bit random integer.
 55        space
 56            The space where the agents will be placed. Can be None if model does not have a space.
 57        """
 58        # Initialize default attributes
 59        self.running: bool = True
 60        self.agents: pd.DataFrame | gpd.GeoDataFrame | None = None
 61        self.agent_types: list[tuple[type[AgentDF], float]] | None = None
 62        self.p_agents: dict[type[AgentDF], float] | None = None
 63        # self.schedule : BaseScheduler = None
 64
 65        # Initialize optional parameters
 66        if not unique_id:
 67            self.unique_id = np.random.randint(
 68                low=-9223372036854775808, high=9223372036854775807, dtype="int64"
 69            )
 70        else:
 71            self.unique_id = unique_id
 72        self.space = space
 73
 74        # Initialize data collection
 75        # self.initialize_data_collector(data_collection)
 76
 77    def get_agents_of_type(self, agent_type: type[AgentDF]) -> pd.Series:
 78        """Returns a boolean mask of the agents dataframe of the model which corresponds to the agent_type.
 79
 80        Parameters
 81        ----------
 82        agent_type : type[AgentDF]
 83            The agent_type to get the mask for.
 84        """
 85        if self.agents is None:
 86            raise RuntimeError(
 87                "You must create agents before getting their masks. Use create_agents() method."
 88            )
 89        return self.agents["type"].str.contains(agent_type.__name__)  # type: ignore
 90
 91    def run_model(self, n_steps: int | None = None, merged_mro: bool = False) -> None:
 92        """If n_steps are specified, executes model.step() until n_steps are reached.
 93        Otherwise, until self.running is false (as the default mesa.Model.run_model).
 94
 95        Parameters
 96        ----------
 97        n_steps : int | None
 98            The number of steps which the model will execute.
 99            Can be None if a running condition turning false is used.
100        merged_mro: bool
101            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
102            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
103            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
104            will be executed only once. Not a viable option if the behavior of a class depends on another.
105        """
106        if n_steps:
107            if not (isinstance(n_steps, int) and n_steps > 0):
108                raise TypeError(
109                    "n_steps should be an integer greater than 0 or None if a running condition is used"
110                )
111            for _ in range(n_steps):
112                self.step(merged_mro)
113        else:
114            while self.running:
115                self.step(merged_mro)
116
117    def step(self, merged_mro: bool = False) -> None:
118        """Executes one step of the model.
119
120        Parameters
121        ----------
122        merged_mro : bool
123            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
124            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
125            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
126            will be executed only once. Not a viable option if the behavior of a class depends on another.
127        """
128        if self.agent_types is None or self.p_agents is None:
129            raise RuntimeError(
130                "You must create agents before running the model. Use create_agents() method."
131            )
132        if merged_mro:
133            for agent_type in self.agent_types:
134                agent_type[0].step()
135        else:
136            for agent in self.p_agents:
137                agent.step()
138
139    def reset_randomizer(self, seed: int | None = None) -> None:
140        """Reset the model random number generator.
141
142        Parameters
143        ----------
144        seed : int | None
145            A new seed for the RNG; if None, reset using the current seed
146        """
147        if seed is None:
148            seed = self._seed
149        self.random = np.random.default_rng(seed)
150        self._seed = seed
151
152    def create_agents(
153        self, n_agents: int, p_agents: dict[type[AgentDF], float]
154    ) -> None:
155        """Populate the self.agents dataframe.
156
157        Parameters
158        ----------
159        n_agents : int | None
160            The number of agents which the model will create.
161        p_agents : dict[type[AgentDF], float]
162            The dictionary of agents to create. The keys are the types of agents,
163            the values are the percentages of each agent type. The sum of the values should be 1.
164        """
165
166        # Verify parameters
167        if not (isinstance(n_agents, int) and n_agents > 0):
168            raise TypeError("n_agents should be an integer greater than 0")
169        if sum(p_agents.values()) != 1:
170            raise ValueError("Sum of proportions of agents should be 1")
171        if any(p < 0 or p > 1 for p in p_agents.values()):
172            raise ValueError("Proportions of agents should be between 0 and 1")
173
174        self.p_agents = p_agents
175
176        start_time = time()
177        print("Creating agents: ...")
178
179        mros = [[agent.__mro__[:-1], p] for agent, p in p_agents.items()]
180        mros_copy = deepcopy(mros)
181        agent_types = []
182
183        # Create a "merged MRO" (inspired by C3 linearization algorithm)
184        while True:
185            candunique_idate_added = False
186            # if all mros are empty, the merged mro is done
187            if not any(mro[0] for mro in mros):
188                break
189            for mro in mros:
190                # If mro is empty, continue
191                if not mro[0]:
192                    continue
193                # candunique_idate = head
194                candunique_idate = mro[0][0]
195                # If candunique_idate appears in the tail of another MRO, skip it for now (because other agent_types depend on it, will be added later)
196                if any(
197                    candunique_idate in other_mro[0][1:]
198                    for other_mro in mros
199                    if other_mro is not mro
200                ):
201                    continue
202                else:
203                    p = 0
204                    for i, other_mro in enumerate(mros):
205                        if other_mro[0][0] == candunique_idate:
206                            p += other_mro[1]
207                            mros[i][0] = other_mro[0][1:]
208                        else:
209                            continue
210                    agent_types.append((candunique_idate, p))  # Safe to add it
211                    candunique_idate_added = True
212            # If there wasn't any good head, there is an inconsistent hierarchy
213            if not candunique_idate_added:
214                raise ValueError("Inconsistent hierarchy")
215        self.agent_types = list(agent_types)
216
217        # Create a single DF using vars and values for every class
218        columns: set[str] = set()
219        dtypes: dict[str, str] = {}
220        for agent_type in self.agent_types:
221            for key, val in agent_type[0].dtypes.items():
222                if key not in columns:
223                    columns.add(key)
224                    dtypes[key] = val
225
226        if "geometry" in columns:
227            if not (self.space and hasattr(self.space, "crs")):
228                raise ValueError(
229                    "You must specify a space with a crs attribute if you want to create GeoAgents"
230                )
231            self.agents = gpd.GeoDataFrame(
232                index=pd.RangeIndex(0, n_agents),
233                columns=list(columns),
234                crs=self.space.crs,
235            )
236        else:
237            self.agents = pd.DataFrame(
238                index=pd.RangeIndex(0, n_agents), columns=list(columns)
239            )
240
241        # Populate agents type
242        start_index = 0
243        for i, (_, p) in enumerate(p_agents.items()):
244            self.agents.loc[
245                start_index : start_index + int(n_agents * p) - 1, "type"
246            ] = str(mros_copy[i][0])
247            start_index += int(n_agents * p)
248
249        # Initialize agents
250        AgentDF.model = self
251        self.update_agents_masks()
252        for agent in p_agents:
253            agent.__init__()
254
255        # Set dtypes
256        for col, dtype in dtypes.items():
257            if "int" in dtype and self.agents[col].isna().sum() > 0:  # type: ignore
258                warn(
259                    f"Pandas does not support NaN values for int{dtype[-2:]} dtypes. Changing dtype to float{dtype[-2:]} for {col}",
260                    RuntimeWarning,
261                )
262                dtypes[col] = "float" + dtype[-2:]
263        self.agents = self.agents.astype(dtypes)
264
265        # Set agents' unique_id as index (Have to reassign masks because index changed)
266        self.agents.set_index("id", inplace=True)
267        self.update_agents_masks()
268
269        print("Created agents: " + "--- %s seconds ---" % (time() - start_time))
270
271    def _initialize_data_collection(self, how="2d") -> None:
272        """Initializes the data collection of the model.
273
274        Parameters
275        ----------
276        how : str
277            The frequency of the data collection. It can be 'xd', 'xd', 'xh', 'weekly', 'daily', 'hourly'.
278        """
279        # TODO: finish implementation of different data collections
280        if how == "2d":
281            return
282
283    def update_agents_masks(self) -> None:
284        """Updates the masks attributes of each agent in self.agent_types.
285        Useful after agents are created/deleted or index changes.
286        """
287        if self.agent_types is None:
288            raise RuntimeError(
289                "You must create agents before updating their masks. Use create_agents() method."
290            )
291        for agent_type in self.agent_types:
292            agent_type[0].mask = self.get_agents_of_type(agent_type[0])
293
294    # TODO: implement different data collection frequencies (xw, xd, xh, weekly, daily, hourly, per every step):
295    """def initialize_data_collector(
296        self,
297        model_reporters=None,
298        agent_reporters=None,
299        tables=None,
300    ) -> None:
301        if not hasattr(self, "schedule") or self.schedule is None:
302            raise RuntimeError(
303                "You must initialize the scheduler (self.schedule) before initializing the data collector."
304            )
305        if self.schedule.get_agent_count() == 0:
306            raise RuntimeError(
307                "You must add agents to the scheduler before initializing the data collector."
308            )
309        self.datacollector = DataCollector(
310            model_reporters=model_reporters,
311            agent_reporters=agent_reporters,
312            tables=tables,
313        )
314        # Collect data for the first time during initialization.
315        self.datacollector.collect(self)"""
class ModelDF:
 13class ModelDF:
 14    """The base class for all models
 15
 16    Attributes
 17    ----------
 18    unique_id : int
 19        The unique_id of the model.
 20    running : bool
 21        Indicates if the model is running or not.
 22    agents : pd.DataFrame | gpd.GeoDataFrame | None
 23        The dataframe containing the agents of the model.
 24    agent_types : list[tuple[type[AgentDF], float]] | None
 25        The list of agent types and their proportions.
 26    p_agents : dict[type[AgentDF], float] | None
 27        The dictionary of agents to create. The keys are the types of agents,
 28        the values are the percentages of each agent type. The sum of the values should be 1.
 29    space
 30        The space where the agents will be placed. Can be None if model does not have a space.
 31    """
 32
 33    def __new__(cls, *args, **kwargs):
 34        """Create a new model object and instantiate its RNG automatically
 35        (adds supports to numpy with respect to base model)."""
 36        obj = object.__new__(cls)
 37        obj._seed = kwargs.get("seed")
 38        if obj._seed is None:
 39            # We explicitly specify the seed here so that we know its value in
 40            # advance.
 41            obj._seed = np.random.SeedSequence().entropy
 42        # Use default_rng to get a new Generator instance
 43        obj.random = np.random.default_rng(obj._seed)
 44        return obj
 45
 46    def __init__(self, unique_id: int | None = None, space=None):
 47        """Create a new model. Overload this method with the actual code to
 48        start the model. Always start with super().__init__() to initialize the
 49        model object properly.
 50
 51        Parameters
 52        ----------
 53        unique_id : int | None
 54            The unique_id of the model.
 55            If None, a random unique_id is assigned using a 64-bit random integer.
 56        space
 57            The space where the agents will be placed. Can be None if model does not have a space.
 58        """
 59        # Initialize default attributes
 60        self.running: bool = True
 61        self.agents: pd.DataFrame | gpd.GeoDataFrame | None = None
 62        self.agent_types: list[tuple[type[AgentDF], float]] | None = None
 63        self.p_agents: dict[type[AgentDF], float] | None = None
 64        # self.schedule : BaseScheduler = None
 65
 66        # Initialize optional parameters
 67        if not unique_id:
 68            self.unique_id = np.random.randint(
 69                low=-9223372036854775808, high=9223372036854775807, dtype="int64"
 70            )
 71        else:
 72            self.unique_id = unique_id
 73        self.space = space
 74
 75        # Initialize data collection
 76        # self.initialize_data_collector(data_collection)
 77
 78    def get_agents_of_type(self, agent_type: type[AgentDF]) -> pd.Series:
 79        """Returns a boolean mask of the agents dataframe of the model which corresponds to the agent_type.
 80
 81        Parameters
 82        ----------
 83        agent_type : type[AgentDF]
 84            The agent_type to get the mask for.
 85        """
 86        if self.agents is None:
 87            raise RuntimeError(
 88                "You must create agents before getting their masks. Use create_agents() method."
 89            )
 90        return self.agents["type"].str.contains(agent_type.__name__)  # type: ignore
 91
 92    def run_model(self, n_steps: int | None = None, merged_mro: bool = False) -> None:
 93        """If n_steps are specified, executes model.step() until n_steps are reached.
 94        Otherwise, until self.running is false (as the default mesa.Model.run_model).
 95
 96        Parameters
 97        ----------
 98        n_steps : int | None
 99            The number of steps which the model will execute.
100            Can be None if a running condition turning false is used.
101        merged_mro: bool
102            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
103            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
104            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
105            will be executed only once. Not a viable option if the behavior of a class depends on another.
106        """
107        if n_steps:
108            if not (isinstance(n_steps, int) and n_steps > 0):
109                raise TypeError(
110                    "n_steps should be an integer greater than 0 or None if a running condition is used"
111                )
112            for _ in range(n_steps):
113                self.step(merged_mro)
114        else:
115            while self.running:
116                self.step(merged_mro)
117
118    def step(self, merged_mro: bool = False) -> None:
119        """Executes one step of the model.
120
121        Parameters
122        ----------
123        merged_mro : bool
124            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
125            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
126            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
127            will be executed only once. Not a viable option if the behavior of a class depends on another.
128        """
129        if self.agent_types is None or self.p_agents is None:
130            raise RuntimeError(
131                "You must create agents before running the model. Use create_agents() method."
132            )
133        if merged_mro:
134            for agent_type in self.agent_types:
135                agent_type[0].step()
136        else:
137            for agent in self.p_agents:
138                agent.step()
139
140    def reset_randomizer(self, seed: int | None = None) -> None:
141        """Reset the model random number generator.
142
143        Parameters
144        ----------
145        seed : int | None
146            A new seed for the RNG; if None, reset using the current seed
147        """
148        if seed is None:
149            seed = self._seed
150        self.random = np.random.default_rng(seed)
151        self._seed = seed
152
153    def create_agents(
154        self, n_agents: int, p_agents: dict[type[AgentDF], float]
155    ) -> None:
156        """Populate the self.agents dataframe.
157
158        Parameters
159        ----------
160        n_agents : int | None
161            The number of agents which the model will create.
162        p_agents : dict[type[AgentDF], float]
163            The dictionary of agents to create. The keys are the types of agents,
164            the values are the percentages of each agent type. The sum of the values should be 1.
165        """
166
167        # Verify parameters
168        if not (isinstance(n_agents, int) and n_agents > 0):
169            raise TypeError("n_agents should be an integer greater than 0")
170        if sum(p_agents.values()) != 1:
171            raise ValueError("Sum of proportions of agents should be 1")
172        if any(p < 0 or p > 1 for p in p_agents.values()):
173            raise ValueError("Proportions of agents should be between 0 and 1")
174
175        self.p_agents = p_agents
176
177        start_time = time()
178        print("Creating agents: ...")
179
180        mros = [[agent.__mro__[:-1], p] for agent, p in p_agents.items()]
181        mros_copy = deepcopy(mros)
182        agent_types = []
183
184        # Create a "merged MRO" (inspired by C3 linearization algorithm)
185        while True:
186            candunique_idate_added = False
187            # if all mros are empty, the merged mro is done
188            if not any(mro[0] for mro in mros):
189                break
190            for mro in mros:
191                # If mro is empty, continue
192                if not mro[0]:
193                    continue
194                # candunique_idate = head
195                candunique_idate = mro[0][0]
196                # If candunique_idate appears in the tail of another MRO, skip it for now (because other agent_types depend on it, will be added later)
197                if any(
198                    candunique_idate in other_mro[0][1:]
199                    for other_mro in mros
200                    if other_mro is not mro
201                ):
202                    continue
203                else:
204                    p = 0
205                    for i, other_mro in enumerate(mros):
206                        if other_mro[0][0] == candunique_idate:
207                            p += other_mro[1]
208                            mros[i][0] = other_mro[0][1:]
209                        else:
210                            continue
211                    agent_types.append((candunique_idate, p))  # Safe to add it
212                    candunique_idate_added = True
213            # If there wasn't any good head, there is an inconsistent hierarchy
214            if not candunique_idate_added:
215                raise ValueError("Inconsistent hierarchy")
216        self.agent_types = list(agent_types)
217
218        # Create a single DF using vars and values for every class
219        columns: set[str] = set()
220        dtypes: dict[str, str] = {}
221        for agent_type in self.agent_types:
222            for key, val in agent_type[0].dtypes.items():
223                if key not in columns:
224                    columns.add(key)
225                    dtypes[key] = val
226
227        if "geometry" in columns:
228            if not (self.space and hasattr(self.space, "crs")):
229                raise ValueError(
230                    "You must specify a space with a crs attribute if you want to create GeoAgents"
231                )
232            self.agents = gpd.GeoDataFrame(
233                index=pd.RangeIndex(0, n_agents),
234                columns=list(columns),
235                crs=self.space.crs,
236            )
237        else:
238            self.agents = pd.DataFrame(
239                index=pd.RangeIndex(0, n_agents), columns=list(columns)
240            )
241
242        # Populate agents type
243        start_index = 0
244        for i, (_, p) in enumerate(p_agents.items()):
245            self.agents.loc[
246                start_index : start_index + int(n_agents * p) - 1, "type"
247            ] = str(mros_copy[i][0])
248            start_index += int(n_agents * p)
249
250        # Initialize agents
251        AgentDF.model = self
252        self.update_agents_masks()
253        for agent in p_agents:
254            agent.__init__()
255
256        # Set dtypes
257        for col, dtype in dtypes.items():
258            if "int" in dtype and self.agents[col].isna().sum() > 0:  # type: ignore
259                warn(
260                    f"Pandas does not support NaN values for int{dtype[-2:]} dtypes. Changing dtype to float{dtype[-2:]} for {col}",
261                    RuntimeWarning,
262                )
263                dtypes[col] = "float" + dtype[-2:]
264        self.agents = self.agents.astype(dtypes)
265
266        # Set agents' unique_id as index (Have to reassign masks because index changed)
267        self.agents.set_index("id", inplace=True)
268        self.update_agents_masks()
269
270        print("Created agents: " + "--- %s seconds ---" % (time() - start_time))
271
272    def _initialize_data_collection(self, how="2d") -> None:
273        """Initializes the data collection of the model.
274
275        Parameters
276        ----------
277        how : str
278            The frequency of the data collection. It can be 'xd', 'xd', 'xh', 'weekly', 'daily', 'hourly'.
279        """
280        # TODO: finish implementation of different data collections
281        if how == "2d":
282            return
283
284    def update_agents_masks(self) -> None:
285        """Updates the masks attributes of each agent in self.agent_types.
286        Useful after agents are created/deleted or index changes.
287        """
288        if self.agent_types is None:
289            raise RuntimeError(
290                "You must create agents before updating their masks. Use create_agents() method."
291            )
292        for agent_type in self.agent_types:
293            agent_type[0].mask = self.get_agents_of_type(agent_type[0])
294
295    # TODO: implement different data collection frequencies (xw, xd, xh, weekly, daily, hourly, per every step):
296    """def initialize_data_collector(
297        self,
298        model_reporters=None,
299        agent_reporters=None,
300        tables=None,
301    ) -> None:
302        if not hasattr(self, "schedule") or self.schedule is None:
303            raise RuntimeError(
304                "You must initialize the scheduler (self.schedule) before initializing the data collector."
305            )
306        if self.schedule.get_agent_count() == 0:
307            raise RuntimeError(
308                "You must add agents to the scheduler before initializing the data collector."
309            )
310        self.datacollector = DataCollector(
311            model_reporters=model_reporters,
312            agent_reporters=agent_reporters,
313            tables=tables,
314        )
315        # Collect data for the first time during initialization.
316        self.datacollector.collect(self)"""

The base class for all models

Attributes
  • unique_id (int): The unique_id of the model.
  • running (bool): Indicates if the model is running or not.
  • agents (pd.DataFrame | gpd.GeoDataFrame | None): The dataframe containing the agents of the model.
  • agent_types (list[tuple[type[AgentDF], float]] | None): The list of agent types and their proportions.
  • p_agents (dict[type[AgentDF], float] | None): The dictionary of agents to create. The keys are the types of agents, the values are the percentages of each agent type. The sum of the values should be 1.
  • space: The space where the agents will be placed. Can be None if model does not have a space.
ModelDF(unique_id: int | None = None, space=None)
46    def __init__(self, unique_id: int | None = None, space=None):
47        """Create a new model. Overload this method with the actual code to
48        start the model. Always start with super().__init__() to initialize the
49        model object properly.
50
51        Parameters
52        ----------
53        unique_id : int | None
54            The unique_id of the model.
55            If None, a random unique_id is assigned using a 64-bit random integer.
56        space
57            The space where the agents will be placed. Can be None if model does not have a space.
58        """
59        # Initialize default attributes
60        self.running: bool = True
61        self.agents: pd.DataFrame | gpd.GeoDataFrame | None = None
62        self.agent_types: list[tuple[type[AgentDF], float]] | None = None
63        self.p_agents: dict[type[AgentDF], float] | None = None
64        # self.schedule : BaseScheduler = None
65
66        # Initialize optional parameters
67        if not unique_id:
68            self.unique_id = np.random.randint(
69                low=-9223372036854775808, high=9223372036854775807, dtype="int64"
70            )
71        else:
72            self.unique_id = unique_id
73        self.space = space
74
75        # Initialize data collection
76        # self.initialize_data_collector(data_collection)

Create a new model. Overload this method with the actual code to start the model. Always start with super().__init__() to initialize the model object properly.

Parameters
  • unique_id (int | None): The unique_id of the model. If None, a random unique_id is assigned using a 64-bit random integer.
  • space: The space where the agents will be placed. Can be None if model does not have a space.
def get_agents_of_type( self, agent_type: type[mesa_frames.agent.AgentDF]) -> pandas.core.series.Series:
78    def get_agents_of_type(self, agent_type: type[AgentDF]) -> pd.Series:
79        """Returns a boolean mask of the agents dataframe of the model which corresponds to the agent_type.
80
81        Parameters
82        ----------
83        agent_type : type[AgentDF]
84            The agent_type to get the mask for.
85        """
86        if self.agents is None:
87            raise RuntimeError(
88                "You must create agents before getting their masks. Use create_agents() method."
89            )
90        return self.agents["type"].str.contains(agent_type.__name__)  # type: ignore

Returns a boolean mask of the agents dataframe of the model which corresponds to the agent_type.

Parameters
  • agent_type (type[AgentDF]): The agent_type to get the mask for.
def run_model(self, n_steps: int | None = None, merged_mro: bool = False) -> None:
 92    def run_model(self, n_steps: int | None = None, merged_mro: bool = False) -> None:
 93        """If n_steps are specified, executes model.step() until n_steps are reached.
 94        Otherwise, until self.running is false (as the default mesa.Model.run_model).
 95
 96        Parameters
 97        ----------
 98        n_steps : int | None
 99            The number of steps which the model will execute.
100            Can be None if a running condition turning false is used.
101        merged_mro: bool
102            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
103            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
104            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
105            will be executed only once. Not a viable option if the behavior of a class depends on another.
106        """
107        if n_steps:
108            if not (isinstance(n_steps, int) and n_steps > 0):
109                raise TypeError(
110                    "n_steps should be an integer greater than 0 or None if a running condition is used"
111                )
112            for _ in range(n_steps):
113                self.step(merged_mro)
114        else:
115            while self.running:
116                self.step(merged_mro)

If n_steps are specified, executes model.step() until n_steps are reached. Otherwise, until self.running is false (as the default mesa.Model.run_model).

Parameters
  • n_steps (int | None): The number of steps which the model will execute. Can be None if a running condition turning false is used.
  • merged_mro (bool): If False, the model will execute one step for each class in p_agent. This is the default behaviour. If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO. This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes), will be executed only once. Not a viable option if the behavior of a class depends on another.
def step(self, merged_mro: bool = False) -> None:
118    def step(self, merged_mro: bool = False) -> None:
119        """Executes one step of the model.
120
121        Parameters
122        ----------
123        merged_mro : bool
124            If False, the model will execute one step for each class in p_agent. This is the default behaviour.
125            If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO.
126            This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes),
127            will be executed only once. Not a viable option if the behavior of a class depends on another.
128        """
129        if self.agent_types is None or self.p_agents is None:
130            raise RuntimeError(
131                "You must create agents before running the model. Use create_agents() method."
132            )
133        if merged_mro:
134            for agent_type in self.agent_types:
135                agent_type[0].step()
136        else:
137            for agent in self.p_agents:
138                agent.step()

Executes one step of the model.

Parameters
  • merged_mro (bool): If False, the model will execute one step for each class in p_agent. This is the default behaviour. If True, the model will execute one step for each inherited agent type in the order of a "merged" MRO. This may increase performance if there are multiple and complex inheritance as each agent_type (even if parents of different classes), will be executed only once. Not a viable option if the behavior of a class depends on another.
def reset_randomizer(self, seed: int | None = None) -> None:
140    def reset_randomizer(self, seed: int | None = None) -> None:
141        """Reset the model random number generator.
142
143        Parameters
144        ----------
145        seed : int | None
146            A new seed for the RNG; if None, reset using the current seed
147        """
148        if seed is None:
149            seed = self._seed
150        self.random = np.random.default_rng(seed)
151        self._seed = seed

Reset the model random number generator.

Parameters
  • seed (int | None): A new seed for the RNG; if None, reset using the current seed
def create_agents( self, n_agents: int, p_agents: dict[type[mesa_frames.agent.AgentDF], float]) -> None:
153    def create_agents(
154        self, n_agents: int, p_agents: dict[type[AgentDF], float]
155    ) -> None:
156        """Populate the self.agents dataframe.
157
158        Parameters
159        ----------
160        n_agents : int | None
161            The number of agents which the model will create.
162        p_agents : dict[type[AgentDF], float]
163            The dictionary of agents to create. The keys are the types of agents,
164            the values are the percentages of each agent type. The sum of the values should be 1.
165        """
166
167        # Verify parameters
168        if not (isinstance(n_agents, int) and n_agents > 0):
169            raise TypeError("n_agents should be an integer greater than 0")
170        if sum(p_agents.values()) != 1:
171            raise ValueError("Sum of proportions of agents should be 1")
172        if any(p < 0 or p > 1 for p in p_agents.values()):
173            raise ValueError("Proportions of agents should be between 0 and 1")
174
175        self.p_agents = p_agents
176
177        start_time = time()
178        print("Creating agents: ...")
179
180        mros = [[agent.__mro__[:-1], p] for agent, p in p_agents.items()]
181        mros_copy = deepcopy(mros)
182        agent_types = []
183
184        # Create a "merged MRO" (inspired by C3 linearization algorithm)
185        while True:
186            candunique_idate_added = False
187            # if all mros are empty, the merged mro is done
188            if not any(mro[0] for mro in mros):
189                break
190            for mro in mros:
191                # If mro is empty, continue
192                if not mro[0]:
193                    continue
194                # candunique_idate = head
195                candunique_idate = mro[0][0]
196                # If candunique_idate appears in the tail of another MRO, skip it for now (because other agent_types depend on it, will be added later)
197                if any(
198                    candunique_idate in other_mro[0][1:]
199                    for other_mro in mros
200                    if other_mro is not mro
201                ):
202                    continue
203                else:
204                    p = 0
205                    for i, other_mro in enumerate(mros):
206                        if other_mro[0][0] == candunique_idate:
207                            p += other_mro[1]
208                            mros[i][0] = other_mro[0][1:]
209                        else:
210                            continue
211                    agent_types.append((candunique_idate, p))  # Safe to add it
212                    candunique_idate_added = True
213            # If there wasn't any good head, there is an inconsistent hierarchy
214            if not candunique_idate_added:
215                raise ValueError("Inconsistent hierarchy")
216        self.agent_types = list(agent_types)
217
218        # Create a single DF using vars and values for every class
219        columns: set[str] = set()
220        dtypes: dict[str, str] = {}
221        for agent_type in self.agent_types:
222            for key, val in agent_type[0].dtypes.items():
223                if key not in columns:
224                    columns.add(key)
225                    dtypes[key] = val
226
227        if "geometry" in columns:
228            if not (self.space and hasattr(self.space, "crs")):
229                raise ValueError(
230                    "You must specify a space with a crs attribute if you want to create GeoAgents"
231                )
232            self.agents = gpd.GeoDataFrame(
233                index=pd.RangeIndex(0, n_agents),
234                columns=list(columns),
235                crs=self.space.crs,
236            )
237        else:
238            self.agents = pd.DataFrame(
239                index=pd.RangeIndex(0, n_agents), columns=list(columns)
240            )
241
242        # Populate agents type
243        start_index = 0
244        for i, (_, p) in enumerate(p_agents.items()):
245            self.agents.loc[
246                start_index : start_index + int(n_agents * p) - 1, "type"
247            ] = str(mros_copy[i][0])
248            start_index += int(n_agents * p)
249
250        # Initialize agents
251        AgentDF.model = self
252        self.update_agents_masks()
253        for agent in p_agents:
254            agent.__init__()
255
256        # Set dtypes
257        for col, dtype in dtypes.items():
258            if "int" in dtype and self.agents[col].isna().sum() > 0:  # type: ignore
259                warn(
260                    f"Pandas does not support NaN values for int{dtype[-2:]} dtypes. Changing dtype to float{dtype[-2:]} for {col}",
261                    RuntimeWarning,
262                )
263                dtypes[col] = "float" + dtype[-2:]
264        self.agents = self.agents.astype(dtypes)
265
266        # Set agents' unique_id as index (Have to reassign masks because index changed)
267        self.agents.set_index("id", inplace=True)
268        self.update_agents_masks()
269
270        print("Created agents: " + "--- %s seconds ---" % (time() - start_time))

Populate the self.agents dataframe.

Parameters
  • n_agents (int | None): The number of agents which the model will create.
  • p_agents (dict[type[AgentDF], float]): The dictionary of agents to create. The keys are the types of agents, the values are the percentages of each agent type. The sum of the values should be 1.
def update_agents_masks(self) -> None:
284    def update_agents_masks(self) -> None:
285        """Updates the masks attributes of each agent in self.agent_types.
286        Useful after agents are created/deleted or index changes.
287        """
288        if self.agent_types is None:
289            raise RuntimeError(
290                "You must create agents before updating their masks. Use create_agents() method."
291            )
292        for agent_type in self.agent_types:
293            agent_type[0].mask = self.get_agents_of_type(agent_type[0])

Updates the masks attributes of each agent in self.agent_types. Useful after agents are created/deleted or index changes.