Source code for simulation_framework.src.entities.agent

from __future__ import annotations

import logging
import random
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple

from ..ai.character_class import CharacterClass, get_random_character_class
from ..ai.decision_maker import DecisionMaker
from ..ai.goal import Goal
from ..ai.personality import Personality
from ..ai.spatial_memory import SpatialMemory
from .base import Entity
from .stats import Stats

logger = logging.getLogger(__name__)


if TYPE_CHECKING:
    from ..actions.base import Action
    from ..core.world import World


[docs] class Agent(Entity): def __init__( self, position: Tuple[int, int], name: str = "Agent", personality: Optional[Personality] = None, character_class: Optional[CharacterClass] = None, stats: Optional[Stats] = None, ): super().__init__(position, name, stats or Stats()) self.personality = personality or Personality.randomize() self.character_class = character_class or get_random_character_class() # Apply class bonuses to stats self._apply_class_bonuses() # AI state self.skills: Dict[str, int] = {} self.decision_maker = DecisionMaker() self.current_goals: List[Goal] = [] self.current_action: Optional[Action] = None self.current_action_goal: Optional[Goal] = ( None # Track which goal generated current action ) self.action_queue: List[Action] = [] # Experience and relationships self.total_experience = 0 self.relationships: Dict[int, float] = {} # entity_id -> relationship (-1 to 1) # Memory and awareness self.spatial_memory = SpatialMemory(memory_duration=100) self.known_entities: Dict[int, Dict] = {} # Recent entity sightings self.memory_duration = 50 # How long to remember entity positions # Behavioral state self.last_action_tick = 0 self.idle_ticks = 0 self.max_idle_ticks = 10 def _apply_class_bonuses(self) -> None: """Apply character class stat bonuses""" for stat_name, bonus in self.character_class.starting_stats_bonus.items(): if hasattr(self.stats, stat_name): current_value = getattr(self.stats, stat_name) setattr(self.stats, stat_name, current_value + bonus) # Give starting equipment for item in self.character_class.get_starting_equipment(): self.inventory.add_item(item, 1) # Equip weapons and tools automatically if item.item_type == "weapon": self.inventory.equip_weapon(item) elif item.item_type == "tool": tool_type = item.get_property("tool_type") if tool_type: self.inventory.equip_tool(item, tool_type)
[docs] def update(self, world: World) -> None: """Main update loop for agent AI""" self.update_status_effects() self._update_memory(world) if not self.stats.is_alive(): return # Three-phase AI update self.perceive(world) self.decide(world) self.act(world)
[docs] def perceive(self, world: World) -> None: """Update agent's knowledge of the world""" self._scan_for_entities(world) self._update_relationships(world) self._update_spatial_memory(world)
[docs] def decide(self, world: World) -> None: """Make decisions about what to do next""" # Clean up completed/invalid goals self.current_goals = [ goal for goal in self.current_goals if not goal.is_complete(self, world) and not goal.should_abandon(self, world) ] # Select new goal if needed if not self.current_goals or random.random() < 0.1: # 10% chance to reconsider new_goal = self.decision_maker.select_goal(self, world, self.current_goals) if new_goal and new_goal not in self.current_goals: self.current_goals.append(new_goal) # Sort goals by priority self.current_goals.sort(key=lambda g: g.priority, reverse=True)
[docs] def act(self, world: World) -> None: """Execute actions based on current goals""" # Check if current action should be interrupted by higher priority goal if ( self.current_action and self.current_action.is_active and not self.current_action.is_complete(world.current_tick) and self.current_goals and self.current_action_goal ): highest_priority_goal = self.current_goals[0] # If highest priority goal is different and has higher priority, interrupt if ( highest_priority_goal != self.current_action_goal and highest_priority_goal.priority > self.current_action_goal.priority and self.current_action.can_interrupt() ): # Interrupt current action self.current_action.interrupt() logger.info( f"Agent {self.name} interrupted {type(self.current_action).__name__} for {highest_priority_goal}" ) # Notify the old goal that its action was interrupted if hasattr(self.current_action_goal, "on_action_completed"): self.current_action_goal.on_action_completed( self.current_action, False, self, world ) # Clear interrupted action self.current_action = None self.current_action_goal = None # Check if current action is still running (and wasn't interrupted) if ( self.current_action and hasattr(self.current_action, "is_active") and self.current_action.is_active and not self.current_action.is_complete(world.current_tick) ): return # Still executing current action # Get next action from highest priority goal new_action = None action_goal = None if self.action_queue: # Execute queued actions first new_action = self.action_queue.pop(0) # Queued actions don't have associated goals elif self.current_goals: # Get action from current goal active_goal = self.current_goals[0] new_action = active_goal.get_next_action(self, world) action_goal = active_goal if new_action: # Start the action if hasattr(new_action, "start"): new_action.start(world.current_tick) # Execute immediately for single-tick actions if new_action.get_duration() <= 1: result = new_action.execute(self, world) active_goal.on_action_completed( new_action, result.success, self, world ) # Gain experience for successful actions if result.success: self._gain_experience(new_action, 1) else: # Multi-tick action - store for later completion self.current_action = new_action self.current_action_goal = action_goal if new_action: self.last_action_tick = world.current_tick self.idle_ticks = 0 else: self.idle_ticks += 1 # If idle too long, add a wander goal if self.idle_ticks > self.max_idle_ticks: from ..ai.goal import ExploreGoal self.current_goals.append(ExploreGoal(priority=2)) self.idle_ticks = 0
def _scan_for_entities(self, world: World) -> None: """Scan for nearby entities and update knowledge""" agent_x, agent_y = self.position for entity in world.entities.values(): if entity.id == self.id: continue if not entity.stats.is_alive(): continue distance = self.distance_to(entity) if distance <= self.vision_range: # Update knowledge of this entity self.known_entities[entity.id] = { "position": entity.position, "last_seen_tick": world.current_tick, "type": type(entity).__name__, "hostile": getattr(entity, "npc_type", "neutral") == "aggressive", "health_percentage": entity.stats.get_health_percentage(), } def _update_memory(self, world: World) -> None: """Clean up old memory entries""" current_tick = world.current_tick entities_to_remove = [] for entity_id, info in self.known_entities.items(): if current_tick - info["last_seen_tick"] > self.memory_duration: entities_to_remove.append(entity_id) for entity_id in entities_to_remove: del self.known_entities[entity_id] def _update_relationships(self, world: World) -> None: """Update relationships with other agents""" # Simplified relationship system - could be expanded for entity_id, info in self.known_entities.items(): if info["type"] == "Agent": # Neutral decay towards 0 if entity_id in self.relationships: current_rel = self.relationships[entity_id] self.relationships[entity_id] = current_rel * 0.99 def _update_spatial_memory(self, world: World) -> None: """Update agent's spatial memory with visible tiles and resources""" vision_range = self.vision_range agent_x, agent_y = self.position for dy in range(-vision_range, vision_range + 1): for dx in range(-vision_range, vision_range + 1): x, y = agent_x + dx, agent_y + dy if 0 <= x < world.width and 0 <= y < world.height: tile = world.get_tile(x, y) if tile: # Remember the tile self.spatial_memory.remember_tile( position=(x, y), terrain_type=tile.terrain_type.value, has_resources=len(tile.resources) > 0, is_passable=tile.can_pass(), current_tick=world.current_tick, ) # Remember resources on this tile for resource in tile.resources: self.spatial_memory.remember_resource( resource_type=resource.resource_type, position=(x, y), quantity=resource.quantity, current_tick=world.current_tick, ) # Clean up old memories periodically if world.current_tick % 50 == 0: self.spatial_memory.forget_old_memories(world.current_tick) def _gain_experience(self, action: Action, amount: int) -> None: """Gain experience for performing actions""" self.total_experience += amount # Gain skill experience based on action type skill_name = self._get_skill_for_action(action) if skill_name: multiplier = self.character_class.get_skill_modifier(skill_name) skill_gain = int(amount * multiplier) current_skill = self.skills.get(skill_name, 0) self.skills[skill_name] = current_skill + skill_gain def _get_skill_for_action(self, action: Action) -> Optional[str]: """Map action types to skill names""" action_type = type(action).__name__ skill_mapping = { "MeleeAttack": "combat", "RangedAttack": "archery", "MagicAttack": "magic", "GatherAction": "gathering", "FishAction": "fishing", "MineAction": "mining", "WoodcutAction": "woodcutting", "ForageAction": "foraging", "CraftAction": "crafting", "PathfindAction": "exploration", } return skill_mapping.get(action_type)
[docs] def add_relationship(self, entity_id: int, relationship_change: float) -> None: """Modify relationship with another entity""" current = self.relationships.get(entity_id, 0.0) new_relationship = max(-1.0, min(1.0, current + relationship_change)) self.relationships[entity_id] = new_relationship
[docs] def get_relationship(self, entity_id: int) -> float: """Get relationship level with another entity (-1 to 1)""" return self.relationships.get(entity_id, 0.0)
[docs] def is_hostile_to(self, entity: Entity) -> bool: """Check if this agent is hostile to another entity""" if hasattr(entity, "npc_type") and entity.npc_type == "aggressive": return True # Always hostile to aggressive NPCs if entity.id in self.relationships: return self.relationships[entity.id] < -0.3 return False
[docs] def get_skill_level(self, skill_name: str) -> int: """Get current level in a skill""" return self.skills.get(skill_name, 0)
[docs] def get_dominant_skills(self, threshold: int = 5) -> List[Tuple[str, int]]: """Get skills above the threshold""" return [ (skill, level) for skill, level in self.skills.items() if level >= threshold ]
[docs] def get_agent_summary(self) -> Dict: """Get a summary of the agent's state for debugging/analysis""" return { "id": self.id, "name": self.name, "position": self.position, "personality": self.personality.to_dict(), "character_class": self.character_class.name, "skills": self.skills.copy(), "current_goals": [str(goal) for goal in self.current_goals], "current_action": str(self.current_action) if self.current_action else None, "total_experience": self.total_experience, "relationships": len(self.relationships), "known_entities": len(self.known_entities), "health": f"{self.stats.health}/{self.stats.max_health}", "stamina": f"{self.stats.stamina}/{self.stats.max_stamina}", "inventory_items": len(self.inventory.get_all_items()), }
[docs] def on_death(self, killer: Optional[Entity] = None) -> None: """Handle agent death""" if killer and killer.id != self.id: # Negative relationship with killer if hasattr(killer, "add_relationship"): killer.add_relationship(self.id, -0.5) # Clear current state self.current_goals.clear() self.current_action = None self.action_queue.clear()
def __repr__(self) -> str: status = "alive" if self.stats.is_alive() else "dead" goals = len(self.current_goals) action = "acting" if self.current_action else "idle" return ( f"Agent(id={self.id}, name='{self.name}', class={self.character_class.name}, " f"{status}, {action}, {goals} goals)" )
[docs] def create_random_agent(position: Tuple[int, int], name: Optional[str] = None) -> Agent: """Create an agent with randomized personality and class""" if not name: # Generate a random name first_names = [ "Alex", "Sam", "Jordan", "Casey", "Riley", "Avery", "Quinn", "Sage", "Blake", "Robin", ] last_names = [ "Smith", "Jones", "Brown", "Davis", "Wilson", "Clark", "Lewis", "Walker", "Hall", "Young", ] name = f"{random.choice(first_names)} {random.choice(last_names)}" personality = Personality.randomize() character_class = get_random_character_class() # Adjust stats based on personality stats = Stats() # Brave agents get more health if personality.bravery > 0.7: stats.max_health += 10 stats.health += 10 # Industrious agents get more stamina if personality.industriousness > 0.7: stats.max_stamina += 15 stats.stamina += 15 return Agent(position, name, personality, character_class, stats)
[docs] def create_agent_with_archetype( position: Tuple[int, int], archetype: str, name: Optional[str] = None ) -> Agent: """Create an agent based on a personality archetype""" personality = Personality.create_archetype(archetype) # Choose appropriate class for archetype class_mapping = { "explorer": "explorer", "warrior": "warrior", "trader": "trader", "crafter": "blacksmith", "hermit": "alchemist", "bandit": "hunter", } from ..ai.character_class import get_character_class character_class = get_character_class(class_mapping.get(archetype, "warrior")) if not name: name = f"{archetype.title()} Agent" return Agent(position, name, personality, character_class)