Stream markers with Kafka

There are two ways to create markers: (i) using the markers board mode and (ii) writing directly on the Kafka stream. . In this section we are to use the second one, using kafka-python to create a produser with a specific topic. This is the recomended way to stream marker. This is the recommended way to stream marker for OpenBCI-Stream.

To create a Kafka produser with Python is very simple:

[1]:
from kafka import KafkaProducer
import pickle

marker_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                compression_type='gzip',
                                value_serializer=pickle.dumps,
                               )

The bootstrap_servers are a list of IP:PORT where is running Kafka (OpenBCI-Stream no need more than one server), and value_serializer is the method for serialize data, since the consumer will deserialize others kind of data is a good practice serialize all of them with this method.

The individual markers, are streamed with:

[ ]:
marker = 'LEFT'
marker_producer.send('marker', marker)

The timestamps are registered automatically in background.

Read streamed markers

[9]:
from openbci_stream.acquisition import OpenBCIConsumer

with OpenBCIConsumer() as stream:
    for message in stream:
        if message.topic == 'marker':

            if message.value['marker'] == 'stop':
                break
            else:
                print(message.value)
WARNING:kafka.coordinator.consumer:group_id is None: disabling auto-commit.
{'timestamp': 1603138253.3387, 'marker': 'Left'}
{'timestamp': 1603138254.796022, 'marker': 'Right'}
{'timestamp': 1603138256.704934, 'marker': 'Right'}
{'timestamp': 1603138258.143958, 'marker': 'Left'}

Redirect markers into the OpenBCI board

The package Cyton uses the compatible board mode for create markers, but is possible to redirect the streamed markers from Kafka to the board, the methods listen_stream_markers automatically create a producer and redirect markers from kafka to OpenBCI.

[ ]:
from openbci_stream.acquisition import Cyton

openbci = Cyton('serial', capture_stream=True)

openbci.listen_stream_markers(host='localhost:9092')

...