Source code for vent.io.devices.valves

from abc import ABC, abstractmethod
from vent.io.devices.pins import Pin, PWMOutput

import numpy as np


[docs]class SolenoidBase(ABC): """ An abstract baseclass that defines methods using valve terminology. Also allows configuring both normally _open and normally closed valves (called the "form" of the valve). """ _FORMS = {'Normally Closed': 0, 'Normally Open': 1}
[docs] def __init__(self, form='Normally Closed'): """ Args: form (str): The form of the solenoid; can be either `Normally Open` or `Normally Closed` """ self.form = form
@property def form(self) -> str: """ Returns the human-readable form of the valve.""" return dict(map(reversed, self._FORMS.items()))[self._form] @form.setter def form(self, form): """ Performs validation on requested form and then sets it. Args: form (str): The form of the solenoid; can be either `Normally Open` or `Normally Closed` """ if form not in self._FORMS.keys(): raise ValueError('form must be one of {}'.format(self._FORMS.keys())) else: self._form = self._FORMS[form]
[docs] @abstractmethod def open(self): """ Energizes valve if Normally Closed. De-energizes if Normally Open."""
[docs] @abstractmethod def close(self): """ De-energizes valve if Normally Closed. Energizes if Normally Open."""
@property @abstractmethod def is_open(self) -> bool: """ Returns True if valve is open, False if it is closed"""
[docs]class OnOffValve(SolenoidBase, Pin): """ An extension of vent.io.iobase.Pin which uses valve terminology for its methods. Also allows configuring both normally _open and normally closed valves (called the "form" of the valve). """ _FORMS = {'Normally Closed': 0, 'Normally Open': 1}
[docs] def __init__(self, pin, form='Normally Closed', pig=None): """ Args: pin (int): The number of the pin to use form (str): The form of the solenoid; can be either `Normally Open` or `Normally Closed` pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ self.form = form Pin.__init__(self, pin, pig) SolenoidBase.__init__(self, form=form)
[docs] def open(self): """ Energizes valve if Normally Closed. De-energizes if Normally Open.""" if self._form: self.write(0) else: self.write(1)
[docs] def close(self): """ De-energizes valve if Normally Closed. Energizes if Normally Open.""" if self.form == 'Normally Closed': self.write(0) else: self.write(1)
@property def is_open(self) -> bool: """ Implements parent's abstractmethod; returns True if valve is open, False if it is closed""" energized = True if self.read() else False if self.form == 'Normally Closed': return energized else: return not energized
[docs]class PWMControlValve(SolenoidBase, PWMOutput): """ An extension of PWMOutput which incorporates linear compensation of the valve's response. """
[docs] def __init__(self, pin, form='Normally Closed', frequency=None, response=None, pig=None): """ Args: pin (int): The number of the pin to use form (str): The form of the solenoid; can be either `Normally Open` or `Normally Closed` frequency (float): The PWM frequency to use. response (str): "/path/to/response/curve/file" pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ PWMOutput.__init__(self, pin=pin, initial_duty=0, frequency=frequency, pig=pig) SolenoidBase.__init__(self, form=form) '''if response is None: raise NotImplementedError('You need to implement a default response behavior')''' if form != 'Normally Closed': raise NotImplementedError('Normally Open PWM control valves have not been implemented') self._rising = True self._load_valve_response(response_path=response)
@property def is_open(self) -> bool: """ Implements parent's abstractmethod; returns True if valve is open, False if it is closed""" if self.setpoint > 0: return True else: return False
[docs] def open(self): """ Implements parent's abstractmethod; fully opens the valve""" self.setpoint = 1.0
[docs] def close(self): """ Implements parent's abstractmethod; fully closes the valve""" self.setpoint = 0.0
@property def setpoint(self) -> float: """ The linearized setpoint corresponding to the current duty cycle according to the valve's response curve Returns: float: A number between 0 and 1 representing the current flow as a proportion of maximum """ return self.inverse_response(self.duty, self._rising) @setpoint.setter def setpoint(self, setpoint): """Overridden to determine & write the duty cycle corresponding to the requested linearized setpoint according to the valve's response curve Args: setpoint (float): A number between 0 and 100 representing how much to open the valve """ if not 0 <= setpoint <= 100: raise ValueError('setpoint must be between 0 and 100 for an expiratory control valve') self._rising = setpoint > self.setpoint self.duty = self.response(setpoint, self._rising)
[docs] def response(self, setpoint, rising=True): """Setpoint takes a value in the range (0,100) so as not to confuse with duty cycle, which takes a value in the range (0,1). Response curves are specific to individual valves and are to be implemented by subclasses. Different curves are calibrated to 'rising = True' (valves opening) or'rising = False' (valves closing), as different characteristic flow behavior can be observed. Args: setpoint (float): A number between 0 and 1 representing how much to open the valve rising (bool): Whether or not the requested setpoint is higher than the last (rising = True), or the opposite (Rising = False) Returns: float: The PWM duty cycle corresponding to the requested setpoint """ idx = (np.abs(self._response_array[:, 0] - setpoint)).argmin() if rising: duty = self._response_array[idx, 1] else: duty = self._response_array[idx, 2] return duty
[docs] def inverse_response(self, duty_cycle, rising=True): """Inverse of response. Given a duty cycle in the range (0,1), returns the corresponding linear setpoint in the range (0,100). Args: duty_cycle: The PWM duty cycle rising (bool): Whether or not the requested setpoint is higher than the last (rising = True), or the opposite (Rising = False) Returns: float: The setpoint of the valve corresponding to `duty_cycle` """ if rising: idx = (np.abs(self._response_array[:, 1] - duty_cycle)).argmin() else: idx = (np.abs(self._response_array[:, 2] - duty_cycle)).argmin() return self._response_array[idx, 0]
[docs] def _load_valve_response(self, response_path): """ Loads and applies a response curve of the form `f(setpoint) = duty`. A response curve maps the underlying PWM duty cycle `duty` onto the normalized variable `setpoint` representing the flow through the valve as a percentage of its maximum. Flow through a proportional valve may be nonlinear with respect to [PWM] duty cycle, if the valve itself does not include its own electronics to linearize response wrt/ input. Absent on-board compensation of response, a proportional solenoid with likely not respond [flow] at all below some minimum threshold duty cycle. Above this threshold, the proportional valve begins to open and its response resembles a sigmoid: just past the threshold there is a region where flow increases exponentially wrt/ duty cycle, this is followed by a region of pseudo-linear response that begins to taper off, eventually approaching the valve's maximum flow asymptotically as the duty cycle approaches 100% and the valve opens fully. Args: response_path: 'path/to/binary/response/file' - if response_path is None, defaults to `setpoint = duty` """ if response_path is not None: response_array = np.load(response_path) else: response_array = np.linspace([0, 0, 0], [100, 1, 1], num=101) self._response_array = response_array
[docs]class SimOnOffValve(SolenoidBase): """ stub: a simulated on/off valve""" def __init__(self, pin=None, form='Normally Closed', pig=None): super().__init__(form=form) self.state = 0 if form == 'Normally Closed' else 1
[docs] def open(self): self.state = 1
[docs] def close(self): self.state = 0
@property def is_open(self) -> bool: return True if self.state == 1 else False
[docs]class SimControlValve(SolenoidBase): """stub: a simulated linear control valve"""
[docs] def __init__(self, pin=None, form='Normally Closed', frequency=None, response=None, pig=None): """ Args: pin (int): (unused for sim) form (str): The form of the solenoid; can be either `Normally Open` or `Normally Closed` frequency (float): (unused for sim) response (str): (unused for sim) # TODO implement this (requires refactor) pig (PigpioConnection): (unused for sim) """ if response: raise NotImplementedError('This sim is pretty basic - no fancy response for you') if form != 'Normally Closed': raise NotImplementedError('Normally Open sim control valves have not been implemented') super().__init__(form=form) self._setpoint = 0
@property def is_open(self) -> bool: """ Implements parent's abstractmethod; returns True if valve is open, False if it is closed FIXME: Needs refactor; duplicate property to PWMControlValve.is_open""" if self.setpoint > 0: return True else: return False
[docs] def open(self): """ Implements parent's abstractmethod; fully opens the valve FIXME: Needs refactor; duplicate method to PWMControlValve.open()""" self.setpoint = 100
[docs] def close(self): """ Implements parent's abstractmethod; fully closes the valve FIXME: Needs refactor; duplicate method to PWMControlValve.close()""" self.setpoint = 0
@property def setpoint(self): """ The requested linearized set-point of the valve. Returns: float: A number between 0 and 1 representing the current flow as a proportion of maximum """ return self._setpoint @setpoint.setter def setpoint(self, setpoint): """ Args: setpoint (float): Between 0 and 100; the requested set-point of the valve as a proportion of maximum """ if not 0 <= setpoint <= 100: raise ValueError('setpoint must be between 0 and 100 for an expiratory control valve') self._setpoint = setpoint