Source code for vent.io.devices.base

""" Base classes & functions used throughout vent.io.devices
"""
from collections import OrderedDict
from vent.common.fashion import pigpio_command

import pigpio
import time


class PigpioConnection(pigpio.pi):
    """ Subclass that extends pigpio.pi to throw an exception if there are issues connecting to the pigpio daemon."""

    def __init__(self, *args, **kwargs):
        """ Calls superclass init and checks if a connection was established; throws a RuntimeError if not.

        Args:
            *args: parameters to pass through like: pigpio.pi().__init__(*args)
            **kwargs: parameters to pass through like: pigpio.pi().__init__(**kwargs)
        """
        super().__init__(*args, **kwargs)
        if not self.connected:
            raise RuntimeError('Could not establish connection with pigpio daemon')


[docs]class IODeviceBase: """ Abstract base Class for pigpio handles (or whatever other GPIO library we end up using) Note: pigpio commands return -144 if an error is encountered while attempting to communicate with the demon. TODO would be to recognize when that occurs and handle it gracefully, i.e. kill the daemon, restart it, and reopen the python interface(s) """
[docs] def __init__(self, pig: PigpioConnection = None): """ Initializes the pigpio python bindings object if necessary, and checks that it is actually running. Args: pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ self._pig = pig if pig is not None else PigpioConnection(show_errors=False) self._handle = -1
@property def pig(self) -> PigpioConnection: """ The pigpio python bindings object""" return self._pig @property def handle(self) -> int: """ Pigpiod handle associated with device (only for i2c/spi)""" return self._handle @property def pigpiod_ok(self) -> bool: """ Returns True if pigpiod is running and False if not""" return self.pig.connected
[docs] def _close(self): """ Closes an I2C/SPI (or potentially Serial) connection""" if not self.pigpiod_ok or self.handle <= 0: return
[docs]class I2CDevice(IODeviceBase): """ A class wrapper for pigpio I2C handles. Defines several methods used for reading from and writing to device registers. Defines helper classes Register and ValueField for handling the manipulation of arbitrary registers. Note: The Raspberry Pi uses LE byte-ordering, while the outside world tends to use BE (at least, the sensors in use so far all do). Thus, bytes need to be swapped from native (LE) ordering to BE prior to being written to an i2c device, and bytes recieved need to be swapped from BE into native (LE). All methods except read_device and write_device perform this automatically. The methods read_device and write_device do NOT byteswap and return bytearrays rather than the unsigned 16-bit int used by the other read/write methods. """
[docs] def __init__(self, i2c_address, i2c_bus, pig=None): """ Initializes pigpio bindings and opens i2c connection. Args: i2c_address (int): I2C address of the device. (e.g., `i2c_address=0x50`) i2c_bus (int): The I2C bus to use. Should probably be set to 1 on Raspberry Pi. pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ super().__init__(pig) self._i2c_bus = i2c_bus self._open(i2c_bus, i2c_address)
[docs] @pigpio_command def _open(self, i2c_bus, i2c_address): """ Opens i2c connection given i2c bus and address.""" self._handle = self.pig.i2c_open(i2c_bus, i2c_address)
[docs] @pigpio_command def _close(self): """ Extends superclass method. Checks that pigpiod is connected and if a handle has been set - if so, closes an i2c connection. """ super()._close() self.pig.i2c_close(self.handle)
[docs] @pigpio_command def read_device(self, count=2) -> tuple: """ Read a specified number of bytes directly from the the device without specifying or changing the register. Does NOT perform LE/BE conversion. Args: count (int): The number of bytes to read from the device. Returns: tuple: a tuple of the number of bytes read and a bytearray containing the bytes. If there was an error the number of bytes read will be less than zero (and will contain the error code). """ return self.pig.i2c_read_device(self.handle, count)
[docs] @pigpio_command def write_device(self, word, signed=False): """ Write 2 bytes to the device without specifying register. DOES perform LE/BE conversion. Args: word (int): The integer representation of the data to write. signed (bool): Whether or not `word` is signed. """ self.pig.i2c_write_device( self.handle, native16_to_be(word, signed=signed) )
[docs] @pigpio_command def read_register(self, register, signed=False) -> int: """ Read 2 bytes from the specified register and byteswap the result. Args: register (int): The index of the register to read. signed (bool): Whether or not the data to read is expected to be signed. Returns: int: integer representation of 16 bit register contents. """ return be16_to_native(self.pig.i2c_read_i2c_block_data( self.handle, register, count=2 ), signed=signed)
[docs] @pigpio_command def write_register(self, register, word, signed=False): """ Write 2 bytes to the specified register. Byteswaps. Args: register (int): The index of the register to write to word (int): The unsigned 16 bit integer to write to the register (must be consistent with 'signed') signed (bool): Whether or not 'word' is signed """ self.pig.i2c_write_i2c_block_data( self.handle, register, native16_to_be(word, signed=signed) )
[docs] class Register: """ Describes a writable configuration register. Has dynamically defined attributes corresponding to the fields described by the passed arguments. Takes as arguments two tuples of equal length, the first of which names each field and the second being a tuple of tuples containing the (human readable) possible settings & values for each field. Note: The initializer reverses the fields & their values because a human reads the register, as drawn in the datasheet, from left to right - however, the fields furthest to the left are the most significant bits of the register. """
[docs] def __init__(self, fields, values): """ Initializer which loads (dynamically defined) attributes from tuples. Args: fields (tuple): A tuple containing the names of the register's value fields values (tuple): A tuple of tuples containing the possible values for each value field. Length must match the length of fields. If there are redundant values for a field specified in the datasheet, be sure to include them. (e.g., a field takes values `A: 0b00`, `B: 0b01`, and `C: 0b10`; but the value for `0b11` is either not specified by the datasheet or is listed redundantly as `C: 0b11` -> `values` should list both the 3rd and 4th possible values as 'C' like so: ('A', 'B', 'C', 'C') """ if len(fields) != len(values): raise ValueError('fields and values must contain the same number of elements') self.fields = fields offset = 0 for fld, val in zip(reversed(fields), reversed(values)): setattr( self, fld, self.ValueField( offset, len(val) - 1, OrderedDict(zip(val, range(len(val)))) ) ) offset += (len(val) - 1).bit_length()
[docs] def unpack(self, cfg) -> OrderedDict: """ Given the contents of a register in integer form, returns a dict of fields and their current settings. Args: cfg (int): An integer representing a possible configuration value for the register """ return OrderedDict(zip( self.fields, (getattr( getattr(self, field), 'unpack')(cfg) for field in self.fields) ))
[docs] def pack(self, cfg, **kwargs) -> int: """ Given an initial integer representation of a register and an arbitrary number of field=value settings, returns an integer representation of the register incorporating the new settings. Args: cfg (int): An integer representing a possible configuration value for the register **kwargs: The register fields & values to patch into cfg. Takes keyword arguments of the form: `field=value` """ for field, value in kwargs.items(): if hasattr(self, field) and value is not None: cfg = getattr(getattr(self, field), 'insert')(cfg, value) return cfg
[docs] class ValueField: """ Describes a configurable value field in a writable register."""
[docs] def __init__(self, offset, mask, values): """ Instantiates a value field of a register given the bit offset, mask, and list of possible values. Args: offset (int): The offset bits of the value field in the register, i.e. the distance from LSB mask (int): integer representation of the value field mask (w/o offset) values (OrderedDict): The possible values that the field can take. """ self._offset = offset self._mask = mask self._values = values self._reversed_values = OrderedDict(map(reversed, self._values.items()))
[docs] def unpack(self, cfg): """ Extracts the ValueField's setting from cfg & returns the result in a human readable form. Args: cfg (int): An integer representing a possible configuration value for the register """ return self._reversed_values[self.extract(cfg)]
[docs] def extract(self, cfg) -> int: """ Extracts setting from passed 16-bit config & returns integer representation. Args: cfg (int): An integer representing a possible configuration value for the register """ return (cfg & (self._mask << self._offset)) >> self._offset
[docs] def pack(self, value) -> int: """ Takes a human-readable ValueField setting and returns the corresponding bit-shifted integer. Args: value (int): The integer representation of `value` bit-shifted by the ValueField's offset Returns: int: The integer representation of the ValueField setting according to `value` """ if value not in self._values.keys(): raise ValueError("ValueField must be one of: {}".format(self._values.keys())) return self._values[value] << self._offset
[docs] def insert(self, cfg, value) -> int: """ Validates and performs bitwise replacement with the human-readable ValueField setting and integer representation of the register configuration. Args: cfg (int): An integer representing a possible configuration value for the register value (object): The human readable representation of the desired ValueField setting. Must match a value in ValueField._values; if not, throws a ValueError Returns: int: The integer representation of the Register's configuration with the value of ValueField patched according the `value` """ if value not in self._values.keys(): raise ValueError("ValueField must be one of: {}".format(self._values.keys())) return (cfg & ~(self._mask << self._offset)) | (self._values[value] << self._offset)
[docs]class SPIDevice(IODeviceBase): """ A class wrapper for pigpio SPI handles. Not really implemented. """
[docs] def __init__(self, channel, baudrate, pig=None): """ Instantiates an SPIDevice on SPI `channel` with `baudrate` and, optionally, `pigpio.pi = pig`. Args: channel (int): The SPI channel baudrate (int): SPI baudrate pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ super().__init__(pig=pig) self._open(channel, baudrate)
[docs] @pigpio_command def _open(self, channel, baudrate): """ Opens an SPI connection and sets the pigpiod handle. Args: channel (int): The SPI channel baudrate (int): SPI baudrate """ self._handle = self.pig.spi_open(channel, baudrate)
[docs] @pigpio_command def _close(self): """ Extends superclass method. Checks that pigpiod is connected and if a handle has been set - if so, closes an SPI connection. """ super()._close() self.pig.spi_close(self.handle)
[docs]class ADS1115(I2CDevice): """ ADS1115 16 bit, 4 Channel Analog to Digital Converter. Datasheet: http://www.ti.com/lit/ds/symlink/ads1114.pdf?ts=1587872241912 Default Values: Default configuration for vent: 0xC3E3 Default configuration on power-up: 0x8583 """ _DEFAULT_ADDRESS = 0x48 _DEFAULT_VALUES = {'MUX': 0, 'PGA': 4.096, 'MODE': 'SINGLE', 'DR': 860} _TIMEOUT = 1 """ Address Pointer Register (write-only) """ _POINTER_FIELDS = ('P',) _POINTER_VALUES = ( ( 'CONVERSION', 'CONFIG', 'LO_THRESH', 'HIGH_THRESH' ), ) """ Config Register (R/W) """ _CONFIG_FIELDS = ( 'OS', 'MUX', 'PGA', 'MODE', 'DR', 'COMP_MODE', 'COMP_POL', 'COMP_LAT', 'COMP_QUE' ) _CONFIG_VALUES = ( ('NO_EFFECT', 'START_CONVERSION'), ((0, 1), (0, 3), (1, 3), (2, 3), 0, 1, 2, 3), (6.144, 4.096, 2.048, 1.024, 0.512, 0.256, 0.256, 0.256), ('CONTINUOUS', 'SINGLE'), (8, 16, 32, 64, 128, 250, 475, 860), ('TRADIONAL', 'WINDOW'), ('ACTIVE_LOW', 'ACTIVE_HIGH'), ('NONLATCHING', 'LATCHING'), (1, 2, 3, 'DISABLE') ) USER_CONFIGURABLE_FIELDS = ('MUX', 'PGA', 'MODE', 'DR') """ Note: The Conversion Register is read-only and contains a 16bit representation of the requested value (provided the conversion is ready). The Lo-thresh & Hi-thresh Registers are not Utilized here. However, their function and usage are described in the datasheet. Should you want to extend the functionality implemented here. """
[docs] def __init__(self, address=_DEFAULT_ADDRESS, i2c_bus=1, pig=None): """ Initializes registers: Pointer register is write only, config is R/W. Sets initial value of _last_cfg to what is actually on the ADS.Packs default settings into _cfg, but does not actually write to ADC - that occurs when read_conversion() is called. Args: address (int): I2C address of the device. (e.g., `i2c_address=0x48`) i2c_bus (int): The I2C bus to use. Should probably be set to 1 on Raspberry Pi. pig (PigpioConnection): pigpiod connection to use; if not specified, a new one is established """ super().__init__(address, i2c_bus, pig) self.pointer = self.Register(self._POINTER_FIELDS, self._POINTER_VALUES) self._config = self.Register(self._CONFIG_FIELDS, self._CONFIG_VALUES) self._last_cfg = self._read_last_cfg() self._cfg = self._config.pack(cfg=self._last_cfg, **self._DEFAULT_VALUES)
[docs] def read_conversion(self, **kwargs) -> float: """ Returns a voltage (expressed as a float) corresponding to a channel on the ADC. The channel to read from, along with the gain, mode, and sample rate of the conversion may be may be specified as optional parameters. If read_conversion() is called with no parameters, the resulting voltage corresponds to the channel last read from and the same conversion settings. Args: MUX: The pin to read from in single channel mode: e.g., `0, 1, 2, 3` or, a tuple of pins over which to make a differential reading. e.g., `(0, 1), (0, 3), (1, 3), (2, 3)` PGA: The full scale voltage (FSV) corresponding to a programmable gain setting. e.g., `(6.144, 4.096, 2.048, 1.024, 0.512, 0.256, 0.256, 0.256)` MODE: Whether to set the ADC to continuous conversion mode, or operate in single-shot mode. e.g., `'CONTINUOUS', 'SINGLE'` DR: The data rate to make the conversion at; units: samples per second. e.g., `8, 16, 32, 64, 128, 250, 475, 860` """ return ( self._read_conversion(**kwargs) * self.config.PGA.unpack(self.cfg) / 32767 )
[docs] def print_config(self) -> OrderedDict: """ Returns the human-readable configuration for the next read. Returns: OrderedDict: an ordered dictionary of the form {field: value}, ordered from MSB -> LSB """ return self.config.unpack(self.cfg)
@property def config(self): """ Returns the Register object of the config register. Returns: vent.io.devices.I2CDevice.Register: The Register object initialized for the ADS1115. """ return self._config @property def cfg(self) -> int: """ Returns the contents (as a 16-bit unsigned integer) of the configuration that will be written to the config register when read_conversion() is next called. """ return self._cfg
[docs] def _read_conversion(self, **kwargs) -> int: """ Backend for read_conversion. Returns the contents of the 16-bit conversion register as an unsigned integer. If no parameters are passed, one of two things can happen: 1) If the ADC is in single-shot (mode='SINGLE') conversion mode, _last_cfg is written to the config register; once the ADC indicates it is ready, the contents of the conversion register are read and the result is returned. 2) If the ADC is in CONTINUOUS mode, the contents of the conversion register are read immediately and returned. If any of channel, gain, mode, or data_rate are specified as parameters, a new _cfg is packed and written to the config register; once the ADC indicates it is ready, the contents of the conversion register are read and the result is returned. Note: In continuous mode, data can be read from the conversion register of the ADS1115 at any time and always reflects the most recently completed conversion. So says the datasheet. Args: **kwargs: see documentation of vent.io.devices.ADS1115.read_conversion """ self._cfg = self._config.pack(cfg=self.cfg, **kwargs) mode = self.print_config()['MODE'] if self._cfg != self._last_cfg or mode == 'SINGLE': self.write_register(self.pointer.P.pack('CONFIG'), self.cfg) self._last_cfg = self.cfg data_rate = self._config.DR.unpack(self.cfg) while not (self._ready() or mode == 'CONTINUOUS'): # TODO: Needs timout tick = time.time() while (time.time() - tick) < (1 / data_rate): pass # TODO: implement asyncio.sleep() return self.read_register(self.pointer.P.pack('CONVERSION'), signed=True)
[docs] def _read_last_cfg(self) -> int: """ Reads the config register and returns the contents as a 16-bit unsigned integer; updates internal record _last_cfg. """ self._last_cfg = self.read_register(self.pointer.P.pack('CONFIG')) return self._last_cfg
[docs] def _ready(self) -> bool: """ Return status of ADC conversion; True indicates the conversion is complete and the results ready to be read. """ return bool(self.read_register(self.pointer.P.pack('CONFIG')) >> 15)
[docs]class ADS1015(ADS1115): """ ADS1015 16 bit, 4 Channel Analog to Digital Converter. Datasheet: http://www.ti.com/lit/ds/symlink/ads1015.pdf?&ts=1589228228921 Basically the same device as the ADS1115, except has 12 bit resolution instead of 16, and has different (faster) data rates. The difference in data rates is handled by overloading _CONFIG_VALUES. The difference in resolution is irrelevant for implementation. """ _DEFAULT_ADDRESS = 0x48 _DEFAULT_VALUES = {'MUX': 0, 'PGA': 4.096, 'MODE': 'SINGLE', 'DR': 3300} """ Address Pointer Register (write-only) """ _POINTER_FIELDS = ('P',) _POINTER_VALUES = ( ( 'CONVERSION', 'CONFIG', 'LO_THRESH', 'HIGH_THRESH' ), ) """ Config Register (R/W) """ _CONFIG_FIELDS = ( 'OS', 'MUX', 'PGA', 'MODE', 'DR', 'COMP_MODE', 'COMP_POL', 'COMP_LAT', 'COMP_QUE' ) _CONFIG_VALUES = ( ('NO_EFFECT', 'START_CONVERSION'), ((0, 1), (0, 3), (1, 3), (2, 3), 0, 1, 2, 3), (6.144, 4.096, 2.048, 1.024, 0.512, 0.256, 0.256, 0.256), ('CONTINUOUS', 'SINGLE'), (128, 250, 490, 920, 1600, 2400, 3300, 3300), # This one is different ('TRADIONAL', 'WINDOW'), ('ACTIVE_LOW', 'ACTIVE_HIGH'), ('NONLATCHING', 'LATCHING'), (1, 2, 3, 'DISABLE') ) USER_CONFIGURABLE_FIELDS = ('MUX', 'PGA', 'MODE', 'DR')
[docs] def __init__(self, address=_DEFAULT_ADDRESS, i2c_bus=1, pig=None): """ See: vent.io.devices.ADS1115.__init__ """ super().__init__(address=address, i2c_bus=i2c_bus, pig=pig)
[docs]def be16_to_native(data, signed=False) -> int: """ Unpacks a bytes-like object respecting big-endianness of outside world and returns an int according to signed. Args: data: bytes-like object. The data to be unpacked & converted signed (bool): Whether or not `data` is signed """ return int.from_bytes(data[1][:2], 'big', signed=signed)
[docs]def native16_to_be(word, signed=False) -> bytes: """ Packs an int into bytes after swapping endianness. Args: signed (bool): Whether or not `data` is signed word (int): The integer representation to converted and packed into bytes """ return word.to_bytes(2, 'big', signed=signed)