barebone app with django and react, sse, jwt token, comport reader, test comport writer, requires com0com, users with groups, sample table vehicles, tokens for access and refresh
This commit is contained in:
@@ -0,0 +1,16 @@
|
||||
# COM Port Test Writer configuration
|
||||
COM_PORT=COM1
|
||||
BAUD_RATE=9600
|
||||
TIMEOUT=1
|
||||
|
||||
# Test Data Settings
|
||||
WRITE_INTERVAL=1.0
|
||||
TEST_DATA_TYPE=scales
|
||||
|
||||
# Scales data settings
|
||||
SCALES_MIN=0
|
||||
SCALES_MAX=100
|
||||
SCALES_INCREMENT=0.5
|
||||
|
||||
# Debug
|
||||
DEBUG=True
|
||||
@@ -0,0 +1,165 @@
|
||||
# COM Port Test Writer - Simulates device sending data
|
||||
|
||||
This utility simulates a physical device (like scales) by continuously writing test data to a COM port.
|
||||
|
||||
## Features
|
||||
|
||||
- ✅ Writes test data continuously to COM port
|
||||
- ✅ Multiple data types: scales, counter, random, mixed
|
||||
- ✅ Configurable write interval
|
||||
- ✅ Easy start/stop with Ctrl+C
|
||||
- ✅ List available COM ports
|
||||
- ✅ Command-line arguments
|
||||
|
||||
## Setup
|
||||
|
||||
1. **Install dependencies:**
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. **Configure environment (optional):**
|
||||
```bash
|
||||
copy .env.example .env
|
||||
# Edit .env with your settings
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
### List Available COM Ports
|
||||
```bash
|
||||
python test_writer.py --list
|
||||
```
|
||||
|
||||
### Basic Usage (using configured port)
|
||||
```bash
|
||||
python test_writer.py
|
||||
```
|
||||
|
||||
### Specify COM Port
|
||||
```bash
|
||||
python test_writer.py --port COM3
|
||||
```
|
||||
|
||||
### Different Data Types
|
||||
|
||||
**Scales Data (default)** - Simulates scale readings
|
||||
```bash
|
||||
python test_writer.py --type scales
|
||||
```
|
||||
|
||||
**Counter Data** - Incrementing numbers
|
||||
```bash
|
||||
python test_writer.py --type counter
|
||||
```
|
||||
|
||||
**Random Data** - Random numeric values
|
||||
```bash
|
||||
python test_writer.py --type random
|
||||
```
|
||||
|
||||
**Mixed Sensor Data** - Temperature, humidity, pressure
|
||||
```bash
|
||||
python test_writer.py --type mixed
|
||||
```
|
||||
|
||||
### Custom Write Interval
|
||||
```bash
|
||||
python test_writer.py --interval 2.0 # Send every 2 seconds
|
||||
python test_writer.py --interval 0.5 # Send every 0.5 seconds
|
||||
```
|
||||
|
||||
### Custom Baud Rate
|
||||
```bash
|
||||
python test_writer.py --baud 115200
|
||||
```
|
||||
|
||||
## Testing Workflow
|
||||
|
||||
1. **Start Serial Bridge** (reads COM port)
|
||||
```bash
|
||||
cd serial_bridge
|
||||
python app.py
|
||||
```
|
||||
|
||||
2. **Start Test Writer** (writes test data)
|
||||
```bash
|
||||
cd test_comport_writer
|
||||
python test_writer.py --type scales --interval 1.0
|
||||
```
|
||||
|
||||
3. **View in React Frontend**
|
||||
- Open http://localhost:3000
|
||||
- Data should appear in real-time
|
||||
|
||||
## Configuration
|
||||
|
||||
Edit `.env` for default settings:
|
||||
|
||||
```env
|
||||
COM_PORT=COM1
|
||||
BAUD_RATE=9600
|
||||
WRITE_INTERVAL=1.0
|
||||
TEST_DATA_TYPE=scales
|
||||
SCALES_MIN=0
|
||||
SCALES_MAX=100
|
||||
```
|
||||
|
||||
## Example Output
|
||||
|
||||
```
|
||||
==================================================
|
||||
COM Port Test Writer
|
||||
==================================================
|
||||
Port: COM1
|
||||
Baud Rate: 9600
|
||||
Data Type: scales
|
||||
Write Interval: 1.0s
|
||||
==================================================
|
||||
|
||||
✓ Connected to COM1 at 9600 baud
|
||||
Press Ctrl+C to stop writing data
|
||||
|
||||
→ Sent: 45.23 kg
|
||||
→ Sent: 45.67 kg
|
||||
→ Sent: 46.12 kg
|
||||
→ Sent: 46.45 kg
|
||||
...
|
||||
(Press Ctrl+C to stop)
|
||||
|
||||
✓ Disconnected from COM1
|
||||
✓ Test writer stopped
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
- **"COM port not found"**: Check `COM_PORT` setting and ensure port exists
|
||||
- **"Port already in use"**: Another application is using the port (close Serial Bridge temporarily)
|
||||
- **No data appearing**: Ensure Serial Bridge is running and pointing to same COM port
|
||||
- **Baud rate mismatch**: Ensure test writer and serial bridge use same baud rate
|
||||
|
||||
## Complete System Test
|
||||
|
||||
To test the complete flow:
|
||||
|
||||
```bash
|
||||
# Terminal 1: Django Backend
|
||||
cd backend
|
||||
venv\Scripts\activate
|
||||
python manage.py runserver
|
||||
|
||||
# Terminal 2: React Frontend
|
||||
cd frontend
|
||||
npm start
|
||||
|
||||
# Terminal 3: Serial Bridge
|
||||
cd serial_bridge
|
||||
venv\Scripts\activate
|
||||
python app.py
|
||||
|
||||
# Terminal 4: Test Writer
|
||||
cd test_comport_writer
|
||||
python test_writer.py --type scales --interval 1.0
|
||||
```
|
||||
|
||||
Then visit http://localhost:3000 to see real-time data flow.
|
||||
@@ -0,0 +1,24 @@
|
||||
"""
|
||||
Configuration for COM Port Test Writer
|
||||
"""
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# COM Port Settings
|
||||
COM_PORT = os.getenv('COM_PORT', 'COM4')
|
||||
BAUD_RATE = int(os.getenv('BAUD_RATE', 9600))
|
||||
TIMEOUT = int(os.getenv('TIMEOUT', 1))
|
||||
|
||||
# Test Data Settings
|
||||
WRITE_INTERVAL = float(os.getenv('WRITE_INTERVAL', 1.0)) # seconds between writes
|
||||
TEST_DATA_TYPE = os.getenv('TEST_DATA_TYPE', 'scales') # 'scales', 'counter', 'random'
|
||||
|
||||
# Scales data
|
||||
SCALES_MIN = int(os.getenv('SCALES_MIN', 5000))
|
||||
SCALES_MAX = int(os.getenv('SCALES_MAX', 20000))
|
||||
SCALES_INCREMENT = float(os.getenv('SCALES_INCREMENT', 5.0))
|
||||
|
||||
# Application Settings
|
||||
DEBUG = os.getenv('DEBUG', 'True') == 'True'
|
||||
@@ -0,0 +1,2 @@
|
||||
pyserial==3.5
|
||||
python-dotenv==1.0.0
|
||||
@@ -0,0 +1,215 @@
|
||||
"""
|
||||
COM Port Test Writer - Simulates data being sent to a COM port
|
||||
"""
|
||||
import serial
|
||||
import time
|
||||
import random
|
||||
import logging
|
||||
import sys
|
||||
from typing import Optional
|
||||
from config import (
|
||||
COM_PORT, BAUD_RATE, TIMEOUT, WRITE_INTERVAL,
|
||||
TEST_DATA_TYPE, SCALES_MIN, SCALES_MAX, SCALES_INCREMENT, DEBUG
|
||||
)
|
||||
|
||||
# Setup logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG if DEBUG else logging.INFO,
|
||||
# format='%(asctime)s - %(levelname)s - %(message)s'
|
||||
format='%(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ComPortTestWriter:
|
||||
def __init__(self, port: str, baudrate: int = 9600, timeout: int = 1):
|
||||
self.port = port
|
||||
self.baudrate = baudrate
|
||||
self.timeout = timeout
|
||||
self.serial_conn: Optional[serial.Serial] = None
|
||||
self.is_connected = False
|
||||
self.is_running = False
|
||||
self.data_counter = 0
|
||||
self.current_scales_value = SCALES_MIN
|
||||
|
||||
def connect(self) -> bool:
|
||||
"""Connect to the serial port"""
|
||||
try:
|
||||
self.serial_conn = serial.Serial(
|
||||
port=self.port,
|
||||
baudrate=self.baudrate,
|
||||
timeout=self.timeout
|
||||
)
|
||||
self.is_connected = True
|
||||
logger.info(f"✓ Connected to {self.port} at {self.baudrate} baud")
|
||||
return True
|
||||
except serial.SerialException as e:
|
||||
logger.error(f"✗ Failed to connect to {self.port}: {e}")
|
||||
logger.info("\nTip: Check that the COM port exists and is not in use.")
|
||||
logger.info("You can list available ports with: python -m serial.tools.list_ports")
|
||||
self.is_connected = False
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from the serial port"""
|
||||
if self.serial_conn and self.serial_conn.is_open:
|
||||
self.serial_conn.close()
|
||||
self.is_connected = False
|
||||
logger.info(f"✓ Disconnected from {self.port}")
|
||||
|
||||
def write_data(self, data: str) -> bool:
|
||||
"""Write data to the serial port"""
|
||||
try:
|
||||
if self.is_connected and self.serial_conn:
|
||||
# Add newline if not present
|
||||
if not data.endswith('\n'):
|
||||
data += '\n'
|
||||
|
||||
self.serial_conn.write(data.encode('utf-8'))
|
||||
logger.info(f"→ Sent: {data.strip()}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing to {self.port}: {e}")
|
||||
self.is_connected = False
|
||||
return False
|
||||
|
||||
def generate_scales_data(self) -> str:
|
||||
"""Generate simulated scales data"""
|
||||
# Simulate scales value fluctuating slightly
|
||||
variation = random.uniform(-0.1, 0.1)
|
||||
self.current_scales_value += variation
|
||||
|
||||
# Keep within bounds
|
||||
self.current_scales_value = max(SCALES_MIN, min(SCALES_MAX, self.current_scales_value))
|
||||
|
||||
return f"{self.current_scales_value:.2f} kg"
|
||||
|
||||
def generate_counter_data(self) -> str:
|
||||
"""Generate incrementing counter data"""
|
||||
self.data_counter += 1
|
||||
return f"Count: {self.data_counter}"
|
||||
|
||||
def generate_random_data(self) -> str:
|
||||
"""Generate random numeric data"""
|
||||
value = random.uniform(10000, 50000)
|
||||
return f"Value: {value:.2f}"
|
||||
|
||||
def generate_mixed_data(self) -> str:
|
||||
"""Generate mixed sensor data"""
|
||||
temperature = random.uniform(20, 30)
|
||||
humidity = random.uniform(30, 70)
|
||||
pressure = random.uniform(1010, 1020)
|
||||
return f"T:{temperature:.1f}C H:{humidity:.1f}% P:{pressure:.1f}hPa"
|
||||
|
||||
def get_next_data(self) -> str:
|
||||
"""Get the next data to send based on test type"""
|
||||
if TEST_DATA_TYPE == 'scales':
|
||||
return self.generate_scales_data()
|
||||
elif TEST_DATA_TYPE == 'counter':
|
||||
return self.generate_counter_data()
|
||||
elif TEST_DATA_TYPE == 'random':
|
||||
return self.generate_random_data()
|
||||
elif TEST_DATA_TYPE == 'mixed':
|
||||
return self.generate_mixed_data()
|
||||
else:
|
||||
return self.generate_scales_data()
|
||||
|
||||
def run(self):
|
||||
"""Run the test writer"""
|
||||
logger.info(f"\n{'='*50}")
|
||||
logger.info(f"COM Port Test Writer")
|
||||
logger.info(f"{'='*50}")
|
||||
logger.info(f"Port: {self.port}")
|
||||
logger.info(f"Baud Rate: {self.baudrate}")
|
||||
logger.info(f"Data Type: {TEST_DATA_TYPE}")
|
||||
logger.info(f"Write Interval: {WRITE_INTERVAL}s")
|
||||
logger.info(f"{'='*50}\n")
|
||||
|
||||
if not self.connect():
|
||||
logger.error("Failed to connect to COM port. Exiting.")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
|
||||
try:
|
||||
logger.info("Press Ctrl+C to stop writing data\n")
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
data = self.get_next_data()
|
||||
self.write_data(data)
|
||||
time.sleep(WRITE_INTERVAL)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
except Exception as e:
|
||||
logger.error(f"Error in write loop: {e}")
|
||||
break
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
self.is_running = False
|
||||
self.disconnect()
|
||||
logger.info("\n✓ Test writer stopped")
|
||||
|
||||
def list_available_ports():
|
||||
"""List available COM ports"""
|
||||
try:
|
||||
from serial.tools import list_ports
|
||||
ports = list_ports.comports()
|
||||
if ports:
|
||||
logger.info("\nAvailable COM ports:")
|
||||
for port in ports:
|
||||
logger.info(f" {port.device} - {port.description}")
|
||||
else:
|
||||
logger.info("No COM ports found")
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing ports: {e}")
|
||||
|
||||
def main():
|
||||
"""Main entry point"""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Test COM port writer - continuously writes data to a COM port'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
default=COM_PORT,
|
||||
help=f'COM port to write to (default: {COM_PORT})'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--baud',
|
||||
type=int,
|
||||
default=BAUD_RATE,
|
||||
help=f'Baud rate (default: {BAUD_RATE})'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--type',
|
||||
choices=['scales', 'counter', 'random', 'mixed'],
|
||||
default=TEST_DATA_TYPE,
|
||||
help=f'Type of test data (default: {TEST_DATA_TYPE})'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--interval',
|
||||
type=float,
|
||||
default=WRITE_INTERVAL,
|
||||
help=f'Write interval in seconds (default: {WRITE_INTERVAL})'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--list',
|
||||
action='store_true',
|
||||
help='List available COM ports and exit'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list:
|
||||
list_available_ports()
|
||||
return
|
||||
|
||||
writer = ComPortTestWriter(args.port, args.baud)
|
||||
writer.run()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user