Source code for responsepixx

from pypixxlib import _libdpx as dp
import sys
from numpy import inf

[docs]class ButtonListener(): """ A convenient class to create button listeners for RESPONSEPixx button boxes. Arguments: RPxName (str): uniquely identifies which RESPONSEPixx device is being used. This is used to define a dictionary that maps button numbers to button color strings. Accepted values (not case sensitive): - 1) 'handheld' - 2) 'handheld - mri' - 3) 'dual - mri' - 4) 'dual handheld' - 5) 'mri 10 button' buffBaseAddr (int): the memory address of the digital input log buffer in DATAPixx RAM. see pypixxlib._libdpx.DPxSetDinLog() (default = 12000000) numBuffFrames (int): the digital input log buffer size in frames. see pypixxlib._libdpx.DPxSetDinLog() (default = 1000) Attributes: __logStatus__ (dict): the logStatus dictionary created by pypixxlib._libdpx.DPxSetDinLog() __newData__ (int): an index for new entries in the logStatus circular buffer __logs__ (list): all unique button events since most recent update. May be updated indefinitely __logsTotal__ (list): all unique button events since instantiation. May be updated indefinitely __states__ (list): Booleans indicating whether each button (i.e., digital input pin) is currently in the 'on' state. These are initialized using the digital input values. __buttonDict__ (dict): maps button numbers (int) to button colors (str) __mri__ (bool): flag indicating whether an MRI safe RESPONSEPixx box is being used. This is necessary as MRI safe boxes have the opposite ground state __startTime__ (float): time of RPixxListener instantiation in VPixx box time __RPxName__ (str): the type of RESPONSEPixx device being used Exceptions: 1) misspecified RPxName (see Arguments) """ def __init__(self,RPxName, buffBaseAddr=12000000, numBuffFrames=1000): # check for handshake between pypixxlib and the DATAPixx hardware dp.DPxOpen() if not dp.DPxIsReady(): raise Exception("ERROR in %s\nNo established connection to a DATAPixx device! You must call pypixxlib._libdpx.DPxOpen() before instantiating ButtonListener" % sys.argv[0]) dp.DPxEnableDinDebounce() dp.DPxSetDinDataOut(0) # initialize LEDs to off # intialize the logStatus dictionary, where new Din logs are accessed self.__logStatus__ = dp.DPxSetDinLog(buffBaseAddr,numBuffFrames) dp.DPxStartDinLog() # tell hardware to start logging state changes in the digital input channels # intialize newData, logs, logsTotal. these are empty until updated with new Din logs self.__newData__ = [] self.__logs__ = [] self.__logsTotal__ = [] # initialize the button number to button string dictionary amd MRI Boolean # this is very rigid, but there is a detailed error if the 'RPxName' # string is misspecified RPxName = RPxName.lower() # not case sensitive if RPxName=="handheld": self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue',4:'white'} self.__mri__ = False elif RPxName=="handheld - mri" or RPxName=="dual - mri": self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue'} self.__mri__ = True elif RPxName=="dual handheld": self.__buttonDict__ = {0:'red',1:'yellow',2:'green',3:'blue'} self.__mri__ = False elif RPxName=="mri 10 button": self.__buttonDict__ = {0:'red left',1:'yellow left',2:'green left',3:'blue left', 4:'white left',5:'red right',6:'yellow right',7:'green right', 8:'blue right',9:'white right'} self.__mri__ = True else: raise Exception("ERROR in %s\nInvalid RESPONSEPixx device name. Valid devices:\n\tHandheld\n\tHandheld - MRI\n\tDual - MRI\n\tDual Handheld\n\tMRI 10 Button" % sys.argv[0]) # intialize button states self.__states__ = {} dp.DPxUpdateRegCache() base10State = dp.DPxGetDinValue() for buttonNum in self: self.__states__[buttonNum] = self.__getState__(base10State,buttonNum) # initialize the DPx time this listener was instantiated self.__startTime__ = dp.DPxGetTime() # intialize the name of this device self.__RPxName__ = RPxName def __del__(self): """ Ensure the Din logs terminate if listener object is deleted. """ self.pause() def __iter__(self): """ The iter() function is defined such that iterating over ButtonListener objects is akin to iterating over the list of button numbers. Now, instead of for buttonNum in list(self.__buttonDict__.keys()): ... we can simply use for buttonNum in self: ... """ return iter(list(self.__buttonDict__.keys())) def __getState__(self,base10State,pinNum): """ This function receives a base 10 representation of all pin states across digital input channels and returns a Boolean indicating whether the pin specified by 'pinNum' is currently active. """ return bool( base10State & (2**pinNum) ) == self.__mri__ # (flip for MRI buttons) def __update_newData_logStatus__(self): """ This function performs a register update, queries the digital input circular buffer, and updates the starting location of any new logs in the circular buffer. It returns a Boolean indicating whether there are new entries in the circular buffer. This function may update the values of __newData__ and __logStatus__ """ dp.DPxUpdateRegCache() dp.DPxGetDinStatus(self.__logStatus__) self.__newData__ = self.__logStatus__['newLogFrames'] return bool(self.__newData__) def __ensureButtonSubsetIntegrity__(self, buttonSubset): """ This function ensures that a user-defined subset of RESPONSEPixx button(s) is in the proper format to be used by other functions in this class. Arguments: buttonSubset (list): is expected as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers if buttonSubset is not in any of these 3 formats, this function will raise a detailed Exception detailing the exact format that is expected for buttonSubset Returns: list: Each element is an integer Exceptions: 1) buttonSubset is not in an expected format (see Arguments) 2) buttonSubset contains buttons that don't exist in buttonDict """ if buttonSubset is None: return list(self.__buttonDict__.keys()) # no further data checks required elif type(buttonSubset) is int: buttonSubset = [buttonSubset] elif type(buttonSubset) is not list or not all([ (type(i) is int) for i in buttonSubset]): raise Exception("ERROR in %s\nInvalid RESPONSEPixx button number(s). buttonSubset must be\n\t1) omitted (return activity for all button)\n\t2) a single integer\n\t3) a list of integer(s)" % sys.argv[0]) buttonSubset = set(buttonSubset) # discard repeated values if not buttonSubset.issubset(self.__buttonDict__.keys()): # ensure that these buttons exist on the button box raise Exception("ERROR in %s\nOne or more elements in buttonSubset are not active buttons" % sys.argv[0]) # buttonSubset passes all integrity checks return list(buttonSubset)
[docs] def updateLogs(self): """ This function allows the user to update the digital input log. They can therefore query the same digital input log multiple times without overwriting them. This function may update the values of __logs__ and __logsTotal__ """ self.__logs__ = [] # list of [time stamp, button number, push/release] for all pin activations in most recent log if self.__update_newData_logStatus__(): # calling __update_newData_logStatus__() ensures register update and logStatus check # check for underflow if self.__logStatus__['numLogUnderflows']: print("\nWARNING in %s\nUnderflows detected in digital input buffer. Recommend increasing 'numBufferFrames'. Current buffer size is %d frames.\n" % (sys.argv[0], self.__logStatus__['numBufferFrames']) ) # iterate through the current logs currentLogs = dp.DPxReadDinLog(self.__logStatus__,self.__newData__) # retrieve current logs for log in currentLogs: # for each sample in the log for buttonNum in self: # for each pin with a key assigned to it nth_currentState = self.__getState__(log[1],buttonNum) # is the nth pin in an active state # the n_th pin is currently activate, and was not previously active if nth_currentState and not self.__states__[buttonNum]: self.__logs__.append([log[0],buttonNum,'Push']) # append timestamp and pin number # self.__logsTotal__.append([log[0],buttonNum,'Push']) # append timestamp and pin number self.__states__[buttonNum] = True # change button state # n_th pin is not currently active, but was previously active elif not nth_currentState and self.__states__[buttonNum]: self.__logs__.append([log[0],buttonNum,'Release']) # append timestamp and pin number # self.__logsTotal__.append([log[0],buttonNum,'Release']) # append timestamp and pin number self.__states__[buttonNum] = False # change button state # else: # # the n_th pin is active, and was previously also active (no state change): # nth_currentState and self.__states__[buttonInd] # # OR # # the n_th pin is not active but was also previously not active (no state change): # not nth_currentState and not self.__states__[buttonInd] # update total log of all button presses for log in self.__logs__: self.__logsTotal__.append(log)
[docs] def getNewButtonActivity(self, buttonSubset=None, push=True, release=False): """ This function allows the user to easily query all unique RESPONSEPixx button events since the most recent call to updateLogs(). This function does not actually update the Din logs. Arguments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers push (bool): indicates whether to include button pushes in the activity log. (default = True) release (bool): indicates whether to include button releases in the activity log. (default = True) Returns: list: Each element of the list is a unique, new RESPONSEPixx button event. Each element is itself a list with 3 elements: [timestamp (float), button number (int), push/release (str)] """ # ensure data hygiene buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) # create empty list buttonActivity = [] for log in self.__logs__: # for each sample in the log for button in buttonSubset: # for all requested buttons if log[1]==button and ((log[2]=="Push" and push) or (log[2]=="Release" and release)): # if this is the correct button, and it's a push when pushes are requested _or_ it's a release when releases are requested buttonActivity.append(log) # add to the activity log return buttonActivity
[docs] def getAllButtonActivity(self, buttonSubset=None, numLogs=None, push=True, release=False): """ This function allows the user to easily query all unique RESPONSEPixx button events since the instantiation of the RPixxListener object. This function does not actually update the Din logs. Arguments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers numLogs (int): indicates how many of the total list of logs to return starting backwards from the most recent log push (bool): indicates whether to include button pushes in the activity log. (default = True) release (bool): indicates whether to include button releases in the activity log. (default = True) Returns: list: Each element of the list is a unique, new RESPONSEPixx button event. Each element is itself a list with 3 elements: [timestamp (float), button number (int), push/release (str)] Exceptions: 1) if numLogs is not a single, positive integer """ # ensure data hygiene buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) n = len(self.__logsTotal__) # get the number of total logs # ensure numLogs is sensible if numLogs is None: numLogs = n elif type(numLogs) is not int or numLogs<=0: raise Exception("ERROR in %s\nInvalid numLogs data type. numLog must a single positive integer" % sys.argv[0]) elif numLogs > n: print("\nWARNING in %s\nRequested number of logs (%d) exceeds available number of logs (%d). Returning all available logs.\n" % (sys.argv[0], numLogs, len(self.__logsTotal__)) ) numLogs = n # create empty list buttonActivity = [] for log in self.__logsTotal__[(n-numLogs):n]: # for each sample in the requested logs for button in buttonSubset: # for all requested buttons if log[1]==button and ((log[2]=="Push" and push) or (log[2]=="Release" and release)): # if this is the correct button, and it's a push when pushes are requested _or_ it's a release when releases are requested buttonActivity.append(log) # add to the activity log return buttonActivity
[docs] def waitForButton(self, buttonSubset=None, timeOut=inf): """ This function allows the user to drop a blocking command whereby execution pauses until one or more unique RESPONSEPixx button(s) are activated. This function does update the Din logs. These updates cannot create conflicts, as this is a blocking function and no other queries can occur concurrently. Arguments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers timeOut (float): reaction time in seconds after which this blocking function terminates Returns: rt (float): the reaction time of the button press in seconds Exceptions: 1) if argument 'timeOut' cannot be casted to float data type or is less than zero. Detailed feedback is given. """ # ensure data hygiene buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) # ensure validity of timeOut if type(float(timeOut)) is not float or timeOut < 0: raise Exception("ERROR in %s\nInvalid 'timeOut' argument. 'timeOut' must a single number greater than or equal to 0" % sys.argv[0]) dp.DPxUpdateRegCache() startTime = dp.DPxGetTime() while True: dp.DPxUpdateRegCache() rt = dp.DPxGetTime() - startTime self.updateLogs() if any(self.isButtonActive(buttonSubset)) or rt >= timeOut: break return rt
[docs] def isButtonActive(self,buttonSubset=None): """ A fast and simple method for returning a list of Booleans indicating whether RESPONSEPixx button(s) are currently active since the most recent call to updateLogs(). This function does not update the Din logs. Arugments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers Returns: list: Each element in the list is a bool, where the ith element indicates whether the ith button in the buttonSubset is active """ # ensure data hygiene buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) return [self.__states__[i] for i in buttonSubset]
[docs] def getButtonString(self,buttonSubset=None): """ A method for returning a list of strings that give a description of all the RESPONSEPixx button(s) indicated by buttonSubset. The buttons specified in buttonSubset are used to retrieve the button strings from the __buttonDict__ attribute of this class. Arugments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers Returns: list: Each element in the list is a string, where the ith element describes the ith button in the buttonSubset (e.g., "white button") """ buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) return [self.__buttonDict__[i] for i in buttonSubset]
[docs] def setButtonLEDs(self, buttonSubset=None, state=None, strength=1.0): """ A method for flipping the state of the RESPONSEPixx button LED(s) indicated by buttonSubset. Arugments: buttonSubset (list): indicates which buttons to return from the activity log. buttonSubset is passed through __ensureButtonSubsetIntegrity__(), which strictly ensures that it is a list of integers buttonSubset can be passed as either 1) omitted (defaulted to None), which specifies to use all buttons 2) a single integer 3) a list of integers state (any): indicates the how to adjust the state of the LED lights. If omitted, it is defaulted to None, which will flip the state of the LEDs with respect to their current state. If this argument is provided, it will be casted to bool. If it cannot be casted to bool, this will raise an Exception. If state is True, this will ensure that the LEDs are turned on, regardless of their current state. If state is False, this will ensure that the LEDs are turned off, regardless of their current state. (default = None) strength (float): indicates the brightness of the LED lights. Must be between 0 and 1. Values out side this range will elicit a warning. Values less than 0 will be defaulted to 0. Values greater than 1 will be defaulted to 1. (default = 1.0) Exceptions: 1) if argument 'state' cannot be casted to bool data type. 2) if argument 'strength' cannot be casted to float data type. These would raise errors in DPxSetDinDataOutStrength() anyways, so may as well catch them here and give detailed feedback. """ # check whether this button box has LEDs if self.__mri__: print("\nWARNING in %s\nResponsePixx device '%s' does not have LED lights.\n" % (sys.argv[0],self.__RPxName__) ) return # validate button list buttonSubset = self.__ensureButtonSubsetIntegrity__(buttonSubset) # check validity of state argument if type(bool(state)) is not bool: raise Exception("ERROR in %s\nInvalid 'state' argument. 'state' must a single value that can be casted to bool" % sys.argv[0]) elif state is not None: # cast it to bool state = bool(state) # check validity of strength argument if type(float(strength)) is not float: raise Exception("ERROR in %s\nInvalid 'strength' argument. 'strength' must a single positive float between 0 and 1" % sys.argv[0]) elif strength < 0 or strength > 1: print("\nWARNING in %s\nLED strength of '%d' will be defaulted to %d. LED values must be between 0 and 1.\n" % (sys.argv[0],strength,strength>1) ) strength = float(strength>1) # ensure the LED states of the buttons are initially the same requiredDinDataOutState = dp.DPxGetDinDataOut() # Current state # now perform the necessary operations to the state of buttons in buttonSubset for pinNum in buttonSubset: nth_currentState = self.__getState__(requiredDinDataOutState, pinNum+16)==0 # gives the current state if nth_currentState and ((state is None) or (not state)): # is on, needs to be off requiredDinDataOutState -= 2**(pinNum+16) # turn off elif (not nth_currentState) and ((state is None) or state): # is off, and needs to be on requiredDinDataOutState += 2**(pinNum+16) # turn on # set Din data out values dp.DPxSetDinDataDir(requiredDinDataOutState) dp.DPxSetDinDataOut(requiredDinDataOutState) dp.DPxSetDinDataOutStrength(strength) dp.DPxUpdateRegCache()
[docs] def getStartTime(self): """ Simply returns the time of this ButtonListener was instantiated. Time stamp is grabbed from the DPx box. Returns: self.__startTime__ (float): time stamp for when this object was instantiated in DPx time. """ return self.__startTime__
[docs] def getRPixxName(self): """ Simply returns the name of the RESPONSEPixx type. This was passed to ButtonListener upon instantiation. Returns: self.__RPxName__ (str): name of the RESPONSEPixx device type. """ return self.__RPxName__
[docs] def isListening(self): """ Query whether the DATAPixx digital inputs are currently being logged. Returns: (bool): indicates whether the Din logs are running """ return bool(dp.DPxIsDinLogEvents())
[docs] def pause(self): """ Stop the current Din log. """ dp.DPxStopDinLog() dp.DPxUpdateRegCache()
[docs] def resume(self): """ Start the current Din log. """ dp.DPxStartDinLog() dp.DPxUpdateRegCache()