""" Serial Port Reader Handles reading data from COM ports """ import serial import threading import logging from typing import Callable, Optional from datetime import datetime logger = logging.getLogger(__name__) class SerialPortReader: def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1): self.port = port self.baudrate = baudrate self.timeout = timeout self.serial_conn: Optional[serial.Serial] = None self.is_connected = False self.reader_thread: Optional[threading.Thread] = None self.is_running = False self.data_callback: Optional[Callable] = None def connect(self) -> bool: """Connect to the serial port""" try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout ) self.is_connected = True logger.info(f"Connected to {self.port} at {self.baudrate} baud") return True except serial.SerialException as e: logger.error(f"Failed to connect to {self.port}: {e}") self.is_connected = False return False def disconnect(self): """Disconnect from the serial port""" self.is_running = False if self.reader_thread: self.reader_thread.join(timeout=2) if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() self.is_connected = False logger.info(f"Disconnected from {self.port}") def set_data_callback(self, callback: Callable): """Set callback function for received data""" self.data_callback = callback def start_reading(self): """Start reading from serial port in a background thread""" if not self.is_connected: if not self.connect(): return self.is_running = True self.reader_thread = threading.Thread(target=self._read_loop, daemon=True) self.reader_thread.start() logger.info("Serial reader started") def stop_reading(self): """Stop reading from serial port""" self.is_running = False if self.reader_thread: self.reader_thread.join(timeout=2) logger.info("Serial reader stopped") def _read_loop(self): """Main reading loop""" while self.is_running and self.is_connected: try: if self.serial_conn and self.serial_conn.in_waiting > 0: data = self.serial_conn.readline().decode('utf-8', errors='ignore').strip() if data and self.data_callback: self.data_callback({ 'port': self.port, 'data': data, 'timestamp': datetime.now().isoformat() }) except Exception as e: logger.error(f"Error reading from {self.port}: {e}") self.is_connected = False break def write_data(self, data: str) -> bool: """Write data to the serial port""" try: if self.is_connected and self.serial_conn: if isinstance(data, str): data = data.encode('utf-8') self.serial_conn.write(data) return True except Exception as e: logger.error(f"Error writing to {self.port}: {e}") return False def get_status(self) -> dict: """Get current status""" return { 'port': self.port, 'connected': self.is_connected, 'running': self.is_running, 'baudrate': self.baudrate }