You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

219 lines
7.1 KiB
Python

"""
COM Port Test Writer - Simulates data being sent to a COM port
"""
import serial
import time
import random
import logging
import sys
from typing import Optional
from config import (
COM_PORT, BAUD_RATE, TIMEOUT, WRITE_INTERVAL,
TEST_DATA_TYPE, SCALES_MIN, SCALES_MAX, SCALES_INCREMENT, DEBUG
)
# Setup logging
logging.basicConfig(
level=logging.DEBUG if DEBUG else logging.INFO,
# format='%(asctime)s - %(levelname)s - %(message)s'
format='%(message)s'
)
logger = logging.getLogger(__name__)
class ComPortTestWriter:
def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1):
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.serial_conn: Optional[serial.Serial] = None
self.is_connected = False
self.is_running = False
self.data_counter = 0
self.current_scales_value = SCALES_MIN
def connect(self) -> bool:
"""Connect to the serial port"""
try:
self.serial_conn = serial.Serial(
port=self.port,
baudrate=self.baudrate,
timeout=self.timeout
)
self.is_connected = True
logger.info(f"✓ Connected to {self.port} at {self.baudrate} baud")
return True
except serial.SerialException as e:
logger.error(f"✗ Failed to connect to {self.port}: {e}")
logger.info("\nTip: Check that the COM port exists and is not in use.")
logger.info("You can list available ports with: python -m serial.tools.list_ports")
self.is_connected = False
return False
def disconnect(self):
"""Disconnect from the serial port"""
if self.serial_conn and self.serial_conn.is_open:
self.serial_conn.close()
self.is_connected = False
logger.info(f"✓ Disconnected from {self.port}")
def write_data(self, data: str) -> bool:
"""Write data to the serial port"""
try:
if self.is_connected and self.serial_conn:
# Add newline if not present
if not data.endswith('\n'):
data += '\n'
self.serial_conn.write(data.encode('utf-8'))
logger.info(f"→ Sent: {data.strip()}")
return True
except Exception as e:
logger.error(f"Error writing to {self.port}: {e}")
self.is_connected = False
return False
def generate_scales_data(self) -> str:
"""Generate simulated scales data"""
# Step through the range and reverse at each end
if not hasattr(self, '_direction'):
self._direction = 1
step = random.uniform(SCALES_INCREMENT * 0.8, SCALES_INCREMENT * 1.2)
self.current_scales_value += step * self._direction
if self.current_scales_value >= SCALES_MAX:
self._direction = -1
elif self.current_scales_value <= SCALES_MIN:
self._direction = 1
self.current_scales_value = max(SCALES_MIN, min(SCALES_MAX, self.current_scales_value))
return f"{self.current_scales_value:.2f} kg"
def generate_counter_data(self) -> str:
"""Generate incrementing counter data"""
self.data_counter += 1
return f"Count: {self.data_counter}"
def generate_random_data(self) -> str:
"""Generate random numeric data"""
value = random.uniform(10000, 50000)
return f"Value: {value:.2f}"
def generate_mixed_data(self) -> str:
"""Generate mixed sensor data"""
temperature = random.uniform(20, 30)
humidity = random.uniform(30, 70)
pressure = random.uniform(1010, 1020)
return f"T:{temperature:.1f}C H:{humidity:.1f}% P:{pressure:.1f}hPa"
def get_next_data(self) -> str:
"""Get the next data to send based on test type"""
if TEST_DATA_TYPE == 'scales':
return self.generate_scales_data()
elif TEST_DATA_TYPE == 'counter':
return self.generate_counter_data()
elif TEST_DATA_TYPE == 'random':
return self.generate_random_data()
elif TEST_DATA_TYPE == 'mixed':
return self.generate_mixed_data()
else:
return self.generate_scales_data()
def run(self):
"""Run the test writer"""
logger.info(f"\n{'='*50}")
logger.info(f"COM Port Test Writer")
logger.info(f"{'='*50}")
logger.info(f"Port: {self.port}")
logger.info(f"Baud Rate: {self.baudrate}")
logger.info(f"Data Type: {TEST_DATA_TYPE}")
logger.info(f"Write Interval: {WRITE_INTERVAL}s")
logger.info(f"{'='*50}\n")
if not self.connect():
logger.error("Failed to connect to COM port. Exiting.")
return
self.is_running = True
try:
logger.info("Press Ctrl+C to stop writing data\n")
while self.is_running:
try:
data = self.get_next_data()
self.write_data(data)
time.sleep(WRITE_INTERVAL)
except KeyboardInterrupt:
break
except Exception as e:
logger.error(f"Error in write loop: {e}")
break
except KeyboardInterrupt:
pass
finally:
self.is_running = False
self.disconnect()
logger.info("\n✓ Test writer stopped")
def list_available_ports():
"""List available COM ports"""
try:
from serial.tools import list_ports
ports = list_ports.comports()
if ports:
logger.info("\nAvailable COM ports:")
for port in ports:
logger.info(f" {port.device} - {port.description}")
else:
logger.info("No COM ports found")
except Exception as e:
logger.error(f"Error listing ports: {e}")
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(
description='Test COM port writer - continuously writes data to a COM port'
)
parser.add_argument(
'--port',
default=COM_PORT,
help=f'COM port to write to (default: {COM_PORT})'
)
parser.add_argument(
'--baud',
type=int,
default=BAUD_RATE,
help=f'Baud rate (default: {BAUD_RATE})'
)
parser.add_argument(
'--type',
choices=['scales', 'counter', 'random', 'mixed'],
default=TEST_DATA_TYPE,
help=f'Type of test data (default: {TEST_DATA_TYPE})'
)
parser.add_argument(
'--interval',
type=float,
default=WRITE_INTERVAL,
help=f'Write interval in seconds (default: {WRITE_INTERVAL})'
)
parser.add_argument(
'--list',
action='store_true',
help='List available COM ports and exit'
)
args = parser.parse_args()
if args.list:
list_available_ports()
return
writer = ComPortTestWriter(args.port, args.baud)
writer.run()
if __name__ == '__main__':
main()