Source code for responsepixx
from pypixxlib import _libdpx as dp
import sys
from numpy import inf
[docs]class ButtonListener():
""" A convenient class to create button listeners for RESPONSEPixx button boxes.
Arguments:
RPxName (str): uniquely identifies which RESPONSEPixx device is being used. This is used to
define a dictionary that maps button numbers to button color strings. Accepted values (not case sensitive):
- 1) 'handheld'
- 2) 'handheld - mri'
- 3) 'dual - mri'
- 4) 'dual handheld'
- 5) 'mri 10 button'
buffBaseAddr (int): the memory address of the digital input log buffer in
DATAPixx RAM. see pypixxlib._libdpx.DPxSetDinLog() (default = 12000000)
numBuffFrames (int): the digital input log buffer size in frames.
see pypixxlib._libdpx.DPxSetDinLog() (default = 1000)
Attributes:
__logStatus__ (dict): the logStatus dictionary created by pypixxlib._libdpx.DPxSetDinLog()
__newData__ (int): an index for new entries in the logStatus circular buffer
__logs__ (list): all unique button events since most recent update. May be updated indefinitely
__logsTotal__ (list): all unique button events since instantiation. May be updated indefinitely
__states__ (list): Booleans indicating whether each button (i.e., digital input pin) is currently in the 'on' state. These are initialized using the digital input values.
__buttonDict__ (dict): maps button numbers (int) to button colors (str)
__mri__ (bool): flag indicating whether an MRI safe RESPONSEPixx box is being used. This is necessary as MRI safe boxes have the opposite ground state
__startTime__ (float): time of RPixxListener instantiation in VPixx box time
__RPxName__ (str): the type of RESPONSEPixx device being used
Exceptions:
1) misspecified RPxName (see Arguments)
"""
def __init__(self,RPxName, buffBaseAddr=12000000, numBuffFrames=1000):
# check for handshake between pypixxlib and the DATAPixx hardware
dp.DPxOpen()
if not dp.DPxIsReady():
raise Exception("ERROR in %s\nNo established connection to a DATAPixx device! You must call pypixxlib._libdpx.DPxOpen() before instantiating ButtonListener" % sys.argv[0])
dp.DPxEnableDinDebounce()
dp.DPxSetDinDataOut(0) # initialize LEDs to off
# intialize the logStatus dictionary, where new Din logs are accessed
self.__logStatus__ = dp.DPxSetDinLog(buffBaseAddr,numBuffFrames)
dp.DPxStartDinLog() # tell hardware to start logging state changes in the digital input channels
# intialize newData, logs, logsTotal. these are empty until updated with new Din logs
self.__newData__ = []
self.__logs__ = []
self.__logsTotal__ = []
# initialize the button number to button string dictionary amd MRI Boolean
# this is very rigid, but there is a detailed error if the 'RPxName'
# string is misspecified
RPxName = RPxName.lower() # not case sensitive
if RPxName=="handheld":
self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue',4:'white'}
self.__mri__ = False
elif RPxName=="handheld - mri" or RPxName=="dual - mri":
self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue'}
self.__mri__ = True
elif RPxName=="dual handheld":
self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue'}
self.__mri__ = False
elif RPxName=="mri 10 button":
self.__buttonDict__ = {0:'red left',1:'yellow left',2:'green left',3:'blue left',
4:'white left',5:'red right',6:'yellow right',7:'green right',
8:'blue right',9:'white right'}
self.__mri__ = True
else:
raise Exception("ERROR in %s\nInvalid RESPONSEPixx device name. Valid devices:\n\tHandheld\n\tHandheld - MRI\n\tDual - MRI\n\tDual Handheld\n\tMRI 10 Button" % sys.argv[0])
# intialize button states
self.__states__ = {}
dp.DPxUpdateRegCache()
base10State = dp.DPxGetDinValue()
for buttonNum in self:
self.__states__[buttonNum] = self.__getState__(base10State,buttonNum)
# initialize the DPx time this listener was instantiated
self.__startTime__ = dp.DPxGetTime()
# intialize the name of this device
self.__RPxName__ = RPxName
def __del__(self):
"""
Ensure the Din logs terminate if listener object is deleted.
"""
self.pause()
def __iter__(self):
"""
The iter() function is defined such that iterating over ButtonListener
objects is akin to iterating over the list of button numbers. Now, instead of
for buttonNum in list(self.__buttonDict__.keys()):
...
we can simply use
for buttonNum in self:
...
"""
return iter(list(self.__buttonDict__.keys()))
def __getState__(self,base10State,pinNum):
"""
This function receives a base 10 representation of all pin states across digital input channels and
returns a Boolean indicating whether the pin specified by 'pinNum' is currently active.
"""
return bool( base10State & (2**pinNum) ) == self.__mri__ # (flip for MRI buttons)
def __update_newData_logStatus__(self):
"""
This function performs a register update, queries the digital input circular buffer,
and updates the starting location of any new logs in the circular buffer.
It returns a Boolean indicating whether there are new entries in the circular buffer.
This function may update the values of
__newData__ and
__logStatus__
"""
dp.DPxUpdateRegCache()
dp.DPxGetDinStatus(self.__logStatus__)
self.__newData__ = self.__logStatus__['newLogFrames']
return bool(self.__newData__)
def __ensureButtonSubsetIntegrity__(self, buttonSubset):
"""
This function ensures that a user-defined subset of RESPONSEPixx button(s)
is in the proper format to be used by other functions in this class.
Arguments:
buttonSubset (list): is expected as either
1) omitted (defaulted to None), which
specifies to use all buttons
2) a single integer
3) a list of integers
if buttonSubset is not in any of these 3 formats, this
function will raise a detailed Exception detailing the exact
format that is expected for buttonSubset
Returns:
list: Each element is an integer
Exceptions:
1) buttonSubset is not in an expected format (see Arguments)
2) buttonSubset contains buttons that don't exist in buttonDict
"""
if buttonSubset is None:
return list(self.__buttonDict__.keys()) # no further data checks required
elif type(buttonSubset) is int:
buttonSubset = [buttonSubset]
elif type(buttonSubset) is not list or not all([ (type(i) is int) for i in buttonSubset]):
raise Exception("ERROR in %s\nInvalid RESPONSEPixx button number(s). buttonSubset must be\n\t1) omitted (return activity for all button)\n\t2) a single integer\n\t3) a list of integer(s)" % sys.argv[0])
buttonSubset = set(buttonSubset) # discard repeated values
if not buttonSubset.issubset(self.__buttonDict__.keys()): # ensure that these buttons exist on the button box
raise Exception("ERROR in %s\nOne or more elements in buttonSubset are not active buttons" % sys.argv[0])
# buttonSubset passes all integrity checks
return list(buttonSubset)
[docs] def updateLogs(self):
"""
This function allows the user to update the digital input log. They can therefore query the same
digital input log multiple times without overwriting them.
This function may update the values of
__logs__ and
__logsTotal__
"""
self.__logs__ = [] # list of [time stamp, button number, push/release] for all pin activations in most recent log
if self.__update_newData_logStatus__(): # calling __update_newData_logStatus__() ensures register update and logStatus check
# check for underflow
if self.__logStatus__['numLogUnderflows']:
print("\nWARNING in %s\nUnderflows detected in digital input buffer. Recommend increasing 'numBufferFrames'. Current buffer size is %d frames.\n" % (sys.argv[0], self.__logStatus__['numBufferFrames']) )
# iterate through the current logs
currentLogs = dp.DPxReadDinLog(self.__logStatus__,self.__newData__) # retrieve current logs
for log in currentLogs: # for each sample in the log
for buttonNum in self: # for each pin with a key assigned to it
nth_currentState = self.__getState__(log[1],buttonNum) # is the nth pin in an active state
# the n_th pin is currently activate, and was not previously active
if nth_currentState and not self.__states__[buttonNum]:
self.__logs__.append([log[0],buttonNum,'Push']) # append timestamp and pin number
# self.__logsTotal__.append([log[0],buttonNum,'Push']) # append timestamp and pin number
self.__states__[buttonNum] = True # change button state
# n_th pin is not currently active, but was previously active
elif not nth_currentState and self.__states__[buttonNum]:
self.__logs__.append([log[0],buttonNum,'Release']) # append timestamp and pin number
# self.__logsTotal__.append([log[0],buttonNum,'Release']) # append timestamp and pin number
self.__states__[buttonNum] = False # change button state
# else:
#
# the n_th pin is active, and was previously also active (no state change):
# nth_currentState and self.__states__[buttonInd]
#
# OR
#
# the n_th pin is not active but was also previously not active (no state change):
# not nth_currentState and not self.__states__[buttonInd]
# update total log of all button presses
for log in self.__logs__:
self.__logsTotal__.append(log)
[docs] def getNewButtonActivity(self, buttonSubset=None, push=True, release=False):
"""
This function allows the user to easily query all unique RESPONSEPixx button events
since the most recent call to updateLogs(). This function does not actually update
the Din logs.
Arguments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
push (bool): indicates whether to include button pushes in the activity log. (default = True)
release (bool): indicates whether to include button releases in the activity log. (default = True)
Returns:
list: Each element of the list is a unique, new RESPONSEPixx button event.
Each element is itself a list with 3 elements:
[timestamp (float), button number (int), push/release (str)]
"""
# ensure data hygiene
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
# create empty list
buttonActivity = []
for log in self.__logs__: # for each sample in the log
for button in buttonSubset: # for all requested buttons
if log[1]==button and ((log[2]=="Push" and push) or (log[2]=="Release" and release)): # if this is the correct button, and it's a push when pushes are requested _or_ it's a release when releases are requested
buttonActivity.append(log) # add to the activity log
return buttonActivity
[docs] def getAllButtonActivity(self, buttonSubset=None, numLogs=None, push=True, release=False):
""" This function allows the user to easily query all unique RESPONSEPixx button events
since the instantiation of the RPixxListener object. This function does not actually update
the Din logs.
Arguments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
numLogs (int): indicates how many of the total list of logs to return starting
backwards from the most recent log
push (bool): indicates whether to include button pushes in the activity log. (default = True)
release (bool): indicates whether to include button releases in the activity log. (default = True)
Returns:
list: Each element of the list is a unique, new RESPONSEPixx button event.
Each element is itself a list with 3 elements: [timestamp (float), button number (int), push/release (str)]
Exceptions:
1) if numLogs is not a single, positive integer
"""
# ensure data hygiene
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
n = len(self.__logsTotal__) # get the number of total logs
# ensure numLogs is sensible
if numLogs is None:
numLogs = n
elif type(numLogs) is not int or numLogs<=0:
raise Exception("ERROR in %s\nInvalid numLogs data type. numLog must a single positive integer" % sys.argv[0])
elif numLogs > n:
print("\nWARNING in %s\nRequested number of logs (%d) exceeds available number of logs (%d). Returning all available logs.\n" % (sys.argv[0], numLogs, len(self.__logsTotal__)) )
numLogs = n
# create empty list
buttonActivity = []
for log in self.__logsTotal__[(n-numLogs):n]: # for each sample in the requested logs
for button in buttonSubset: # for all requested buttons
if log[1]==button and ((log[2]=="Push" and push) or (log[2]=="Release" and release)): # if this is the correct button, and it's a push when pushes are requested _or_ it's a release when releases are requested
buttonActivity.append(log) # add to the activity log
return buttonActivity
[docs] def waitForButton(self, buttonSubset=None, timeOut=inf):
""" This function allows the user to drop a blocking command whereby execution pauses
until one or more unique RESPONSEPixx button(s) are activated. This function does
update the Din logs. These updates cannot create conflicts, as this is a blocking
function and no other queries can occur concurrently.
Arguments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
timeOut (float): reaction time in seconds after which this blocking
function terminates
Returns:
rt (float): the reaction time of the button press in seconds
Exceptions:
1) if argument 'timeOut' cannot be casted to float data type or is less than zero.
Detailed feedback is given.
"""
# ensure data hygiene
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
# ensure validity of timeOut
if type(float(timeOut)) is not float or timeOut < 0:
raise Exception("ERROR in %s\nInvalid 'timeOut' argument. 'timeOut' must a single number greater than or equal to 0" % sys.argv[0])
dp.DPxUpdateRegCache()
startTime = dp.DPxGetTime()
while True:
dp.DPxUpdateRegCache()
rt = dp.DPxGetTime() - startTime
self.updateLogs()
if any(self.isButtonActive(buttonSubset)) or rt >= timeOut:
break
return rt
[docs] def isButtonActive(self,buttonSubset=None):
"""
A fast and simple method for returning a list of Booleans indicating whether RESPONSEPixx button(s)
are currently active since the most recent call to updateLogs(). This function
does not update the Din logs.
Arugments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
Returns:
list: Each element in the list is a bool, where the ith element indicates whether
the ith button in the buttonSubset is active
"""
# ensure data hygiene
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
return [self.__states__[i] for i in buttonSubset]
[docs] def getButtonString(self,buttonSubset=None):
"""
A method for returning a list of strings that give a description of all the
RESPONSEPixx button(s) indicated by buttonSubset. The buttons specified in
buttonSubset are used to retrieve the button strings from the
__buttonDict__ attribute of this class.
Arugments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
Returns:
list: Each element in the list is a string, where the ith element describes
the ith button in the buttonSubset (e.g., "white button")
"""
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
return [self.__buttonDict__[i] for i in buttonSubset]
[docs] def setButtonLEDs(self, buttonSubset=None, state=None, strength=1.0):
"""
A method for flipping the state of the RESPONSEPixx button LED(s) indicated by buttonSubset.
Arugments:
buttonSubset (list): indicates which buttons to return from the activity log.
buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers
buttonSubset can be passed as either
1) omitted (defaulted to None), which specifies to use all buttons
2) a single integer
3) a list of integers
state (any): indicates the how to adjust the state of the LED lights. If omitted, it is
defaulted to None, which will flip the state of the LEDs with respect to their
current state. If this argument is provided, it will be casted to bool. If it
cannot be casted to bool, this will raise an Exception. If state is True, this
will ensure that the LEDs are turned on, regardless of their current state.
If state is False, this will ensure that the LEDs are turned off, regardless of
their current state. (default = None)
strength (float): indicates the brightness of the LED lights. Must be between
0 and 1. Values out side this range will elicit a warning.
Values less than 0 will be defaulted to 0.
Values greater than 1 will be defaulted to 1. (default = 1.0)
Exceptions:
1) if argument 'state' cannot be casted to bool data type.
2) if argument 'strength' cannot be casted to float data type.
These would raise errors in DPxSetDinDataOutStrength() anyways, so may as well catch them
here and give detailed feedback.
"""
# check whether this button box has LEDs
if self.__mri__:
print("\nWARNING in %s\nResponsePixx device '%s' does not have LED lights.\n" % (sys.argv[0],self.__RPxName__) )
return
# validate button list
buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset)
# check validity of state argument
if type(bool(state)) is not bool:
raise Exception("ERROR in %s\nInvalid 'state' argument. 'state' must a single value that can be casted to bool" % sys.argv[0])
elif state is not None: # cast it to bool
state = bool(state)
# check validity of strength argument
if type(float(strength)) is not float:
raise Exception("ERROR in %s\nInvalid 'strength' argument. 'strength' must a single positive float between 0 and 1" % sys.argv[0])
elif strength < 0 or strength > 1:
print("\nWARNING in %s\nLED strength of '%d' will be defaulted to %d. LED values must be between 0 and 1.\n" % (sys.argv[0],strength,strength>1) )
strength = float(strength>1)
# ensure the LED states of the buttons are initially the same
requiredDinDataOutState = dp.DPxGetDinDataOut() # Current state
# now perform the necessary operations to the state of buttons in buttonSubset
for pinNum in buttonSubset:
nth_currentState = self.__getState__(requiredDinDataOutState, pinNum+16)==0 # gives the current state
if nth_currentState and ((state is None) or (not state)): # is on, needs to be off
requiredDinDataOutState -= 2**(pinNum+16) # turn off
elif (not nth_currentState) and ((state is None) or state): # is off, and needs to be on
requiredDinDataOutState += 2**(pinNum+16) # turn on
# set Din data out values
dp.DPxSetDinDataDir(requiredDinDataOutState)
dp.DPxSetDinDataOut(requiredDinDataOutState)
dp.DPxSetDinDataOutStrength(strength)
dp.DPxUpdateRegCache()
[docs] def getStartTime(self):
"""
Simply returns the time of this ButtonListener was instantiated. Time stamp is grabbed from the DPx box.
Returns:
self.__startTime__ (float): time stamp for when this object was instantiated in DPx time.
"""
return self.__startTime__
[docs] def getRPixxName(self):
"""
Simply returns the name of the RESPONSEPixx type. This was passed to ButtonListener upon instantiation.
Returns:
self.__RPxName__ (str): name of the RESPONSEPixx device type.
"""
return self.__RPxName__
[docs] def isListening(self):
"""
Query whether the DATAPixx digital inputs are currently being logged.
Returns:
(bool): indicates whether the Din logs are running
"""
return bool(dp.DPxIsDinLogEvents())
[docs] def resume(self):
"""
Start the current Din log.
"""
dp.DPxStartDinLog()
dp.DPxUpdateRegCache()