Source code for simulation_framework.src.core.simulation

from __future__ import annotations

import logging
import time
from datetime import datetime
from typing import Any, Callable, Dict, List, Optional

from ..database.analytics_engine import AnalyticsEngine
from ..database.database import Database
from ..database.models import (
    ActionLog,
    AgentSnapshot,
    CombatLog,
    SimulationRun,
    WorldSnapshot,
)
from ..entities.agent import Agent
from ..entities.npc import NPC
from ..systems.fog_of_war import FogOfWar
from ..systems.respawn import RespawnManager
from ..systems.trading import Market, TradingSystem
from .config import SimulationConfig
from .time_manager import TimeManager
from .world import World

logger = logging.getLogger(__name__)


[docs] class Simulation: """Main simulation orchestrator that manages the game loop and all systems""" def __init__(self, config: SimulationConfig): self.config = config self.time_manager = TimeManager() # Initialize database and analytics self.db = Database(config.database_path) self.analytics = AnalyticsEngine(self.db) # Initialize world self.world = World( width=config.world_width, height=config.world_height, seed=config.world_seed ) # Initialize systems self.fog_of_war = FogOfWar(self.world.width, self.world.height) self.trading_system = TradingSystem() self.respawn_manager = RespawnManager(self.world.width, self.world.height) self.market = Market() # Entity containers self.agents: List[Agent] = [] self.npcs: List[NPC] = [] # Simulation state self.running = False self.paused = False self.simulation_run: Optional[SimulationRun] = None self.simulation_id: Optional[int] = None # Performance tracking self.tick_times: List[float] = [] self.last_save_tick = 0 # Event handlers self.on_tick_complete: Optional[Callable[[int], None]] = None self.on_simulation_complete: Optional[Callable[[Simulation], None]] = None
[docs] def initialize_simulation(self, name: str, description: str = "") -> None: """Initialize a new simulation run in the database""" self.simulation_run = SimulationRun( name=name, description=description, world_seed=self.config.world_seed, world_width=self.config.world_width, world_height=self.config.world_height, start_time=datetime.now(), total_agents=len(self.agents), config={ "max_ticks": self.config.max_ticks, "save_interval": self.config.save_interval, "analytics_interval": self.config.analytics_interval, "tick_rate": self.config.tick_rate, }, ) self.simulation_id = self.db.create_simulation_run(self.simulation_run) self.simulation_run.id = self.simulation_id logger.info(f"Initialized simulation '{name}' with ID {self.simulation_id}")
[docs] def add_agent(self, agent: Agent) -> None: """Add an agent to the simulation""" agent.world = self.world agent.fog_of_war = self.fog_of_war self.agents.append(agent) # Add agent to world entities for threat detection and interactions self.world.add_entity(agent) # Update fog of war for new agent if hasattr(self.fog_of_war, "update_agent_vision"): self.fog_of_war.update_agent_vision(agent, self.world)
[docs] def add_agents(self, agents: List[Agent]) -> None: """Add multiple agents to the simulation""" for agent in agents: self.add_agent(agent)
[docs] def add_npc(self, npc: NPC) -> None: """Add an NPC to the simulation""" npc.world = self.world self.npcs.append(npc) # Add NPC to world entities for threat detection and interactions self.world.add_entity(npc)
# Note: NPCs with respawn_delay will be handled by respawn manager when they die # The RespawnManager.schedule_respawn method will be called when needed
[docs] def add_npcs(self, npcs: List[NPC]) -> None: """Add multiple NPCs to the simulation""" for npc in npcs: self.add_npc(npc)
[docs] def step(self) -> None: """Execute one simulation tick""" if not self.running or self.paused: return start_time = time.time() current_tick = self.time_manager.current_tick try: # 1. Update NPC controllers self._update_npcs() # 2. Agent perception updates self._update_agent_perception() # 3. Agent decision-making and action planning self._update_agent_decisions() # 4. Execute all queued actions self._execute_actions() # 5. Update game systems self._update_systems() # 6. Handle respawns self._process_respawns() # 7. Update market prices self._update_market() # 8. Periodic saves and analytics self._periodic_tasks() # Advance time self.time_manager.tick() self.world.current_tick = self.time_manager.current_tick # Track performance tick_duration = time.time() - start_time self.tick_times.append(tick_duration) # Keep only last 100 tick times for rolling average if len(self.tick_times) > 100: self.tick_times.pop(0) # Call tick completion handler if self.on_tick_complete: self.on_tick_complete(current_tick) logger.debug(f"Tick {current_tick} completed in {tick_duration:.4f}s") except Exception as e: logger.error(f"Error in simulation step {current_tick}: {e}") self.stop_simulation() raise
def _update_npcs(self) -> None: """Update NPC AI and behaviors""" for npc in self.npcs: if npc.stats.is_alive(): # Update NPC AI decision making if hasattr(npc, "update"): npc.update(self.world) # Handle aggro and combat for hostile NPCs if hasattr(npc, "aggro_range") and npc.aggro_range > 0: self._check_npc_aggro(npc) def _check_npc_aggro(self, npc: NPC) -> None: """Check if NPC should become aggressive towards nearby agents""" # Only check aggro if NPC doesn't already have a target if hasattr(npc, "target_id") and npc.target_id is not None: return # Already has a target if not (hasattr(npc, "aggro_range") and npc.aggro_range > 0): return # No aggro range # Find nearest agent within aggro range nearest_agent = None nearest_distance = float("inf") for agent in self.agents: if agent.stats.is_alive(): # Calculate distance manually since positions are tuples dx = npc.position[0] - agent.position[0] dy = npc.position[1] - agent.position[1] distance = (dx * dx + dy * dy) ** 0.5 if distance <= npc.aggro_range and distance < nearest_distance: nearest_agent = agent nearest_distance = distance # Set target to nearest agent in range if nearest_agent: npc.target_id = nearest_agent.id logger.info( f"NPC {npc.name} acquired target: Agent {nearest_agent.name} at distance {nearest_distance:.1f}" ) def _update_agent_perception(self) -> None: """Update agent vision and perception""" for agent in self.agents: if agent.stats.is_alive(): # Update fog of war self.fog_of_war.update_agent_vision(agent, self.world) def _update_agent_decisions(self) -> None: """Update agent AI decision making""" for agent in self.agents: if agent.stats.is_alive(): agent.update(self.world) def _execute_actions(self) -> None: """Execute all queued actions for all entities""" all_entities = self.agents + self.npcs for entity in all_entities: if entity.stats.is_alive() and hasattr(entity, "current_action"): action = entity.current_action if action and action.is_active: try: # Execute the action (progressive execution) result = action.execute(entity, self.world) # Log the action if it completed or failed if self.simulation_id and ( not result.success or action.is_complete(self.time_manager.current_tick) ): action_log = ActionLog( simulation_id=self.simulation_id, tick=self.time_manager.current_tick, agent_id=entity.id, action_type=action.__class__.__name__, action_data=( action.to_dict() if hasattr(action, "to_dict") else {} ), success=result.success, result_message=result.message, duration=action.get_duration(), ) self.db.log_action(action_log) # Log combat-specific events to combat_logs table if hasattr(result, "events") and result.events: for event in result.events: if event.event_type in [ "attack_hit", "attack_miss", ]: combat_log = CombatLog( simulation_id=self.simulation_id, tick=self.time_manager.current_tick, attacker_id=event.actor_id, target_id=( event.target_id if event.target_id else 0 ), damage_dealt=( event.data.get("damage", 0) if event.event_type == "attack_hit" else 0 ), damage_type=event.data.get( "damage_type", "physical" ), was_critical=( event.data.get("is_critical", False) if event.event_type == "attack_hit" else False ), weapon_used=action.__class__.__name__, target_died=( event.data.get("target_died", False) if event.event_type == "attack_hit" else False ), ) self.db.log_combat(combat_log) # Clear action if completed or failed if not result.success or action.is_complete( self.time_manager.current_tick ): entity.current_action = None # Notify the goal that the action completed if ( hasattr(entity, "current_goals") and entity.current_goals ): active_goal = entity.current_goals[0] if hasattr(active_goal, "on_action_completed"): active_goal.on_action_completed( action, result.success, entity, self.world ) except Exception as e: logger.error( f"Error executing action {action} for entity {entity.id}: {e}" ) entity.current_action = None def _update_systems(self) -> None: """Update all game systems""" # Update trading system self.trading_system.update(self.time_manager.current_tick) # Process any completed trades completed_trades = self.trading_system.get_completed_trades() for trade in completed_trades: # Log trades to database if self.simulation_id: from ..database.models import TradeLog trade_log = TradeLog( simulation_id=self.simulation_id, tick=self.time_manager.current_tick, initiator_id=trade.get("initiator_id", 0), target_id=trade.get("target_id", 0), offered_items={ item: qty for item, qty in trade.get("offered_items", []) }, requested_items={ item: qty for item, qty in trade.get("requested_items", []) }, offered_gold=trade.get("offered_gold", 0), requested_gold=trade.get("requested_gold", 0), completed=True, ) self.db.log_trade(trade_log) def _process_respawns(self) -> None: """Process entity respawns""" respawned_entities = self.respawn_manager.process_respawns(self.world) for entity in respawned_entities: if isinstance(entity, NPC): self.npcs.append(entity) entity.world = self.world logger.info(f"Respawned NPC {entity.name} at {entity.position}") # Log respawn event to database if self.simulation_id: action_log = ActionLog( simulation_id=self.simulation_id, tick=self.time_manager.current_tick, agent_id=entity.id, action_type="Respawn", action_data={ "entity_type": "NPC", "npc_type": getattr(entity, "npc_type", "unknown"), "position": entity.position, }, success=True, result_message=f"Respawned {entity.name} at {entity.position}", duration=0, ) self.db.log_action(action_log) elif isinstance(entity, Agent): self.agents.append(entity) entity.world = self.world entity.fog_of_war = self.fog_of_war logger.info(f"Respawned Agent {entity.name} at {entity.position}") # Log respawn event to database if self.simulation_id: action_log = ActionLog( simulation_id=self.simulation_id, tick=self.time_manager.current_tick, agent_id=entity.id, action_type="Respawn", action_data={ "entity_type": "Agent", "character_class": ( getattr(entity.character_class, "name", "unknown") if hasattr(entity, "character_class") else "unknown" ), "position": entity.position, }, success=True, result_message=f"Respawned {entity.name} at {entity.position}", duration=0, ) self.db.log_action(action_log) def _update_market(self) -> None: """Update market prices based on recent trades""" if self.time_manager.current_tick % 50 == 0: # Update every 50 ticks # Get recent trade data and update market prices self.market.update_prices(self.time_manager.current_tick) def _periodic_tasks(self) -> None: """Handle periodic database saves and analytics""" current_tick = self.time_manager.current_tick # Save snapshots periodically if current_tick - self.last_save_tick >= self.config.save_interval: self._save_snapshots() self.last_save_tick = current_tick # Calculate analytics periodically if current_tick % self.config.analytics_interval == 0 and self.simulation_id: self.analytics.calculate_all_metrics(self.simulation_id, current_tick) def _save_snapshots(self) -> None: """Save agent and world snapshots to database""" if not self.simulation_id: return current_tick = self.time_manager.current_tick # Save agent snapshots (including NPCs for testing purposes) agent_snapshots = [] # Save agent snapshots for agent in self.agents: snapshot = AgentSnapshot( simulation_id=self.simulation_id, agent_id=agent.id, tick=current_tick, name=agent.name, position_x=agent.position[0], position_y=agent.position[1], health=agent.stats.health, max_health=agent.stats.max_health, stamina=agent.stats.stamina, max_stamina=agent.stats.max_stamina, personality=( agent.personality.to_dict() if hasattr(agent, "personality") else {} ), character_class=( agent.character_class.name if hasattr(agent, "character_class") else "" ), skills=agent.skills if hasattr(agent, "skills") else {}, current_goals=( [str(goal) for goal in agent.current_goals] if hasattr(agent, "current_goals") else [] ), relationships=( agent.relationships if hasattr(agent, "relationships") else {} ), inventory_items=( len(agent.inventory.items) if hasattr(agent, "inventory") else 0 ), gold=0, # TODO: Implement gold system ) agent_snapshots.append(snapshot) # Also save NPC snapshots (treat as agents for tracking purposes) for npc in self.npcs: snapshot = AgentSnapshot( simulation_id=self.simulation_id, agent_id=npc.id, tick=current_tick, name=npc.name, position_x=npc.position[0], position_y=npc.position[1], health=npc.stats.health, max_health=npc.stats.max_health, stamina=npc.stats.stamina, max_stamina=npc.stats.max_stamina, personality={}, # NPCs don't have personality character_class=npc.npc_type, # Use NPC type as class skills={}, current_goals=[], relationships={}, inventory_items=0, gold=0, ) agent_snapshots.append(snapshot) if agent_snapshots: self.db.save_agent_snapshots_batch(agent_snapshots) # Save world snapshot world_snapshot = WorldSnapshot( simulation_id=self.simulation_id, tick=current_tick, total_entities=len(self.agents) + len(self.npcs), active_agents=len([a for a in self.agents if a.stats.is_alive()]), active_npcs=len([n for n in self.npcs if n.stats.is_alive()]), resource_nodes=0, # TODO: Implement resource node counting world_events=[], # TODO: Implement world events market_prices=self.market.get_current_prices(), ) self.db.save_world_snapshot(world_snapshot) logger.debug(f"Saved snapshots for tick {current_tick}")
[docs] def run(self, num_ticks: Optional[int] = None) -> None: """Run the simulation for a specified number of ticks""" if not self.simulation_id: raise RuntimeError( "Simulation not initialized. Call initialize_simulation() first." ) self.running = True target_tick = None if num_ticks: target_tick = self.time_manager.current_tick + num_ticks logger.info(f"Starting simulation run for {num_ticks or 'unlimited'} ticks") try: while self.running: if target_tick and self.time_manager.current_tick >= target_tick: break if self.time_manager.current_tick >= self.config.max_ticks: logger.info(f"Reached maximum ticks ({self.config.max_ticks})") break self.step() # Optional tick rate limiting if self.config.tick_rate > 0: time.sleep(1.0 / self.config.tick_rate) except KeyboardInterrupt: logger.info("Simulation interrupted by user") finally: self.stop_simulation()
[docs] def run_until(self, condition: Callable[[Simulation], bool]) -> None: """Run the simulation until a condition is met""" if not self.simulation_id: raise RuntimeError( "Simulation not initialized. Call initialize_simulation() first." ) self.running = True logger.info("Starting simulation with custom condition") try: while self.running and not condition(self): if self.time_manager.current_tick >= self.config.max_ticks: logger.info(f"Reached maximum ticks ({self.config.max_ticks})") break self.step() if self.config.tick_rate > 0: time.sleep(1.0 / self.config.tick_rate) except KeyboardInterrupt: logger.info("Simulation interrupted by user") finally: self.stop_simulation()
[docs] def run_with_visualizer(self, num_ticks: Optional[int] = None) -> None: """Run the simulation with pygame visualization""" if not self.simulation_id: raise RuntimeError( "Simulation not initialized. Call initialize_simulation() first." ) # Import visualizer here to avoid dependency issues if pygame is not available try: import os import sys # Add the root directory to the Python path for visualizer imports root_dir = os.path.abspath( os.path.join(os.path.dirname(__file__), "../../..") ) if root_dir not in sys.path: sys.path.insert(0, root_dir) from visualizer.pygame_visualizer import GameVisualizer from visualizer.ui import UIManager except ImportError as e: logger.error(f"Failed to import visualizer: {e}") logger.error("Make sure pygame is installed: pip install pygame>=2.0.0") return # Initialize visualizer visualizer = GameVisualizer( width=self.config.visualizer_width, height=self.config.visualizer_height, tile_size=self.config.visualizer_tile_size, ) ui_manager = UIManager( self.config.visualizer_width, self.config.visualizer_height ) self.running = True target_tick = None if num_ticks: target_tick = self.time_manager.current_tick + num_ticks logger.info( f"Starting visual simulation run for {num_ticks or 'unlimited'} ticks" ) try: clock = None try: import pygame clock = pygame.time.Clock() except ImportError: pass while self.running and visualizer.running: # Handle visualizer events if not visualizer.handle_events(self): break # Check tick limit if target_tick and self.time_manager.current_tick >= target_tick: break if self.time_manager.current_tick >= self.config.max_ticks: logger.info(f"Reached maximum ticks ({self.config.max_ticks})") break # Update UI manager ui_manager.update() # Handle agent selection from visualizer if visualizer.selected_agent != ui_manager.selected_entity: ui_manager.show_entity_info(visualizer.selected_agent) # Step simulation if not self.paused: self.step() # Render visualization visualizer.render(self) # Prepare simulation data for UI simulation_data = { "current_tick": self.time_manager.current_tick, "total_agents": len(self.agents), "alive_agents": len([a for a in self.agents if a.stats.is_alive]), "total_npcs": len(self.npcs), "alive_npcs": len([n for n in self.npcs if n.stats.is_alive]), "zoom": visualizer.camera.zoom, } # Render UI ui_manager.render(visualizer.screen, simulation_data) # Update display try: import pygame pygame.display.flip() except ImportError: pass # Frame rate limiting for smooth visualization if clock and self.config.tick_rate > 0: clock.tick(min(60, self.config.tick_rate)) elif clock: clock.tick(60) # Default to 60 FPS except KeyboardInterrupt: logger.info("Visual simulation interrupted by user") finally: visualizer.quit() self.stop_simulation()
[docs] def pause_simulation(self) -> None: """Pause the simulation""" self.paused = True logger.info("Simulation paused")
[docs] def resume_simulation(self) -> None: """Resume the simulation""" self.paused = False logger.info("Simulation resumed")
[docs] def stop_simulation(self) -> None: """Stop the simulation and finalize""" self.running = False if self.simulation_run and self.simulation_id: # Update simulation end time self.simulation_run.end_time = datetime.now() self.simulation_run.current_tick = self.time_manager.current_tick self.db.update_simulation_run(self.simulation_run) # Final save self._save_snapshots() # Final analytics calculation self.analytics.calculate_all_metrics( self.simulation_id, self.time_manager.current_tick ) # Call completion handler if self.on_simulation_complete: self.on_simulation_complete(self) logger.info(f"Simulation stopped at tick {self.time_manager.current_tick}")
[docs] def get_statistics(self) -> Dict[str, Any]: """Get current simulation statistics""" return { "current_tick": self.time_manager.current_tick, "total_agents": len(self.agents), "active_agents": len([a for a in self.agents if a.stats.is_alive()]), "total_npcs": len(self.npcs), "active_npcs": len([n for n in self.npcs if n.stats.is_alive()]), "average_tick_time": ( sum(self.tick_times) / len(self.tick_times) if self.tick_times else 0 ), "simulation_id": self.simulation_id, "running": self.running, "paused": self.paused, }
[docs] def get_analytics_report(self) -> Optional[Dict[str, Any]]: """Get comprehensive analytics report""" if not self.simulation_id: return None return self.analytics.generate_simulation_report(self.simulation_id)