Stream markers with Kafka¶
There are two ways to create markers: (i) using the markers board mode and (ii) writing directly on the Kafka stream
. . In this section we are to use the second one, using kafka-python to create a produser with a specific topic. This is the recomended way to stream marker. This is the recommended way to stream marker for OpenBCI-Stream.
To create a Kafka produser with Python is very simple:
[1]:
from kafka import KafkaProducer
import pickle
marker_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
compression_type='gzip',
value_serializer=pickle.dumps,
)
The bootstrap_servers
are a list of IP:PORT
where is running Kafka (OpenBCI-Stream no need more than one server), and value_serializer
is the method for serialize data, since the consumer will deserialize others kind of data is a good practice serialize all of them with this method.
The individual markers, are streamed with:
[ ]:
marker = 'LEFT'
marker_producer.send('marker', marker)
The timestamps
are registered automatically in background.
Read streamed markers¶
[9]:
from openbci_stream.acquisition import OpenBCIConsumer
with OpenBCIConsumer() as stream:
for message in stream:
if message.topic == 'marker':
if message.value['marker'] == 'stop':
break
else:
print(message.value)
WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
{'timestamp': 1603138253.3387, 'marker': 'Left'}
{'timestamp': 1603138254.796022, 'marker': 'Right'}
{'timestamp': 1603138256.704934, 'marker': 'Right'}
{'timestamp': 1603138258.143958, 'marker': 'Left'}
Redirect markers into the OpenBCI board¶
The package Cyton uses the compatible board mode for create markers, but is possible to redirect the streamed markers from Kafka
to the board, the methods listen_stream_markers
automatically create a producer and redirect markers from kafka to OpenBCI.
[ ]:
from openbci_stream.acquisition import Cyton
openbci = Cyton('serial', capture_stream=True)
openbci.listen_stream_markers(host='localhost:9092')
...