Copy import os
from composabl import (
Agent, Skill, Sensor, Scenario, Perceptor,
MaintainGoal, MinimizeGoal,
SkillController, SkillSelector,
Trainer
)
# Configure environment
os.environ["AMESA_LICENSE"] = "your-license-key"
os.environ["AMESA_EULA_AGREED"] = "1"
# Create perceptor for derivative calculation
class DerivativePerceptor(Perceptor):
def __init__(self):
super().__init__()
self.last_value = None
self.last_time = None
async def compute(self, obs_spec, obs):
import time
current_time = time.time()
if self.last_value is not None:
dt = current_time - self.last_time
derivative = (obs["temperature"] - self.last_value) / dt
else:
derivative = 0
self.last_value = obs["temperature"]
self.last_time = current_time
return {"temperature_rate": derivative}
# Create the agent
agent = Agent()
# Add sensors
agent.add_sensors([
Sensor("temperature", "Current temperature (°C)",
lambda obs: obs["temp"]),
Sensor("pressure", "Current pressure (bar)",
lambda obs: obs["pressure"]),
Sensor("flow_rate", "Flow rate (L/min)",
lambda obs: obs["flow"]),
Sensor("energy_consumption", "Energy usage (kW)",
lambda obs: obs["energy"])
])
# Add perceptor
agent.add_perceptor(Perceptor("temp-derivative", DerivativePerceptor))
# Create skills
# 1. Temperature control
temp_control = Skill(
"temperature-control",
MaintainGoal("temperature",
"Maintain reactor temperature",
target=75.0,
stop_distance=2.0)
)
# 2. Energy optimization
energy_optimization = Skill(
"energy-optimization",
MinimizeGoal("energy_consumption",
"Minimize energy usage")
)
# 3. Emergency shutdown controller
class EmergencyShutdown(SkillController):
async def compute_action(self, obs, action):
if obs["temperature"] > 100 or obs["pressure"] > 10:
return [0, 0, 0] # Shutdown action
return None # Let other skills handle
async def compute_success_criteria(self, obs, action):
return obs["temperature"] < 90 and obs["pressure"] < 8
async def filtered_sensor_space(self):
return ["temperature", "pressure"]
emergency = Skill("emergency-shutdown", EmergencyShutdown)
# 4. Coordinator selector
class ProcessCoordinator(SkillSelector):
def select_skill(self, obs):
# Emergency takes priority
if obs["temperature"] > 95 or obs["pressure"] > 9:
return "emergency-shutdown"
# Normal operation
elif obs["temperature_rate"] > 5: # Rapid temperature change
return "temperature-control"
else:
return "energy-optimization"
coordinator = SkillSelector(
"process-coordinator",
ProcessCoordinator,
children=["temperature-control",
"energy-optimization",
"emergency-shutdown"]
)
# Build agent hierarchy
agent.add_skills([temp_control, energy_optimization, emergency])
agent.add_skill(coordinator)
# Define training scenarios
scenarios = [
Scenario({
"temperature": {"min": 70, "max": 80},
"pressure": {"min": 5, "max": 7},
"flow_rate": 100,
"energy": {"min": 10, "max": 50}
}),
Scenario({
"temperature": 90, # High temp scenario
"pressure": 8,
"flow_rate": 150,
"energy": 75
})
]
# Configure training
config = {
"target": {
"docker": {
"image": "composabl/sim-reactor:latest",
"environment": {
"SCENARIO_MODE": "variable"
}
}
},
"env": {
"name": "reactor-control",
"init": {
"control_frequency": 10, # Hz
"simulation_speed": 100 # 100x real-time
}
},
"algorithm": {
"name": "PPO",
"config": {
"lr": 0.0003,
"gamma": 0.99,
"lambda": 0.95,
"clip_param": 0.2
}
},
"resources": {
"sim_count": 8,
"num_workers": 4
},
"model": {
"fcnet_hiddens": [256, 256],
"fcnet_activation": "relu"
},
"rollout": {
"num_rollout_workers": 4,
"num_envs_per_worker": 2
},
"scenarios": scenarios,
"post_processing": {
"record": {
"enabled": True,
"file_path": "./recordings",
"gif_file_name": "reactor_control.gif"
}
}
}
# Train the agent
trainer = Trainer(config)
# Train with callbacks
def on_cycle_complete(cycle, metrics):
print(f"Cycle {cycle}: Reward = {metrics['episode_reward_mean']:.2f}")
trainer.train(
agent,
train_cycles=100,
callbacks={"on_cycle_complete": on_cycle_complete}
)
# Evaluate the trained agent
print("Evaluating trained agent...")
eval_results = trainer.evaluate(agent, num_episodes=10)
print(f"Average reward: {eval_results['episode_reward_mean']:.2f}")
print(f"Success rate: {eval_results['custom_metrics']['success_rate']:.2%}")
# Export the trained agent
agent.export("./trained_agents/reactor_controller.json")
# Package for deployment
deployed_agent = trainer.package(agent)
print("Agent ready for deployment!")
# Clean up
trainer.close()