""" COM Port Test Writer - Simulates data being sent to a COM port """ import serial import time import random import logging import sys from typing import Optional from config import ( COM_PORT, BAUD_RATE, TIMEOUT, WRITE_INTERVAL, TEST_DATA_TYPE, SCALES_MIN, SCALES_MAX, SCALES_INCREMENT, DEBUG ) # Setup logging logging.basicConfig( level=logging.DEBUG if DEBUG else logging.INFO, # format='%(asctime)s - %(levelname)s - %(message)s' format='%(message)s' ) logger = logging.getLogger(__name__) class ComPortTestWriter: def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1): self.port = port self.baudrate = baudrate self.timeout = timeout self.serial_conn: Optional[serial.Serial] = None self.is_connected = False self.is_running = False self.data_counter = 0 self.current_scales_value = SCALES_MIN def connect(self) -> bool: """Connect to the serial port""" try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, timeout=self.timeout ) self.is_connected = True logger.info(f"✓ Connected to {self.port} at {self.baudrate} baud") return True except serial.SerialException as e: logger.error(f"✗ Failed to connect to {self.port}: {e}") logger.info("\nTip: Check that the COM port exists and is not in use.") logger.info("You can list available ports with: python -m serial.tools.list_ports") self.is_connected = False return False def disconnect(self): """Disconnect from the serial port""" if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() self.is_connected = False logger.info(f"✓ Disconnected from {self.port}") def write_data(self, data: str) -> bool: """Write data to the serial port""" try: if self.is_connected and self.serial_conn: # Add newline if not present if not data.endswith('\n'): data += '\n' self.serial_conn.write(data.encode('utf-8')) logger.info(f"→ Sent: {data.strip()}") return True except Exception as e: logger.error(f"Error writing to {self.port}: {e}") self.is_connected = False return False def generate_scales_data(self) -> str: """Generate simulated scales data""" # Simulate scales value fluctuating slightly variation = random.uniform(-0.1, 0.1) self.current_scales_value += variation # Keep within bounds self.current_scales_value = max(SCALES_MIN, min(SCALES_MAX, self.current_scales_value)) return f"{self.current_scales_value:.2f} kg" def generate_counter_data(self) -> str: """Generate incrementing counter data""" self.data_counter += 1 return f"Count: {self.data_counter}" def generate_random_data(self) -> str: """Generate random numeric data""" value = random.uniform(10000, 50000) return f"Value: {value:.2f}" def generate_mixed_data(self) -> str: """Generate mixed sensor data""" temperature = random.uniform(20, 30) humidity = random.uniform(30, 70) pressure = random.uniform(1010, 1020) return f"T:{temperature:.1f}C H:{humidity:.1f}% P:{pressure:.1f}hPa" def get_next_data(self) -> str: """Get the next data to send based on test type""" if TEST_DATA_TYPE == 'scales': return self.generate_scales_data() elif TEST_DATA_TYPE == 'counter': return self.generate_counter_data() elif TEST_DATA_TYPE == 'random': return self.generate_random_data() elif TEST_DATA_TYPE == 'mixed': return self.generate_mixed_data() else: return self.generate_scales_data() def run(self): """Run the test writer""" logger.info(f"\n{'='*50}") logger.info(f"COM Port Test Writer") logger.info(f"{'='*50}") logger.info(f"Port: {self.port}") logger.info(f"Baud Rate: {self.baudrate}") logger.info(f"Data Type: {TEST_DATA_TYPE}") logger.info(f"Write Interval: {WRITE_INTERVAL}s") logger.info(f"{'='*50}\n") if not self.connect(): logger.error("Failed to connect to COM port. Exiting.") return self.is_running = True try: logger.info("Press Ctrl+C to stop writing data\n") while self.is_running: try: data = self.get_next_data() self.write_data(data) time.sleep(WRITE_INTERVAL) except KeyboardInterrupt: break except Exception as e: logger.error(f"Error in write loop: {e}") break except KeyboardInterrupt: pass finally: self.is_running = False self.disconnect() logger.info("\n✓ Test writer stopped") def list_available_ports(): """List available COM ports""" try: from serial.tools import list_ports ports = list_ports.comports() if ports: logger.info("\nAvailable COM ports:") for port in ports: logger.info(f" {port.device} - {port.description}") else: logger.info("No COM ports found") except Exception as e: logger.error(f"Error listing ports: {e}") def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser( description='Test COM port writer - continuously writes data to a COM port' ) parser.add_argument( '--port', default=COM_PORT, help=f'COM port to write to (default: {COM_PORT})' ) parser.add_argument( '--baud', type=int, default=BAUD_RATE, help=f'Baud rate (default: {BAUD_RATE})' ) parser.add_argument( '--type', choices=['scales', 'counter', 'random', 'mixed'], default=TEST_DATA_TYPE, help=f'Type of test data (default: {TEST_DATA_TYPE})' ) parser.add_argument( '--interval', type=float, default=WRITE_INTERVAL, help=f'Write interval in seconds (default: {WRITE_INTERVAL})' ) parser.add_argument( '--list', action='store_true', help='List available COM ports and exit' ) args = parser.parse_args() if args.list: list_available_ports() return writer = ComPortTestWriter(args.port, args.baud) writer.run() if __name__ == '__main__': main()