"""
==========
Cyton Base
==========
"""
import re
import time
import logging
import pickle
from threading import Thread
from datetime import datetime
from functools import cached_property
from abc import ABCMeta, abstractmethod
from multiprocessing import Process, Manager
from typing import Optional, Union, Literal, Dict, List, TypeVar, Any
import numpy as np
from kafka import KafkaConsumer
from .binary_stream import BinaryStream
from ..utils import HDF5Writer, interpolate_datetime
DAISY = Literal['auto', True, False]
QUEUE = TypeVar('Queue')
########################################################################
[docs]class CytonConstants:
"""Default constants defined in the `Cyton SDK <https://docs.openbci.com/Cyton/CytonSDK/>`_"""
VREF = 4.5
BIN_HEADER = 0xa0
VERSION = b'V'
START_STREAM = b'b'
STOP_STREAM = b's'
DEACTIVATE_CHANEL = b'12345678qwertyui'
ACTIVATE_CHANEL = b'!@#$%^&*QWERTYUI'
CHANNEL_SETTING = b'12345678QWERTYUI'
TEST_GND = b'0' # Connect to internal GND (VDD - VSS)
TEST_1X_SLOW = b'-' # Connect to test signal 1xAmplitude, slow pulse
TEST_1X_FAST = b'=' # Connect to test signal 1xAmplitude, fast pulse
TEST_DC = b'p' # Connect to DC signal
TEST_2X_SLOW = b'[' # Connect to test signal 2xAmplitude, slow pulse
TEST_2X_FAST = b']' # Connect to test signal 2xAmplitude, fast pulse
POWER_DOWN_ON = b'0'
POWER_DOWN_OFF = b'1'
GAIN_1 = b'0'
GAIN_2 = b'1'
GAIN_4 = b'2'
GAIN_6 = b'3'
GAIN_8 = b'4'
GAIN_12 = b'5'
GAIN_24 = b'6'
ADSINPUT_NORMAL = b'0'
ADSINPUT_SHORTED = b'1'
ADSINPUT_BIAS_MEAS = b'2'
ADSINPUT_MVDD = b'3'
ADSINPUT_TEMP = b'4'
ADSINPUT_TESTSIG = b'5'
ADSINPUT_BIAS_DRP = b'6'
ADSINPUT_BIAS_DRN = b'7'
BIAS_REMOVE = b'0'
BIAS_INCLUDE = b'1'
SRB2_DISCONNECT = b'0'
SRB2_CONNECT = b'1'
SRB1_DISCONNECT = b'0'
SRB1_CONNECT = b'1'
DEFAULT_CHANNELS_SETTINGS = b'd'
DEFAULT_CHANNELS_SETTINGS_REPORT = b'D'
TEST_SIGNAL_NOT_APPLIED = b'0'
TEST_SIGNAL_APPLIED = b'1'
SD_DATA_LOGGING_5MIN = b'A'
SD_DATA_LOGGING_15MIN = b'S'
SD_DATA_LOGGING_30MIN = b'F'
SD_DATA_LOGGING_1HR = b'G'
SD_DATA_LOGGING_2HR = b'H'
SD_DATA_LOGGING_4HR = b'J'
SD_DATA_LOGGING_12HR = b'K'
SD_DATA_LOGGING_24HR = b'L'
SD_DATA_LOGGING_14S = b'a'
SD_DATA_LOGGING_STOP = b'j'
QUERY_REGISTER = b'?'
SOFT_RESET = b'v'
USE_8CH_ONLY = b'c'
USE_16CH_ONLY = b'C'
TIME_STAMP_ON = b'<'
TIME_STAMP_OFF = b'>'
SAMPLE_RATE_16KSPS = b'~0'
SAMPLE_RATE_8KSPS = b'~1'
SAMPLE_RATE_4KSPS = b'~2'
SAMPLE_RATE_2KSPS = b'~3'
SAMPLE_RATE_1KSPS = b'~4'
SAMPLE_RATE_500SPS = b'~5'
SAMPLE_RATE_250SPS = b'~6'
SAMPLE_RATE_GET = b'~~'
SAMPLE_RATE_VALUE = {
b'~0': 16e3,
b'~1': 8e3,
b'~2': 4e3,
b'~3': 2e3,
b'~4': 1e3,
b'~5': 500,
b'~6': 250,
}
BOARD_MODE_DEFAULT = b'/0' # Sends accelerometer data in aux bytes
BOARD_MODE_DEBUG = b'/1' # Sends serial output over the external serial
# port which is helpful for debugging.
BOARD_MODE_ANALOG = b'/2' # Reads from analog pins A5(D11), A6(D12) and
# if no wifi shield is present, then A7(D13)
# as well.
BOARD_MODE_DIGITAL = b'/3' # Reads from analog pins D11, D12 and D17.
# If no wifi present then also D13 and D18.
BOARD_MODE_MARKER = b'/4' # Turns accel off and injects markers
# into the stream by sending `X where X is any
# char to add to the first AUX byte.
BOARD_MODE_GET = b'//' # Get current board mode
BOARD_MODE_VALUE = {
b'/0': 'default',
b'/1': 'debug',
b'/2': 'analog',
b'/3': 'digital',
b'/4': 'marker',
}
WIFI_SHIELD_ATTACH = b'{'
WIFI_SHIELD_DEATTACH = b'}'
WIFI_SHIELD_STATUS = b':'
WIFI_SHIELD_RESET = b';'
AD1299_GAIN_REGISTER = {
'000': 1,
'001': 2,
'010': 4,
'011': 6,
'100': 8,
'101': 12,
'110': 24,
}
# ----------------------------------------------------------------------
def __init__(self) -> None:
""""""
########################################################################
[docs]class CytonBase(CytonConstants, metaclass=ABCMeta):
"""
The Cyton data format and SDK define all interactions and capabilities of
the board, the full instructions can be found in the official documentation.
* https://docs.openbci.com/Cyton/CytonDataFormat/
* https://docs.openbci.com/Cyton/CytonSDK/
daisy
Daisy board can be detected on runtime or declare it specifically.
montage
A list means consecutive channels e.g. `['Fp1', 'Fp2', 'F3', 'Fz',
'F4']` and a dictionary means specific channels `{1: 'Fp1', 2: 'Fp2',
3: 'F3', 4: 'Fz', 5: 'F4'}`.
streaming_package_size
The streamer will try to send packages of this size, this is NOT the
sampling rate for data acquisition.
capture_stream
Indicates if the data from the stream will be captured in asynchronous
mode.
"""
# ----------------------------------------------------------------------
def __init__(self, daisy: DAISY, montage: Optional[Union[list, dict]],
streaming_package_size: int, capture_stream: bool, board_id: str = '', parallel_boards: int = 1) -> None:
""""""
# Default values
self.sample_rate = 250
self.boardmode = 'default'
self.closed = False
self.board_id = board_id
self.parallel_boards = parallel_boards
# Daisy before Montage
if daisy in [True, False]:
self.daisy = daisy
elif daisy == 'auto':
self.daisy = self.daisy_attached()
# Gain
self._gain = None
# Montage
self.montage = montage
# Data Structure with special queue that can live across process
self._data_eeg = Manager().Queue()
self._data_aux = Manager().Queue()
self._data_markers = Manager().Queue()
self._data_timestamp = Manager().Queue()
self.reset_input_buffer()
self._auto_capture_stream = capture_stream
self.streaming_package_size = streaming_package_size
# ----------------------------------------------------------------------
@property
def montage(self) -> Union[List, Dict]:
"""The current montage configured on initialization."""
return self._montage
# ----------------------------------------------------------------------
@montage.setter
def montage(self, montage: Union[List, Dict, None]) -> None:
"""Define the information with que electrodes names.
A list means consecutive channels e.g. `['Fp1', 'Fp2', 'F3', 'Fz',
'F4']` and a dictionary means specific channels `{1: 'Fp1', 2: 'Fp2',
3: 'F3', 4: 'Fz', 5: 'F4'}`. Internally the montage is always a
dictionary.
Parameters
----------
montage :
Object for generate the montage parameter.
"""
if isinstance(montage, (bytes)):
montage = pickle.loads(montage)
if isinstance(montage, (list, tuple, range)):
self._montage = {i + 1: ch for i, ch in enumerate(montage)}
elif isinstance(montage, (dict)):
self._montage = {i + 1: ch for i,
ch in enumerate(montage.values())}
else:
# Default
if self.daisy:
self._montage = {i + 1: f'ch{i+1}' for i in range(16)}
elif not self.daisy:
self._montage = {i + 1: f'ch{i+1}' for i in range(8)}
# ----------------------------------------------------------------------
[docs] def deactivate_channel(self, channels: List[int]) -> None:
"""Deactivate the channels specified.
Parameters
----------
channels :
1-based indexing channels.
"""
chain = ''.join([chr(self.DEACTIVATE_CHANEL[ch - 1])
for ch in channels]).encode()
self.command(chain)
# ----------------------------------------------------------------------
[docs] def activate_channel(self, channels: List[int]) -> None:
"""Activate the channels specified.
Parameters
----------
channels :
1-based indexing channels.
"""
chain = ''.join([chr(self.ACTIVATE_CHANEL[ch - 1])
for ch in channels]).encode()
self.command(chain)
# ----------------------------------------------------------------------
[docs] def command(self, c: Union[str, bytes]) -> str:
"""Send a command to device.
Before send the commmand the input buffer is cleared, and after that
waits 300 ms for a response. Is possible to send a raw bytes, a
`CytonConstants` attribute or the constant name e.g.
>>> command(b'~4')
>>> command(CytonConstants.SAMPLE_RATE_1KSPS)
>>> command('SAMPLE_RATE_1KSPS')
Parameters
----------
c
Command to send.
Returns
-------
str
If the command generate a response this will be returned.
"""
if hasattr(self, c.decode()):
c = getattr(self, c.decode())
# asume that default is 250, then, new values are getted from commands
if c in list(self.SAMPLE_RATE_VALUE.keys()):
self.sample_rate = int(self.SAMPLE_RATE_VALUE[c])
# asume that default is `default`, then, new values are getted from commands
if c in list(self.BOARD_MODE_VALUE.keys()):
self.boardmode = self.BOARD_MODE_VALUE[c]
self.reset_input_buffer()
self.write(c)
time.sleep(0.3)
response = self.read(2**11)
logging.info(f'Writing: {c}')
if response and len(response) > 100:
logging.info(f'Response: {response[:100]}...')
else:
logging.info(f'Response: {response}')
if response and hasattr(response, 'encode'):
response = response.encode()
elif response and isinstance(response, (list)):
response = ''.join([chr(r) for r in response]).encode()
return response
# ----------------------------------------------------------------------
[docs] def channel_settings(self, channels: List[int],
power_down: Optional[bytes] = CytonConstants.POWER_DOWN_ON,
gain: Optional[bytes] = CytonConstants.GAIN_24,
input_type: Optional[bytes] = CytonConstants.ADSINPUT_NORMAL,
bias: Optional[bytes] = CytonConstants.BIAS_INCLUDE,
srb2: Optional[bytes] = CytonConstants.SRB2_CONNECT,
srb1: Optional[bytes] = CytonConstants.SRB1_DISCONNECT) -> None:
"""Channel Settings commands.
Parameters
----------
channels
1-based indexing channels list that will share the configuration
specified.
power_down
`POWER_DOWN_ON` (default), `POWER_DOWN_OFF`.
gainoptional
`GAIN_24` (default), `GAIN_12`, `GAIN_8`, `GAIN_6`, `GAIN_4`,
`GAIN_2`, `GAIN_1`.
input_type
Select the ADC channel input source: `ADSINPUT_NORMAL` (default),
`ADSINPUT_SHORTED`, `ADSINPUT_BIAS_MEAS`, `ADSINPUT_MVDD`,
`ADSINPUT_TEMP`, `ADSINPUT_TESTSIG`, `ADSINPUT_BIAS_DRP`,
`ADSINPUT_BIAS_DRN`,
bias
Select to include the channel input in BIAS generation:
`BIAS_INCLUDE` (default), `BIAS_REMOVE`.
srb2
Select to connect this channel’s P input to the SRB2 pin. This
closes a switch between P input and SRB2 for the given channel, and
allows the P input also remain connected to the ADC: `SRB2_CONNECT`
(default), `SRB2_DISCONNECT`.
srb1
Select to connect all channels’ N inputs to SRB1. This effects all
pins, and disconnects all N inputs from the ADC: `SRB1_DISCONNECT`
(default), `SRB1_CONNECT`.
Returns
-------
On success:
* If streaming, no confirmation of success. Note: WiFi Shields will always get a response, even if streaming.
* If not streaming, returns Success: Channel set for 3$$$, where 3 is the channel that was requested to be set.
On failure:
* If not streaming, NOTE: WiFi shield always sends the following responses without $$$
* Not enough characters received, Failure: too few chars$$$ (example user sends x102000X)
* 9th character is not the upper case `X`, Failure: 9th char not X$$$ (example user sends x1020000V)
* Too many characters or some other issue, Failure: Err: too many chars$$$
* If not all commands are not received within 1 second, Timeout processing multi byte message - please send all commands at once as of v2$$$
"""
self.reset_input_buffer()
start = b'x'
end = b'X'
chain = b''
if isinstance(srb2, (bytes, str)):
srb2 = [srb2] * len(channels)
for ch, srb2_ in zip(channels, srb2):
ch = chr(self.CHANNEL_SETTING[ch - 1]).encode()
c = b''.join([start, ch, power_down, gain, input_type, bias, srb2_,
srb1, end])
if len(chain + c) > 31: # Over WiFi there is a limit of 31 chars
self.command(chain)
chain = c
else:
chain += c
self.command(chain)
# ----------------------------------------------------------------------
[docs] def leadoff_impedance(self, channels: List[int],
pchan: Optional[bytes] = CytonConstants.TEST_SIGNAL_NOT_APPLIED,
nchan: Optional[bytes] = CytonConstants.TEST_SIGNAL_APPLIED) -> None:
"""LeadOff Impedance Commands
Parameters
----------
channels
1-based indexing channels list that will share the configuration
specified.
pchan
`TEST_SIGNAL_NOT_APPLIED` (default), `TEST_SIGNAL_APPLIED`.
nchan
`TEST_SIGNAL_APPLIED` (default), `TEST_SIGNAL_NOT_APPLIED`.
Returns
-------
On success:
* If streaming, no confirmation of success. Note: WiFi Shields will always get a response, even if streaming.
* If not streaming, returns Success: Lead off set for 4$$$, where 4 is the channel that was requested to be set.
On failure:
* If not streaming, NOTE: WiFi shield always sends the following responses without $$$
* Not enough characters received, Failure: too few chars$$$ (example user sends z102000Z)
* 5th character is not the upper case ‘Z’, Failure: 5th char not Z$$$ (example user sends z1020000X)
* Too many characters or some other issue, Failure: Err: too many chars$$$
* If not all commands are not received within 1 second, Timeout processing multi byte message - please send all commands at once as of v2$$$
"""
start = b'z'
end = b'Z'
self.reset_input_buffer()
chain = b''
for ch in channels:
ch = chr(self.CHANNEL_SETTING[ch - 1]).encode()
c = b''.join([start, ch, pchan, nchan, end])
if len(chain + c) > 31: # Over WiFi there is a limit of 31 chars
self.command(chain)
chain = c
else:
chain += c
return self.command(chain)
# ----------------------------------------------------------------------
[docs] def send_marker(self, marker: Union[str, bytes, int], burst: int = 4) -> None:
"""Send marker to device.
The marker sended will be added to the `AUX` bytes in the next data
input.
The OpenBCI markers does not work as well as expected, so this module
implement an strategy for make it works. A burst markers are sended but
just one are readed, this add a limitation: No more that one marker each
300 ms are permitted.
Parameters
----------
marker
A single value with the desired marker. Only can be a capitalized
letter, or an integer between 65 and 90. These limitations are
imposed by this library and not by the SDK
burst
How many times the marker will be send.
"""
if isinstance(marker, int):
marker = chr(marker)
elif isinstance(marker, bytes):
marker = marker.decode()
if ord('A') <= ord(marker) <= ord('Z'):
self.write(f'`{marker}'.encode() * burst)
else:
logging.warning(
f'Marker must be between {ord("A")} and {ord("Z")}')
# ----------------------------------------------------------------------
[docs] def daisy_attached(self) -> bool:
"""Check if a Daisy module is attached.
This command will activate the Daisy module is this is available.
Returns
-------
bool
Daisy module activated.
"""
response = self.command(self.USE_16CH_ONLY)
if not response:
return self.daisy_attached()
daisy = not (('no daisy to attach' in response.decode(errors='ignore')) or
('8' in response.decode(errors='ignore')))
if daisy:
logging.info('Daisy detected.')
else:
logging.info('Daisy not detected.')
# ----------------------------------------------------------------------
[docs] def capture_stream(self) -> None:
"""Create a process for connecting to the stream and capture data from it."""
# To prevent circular imports
from .consumer import OpenBCIConsumer
self.reset_buffers()
def bulk_buffer():
""""""
with OpenBCIConsumer() as stream:
for message in stream:
if message.topic == 'eeg':
eeg = message.value['data']
self._data_eeg.put(eeg)
timestamp = np.zeros(
message.value['context']['samples'])
timestamp[-1] = message.value['context']['timestamp.binary']
self._data_timestamp.put(timestamp)
elif message.topic == 'aux':
aux = message.value['data']
self._data_aux.put(aux)
# timestamp = np.zeros(message.value['context']['samples'])
# timestamp[-1] = message.value['context']['timestamp.binary']
# self._data_timestamp.put(timestamp)
elif message.topic == 'marker':
# timestamp = message.value['timestamp']
timestamp = message.timestamp / 1000
marker = message.value['marker']
self._data_markers.put((timestamp, marker))
self.persistent_process = Process(target=bulk_buffer)
self.persistent_process.start()
# ----------------------------------------------------------------------
[docs] def start_stream(self) -> None:
"""Create the binary stream channel."""
# self.binary_stream = BinaryStream(
# self.streaming_package_size, self.board_id)
if not hasattr(set, 'binary_stream'):
self.binary_stream = BinaryStream(
self.streaming_package_size, self.board_id)
self.reset_buffers()
self.reset_input_buffer()
if self._auto_capture_stream:
self.capture_stream()
# ----------------------------------------------------------------------
[docs] def stop_stream(self) -> None:
"""Stop the acquisition daemon if exists."""
if hasattr(self, 'persistent_process'):
self.persistent_process.terminate()
self.persistent_process.join()
# ----------------------------------------------------------------------
[docs] @abstractmethod
def close(self):
"""Stops data stream."""
self.closed = True
# ----------------------------------------------------------------------
[docs] @abstractmethod
def write(self):
"""Write bytes."""
# ----------------------------------------------------------------------
[docs] @abstractmethod
def read(self):
"""Read binary data."""
# ----------------------------------------------------------------------
# ----------------------------------------------------------------------
@property
def eeg_buffer(self) -> QUEUE:
"""Return the deserialized data buffer for EEG."""
return self._data_eeg
# ----------------------------------------------------------------------
@property
def timestamp_buffer(self) -> QUEUE:
"""Return the deserialized data buffer for timestamps."""
return self._data_timestamp
# ----------------------------------------------------------------------
@property
def markers_buffer(self) -> QUEUE:
"""Return the deserialized data buffer for markers."""
return self._data_markers
# ----------------------------------------------------------------------
@property
def aux_buffer(self) -> QUEUE:
"""Return the deserialized data buffer for auxiliar data."""
return self._data_aux
# ----------------------------------------------------------------------
@cached_property
def eeg_time_series(self) -> np.ndarray:
"""Return data acquired in shape (`channels, time`)."""
eeg = [self._data_eeg.get() for _ in range(self._data_eeg.qsize())]
if eeg:
eeg = np.concatenate(eeg, axis=1)
return eeg
logging.warning(
f'No EEG data captured, make sure to activate `capture_stream` in the {self} instantiation\n'
f'This too could be because the `stream_eeg` daemon was not running.')
return np.array([])
# ----------------------------------------------------------------------
@cached_property
def aux_time_series(self) -> np.ndarray:
"""Return auxiliar data acquired in shape (`AUX, time`)."""
aux = [self._data_aux.get() for _ in range(self._data_aux.qsize())]
try:
aux = np.concatenate(aux, axis=1)
except np.AxisError:
aux = np.concatenate(aux, axis=0)
return aux
# ----------------------------------------------------------------------
@cached_property
def timestamp_time_series(self) -> np.ndarray:
"""Return timestamps acquired.
Since there is only one timestamp for package, the others values are
interpolated.
"""
timestamp = [self._data_timestamp.get()
for _ in range(self._data_timestamp.qsize())]
timestamp = np.concatenate(timestamp, axis=0)
length = self.eeg_time_series.shape[1]
sample_rate = self.sample_rate
timestamp = interpolate_datetime(timestamp, length)
return timestamp
# ----------------------------------------------------------------------
@cached_property
def markers(self) -> Dict[str, list]:
"""Return a dictionary with markes as keys and a list of timestamps as
values e.g
>>> {'LEFT': [1603150888.752062, 1603150890.752062, 1603150892.752062],
'RIGHT': [1603150889.752062, 1603150891.752062, 1603150893.752062],}
"""
markers = {}
for t, marker in [self._data_markers.get() for _ in range(self._data_markers.qsize())]:
markers.setdefault(marker, []).append(t)
return markers
# ----------------------------------------------------------------------
[docs] def reset_buffers(self) -> None:
"""Discard buffers."""
for buffer in [self._data_eeg,
self._data_aux,
self._data_markers,
self._data_timestamp,
]:
try:
with buffer.mutex:
buffer.clear()
except AttributeError:
while not buffer.empty():
buffer.get()
for cached in ['eeg_time_series',
'aux_time_series',
'markers_time_series',
'timestamp_time_series',
]:
if cached in self.__dict__:
del self.__dict__[cached]
# ----------------------------------------------------------------------
[docs] def stream(self, duration: int) -> None:
"""Start a synchronous process for start stream data.
This call will hangs until durations be completed.
Parameters
----------
duration
Seconds for data collection.
"""
self.start_stream()
time.sleep(duration)
self.stop_stream()
# ----------------------------------------------------------------------
[docs] def listen_stream_markers(self, host: Optional[str] = 'localhost',
topics: Optional[List[str]] = ['markers']) -> None:
"""Redirect markers form Kafka stream to board, this feature needs
`markers` boardmode configured."""
bootstrap_servers = [f'{host}:9092']
markers_consumer = KafkaConsumer(
bootstrap_servers=bootstrap_servers,
value_deserializer=pickle.loads,
auto_offset_reset='latest',
)
markers_consumer.subscribe(topics)
def _send_marker():
for message in markers_consumer:
marker = message.value
try:
self.send_marker(marker)
print(f"MARKER {marker}")
except Exception as e:
print(f"MARKER {e}")
self.stream_markers = Thread(target=_send_marker)
self.stream_markers.start()
# ----------------------------------------------------------------------
[docs] def save(self, filename: str, montage_name: str,
sample_rate: Optional[int] = None) -> None:
"""Create a hdf file with acquiered data.
Parameters
----------
filename
Path with the destination of the hdf file.
montage_name
Montage name for MNE e.g 'standard_1020'.
sample_rate
The sampling rate for acquired data.
"""
if sample_rate is None:
sample_rate = self.sample_rate
header = {'sample_rate': sample_rate,
'datetime': datetime.now().timestamp(),
'montage': montage_name,
'channels': self.montage,
}
with HDF5Writer(filename) as writer:
writer.add_header(header)
eeg = self.eeg_time_series
time = self.timestamp_time_series
aux = self.aux_time_series
writer.add_eeg(eeg, time)
writer.add_aux(aux, time)
writer.add_markers(self.markers)
logging.info(
f'Writed a vector of shape ({eeg.shape}) for EEG data')
logging.info(
f'Writed a vector of shape ({time.shape}) for time data')
logging.info(
f'Writed a vector of shape ({aux.shape}) for aux data')
if bool(self.markers):
logging.info(f'Writed {self.markers.keys()} markers')
# ----------------------------------------------------------------------
[docs] def __getattribute__(self, attr: str) -> Any:
"""Some attributes must be acceded from RPyC."""
if super().__getattribute__('remote_host'):
return getattr(super().__getattribute__('remote_host'), attr)
return super().__getattribute__(attr)