This repository has been archived on 2022-06-07. You can view files and clone it, but cannot push or open issues or pull requests.
robocar-pca9685-python/donkeycar/parts/pins.py

1044 lines
36 KiB
Python

"""
pins.py - high level ttl an pwm pin abstraction.
This is designed to allow drivers that use ttl and pwm
pins to be reusable with different underlying libaries
and techologies.
The abstract classes InputPin, OutputPin and PwmPin
provide an interface for starting, using and cleaning up the pins.
The factory functions input_pin_by_id(), output_pin_by_id()
and pwm_pin_by_id() construct pins given a string id
that specifies the underlying pin provider and it's attributes.
There are implementations for the Rpi.GPIO library and
for the PCA9685.
Pin id allows pins to be selected using a single string to
select from different underlying providers, numbering schemes and settings.
Use Rpi.GPIO library, GPIO.BOARD pin numbering scheme, pin number 13
pin = input_pin_by_id("RPI_GPIO.BOARD.13")
Use Rpi.GPIO library, GPIO.BCM broadcom pin numbering scheme, gpio pin number 33
pin = output_pin_by_id("RPI_GPIO.BCM.33")
Use PCA9685 on bus 0 at address 0x40, channel 7
pin = pwm_pin_by_id("PCA9685.0:40.7")
"""
from abc import ABC, abstractmethod
from typing import Any, Callable
import logging
logger = logging.getLogger(__name__)
class PinState:
LOW: int = 0
HIGH: int = 1
NOT_STARTED: int = -1
class PinEdge:
RISING: int = 1
FALLING: int = 2
BOTH: int = 3
class PinPull:
PULL_NONE: int = 1
PULL_UP: int = 2
PULL_DOWN: int = 3
class PinProvider:
RPI_GPIO = "RPI_GPIO"
PCA9685 = "PCA9685"
PIGPIO = "PIGPIO"
class PinScheme:
BOARD = "BOARD" # board numbering
BCM = "BCM" # broadcom gpio numbering
#
# #### Base interface for input/output/pwm pins
# #### Implementations derive from these abstact classes
#
class InputPin(ABC):
def __init__(self) -> None:
super().__init__()
@abstractmethod
def start(self, on_input=None, edge: int = PinEdge.RISING) -> None:
"""
Start the pin in input mode.
:param on_input: function to call when an edge is detected, or None to ignore
:param edge: type of edge(s) that trigger on_input; default is PinEdge.RISING
This raises a RuntimeError if the pin is already started.
You can check to see if the pin is started by calling
state() and checking for PinState.NOT_STARTED
"""
pass # subclasses should override this
@abstractmethod
def stop(self) -> None:
"""
Stop the pin and return it to PinState.NOT_STARTED
"""
pass # subclasses should override this
@abstractmethod
def state(self) -> int:
"""
Return most recent input state. This does not re-read the input pin,
it just returns that last value read by the input() method.
If the pin is not started or has been stopped,
this will return PinState:NOT_STARTED
"""
return PinState.NOT_STARTED # subclasses must override
@abstractmethod
def input(self) -> int:
"""
Read the input state from the pin.
:return: PinState.LOW/HIGH or PinState.NOT_STARTED if pin not started
"""
return PinState.NOT_STARTED # subclasses must override
class OutputPin(ABC):
def __init__(self) -> None:
super().__init__()
@abstractmethod
def start(self, state: int = PinState.LOW) -> None:
"""
Start the pin in output mode and with given starting state.
This raises and RuntimeError if the pin is already started.
You can check to see if the pin is started by calling
state() and checking for PinState.NOT_STARTED
"""
pass # subclasses should override this
@abstractmethod
def stop(self) -> None:
"""
Stop the pin and return it to PinState.NOT_STARTED
"""
pass # subclasses should override this
@abstractmethod
def state(self) -> int:
"""
Return most recent output state. This does not re-read the pin,
It just returns that last value set by the output() method.
If the pin is not started or has been stopped,
this will return PinState:NOT_STARTED
:return: most recent output state OR PinState.NOT_STARTED if pin not started.
"""
return PinState.NOT_STARTED # subclasses must override
@abstractmethod
def output(self, state: int) -> None:
"""
Set the output state of the pin to either
:param state: PinState.LOW or PinState.HIGH
:except: RuntimeError if pin not stated.
"""
pass # subclasses must override
class PwmPin(ABC):
def __init__(self) -> None:
super().__init__()
@abstractmethod
def start(self, duty: float = 0) -> None:
"""
Start the pin in output mode and with given starting state.
This raises and RuntimeError if the pin is already started.
You can check to see if the pin is started by calling
state() and checking for PinState.NOT_STARTED
:param duty: duty cycle in range 0 to 1
"""
pass # subclasses should override this
@abstractmethod
def stop(self) -> None:
"""
Stop the pin and return it to PinState.NOT_STARTED
"""
pass # subclasses should override this
@abstractmethod
def state(self) -> float:
"""
Return most recent output state. This does not re-read the pin,
It just returns that last value set by the output() method.
If the pin is not started or has been stopped,
this will return PinState:NOT_STARTED
:return: most recent output duty_cycle
"""
return PinState.NOT_STARTED # subclasses must override
@abstractmethod
def duty_cycle(self, duty: float) -> None:
"""
Set the output duty cycle of the pin
in range 0 to 1.0 (0% to 100%)
:param duty: duty cycle in range 0 to 1
:except: RuntimeError is pin is not started
"""
pass # subclasses must override
#
# ####### Factory Methods
#
#
# Pin id allows pins to be selected using a single string to
# select from different underlying providers, numbering schemes and settings.
#
# Use Rpi.GPIO library, GPIO.BOARD pin numbering scheme, pin number 13
# "RPI_GPIO.BOARD.13"
#
# Use Rpi.GPIO library, GPIO.BCM broadcom pin numbering scheme, gpio pin number 33
# "RPI_GPIO.BCM.33"
#
# Use PCA9685 on bus 0 at address 0x40, channel 7
# "PCA9685.0:40.7"
#
def output_pin_by_id(pin_id: str, frequency_hz: int = 60) -> OutputPin:
"""
Select a ttl output pin given a pin id.
:param pin_id: pin specifier string
:param frequency_hz: duty cycle frequency in hertz (only necessary for PCA9685)
:return: OutputPin
"""
parts = pin_id.split(".")
if parts[0] == PinProvider.PCA9685:
pin_provider = parts[0]
i2c_bus, i2c_address = parts[1].split(":")
i2c_bus = int(i2c_bus)
i2c_address = int(i2c_address, base=16)
frequency_hz = int(frequency_hz)
pin_number = int(parts[2])
return output_pin(pin_provider, pin_number, i2c_bus=i2c_bus, i2c_address=i2c_address, frequency_hz=frequency_hz)
if parts[0] == PinProvider.RPI_GPIO:
pin_provider = parts[0]
pin_scheme = parts[1]
pin_number = int(parts[2])
return output_pin(pin_provider, pin_number, pin_scheme=pin_scheme)
if parts[0] == PinProvider.PIGPIO:
pin_provider = parts[0]
if PinScheme.BCM != parts[1]:
raise ValueError("Pin scheme must be BCM for PIGPIO")
pin_number = int(parts[2])
return output_pin(pin_provider, pin_number, pin_scheme=PinScheme.BCM)
raise ValueError(f"Unknown pin provider {parts[0]}")
def pwm_pin_by_id(pin_id: str, frequency_hz: int = 60) -> PwmPin:
"""
Select a pwm output pin given a pin id.
:param pin_id: pin specifier string
:param frequency_hz: duty cycle frequency in hertz
:return: PwmPin
"""
parts = pin_id.split(".")
if parts[0] == PinProvider.PCA9685:
pin_provider = parts[0]
i2c_bus, i2c_address = parts[1].split(":")
i2c_bus = int(i2c_bus)
i2c_address = int(i2c_address, base=16)
pin_number = int(parts[2])
return pwm_pin(pin_provider, pin_number, i2c_bus=i2c_bus, i2c_address=i2c_address, frequency_hz=frequency_hz)
if parts[0] == PinProvider.RPI_GPIO:
pin_provider = parts[0]
pin_scheme = parts[1]
pin_number = int(parts[2])
return pwm_pin(pin_provider, pin_number, pin_scheme=pin_scheme, frequency_hz=frequency_hz)
if parts[0] == PinProvider.PIGPIO:
pin_provider = parts[0]
if PinScheme.BCM != parts[1]:
raise ValueError("Pin scheme must be BCM for PIGPIO")
pin_number = int(parts[2])
return pwm_pin(pin_provider, pin_number, pin_scheme=PinScheme.BCM, frequency_hz=frequency_hz)
raise ValueError(f"Unknown pin provider {parts[0]}")
def input_pin_by_id(pin_id: str, pull: int = PinPull.PULL_NONE) -> InputPin:
"""
Select a ttl input pin given a pin id.
"""
parts = pin_id.split(".")
if parts[0] == PinProvider.PCA9685:
raise RuntimeError("PinProvider.PCA9685 does not implement InputPin")
if parts[0] == PinProvider.RPI_GPIO:
pin_provider = parts[0]
pin_scheme = parts[1]
pin_number = int(parts[2])
return input_pin(pin_provider, pin_number, pin_scheme=pin_scheme, pull=pull)
if parts[0] == PinProvider.PIGPIO:
pin_provider = parts[0]
if PinScheme.BCM != parts[1]:
raise ValueError("Pin scheme must be BCM for PIGPIO")
pin_number = int(parts[2])
return input_pin(pin_provider, pin_number, pin_scheme=PinScheme.BCM, pull=pull)
raise ValueError(f"Unknown pin provider {parts[0]}")
def input_pin(
pin_provider: str,
pin_number: int,
pin_scheme: str = PinScheme.BOARD,
pull: int = PinPull.PULL_NONE) -> InputPin:
"""
construct an InputPin using the given pin provider.
Note that PCA9685 can NOT provide an InputPin.
:param pin_provider: PinProvider string
:param pin_number: zero based pin number
:param pin_scheme: PinScheme string
:param pull: PinPull value
:return: InputPin
:except: RuntimeError if pin_provider is not valid.
"""
if pin_provider == PinProvider.RPI_GPIO:
return InputPinGpio(pin_number, pin_scheme, pull)
if pin_provider == PinProvider.PCA9685:
raise RuntimeError("PinProvider.PCA9685 does not implement InputPin")
if pin_provider == PinProvider.PIGPIO:
if pin_scheme != PinScheme.BCM:
raise ValueError("Pin scheme must be PinScheme.BCM for PIGPIO")
return InputPinPigpio(pin_number, pull)
raise RuntimeError(f"UnknownPinProvider ({pin_provider})")
def output_pin(
pin_provider: str,
pin_number: int,
pin_scheme: str = PinScheme.BOARD,
i2c_bus: int = 0,
i2c_address: int = 40,
frequency_hz: int = 60) -> OutputPin:
"""
construct an OutputPin using the given pin provider
Note that PCA9685 can NOT provide an InputPin.
:param pin_provider: PinProvider string
:param pin_number: zero based pin number
:param pin_scheme: PinScheme string
:param i2c_bus: I2C bus number for I2C devices
:param i2c_address: I2C address for I2C devices
:param frequency_hz: duty cycle frequence in hertz (for PCA9685)
:return: InputPin
:except: RuntimeError if pin_provider is not valid.
"""
if pin_provider == PinProvider.RPI_GPIO:
return OutputPinGpio(pin_number, pin_scheme)
if pin_provider == PinProvider.PCA9685:
return OutputPinPCA9685(pin_number, pca9685(i2c_bus, i2c_address, frequency_hz))
if pin_provider == PinProvider.PIGPIO:
if pin_scheme != PinScheme.BCM:
raise ValueError("Pin scheme must be PinScheme.BCM for PIGPIO")
return OutputPinPigpio(pin_number)
raise RuntimeError(f"UnknownPinProvider ({pin_provider})")
def pwm_pin(
pin_provider: str,
pin_number: int,
pin_scheme: str = PinScheme.BOARD,
frequency_hz: int = 60,
i2c_bus: int = 0,
i2c_address: int = 40) -> PwmPin:
"""
construct a PwmPin using the given pin provider
:param pin_provider: PinProvider string
:param pin_number: zero based pin number
:param pin_scheme: PinScheme string
:param i2c_bus: I2C bus number for I2C devices
:param i2c_address: I2C address for I2C devices
:param frequency_hz: duty cycle frequence in hertz
:return: PwmPin
:except: RuntimeError if pin_provider is not valid.
"""
if pin_provider == PinProvider.RPI_GPIO:
return PwmPinGpio(pin_number, pin_scheme, frequency_hz)
if pin_provider == PinProvider.PCA9685:
return PwmPinPCA9685(pin_number, pca9685(i2c_bus, i2c_address, frequency_hz))
if pin_provider == PinProvider.PIGPIO:
if pin_scheme != PinScheme.BCM:
raise ValueError("Pin scheme must be PinScheme.BCM for PIGPIO")
return PwmPinPigpio(pin_number, frequency_hz)
raise RuntimeError(f"UnknownPinProvider ({pin_provider})")
#
# ----- RPi.GPIO/Jetson.GPIO implementations -----
#
try:
import RPi.GPIO as GPIO
# lookups to convert abstact api to GPIO values
gpio_pin_edge = [None, GPIO.RISING, GPIO.FALLING, GPIO.BOTH]
gpio_pin_pull = [None, GPIO.PUD_OFF, GPIO.PUD_DOWN, GPIO.PUD_UP]
gpio_pin_scheme = {PinScheme.BOARD: GPIO.BOARD, PinScheme.BCM: GPIO.BCM}
except ImportError:
logger.warn("RPi.GPIO was not imported.")
globals()["GPIO"] = None
def gpio_fn(pin_scheme:int, fn:Callable[[], Any]):
"""
Convenience method to enforce the desired GPIO pin scheme
before calling a GPIO function.
RPi.GPIO allows only a single scheme to be set at runtime.
If the pin scheme is already set to a different scheme, then
this will raise a RuntimeError to prevent erroneous pin outputs.
:param pin_scheme:int GPIO.BOARD or GPIO.BCM
:param fn:Callable[[], Any] no-arg function to call after setting pin scheme.
:return:any return value from called function
:exception:RuntimeError if pin scheme is already set to a different scheme.
"""
prev_scheme = GPIO.getmode()
if prev_scheme is None:
GPIO.setmode(pin_scheme)
elif prev_scheme != pin_scheme:
raise RuntimeError(f"Attempt to change GPIO pin scheme from ({prev_scheme}) to ({pin_scheme})"
" after it has been set. All RPi.GPIO user must use the same pin scheme.")
val = fn()
return val
class InputPinGpio(InputPin):
def __init__(self, pin_number: int, pin_scheme: str, pull: int = PinPull.PULL_NONE) -> None:
"""
Input pin ttl HIGH/LOW using RPi.GPIO/Jetson.GPIO
:param pin_number: GPIO.BOARD mode point number
:param pull: enable a pull up or down resistor on pin. Default is PinPull.PULL_NONE
"""
self.pin_number = pin_number
self.pin_scheme = gpio_pin_scheme[pin_scheme]
self.pull = pull
self.on_input = None
self._state = PinState.NOT_STARTED
super().__init__()
def _callback(self, pin_number):
if self.on_input is not None:
self.on_input(self.pin_number, self.input())
def start(self, on_input=None, edge=PinEdge.RISING) -> None:
"""
:param on_input: function to call when an edge is detected, or None to ignore
:param edge: type of edge(s) that trigger on_input; default is
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start InputPinGpio({self.pin_number}) that is already started.")
gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.IN, pull_up_down=gpio_pin_pull[self.pull]))
if on_input is not None:
self.on_input = on_input
gpio_fn(
self.pin_scheme,
lambda: GPIO.add_event_detect(self.pin_number, gpio_pin_edge[edge], callback=self._callback))
self.input() # read first state
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number))
self._state = PinState.NOT_STARTED
def state(self) -> int:
return self._state
def input(self) -> int:
self._state = gpio_fn(self.pin_scheme, lambda: GPIO.input(self.pin_number))
return self._state
class OutputPinGpio(OutputPin):
"""
Output pin ttl HIGH/LOW using Rpi.GPIO/Jetson.GPIO
"""
def __init__(self, pin_number: int, pin_scheme: str) -> None:
self.pin_number = pin_number
self.pin_scheme = gpio_pin_scheme[pin_scheme]
self._state = PinState.NOT_STARTED
def start(self, state: int = PinState.LOW) -> None:
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start OutputPinGpio({self.pin_number}) that is already started.")
gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.OUT))
self.output(state)
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number))
self._state = PinState.NOT_STARTED
def state(self) -> int:
return self._state
def output(self, state: int) -> None:
gpio_fn(self.pin_scheme, lambda: GPIO.output(self.pin_number, state))
self._state = state
class PwmPinGpio(PwmPin):
"""
PWM output pin using Rpi.GPIO/Jetson.GPIO
"""
def __init__(self, pin_number: int, pin_scheme: str, frequency_hz: float = 50) -> None:
self.pin_number = pin_number
self.pin_scheme = gpio_pin_scheme[pin_scheme]
self.frequency = int(frequency_hz)
self.pwm = None
self._state = PinState.NOT_STARTED
def start(self, duty: float = 0) -> None:
if self.pwm is not None:
raise RuntimeError("Attempt to start PwmPinGpio that is already started.")
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
gpio_fn(self.pin_scheme, lambda: GPIO.setup(self.pin_number, GPIO.OUT))
self.pwm = gpio_fn(self.pin_scheme, lambda: GPIO.PWM(self.pin_number, self.frequency))
self.pwm.start(duty * 100) # takes duty in range 0 to 100
self._state = duty
def stop(self) -> None:
if self.pwm is not None:
self.pwm.stop()
gpio_fn(self.pin_scheme, lambda: GPIO.cleanup(self.pin_number))
self._state = PinState.NOT_STARTED
def state(self) -> float:
return self._state
def duty_cycle(self, duty: float) -> None:
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
self.pwm.ChangeDutyCycle(duty * 100) # takes duty of 0 to 100
self._state = duty
#
# ----- PCA9685 implementations -----
#
class PCA9685:
'''
Pin controller using PCA9685 boards.
This is used for most RC Cars. This
driver can output ttl HIGH or LOW or
produce a duty cycle at the given frequency.
'''
def __init__(self, busnum: int, address: int, frequency: int):
import Adafruit_PCA9685
if busnum is not None:
from Adafruit_GPIO import I2C
# monkey-patch I2C driver to use our bus number
def get_bus():
return busnum
I2C.get_default_bus = get_bus
self.pwm = Adafruit_PCA9685.PCA9685(address=address)
self.pwm.set_pwm_freq(frequency)
self._frequency = frequency
def get_frequency(self):
return self._frequency
def set_high(self, channel: int):
self.pwm.set_pwm(channel, 4096, 0)
def set_low(self, channel: int):
self.pwm.set_pwm(channel, 0, 4096)
def set_duty_cycle(self, channel: int, duty_cycle: float):
if duty_cycle < 0 or duty_cycle > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
if duty_cycle == 1:
self.set_high(channel)
elif duty_cycle == 0:
self.set_low(channel)
else:
# duty cycle is fraction of the 12 bits
pulse = int(4096 * duty_cycle)
try:
self.pwm.set_pwm(channel, 0, pulse)
except Exception as e:
logger.error(f'Error on PCA9685 channel {channel}: {str(e)}')
#
# lookup map for PCA9685 singletons
# key is "busnum:address"
#
_pca9685 = {}
def pca9685(busnum: int, address: int, frequency: int = 60):
"""
pca9685 factory allocates driver for pca9685
at given bus number and i2c address.
If we have already created one for that bus/addr
pair then use that singleton. If frequency is
not the same, then error.
:param busnum: I2C bus number of PCA9685
:param address: address of PCA9685 on I2C bus
:param frequency: frequency in hertz of duty cycle
:except: PCA9685 has a single frequency for all channels,
so attempts to allocate a controller at a
given bus number and address with different
frequencies will raise a ValueError
"""
key = str(busnum) + ":" + hex(address)
pca = _pca9685.get(key)
if pca is None:
pca = PCA9685(busnum, address, frequency)
if pca.get_frequency() != frequency:
raise ValueError(
f"Frequency {frequency} conflicts with pca9685 at {key} "
f"with frequency {pca.pwm.get_pwm_freq()}")
return pca
class OutputPinPCA9685(ABC):
"""
Output pin ttl HIGH/LOW using PCA9685
"""
def __init__(self, pin_number: int, pca9685: PCA9685) -> None:
self.pin_number = pin_number
self.pca9685 = pca9685
self._state = PinState.NOT_STARTED
def start(self, state: int = PinState.LOW) -> None:
"""
Start the pin in output mode.
This raises a RuntimeError if the pin is already started.
You can check to see if the pin is started by calling
state() and checking for PinState.NOT_STARTED
:param state: PinState to start with
:except: RuntimeError if pin is already started.
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start pin ({self.pin_number}) that is already started")
self._state = 0 # hack to allow first output to work
self.output(state)
def stop(self) -> None:
"""
Stop the pin and return it to PinState.NOT_STARTED
"""
if self.state() != PinState.NOT_STARTED:
self.output(PinState.LOW)
self._state = PinState.NOT_STARTED
def state(self) -> int:
"""
Return most recent output state.
If the pin is not started or has been stopped,
this will return PinState:NOT_STARTED
:return: PinState
"""
return self._state
def output(self, state: int) -> None:
"""
Write output state to the pin.
:param state: PinState.LOW or PinState.HIGH
"""
if self.state() == PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to use pin ({self.pin_number}) that is not started")
if state == PinState.HIGH:
self.pca9685.set_high(self.pin_number)
else:
self.pca9685.set_low(self.pin_number)
self._state = state
class PwmPinPCA9685(PwmPin):
"""
PWM output pin using PCA9685
"""
def __init__(self, pin_number: int, pca9685: PCA9685) -> None:
self.pin_number = pin_number
self.pca9685 = pca9685
self._state = PinState.NOT_STARTED
def start(self, duty: float = 0) -> None:
"""
Start pin with given duty cycle
:param duty: duty cycle in range 0 to 1
:except: RuntimeError if pin is already started.
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start pin ({self.pin_number}) that is already started")
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
self._state = 0 # hack to allow first duty_cycle to work
self.duty_cycle(duty)
self._state = duty
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
self.duty_cycle(0)
self._state = PinState.NOT_STARTED
def state(self) -> float:
"""
This returns the last set duty cycle.
:return: duty cycle in range 0 to 1 OR PinState.NOT_STARTED in not started
"""
return self._state
def duty_cycle(self, duty: float) -> None:
"""
Write a duty cycle to the output pin
:param duty: duty cycle in range 0 to 1
:except: RuntimeError if not started
"""
if self.state() == PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to use pin ({self.pin_number}) that is not started")
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
self.pca9685.set_duty_cycle(self.pin_number, duty)
self._state = duty
#
# ----- PIGPIO implementation -----
#
# pigpio is an optional install
try:
import pigpio
pigpio_pin_edge = [None, pigpio.RISING_EDGE, pigpio.FALLING_EDGE, pigpio.EITHER_EDGE]
pigpio_pin_pull = [None, pigpio.PUD_OFF, pigpio.PUD_DOWN, pigpio.PUD_UP]
except ImportError:
logger.warn("pigpio was not imported.")
globals()["pigpio"] = None
class InputPinPigpio(InputPin):
def __init__(self, pin_number: int, pull: int = PinPull.PULL_NONE, pgpio=None) -> None:
"""
Input pin ttl HIGH/LOW using PiGPIO library
:param pin_number: GPIO.BOARD mode pin number
:param pull: enable a pull up or down resistor on pin. Default is PinPull.PULL_NONE
:param pgpio: instance of pgpio to use or None to allocate a new one
"""
self.pgpio = pgpio
self.pin_number = pin_number
self.pull = pigpio_pin_pull[pull]
self.on_input = None
self._state = PinState.NOT_STARTED
def __del__(self):
self.stop()
def _callback(self, gpio, level, tock):
if self.on_input is not None:
self.on_input(gpio, level)
def start(self, on_input=None, edge=PinEdge.RISING) -> None:
"""
Start the input pin and optionally set callback.
:param on_input: function to call when an edge is detected, or None to ignore
:param edge: type of edge(s) that trigger on_input; default is PinEdge.RISING
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start InputPinPigpio({self.pin_number}) that is already started.")
self.pgpio = self.pgpio or pigpio.pi()
self.pgpio.set_mode(self.pin_number, pigpio.INPUT)
self.pgpio.set_pull_up_down(self.pin_number, self.pull)
if on_input is not None:
self.on_input = on_input
self.pgpio.callback(self.pin_number, pigpio_pin_edge[edge], self._callback)
self._state = self.pgpio.read(self.pin_number) # read initial state
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
self.pgpio.stop()
self.pgpio = None
self.on_input = None
self._state = PinState.NOT_STARTED
def state(self) -> int:
"""
Return last input() value. This does NOT read the input again;
it returns that last value that input() returned.
:return: PinState.LOW/HIGH OR PinState.NOT_STARTED if not started
"""
return self._state
def input(self) -> int:
"""
Read the input pins state.
:return: PinState.LOW/HIGH OR PinState.NOT_STARTED if not started
"""
if self.state() != PinState.NOT_STARTED:
self._state = self.pgpio.read(self.pin_number)
return self._state
class OutputPinPigpio(OutputPin):
"""
Output pin ttl HIGH/LOW using Rpi.GPIO/Jetson.GPIO
"""
def __init__(self, pin_number: int, pgpio=None) -> None:
self.pgpio = pgpio
self.pin_number = pin_number
self._state = PinState.NOT_STARTED
def start(self, state: int = PinState.LOW) -> None:
"""
Start the pin in output mode.
This raises a RuntimeError if the pin is already started.
You can check to see if the pin is started by calling
state() and checking for PinState.NOT_STARTED
:param state: PinState to start with
:except: RuntimeError if pin is already started.
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError("Attempt to start OutputPin that is already started.")
self.pgpio = self.pgpio or pigpio.pi()
self.pgpio.set_mode(self.pin_number, pigpio.OUTPUT)
self.pgpio.write(self.pin_number, state) # set initial state
self._state = state
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
self.pgpio.write(self.pin_number, PinState.LOW)
self.pgpio.stop()
self.pgpio = None
self._state = PinState.NOT_STARTED
def state(self) -> int:
"""
Return last output state
:return: PinState.LOW/HIGH or PinState.NOT_STARTED if pin not started.
"""
return self._state
def output(self, state: int) -> None:
"""
Write output state to the pin.
:param state: PinState.LOW or PinState.HIGH
"""
if self.state() != PinState.NOT_STARTED:
self.pgpio.write(self.pin_number, state)
self._state = state
class PwmPinPigpio(PwmPin):
"""
PWM output pin using Rpi.GPIO/Jetson.GPIO
"""
def __init__(self, pin_number: int, frequency_hz: float = 50, pgpio=None) -> None:
self.pgpio = pgpio
self.pin_number: int = pin_number
self.frequency: int = int(frequency_hz)
self._state: int = PinState.NOT_STARTED
def start(self, duty: float = 0) -> None:
"""
Start pin with given duty cycle.
:param duty: duty cycle in range 0 to 1
:except: RuntimeError if pin is already started.
"""
if self.state() != PinState.NOT_STARTED:
raise RuntimeError(f"Attempt to start InputPinPigpio({self.pin_number}) that is already started.")
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
self.pgpio = self.pgpio or pigpio.pi()
self.pgpio.set_mode(self.pin_number, pigpio.OUTPUT)
self.pgpio.set_PWM_frequency(self.pin_number, self.frequency)
self.pgpio.set_PWM_range(self.pin_number, 4095) # 12 bits, same as PCA9685
self.pgpio.set_PWM_dutycycle(self.pin_number, int(duty * 4095)) # set initial state
self._state = duty
def stop(self) -> None:
if self.state() != PinState.NOT_STARTED:
self.pgpio.write(self.pin_number, PinState.LOW)
self.pgpio.stop()
self.pgpio = None
self._state = PinState.NOT_STARTED
def state(self) -> float:
"""
This returns the last set duty cycle.
:return: duty cycle in range 0 to 1 OR PinState.NOT_STARTED in not started
"""
return self._state
def duty_cycle(self, duty: float) -> None:
"""
Write a duty cycle to the output pin
:param duty: duty cycle in range 0 to 1
:except: RuntimeError if not started
"""
if duty < 0 or duty > 1:
raise ValueError("duty_cycle must be in range 0 to 1")
if self.state() != PinState.NOT_STARTED:
self.pgpio.set_PWM_dutycycle(self.pin_number, int(duty * 4095))
self._state = duty
if __name__ == '__main__':
import argparse
import sys
import time
#
# output 50% duty cycle on Rpi board pin 33 (equivalent to BCM.13) for 10 seconds
# python pins.py --pwm-pin=RPI_GPIO.BOARD.33 --duty=0.5 --time=10
#
# input on Rpi board pin 35 (equivalend to BCM.19) for 10 seconds
# python pins.py --in-pin=RPI_GPIO.BOARD.35 --time=10
#
# output 50% duty cycle on Rpi board pin 33, input on Rpi board pin 35 using interrupt handler
# python pins.py --pwm-pin=RPI_GPIO.BOARD.33 --duty=0.5 --in-pin=RPI_GPIO.BOARD.35 -int=rising --time=10
#
# output on Rpi board pin 33, input on Rpi board pin 35 using interrupt handler
# python pins.py --out-pin=RPI_GPIO.BOARD.33 --duty=0.5 --in-pin=RPI_GPIO.BOARD.35 -int=rising --time=10
#
#
# parse arguments
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--pwm-pin", type=str, default=None,
help="pwm pin id, like 'PCA9685:1:60.13' or 'RPI_GPIO.BCM.13")
parser.add_argument("-hz", "--hertz", type=int, default=60,
help="PWM signal frequence in hertz. Default is 60hz")
parser.add_argument("-d", "--duty", type=float, default=0.5,
help="duty cycle in range 0 to 1. Default is 0.5")
parser.add_argument("-o", "--out-pin", type=str, default=None,
help="ttl output pin id, like 'PCA9685:1:60.13' or 'RPI_GPIO.BOARD.35' or 'RPI_GPIO.BCM.13'")
parser.add_argument("-i", "--in-pin", type=str, default=None,
help="ttl input pin id, like 'RPI_GPIO.BOARD.35' or 'RPI_GPIO.BCM.19'")
parser.add_argument("-pu", "--pull", type=str, choices=['up', 'down', 'none'], default='none',
help="input pin pullup, one of 'up', 'down', 'none'")
parser.add_argument("-int", "--interrupt", type=str, choices=['falling', 'rising', 'both', 'none'], default='none',
help="use interrupt routine on in-pin with given edge; 'falling', 'rising' or 'both'")
parser.add_argument("-tm", "--time", type=float, default=1, help="duration test in seconds")
parser.add_argument("-db", "--debug", action='store_true', help="show debug output")
parser.add_argument("-th", "--threaded", action='store_true', help="run in threaded mode")
# Read arguments from command line
args = parser.parse_args()
help = []
if args.hertz < 1:
help.append("-hz/--hertz: must be >= 1.")
if args.duty < 0 or args.duty > 1:
help.append("-d/--duty: must be in range 0 to 1")
if args.pwm_pin is None and args.out_pin is None and args.in_pin is None:
help.append("must have one of -o/--out-pin or -p/--pwm-pin or -i/--in-pin")
if args.pwm_pin is not None and args.out_pin is not None:
help.append("must have only one of -o/--out-pin or -p/--pwn-pin")
if args.time < 1:
help.append("-tm/--time: must be > 0.")
if len(help) > 0:
parser.print_help()
for h in help:
print(" " + h)
sys.exit(1)
pin_pull = {
'up': PinPull.PULL_UP,
'down': PinPull.PULL_DOWN,
'none': PinPull.PULL_NONE
}
pin_edge = {
'none': None,
'falling': PinEdge.FALLING,
'rising': PinEdge.RISING,
'both': PinEdge.BOTH
}
def on_input(pin_number, state):
if state == PinState.HIGH:
print("+", pin_number, time.time()*1000)
elif state == PinState.LOW:
print("-", pin_number, time.time()*1000)
pwm_out_pin: PwmPin = None
ttl_out_pin: OutputPin = None
ttl_in_pin: InputPin = None
try:
#
# construct a pin of the correct type
#
if args.in_pin is not None:
ttl_in_pin = input_pin_by_id(args.in_pin, pin_pull[args.pull])
if args.interrupt != 'none':
ttl_in_pin.start(on_input=on_input, edge=pin_edge[args.interrupt])
else:
ttl_in_pin.start()
if args.pwm_pin is not None:
pwm_out_pin = pwm_pin_by_id(args.pwm_pin, args.hertz)
pwm_out_pin.start(args.duty)
if args.out_pin is not None:
ttl_out_pin = output_pin_by_id(args.out_pin, args.hertz)
ttl_out_pin.start(PinState.LOW)
start_time = time.time()
end_time = start_time + args.time
while start_time < end_time:
if ttl_out_pin is not None:
if args.duty > 0:
ttl_out_pin.output(PinState.HIGH)
time.sleep(1 / args.hertz * args.duty)
if args.duty < 1:
ttl_out_pin.output(PinState.LOW)
time.sleep(1 / args.hertz * (1 - args.duty))
else:
# yield time to background threads
sleep_time = 1/args.hertz - (time.time() - start_time)
if sleep_time > 0.0:
time.sleep(sleep_time)
else:
time.sleep(0) # yield time to other threads
start_time = time.time()
except KeyboardInterrupt:
print('Stopping early.')
except Exception as e:
print(e)
exit(1)
finally:
if pwm_out_pin is not None:
pwm_out_pin.stop()
if ttl_out_pin is not None:
ttl_out_pin.stop()