Source code for openbci_stream.acquisition.tcp_server

TCP server

The module WiFi for OpenBCI receive the instructions to connect with a TCP
server, before that happens, the server must be running and waiting for clients.


import socket
import logging
import asyncore
from datetime import datetime
from typing import Dict, Any, Callable

[docs]class WiFiShieldTCPServer(asyncore.dispatcher): """Create a TCP server that handles the connexión of the WiFi module. Parameters ---------- host IP address of the machine that WiFi module will connect. binary_stream Function that return a kafka producer, this producer could not exist in the very moment of creation of this class instance. kafka_context Funtion that return the information from the acquisition side useful for deserializing and will be packaged back in the stream. """ # ---------------------------------------------------------------------- def __init__(self, host, binary_stream: Callable, kafka_context: Callable) -> None: """""" asyncore.dispatcher.__init__(self) self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.set_reuse_addr() self.bind((host, 0)) self.listen(1) # = data_queue self.kafka_context_ = kafka_context self.binary_stream = binary_stream self._gain = None # ----------------------------------------------------------------------
[docs] def handle_accept(self) -> None: """Redirect the client connection.""" pair = self.accept() if pair is not None: sock, addr = pair'Incoming connection from {addr}') self.handler = asyncore.dispatcher_with_send(sock) self.handler.handle_read = self._handle_read self.handler.handle_error = self._handle_error
# ---------------------------------------------------------------------- def set_gain(self, gain) -> None: """""" self._gain = gain # ---------------------------------------------------------------------- @property def kafka_context(self): """""" return self.kafka_context_() # ----------------------------------------------------------------------
[docs] def _handle_read(self) -> None: """Write the input streaming into the binary stream. There is a maximum of 3000 bytes that can read at once, so it is set to 2970, a 33 multiple. But this not means that read this amount of data on all events, the module WiFi will send with their own consideration. """ kafka_context = self.kafka_context # kafka_context.update({'created':}) if self._gain: kafka_context.update({'gain': self._gain}) else: if kafka_context['daisy']: kafka_context.update({'gain': [24] * 16}) else: kafka_context.update({'gain': [24] * 8}) data = {'context': kafka_context, 'data': self.handler.recv(33 * 90), } self.binary_stream().stream(data)
# ---------------------------------------------------------------------- def _handle_error(self) -> None: """"""
# I'm feeling dirty for this "solution"