Source code for simulation_framework.src.ai.decision_maker

from __future__ import annotations

import random
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from .goal import (
    AttackEnemyGoal,
    CraftItemGoal,
    ExploreGoal,
    FleeFromDangerGoal,
    GatherResourceGoal,
    Goal,
    RestGoal,
)

if TYPE_CHECKING:
    from ..core.world import World
    from ..entities.base import Entity


[docs] class DecisionMaker: def __init__(self): self.goal_history: Dict[str, int] = {} # Track how often goals are selected self.last_decision_tick = 0 self.decision_cooldown = 3 # Ticks between decisions
[docs] def evaluate_all_goals( self, agent: Entity, world: World, current_goals: List[Goal] ) -> List[Tuple[Goal, float]]: """Evaluate utility of all possible goals""" goal_utilities = [] # Evaluate existing goals for goal in current_goals: if not goal.should_abandon(agent, world): utility = self._calculate_goal_utility(goal, agent, world) goal_utilities.append((goal, utility)) # Generate new potential goals new_goals = self._generate_potential_goals(agent, world) for goal in new_goals: utility = self._calculate_goal_utility(goal, agent, world) goal_utilities.append((goal, utility)) return goal_utilities
[docs] def select_goal( self, agent: Entity, world: World, current_goals: List[Goal], selection_method: str = "utility_weighted", ) -> Optional[Goal]: """Select the best goal for the agent""" if world.current_tick < self.last_decision_tick + self.decision_cooldown: # Return current goal if still valid if current_goals: for goal in current_goals: if not goal.should_abandon(agent, world): return goal return None self.last_decision_tick = world.current_tick goal_utilities = self.evaluate_all_goals(agent, world, current_goals) if not goal_utilities: # Emergency goal - rest if nothing else return RestGoal(priority=1) # Sort by utility goal_utilities.sort(key=lambda x: x[1], reverse=True) if selection_method == "highest_utility": return goal_utilities[0][0] elif selection_method == "utility_weighted": return self._weighted_selection(goal_utilities) else: return goal_utilities[0][0]
def _calculate_goal_utility(self, goal: Goal, agent: Entity, world: World) -> float: """Calculate the total utility of a goal""" base_utility = goal.get_utility(agent, world) # Personality modifiers personality_modifier = self._get_personality_modifier(goal, agent) # Class modifiers class_modifier = self._get_class_modifier(goal, agent) # Context modifiers context_modifier = self._get_context_modifier(goal, agent, world) # History modifier (avoid repeated goals) history_modifier = self._get_history_modifier(goal) total_utility = ( base_utility * personality_modifier * class_modifier * context_modifier * history_modifier ) return max(0.0, min(1.0, total_utility)) def _get_personality_modifier(self, goal: Goal, agent: Entity) -> float: """Get personality-based utility modifier""" if not hasattr(agent, "personality"): return 1.0 personality = agent.personality modifier = 1.0 if isinstance(goal, ExploreGoal): modifier = personality.curiosity * 1.5 + personality.bravery * 0.5 elif isinstance(goal, GatherResourceGoal): modifier = personality.industriousness * 1.3 + personality.greed * 0.4 elif isinstance(goal, CraftItemGoal): modifier = personality.industriousness * 1.2 + personality.patience * 0.8 elif isinstance(goal, AttackEnemyGoal): modifier = personality.aggression * 1.5 + personality.bravery * 0.8 elif isinstance(goal, FleeFromDangerGoal): modifier = personality.caution * 1.5 + (1.0 - personality.bravery) * 0.5 elif isinstance(goal, RestGoal): modifier = personality.patience * 0.8 + personality.caution * 0.4 return max(0.2, min(2.0, modifier)) def _get_class_modifier(self, goal: Goal, agent: Entity) -> float: """Get character class-based utility modifier""" if not hasattr(agent, "character_class"): return 1.0 character_class = agent.character_class modifier = 1.0 if isinstance(goal, ExploreGoal): if character_class.name in ["Explorer", "Hunter"]: modifier = 1.4 elif isinstance(goal, GatherResourceGoal): if character_class.name in ["Hunter", "Alchemist", "Blacksmith", "Farmer"]: modifier = 1.3 elif isinstance(goal, CraftItemGoal): if character_class.name in ["Blacksmith", "Alchemist", "Mage"]: modifier = 1.5 elif isinstance(goal, AttackEnemyGoal): if character_class.name in ["Warrior", "Hunter"]: modifier = 1.4 return modifier def _get_context_modifier(self, goal: Goal, agent: Entity, world: World) -> float: """Get context-based utility modifier""" modifier = 1.0 # Health-based modifiers health_ratio = agent.stats.get_health_percentage() stamina_ratio = agent.stats.get_stamina_percentage() if health_ratio < 0.3 or stamina_ratio < 0.2: if isinstance(goal, RestGoal): modifier *= 2.0 elif isinstance(goal, (AttackEnemyGoal, ExploreGoal)): modifier *= 0.3 # Inventory space modifiers inventory_full = ( agent.inventory.get_total_items() >= agent.inventory.capacity * 0.9 ) if inventory_full: if isinstance(goal, GatherResourceGoal): modifier *= 0.2 elif isinstance(goal, CraftItemGoal): modifier *= 1.5 # Crafting uses materials # Time of day / environmental modifiers could be added here return modifier def _get_history_modifier(self, goal: Goal) -> float: """Get modifier based on goal selection history""" goal_type = type(goal).__name__ recent_selections = self.goal_history.get(goal_type, 0) if recent_selections > 3: return 0.7 # Reduce likelihood of repeated goals elif recent_selections > 1: return 0.85 else: return 1.0 def _weighted_selection(self, goal_utilities: List[Tuple[Goal, float]]) -> Goal: """Select goal using weighted random selection""" if not goal_utilities: return RestGoal() # Normalize utilities total_utility = sum(utility for _, utility in goal_utilities) if total_utility <= 0: return goal_utilities[0][0] # Create weighted list weights = [utility / total_utility for _, utility in goal_utilities] # Random selection rand = random.random() cumulative = 0.0 for i, weight in enumerate(weights): cumulative += weight if rand <= cumulative: selected_goal = goal_utilities[i][0] # Update history goal_type = type(selected_goal).__name__ self.goal_history[goal_type] = self.goal_history.get(goal_type, 0) + 1 return selected_goal # Fallback to highest utility return goal_utilities[0][0] def _generate_potential_goals(self, agent: Entity, world: World) -> List[Goal]: """Generate potential new goals based on agent state and world""" potential_goals = [] # Always consider resting if tired if ( agent.stats.get_stamina_percentage() < 0.5 or agent.stats.get_health_percentage() < 0.7 ): potential_goals.append(RestGoal()) # Exploration goals if hasattr(agent, "personality") and agent.personality.curiosity > 0.4: potential_goals.append(ExploreGoal()) # Gathering goals based on needs and preferences needed_resources = self._identify_needed_resources(agent) for resource in needed_resources: potential_goals.append(GatherResourceGoal(resource, target_quantity=5)) # Combat goals - look for nearby enemies enemies = self._find_nearby_enemies(agent, world, range_limit=10) for enemy in enemies: if self._should_consider_combat(agent, enemy): potential_goals.append(AttackEnemyGoal(enemy.id)) # Flee goals - check for immediate threats threats = self._find_immediate_threats(agent, world) for threat in threats: potential_goals.append(FleeFromDangerGoal(threat.id)) return potential_goals def _identify_needed_resources(self, agent: Entity) -> List[str]: """Identify what resources the agent currently needs""" needed = [] # Basic needs if agent.inventory.get_item_count("Bread") < 3: needed.append("berries") # For food if not agent.inventory.get_equipped_weapon(): needed.append("wood") # For crafting weapons # Class-specific needs if hasattr(agent, "character_class"): if agent.character_class.name == "Blacksmith": if agent.inventory.get_item_count("Iron Ore") < 5: needed.append("iron_ore") elif agent.character_class.name == "Alchemist": if agent.inventory.get_item_count("Herbs") < 3: needed.append("herbs") return needed[:2] # Limit to 2 most important needs def _find_nearby_enemies( self, agent: Entity, world: World, range_limit: int = 10 ) -> List: """Find potential enemies within range""" enemies = [] agent_x, agent_y = agent.position for entity in world.entities.values(): if entity.id == agent.id: continue if not entity.stats.is_alive(): continue # Check if it's an NPC that could be hostile if hasattr(entity, "npc_type") and entity.npc_type == "aggressive": distance = agent.distance_to(entity) if distance <= range_limit: enemies.append(entity) return enemies def _should_consider_combat(self, agent: Entity, enemy: Entity) -> bool: """Determine if agent should consider fighting this enemy""" if not hasattr(agent, "personality"): return False # Check relative strength agent_power = agent.stats.attack_power + agent.stats.defense enemy_power = enemy.stats.attack_power + enemy.stats.defense strength_ratio = agent_power / max(enemy_power, 1) # Factor in personality combat_willingness = ( agent.personality.bravery * 0.4 + agent.personality.aggression * 0.4 + (1.0 - agent.personality.caution) * 0.2 ) # Adjust for strength if strength_ratio > 1.5: combat_willingness *= 1.3 elif strength_ratio < 0.8: combat_willingness *= 0.5 return random.random() < combat_willingness def _find_immediate_threats( self, agent: Entity, world: World, range_limit: int = 5 ) -> List: """Find immediate threats that agent should flee from""" threats = [] for entity in world.entities.values(): if entity.id == agent.id: continue if not entity.stats.is_alive(): continue # Check if entity is hostile and nearby if ( hasattr(entity, "npc_type") and entity.npc_type == "aggressive" and hasattr(entity, "target_id") and entity.target_id == agent.id ): distance = agent.distance_to(entity) if distance <= range_limit: # Check if we should flee if self._should_flee_from(agent, entity): threats.append(entity) return threats def _should_flee_from(self, agent: Entity, threat: Entity) -> bool: """Determine if agent should flee from this threat""" if not hasattr(agent, "personality"): return True # Default to caution # Check relative strength agent_power = ( agent.stats.attack_power + agent.stats.defense + agent.stats.health ) threat_power = ( threat.stats.attack_power + threat.stats.defense + threat.stats.health ) strength_ratio = agent_power / max(threat_power, 1) # Factor in personality and health flee_tendency = ( agent.personality.caution * 0.5 + (1.0 - agent.personality.bravery) * 0.3 + (1.0 - agent.stats.get_health_percentage()) * 0.2 ) # Adjust for strength difference if strength_ratio < 0.7: # Much weaker flee_tendency *= 1.5 elif strength_ratio > 1.3: # Much stronger flee_tendency *= 0.5 # For very high flee tendency (>0.6), make it deterministic for testing if flee_tendency > 0.6: return True return random.random() < flee_tendency
[docs] def reset_decision_cooldown(self) -> None: """Reset the decision cooldown (for testing or special cases)""" self.last_decision_tick = 0
[docs] def get_decision_summary(self, agent: Entity) -> Dict[str, any]: """Get summary of decision-making state for debugging""" return { "last_decision_tick": self.last_decision_tick, "goal_history": self.goal_history.copy(), "decision_cooldown": self.decision_cooldown, "agent_personality": ( agent.personality.to_dict() if hasattr(agent, "personality") else None ), "agent_class": ( str(agent.character_class) if hasattr(agent, "character_class") else None ), }