Source code for openbci_stream.daemons.stream_eeg
"""
=============
Binary to EEG
=============
A transformer for Kafka that reads binary data and stream EEG data.
Binary -> Kafka-Transformer -> EEG
For examples and descriptions refers to documentation:
`Data storage handler <../A1-raw_cleaning.ipynb>`_
"""
import sys
import pickle
# import struct
# from functools import cached_property
import numpy as np
# from multiprocessing import Pool
# from datetime import datetime
# import rawutil
import logging
from kafka import KafkaConsumer, KafkaProducer
from typing import TypeVar # , List, Dict, Tuple, Any
from queue import LifoQueue as queue
from openbci_stream.utils import autokill_process
autokill_process(name=f'binary_2_eeg_base')
DEBUG = '--debug' in sys.argv
if DEBUG:
logging.getLogger().setLevel(logging.DEBUG)
# logging.getLogger('kafka').setLevel(logging.WARNING)
KafkaStream = TypeVar('kafka-stream')
########################################################################
[docs]class EEG:
"""Kafka transformer with parallel implementation for processing binary raw
data into EEG microvolts. This script requires the Kafka daemon running and
enables an `auto-kill process <openbci_stream.utils.pid_admin.rst#module-openbci_stream.utils.pid_admin>`_
"""
# ----------------------------------------------------------------------
def __init__(self):
""""""
self.consumer_eegn = KafkaConsumer(
bootstrap_servers=['localhost:9092'],
value_deserializer=pickle.loads,
auto_offset_reset='latest',
)
self.consumer_eegn.subscribe(['eeg0', 'eeg1', 'eeg2', 'eeg3'])
self.producer_eeg = KafkaProducer(
bootstrap_servers=['localhost:9092'],
compression_type='gzip',
value_serializer=pickle.dumps,
batch_size=2**16,
)
# ----------------------------------------------------------------------
[docs] def consume(self) -> None:
"""Infinite loop for read Kafka stream."""
eeg_historical = {
'eeg0': queue(),
'eeg1': queue(),
'eeg2': queue(),
'eeg3': queue(),
}
while True:
for record in self.consumer_eegn:
logging.info(
f"processing {record.topic}:{len(record.value['data'])}"
)
eeg_historical[record.topic].put(record)
count = record.value['context']['parallel_boards']
data = [
eeg_historical[f'eeg{c}'].qsize() for c in range(count)
]
while min(data) > 0:
if all(data):
logging.debug(f'Preparing stream {data}')
stream = [
eeg_historical[f'eeg{c}'].get()
for c in range(count)
]
eeg_data = [s.value['data'] for s in stream]
context = stream[0].value['context']
# montage
logging.debug('Creating montage')
montage = [
s.value['context']['montage'] for s in stream
]
montage_ = []
for items, i in zip(
[list(m.items()) for m in montage],
[0] + [len(m) for m in montage][:-1],
):
items, i
montage_.extend(
[(m[0] + i, m[1]) for m in items]
)
montage = dict(montage_)
# context
logging.debug('Creating context')
context['montage'] = montage
context['timestamp.binary'] = [
s.value['context']['timestamp.binary']
for s in stream
]
context['daisy'] = [
s.value['context']['daisy'] for s in stream
]
context['timestamp.eeg'] = [
s.value['context']['timestamp.eeg']
for s in stream
]
context['timestamp.binary.consume'] = [
s.value['context']['timestamp.binary.consume']
for s in stream
]
context['buffer'] = data
context['sample_ids'] = [
s.value['context']['sample_ids'] for s in stream
]
logging.debug(f"IDs: {context['sample_ids']}")
# NO CUT DATA
logging.debug('Preparing EEG data')
# cuteeg = min([d[0].shape[1] for d in eeg_data])
eeg = np.concatenate(
[d[0] for d in eeg_data], axis=0
)
# if cuteeg:
# logging.debug(f'>Loaded {cuteeg} samples')
if eeg_data[0][1].size:
logging.debug(f'Preparing AUX data')
# cutaux = min([d[1].shape[1] for d in eeg_data])
aux = np.concatenate(
[d[1] for d in eeg_data], axis=0
)
# if cutaux:
# logging.debug(f'Loaded {cutaux} samples')
else:
logging.debug(f'No Auxiliar data')
aux = None
# EEG
logging.debug(f'Streaming EEG({eeg.shape})')
context['samples'] = eeg.shape[1]
self.producer_eeg.send(
'eeg',
{'context': context.copy(), 'data': eeg.copy()},
)
# fut_eeg = self.producer_eeg.send(
# 'eeg', {'context': context.copy(), 'data': eeg.copy()})
# try:
# fut_eeg.get()
# except Exception as e:
# logging.error(e)
# Aux
if aux.size:
logging.debug(f'Streaming AUX({aux.shape})')
context['samples'] = aux.shape[1]
self.producer_eeg.send(
'aux',
{
'context': context.copy(),
'data': aux.copy(),
},
)
# fut_aux = self.producer_eeg.send(
# 'aux', {'context': context.copy(), 'data': aux.copy()})
# try:
# fut_aux.get()
# except Exception as e:
# logging.error(e)
else:
logging.debug(f'Not enought data {data}')
data = [
eeg_historical[f'eeg{c}'].qsize()
for c in range(count)
]
if __name__ == '__main__':
tranformer = EEG()
tranformer.consume()