You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
111 lines
3.8 KiB
Python
111 lines
3.8 KiB
Python
"""
|
|
Serial Port Reader
|
|
Handles reading data from COM ports
|
|
"""
|
|
import serial
|
|
import threading
|
|
import logging
|
|
from typing import Callable, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SerialPortReader:
|
|
def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self.serial_conn: Optional[serial.Serial] = None
|
|
self.is_connected = False
|
|
self.reader_thread: Optional[threading.Thread] = None
|
|
self.is_running = False
|
|
self.data_callback: Optional[Callable] = None
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to the serial port"""
|
|
try:
|
|
self.serial_conn = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=self.timeout
|
|
)
|
|
self.is_connected = True
|
|
logger.info(f"Connected to {self.port} at {self.baudrate} baud")
|
|
return True
|
|
except serial.SerialException as e:
|
|
logger.error(f"Failed to connect to {self.port}: {e}")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from the serial port"""
|
|
self.is_running = False
|
|
if self.reader_thread:
|
|
self.reader_thread.join(timeout=2)
|
|
|
|
if self.serial_conn and self.serial_conn.is_open:
|
|
self.serial_conn.close()
|
|
self.is_connected = False
|
|
logger.info(f"Disconnected from {self.port}")
|
|
|
|
def set_data_callback(self, callback: Callable):
|
|
"""Set callback function for received data"""
|
|
self.data_callback = callback
|
|
|
|
def start_reading(self):
|
|
"""Start reading from serial port in a background thread"""
|
|
if not self.is_connected:
|
|
if not self.connect():
|
|
return
|
|
|
|
self.is_running = True
|
|
self.reader_thread = threading.Thread(target=self._read_loop, daemon=True)
|
|
self.reader_thread.start()
|
|
logger.info("Serial reader started")
|
|
|
|
def stop_reading(self):
|
|
"""Stop reading from serial port"""
|
|
self.is_running = False
|
|
if self.reader_thread:
|
|
self.reader_thread.join(timeout=2)
|
|
logger.info("Serial reader stopped")
|
|
|
|
def _read_loop(self):
|
|
"""Main reading loop"""
|
|
while self.is_running and self.is_connected:
|
|
try:
|
|
if self.serial_conn and self.serial_conn.in_waiting > 0:
|
|
data = self.serial_conn.readline().decode('utf-8', errors='ignore').strip()
|
|
|
|
if data and self.data_callback:
|
|
self.data_callback({
|
|
'port': self.port,
|
|
'data': data,
|
|
'timestamp': datetime.now().isoformat()
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Error reading from {self.port}: {e}")
|
|
self.is_connected = False
|
|
break
|
|
|
|
def write_data(self, data: str) -> bool:
|
|
"""Write data to the serial port"""
|
|
try:
|
|
if self.is_connected and self.serial_conn:
|
|
if isinstance(data, str):
|
|
data = data.encode('utf-8')
|
|
self.serial_conn.write(data)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing to {self.port}: {e}")
|
|
return False
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current status"""
|
|
return {
|
|
'port': self.port,
|
|
'connected': self.is_connected,
|
|
'running': self.is_running,
|
|
'baudrate': self.baudrate
|
|
}
|