Perceptors API

Perceptors API

Process sensor data before it reaches skills.

Basic Perceptor

from composabl import Perceptor, PerceptorImpl

class NoiseFilter(PerceptorImpl):
    def __init__(self, alpha=0.1):
        self.alpha = alpha
        self.filtered_value = None
        
    async def compute(self, obs_spec, obs):
        """Exponential moving average filter"""
        raw_value = obs["noisy_sensor"]
        
        if self.filtered_value is None:
            self.filtered_value = raw_value
        else:
            # EMA filter
            self.filtered_value = (
                self.alpha * raw_value + 
                (1 - self.alpha) * self.filtered_value
            )
        
        return {"filtered_sensor": self.filtered_value}
    
    def filtered_sensor_space(self, obs):
        """Specify input sensors"""
        return ["noisy_sensor"]

# Create and add to agent
filter_perceptor = Perceptor("noise-filter", NoiseFilter(alpha=0.2))
agent.add_perceptor(filter_perceptor)

Advanced Perceptors

Last updated