You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
219 lines
7.1 KiB
Python
219 lines
7.1 KiB
Python
"""
|
|
COM Port Test Writer - Simulates data being sent to a COM port
|
|
"""
|
|
import serial
|
|
import time
|
|
import random
|
|
import logging
|
|
import sys
|
|
from typing import Optional
|
|
from config import (
|
|
COM_PORT, BAUD_RATE, TIMEOUT, WRITE_INTERVAL,
|
|
TEST_DATA_TYPE, SCALES_MIN, SCALES_MAX, SCALES_INCREMENT, DEBUG
|
|
)
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if DEBUG else logging.INFO,
|
|
# format='%(asctime)s - %(levelname)s - %(message)s'
|
|
format='%(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class ComPortTestWriter:
|
|
def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1):
|
|
self.port = port
|
|
self.baudrate = baudrate
|
|
self.timeout = timeout
|
|
self.serial_conn: Optional[serial.Serial] = None
|
|
self.is_connected = False
|
|
self.is_running = False
|
|
self.data_counter = 0
|
|
self.current_scales_value = SCALES_MIN
|
|
|
|
def connect(self) -> bool:
|
|
"""Connect to the serial port"""
|
|
try:
|
|
self.serial_conn = serial.Serial(
|
|
port=self.port,
|
|
baudrate=self.baudrate,
|
|
timeout=self.timeout
|
|
)
|
|
self.is_connected = True
|
|
logger.info(f"✓ Connected to {self.port} at {self.baudrate} baud")
|
|
return True
|
|
except serial.SerialException as e:
|
|
logger.error(f"✗ Failed to connect to {self.port}: {e}")
|
|
logger.info("\nTip: Check that the COM port exists and is not in use.")
|
|
logger.info("You can list available ports with: python -m serial.tools.list_ports")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def disconnect(self):
|
|
"""Disconnect from the serial port"""
|
|
if self.serial_conn and self.serial_conn.is_open:
|
|
self.serial_conn.close()
|
|
self.is_connected = False
|
|
logger.info(f"✓ Disconnected from {self.port}")
|
|
|
|
def write_data(self, data: str) -> bool:
|
|
"""Write data to the serial port"""
|
|
try:
|
|
if self.is_connected and self.serial_conn:
|
|
# Add newline if not present
|
|
if not data.endswith('\n'):
|
|
data += '\n'
|
|
|
|
self.serial_conn.write(data.encode('utf-8'))
|
|
logger.info(f"→ Sent: {data.strip()}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing to {self.port}: {e}")
|
|
self.is_connected = False
|
|
return False
|
|
|
|
def generate_scales_data(self) -> str:
|
|
"""Generate simulated scales data"""
|
|
# Step through the range and reverse at each end
|
|
if not hasattr(self, '_direction'):
|
|
self._direction = 1
|
|
step = random.uniform(SCALES_INCREMENT * 0.8, SCALES_INCREMENT * 1.2)
|
|
self.current_scales_value += step * self._direction
|
|
if self.current_scales_value >= SCALES_MAX:
|
|
self._direction = -1
|
|
elif self.current_scales_value <= SCALES_MIN:
|
|
self._direction = 1
|
|
self.current_scales_value = max(SCALES_MIN, min(SCALES_MAX, self.current_scales_value))
|
|
return f"{self.current_scales_value:.2f} kg"
|
|
|
|
def generate_counter_data(self) -> str:
|
|
"""Generate incrementing counter data"""
|
|
self.data_counter += 1
|
|
return f"Count: {self.data_counter}"
|
|
|
|
def generate_random_data(self) -> str:
|
|
"""Generate random numeric data"""
|
|
value = random.uniform(10000, 50000)
|
|
return f"Value: {value:.2f}"
|
|
|
|
def generate_mixed_data(self) -> str:
|
|
"""Generate mixed sensor data"""
|
|
temperature = random.uniform(20, 30)
|
|
humidity = random.uniform(30, 70)
|
|
pressure = random.uniform(1010, 1020)
|
|
return f"T:{temperature:.1f}C H:{humidity:.1f}% P:{pressure:.1f}hPa"
|
|
|
|
def get_next_data(self) -> str:
|
|
"""Get the next data to send based on test type"""
|
|
if TEST_DATA_TYPE == 'scales':
|
|
return self.generate_scales_data()
|
|
elif TEST_DATA_TYPE == 'counter':
|
|
return self.generate_counter_data()
|
|
elif TEST_DATA_TYPE == 'random':
|
|
return self.generate_random_data()
|
|
elif TEST_DATA_TYPE == 'mixed':
|
|
return self.generate_mixed_data()
|
|
else:
|
|
return self.generate_scales_data()
|
|
|
|
def run(self):
|
|
"""Run the test writer"""
|
|
logger.info(f"\n{'='*50}")
|
|
logger.info(f"COM Port Test Writer")
|
|
logger.info(f"{'='*50}")
|
|
logger.info(f"Port: {self.port}")
|
|
logger.info(f"Baud Rate: {self.baudrate}")
|
|
logger.info(f"Data Type: {TEST_DATA_TYPE}")
|
|
logger.info(f"Write Interval: {WRITE_INTERVAL}s")
|
|
logger.info(f"{'='*50}\n")
|
|
|
|
if not self.connect():
|
|
logger.error("Failed to connect to COM port. Exiting.")
|
|
return
|
|
|
|
self.is_running = True
|
|
|
|
try:
|
|
logger.info("Press Ctrl+C to stop writing data\n")
|
|
|
|
while self.is_running:
|
|
try:
|
|
data = self.get_next_data()
|
|
self.write_data(data)
|
|
time.sleep(WRITE_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in write loop: {e}")
|
|
break
|
|
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
self.is_running = False
|
|
self.disconnect()
|
|
logger.info("\n✓ Test writer stopped")
|
|
|
|
def list_available_ports():
|
|
"""List available COM ports"""
|
|
try:
|
|
from serial.tools import list_ports
|
|
ports = list_ports.comports()
|
|
if ports:
|
|
logger.info("\nAvailable COM ports:")
|
|
for port in ports:
|
|
logger.info(f" {port.device} - {port.description}")
|
|
else:
|
|
logger.info("No COM ports found")
|
|
except Exception as e:
|
|
logger.error(f"Error listing ports: {e}")
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Test COM port writer - continuously writes data to a COM port'
|
|
)
|
|
parser.add_argument(
|
|
'--port',
|
|
default=COM_PORT,
|
|
help=f'COM port to write to (default: {COM_PORT})'
|
|
)
|
|
parser.add_argument(
|
|
'--baud',
|
|
type=int,
|
|
default=BAUD_RATE,
|
|
help=f'Baud rate (default: {BAUD_RATE})'
|
|
)
|
|
parser.add_argument(
|
|
'--type',
|
|
choices=['scales', 'counter', 'random', 'mixed'],
|
|
default=TEST_DATA_TYPE,
|
|
help=f'Type of test data (default: {TEST_DATA_TYPE})'
|
|
)
|
|
parser.add_argument(
|
|
'--interval',
|
|
type=float,
|
|
default=WRITE_INTERVAL,
|
|
help=f'Write interval in seconds (default: {WRITE_INTERVAL})'
|
|
)
|
|
parser.add_argument(
|
|
'--list',
|
|
action='store_true',
|
|
help='List available COM ports and exit'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.list:
|
|
list_available_ports()
|
|
return
|
|
|
|
writer = ComPortTestWriter(args.port, args.baud)
|
|
writer.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|