[docs]class BinaryStream:
"""Kafka producer for equal size packages streaming.
The dictionary streamed contain two objects: `data` and `context`.
**data:** Bytes with binary raw data.
**context:** A dictionary with the following keys:
* **created:** The `timestamp` for the exact moment when binary data was read.
* **daisy:** `True` if Daisy board is attached, otherwise `False`.
* **boardmode:** Can be `default`, `digital`, ''analog', 'debug' or `marker`.
* **montage:** A list means consecutive channels e.g. `['Fp1', 'Fp2', 'F3', 'Fz', 'F4']` and a dictionary means specific channels `{1: 'Fp1', 2: 'Fp2', 3: 'F3', 4: 'Fz', 5: 'F4'}`.
* **connection:** Can be `serial` or `wifi`.
* **gain:** Array with gains.
e.g
>>> context = {'created': 1604196938.727064,
'daisy': False,
'boardmode': 'default',
'montage': ['Fp1', 'Fp2', 'F3', 'Fz', 'F4'],
'connection': 'wifi',
'gain': [24, 24, 24, 24, 24, 24, 24, 24]
}
"""
accumulated = b''
# ----------------------------------------------------------------------
[docs] def __init__(self, streaming_package_size: int, board_id: str = '') -> None:
"""
Parameters
----------
streaming_package_size
The package size for streaming packages.
"""
self.topic = f'binary{board_id}'
logging.info(f'Creating {self.topic} Produser')
self.streaming_package_size = streaming_package_size
self.producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
compression_type='gzip',
value_serializer=pickle.dumps,
)
# ----------------------------------------------------------------------
[docs] def stream(self, data: Dict[str, Any]) -> None:
"""This stream attempts to create packages of the same size.
Over WiFi and Daisy, the sampling frequency is doubled in favor to get
all 16 channels, so the size of the packages is also doubled.
Parameters
----------
data
Dictionary with context and raw binary data.
"""
self.accumulated += data['data']
if data['context']['connection'] == 'wifi' and data['context']['daisy']:
f = 2
else:
f = 1
size = self.streaming_package_size * 33 * f
if len(self.accumulated) >= size:
data['data'] = self.accumulated[:size]
data['context']['timestamp.binary'] = datetime.now().timestamp()
self.producer.send(self.topic, data)
self.accumulated = self.accumulated[size:]
# ----------------------------------------------------------------------
[docs] def close(self) -> None:
"""Terminate the produser."""
logging.info(f'Clossing {self.topic} Produser')
self.producer.close(timeout=0.3)