Source code for openbci_stream.acquisition.tcp_server
"""
==========
TCP server
==========
The module WiFi for OpenBCI receive the instructions to connect with a TCP
server, before that happens, the server must be running and waiting for clients.
"""
import socket
import logging
import asyncore
from datetime import datetime
from typing import Dict, Any, Callable
########################################################################
[docs]class WiFiShieldTCPServer(asyncore.dispatcher):
"""Create a TCP server that handles the connexión of the WiFi module.
Parameters
----------
host
IP address of the machine that WiFi module will connect.
binary_stream
Function that return a kafka producer, this producer could not exist in
the very moment of creation of this class instance.
kafka_context
Funtion that return the information from the acquisition side useful for
deserializing and will be packaged back in the stream.
"""
# ----------------------------------------------------------------------
def __init__(self, host, binary_stream: Callable, kafka_context: Callable) -> None:
""""""
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, 0))
self.listen(1)
# self.data = data_queue
self.kafka_context_ = kafka_context
self.binary_stream = binary_stream
self._gain = None
# ----------------------------------------------------------------------
[docs] def handle_accept(self) -> None:
"""Redirect the client connection."""
pair = self.accept()
if pair is not None:
sock, addr = pair
logging.info(f'Incoming connection from {addr}')
self.handler = asyncore.dispatcher_with_send(sock)
self.handler.handle_read = self._handle_read
self.handler.handle_error = self._handle_error
# ----------------------------------------------------------------------
def set_gain(self, gain) -> None:
""""""
self._gain = gain
# ----------------------------------------------------------------------
@property
def kafka_context(self):
""""""
return self.kafka_context_()
# ----------------------------------------------------------------------
[docs] def _handle_read(self) -> None:
"""Write the input streaming into the binary stream.
There is a maximum of 3000 bytes that can read at once, so it is set to
2970, a 33 multiple. But this not means that read this amount of data
on all events, the module WiFi will send with their own consideration.
"""
kafka_context = self.kafka_context
# kafka_context.update({'created': datetime.now().timestamp()})
if self._gain:
kafka_context.update({'gain': self._gain})
else:
if kafka_context['daisy']:
kafka_context.update({'gain': [24] * 16})
else:
kafka_context.update({'gain': [24] * 8})
data = {'context': kafka_context,
'data': self.handler.recv(33 * 90),
}
self.binary_stream().stream(data)
# ----------------------------------------------------------------------
def _handle_error(self) -> None:
""""""
# I'm feeling dirty for this "solution"