Дабавлен скрипт на python для апи

parent 8df5e7e1
"""
lta_cli_api.py
Python API для CLI протокола LTA27 (v1.2.1)
С автоматической синхронизацией с платой
"""
import serial
import time
import struct
from typing import Dict, Optional, Any, Callable, List
class LTACLIError(Exception):
"""Ошибка протокола CLI."""
pass
class LTAClient:
"""
Клиент для работы с устройством LTA по CLI протоколу.
"""
CMD_OPEN = 0x00
CMD_CLOSE = 0x01
CMD_PING = 0x02
CMD_GET_INFO = 0x0A
CMD_READ_MEM = 0x14
CMD_WRITE_MEM = 0x15
def __init__(self, port: str = "COM3", baudrate: int = 115200,
board_id: int = 0x01, password: str = "1357", debug: bool = True):
self.port = port
self.baudrate = baudrate
self.board_id = board_id
self.password = password
self.debug = debug
self._serial: Optional[serial.Serial] = None
self._connected = False
self._commands: Dict[int, tuple] = {}
# ------------------------------------------------------------------
# Регистрация команд
# ------------------------------------------------------------------
def register_command(self, code: int, name: str):
def decorator(func: Callable) -> Callable:
self._commands[code] = (name, func)
if self.debug:
print(f"[DEBUG] Команда 0x{code:02X} '{name}' зарегистрирована")
return func
return decorator
def call(self, code: int, *args, **kwargs) -> Any:
if code not in self._commands:
raise LTACLIError(f"Команда 0x{code:02X} не зарегистрирована")
_, func = self._commands[code]
return func(*args, **kwargs)
# ------------------------------------------------------------------
# Низкоуровневые методы
# ------------------------------------------------------------------
def _log(self, direction: str, data: str) -> None:
if self.debug:
print(f"[{direction}] {data}")
def _read_response(self) -> str:
"""Читает ответ до \r (с таймаутом)."""
resp = self._serial.read_until(b'\r') # стандартный метод pyserial
if not resp:
raise LTACLIError("Таймаут при чтении ответа")
resp = resp.rstrip(b'\r\n')
self._log("RX", resp.decode('ascii', errors='replace'))
return resp.decode('ascii', errors='replace')
def _wait_for_prompt(self, timeout: float = 5.0) -> None:
"""
Ожидает появления приглашения '|> '.
Если в течение timeout не получено, отправляет \r и повторяет ожидание.
"""
start = time.time()
buffer = ""
while True:
# Читаем доступные данные
if self._serial.in_waiting:
chunk = self._serial.read(self._serial.in_waiting).decode('ascii', errors='replace')
print(chunk, end='', flush=True)
buffer += chunk
if "|> " in buffer:
break
else:
time.sleep(0.01)
# Если таймаут вышел, отправляем \r для пробуждения платы
if time.time() - start > timeout:
self._serial.write(b'\r')
self._serial.flush()
print("[пробуждение платы]")
start = time.time() # сбросить таймер
buffer = "" # начать новый поиск приглашения
# Приглашение получено, дадим плате допечатать и очистим буфер (чтобы старые данные не попали в ответ)
time.sleep(0.05)
self._serial.reset_input_buffer()
def send_command(self, cmd: int, payload: bytes = b'') -> bytes:
hex_data = payload.hex().upper()
cmd_str = f"T{cmd:02X}{self.board_id:02X}{hex_data}"
self._log("TX", cmd_str)
self._serial.write(f"{cmd_str}\r".encode())
self._serial.flush()
response = self._read_response()
if response[0] == 'e':
err_code = int(response[5:7], 16) if len(response) >= 7 else 0
errors = {
0x01: "неизвестная команда",
0x02: "неверный формат или значение аргумента",
0x03: "внутренняя ошибка устройства"
}
raise LTACLIError(f"Ошибка: {errors.get(err_code, f'код 0x{err_code:02X}')}")
if response[0] != 't':
raise LTACLIError(f"Неверный ответ: {response}")
return bytes.fromhex(response[5:]) if len(response) > 5 else b''
# ------------------------------------------------------------------
# Управление соединением
# ------------------------------------------------------------------
def connect(self) -> bool:
print("\n" + "=" * 60)
print("ПОДКЛЮЧЕНИЕ К ПЛАТЕ LTA27")
print("=" * 60)
self._serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1.0
)
print(f"Порт {self.port} открыт")
# Даём плате время и отправляем \r, чтобы вызвать вывод приглашения
time.sleep(0.5)
self._serial.write(b'\r')
self._serial.flush()
print("\nОжидание приглашения от платы...")
print("-" * 60)
self._wait_for_prompt()
print("-" * 60)
print("\nАутентификация...")
pwd_bytes = self.password.encode('ascii')
try:
self.send_command(self.CMD_OPEN, pwd_bytes)
self._connected = True
print("✓ Аутентификация успешна")
except LTACLIError as e:
print(f"✗ Ошибка аутентификации: {e}")
return False
# Ждём следующее приглашение (после успешного открытия)
self._wait_for_prompt()
print("✓ Плата готова к работе")
return True
def close(self) -> None:
if self._connected:
print("\nЗакрытие соединения...")
try:
self.send_command(self.CMD_CLOSE)
self._wait_for_prompt()
print("✓ Соединение закрыто")
except LTACLIError:
pass
self._connected = False
if self._serial and self._serial.is_open:
self._serial.close()
print(f"Порт {self.port} закрыт")
# ------------------------------------------------------------------
# Базовые команды
# ------------------------------------------------------------------
def ping(self) -> bool:
print("\n" + "-" * 40)
print("PING")
print("-" * 40)
self.send_command(self.CMD_PING)
self._wait_for_prompt()
print("✓ Ping OK")
return True
def get_info(self) -> Dict[str, Any]:
print("\n" + "=" * 60)
print("ИНФОРМАЦИЯ ОБ УСТРОЙСТВЕ")
print("=" * 60)
data = self.send_command(self.CMD_GET_INFO)
self._wait_for_prompt()
idx = 0
board_id = data[idx]; idx += 1
def read_str():
nonlocal idx
length = data[idx]; idx += 1
s = data[idx:idx+length].decode('ascii', errors='replace')
idx += length
return s
result = {
'board_id': board_id,
'name': read_str(),
'serial': read_str(),
'version': read_str()
}
print(f" Имя: {result['name']}")
print(f" Серийный: {result['serial']}")
print(f" Версия: {result['version']}")
print(f" Board ID: 0x{result['board_id']:02X}")
return result
def read_mem(self, address: int, size: int) -> bytes:
if not (1 <= size <= 64):
raise ValueError("Размер должен быть 1..64 байт")
print(f"\nЧтение памяти: 0x{address:08X} ({size} байт)")
payload = struct.pack('>I', address) + bytes([size])
data = self.send_command(self.CMD_READ_MEM, payload)
self._wait_for_prompt()
print(f" Данные: {data.hex().upper()}")
return data
def write_mem(self, address: int, data: bytes) -> bool:
if not (1 <= len(data) <= 64):
raise ValueError("Длина данных должна быть 1..64 байт")
print(f"\nЗапись в память: 0x{address:08X} <- {data.hex().upper()}")
payload = struct.pack('>I', address) + data
self.send_command(self.CMD_WRITE_MEM, payload)
self._wait_for_prompt()
print(" ✓ Записано")
return True
def scan_boards(self, start: int = 0, end: int = 254, timeout: float = 0.1) -> List[int]:
if not self._serial or not self._serial.is_open:
raise LTACLIError("Порт не открыт")
print("\n" + "=" * 60)
print("ПОИСК ПЛАТ НА ШИНЕ")
print("=" * 60)
found = []
original_timeout = self._serial.timeout
original_board_id = self.board_id
self._serial.timeout = timeout
for bid in range(start, end + 1):
self.board_id = bid
try:
saved_connected = self._connected
self._connected = True
self.send_command(self.CMD_PING)
found.append(bid)
print(f" Найдена плата: board_id=0x{bid:02X}")
self._connected = saved_connected
except LTACLIError:
pass
self._serial.reset_input_buffer()
self.board_id = original_board_id
self._serial.timeout = original_timeout
print(f"\nНайдено плат: {len(found)}")
return found
def __enter__(self):
self.connect()
return self
def __exit__(self, *args):
self.close()
# ==================================================================
# Пример использования
# ==================================================================
if __name__ == "__main__":
cli = LTAClient(port="COM3", board_id=0x01, password="1357", debug=True)
@cli.register_command(0x30, "read_fpga_reg")
def read_fpga_reg(reg_addr: int) -> int:
payload = struct.pack('>H', reg_addr)
resp = cli.send_command(0x30, payload)
return struct.unpack('>I', resp)[0]
try:
if not cli.connect():
print("\nНе удалось подключиться к плате!")
exit(1)
dev_info = cli.get_info()
cli.ping()
print("\n" + "=" * 60)
print("ТЕСТЫ ПАМЯТИ")
print("=" * 60)
cli.read_mem(0x20000000, 4)
cli.read_mem(0x08000000, 16)
test_bytes = b'\xDE\xAD\xBE\xEF'
cli.write_mem(0x20000000, test_bytes)
read_back = cli.read_mem(0x20000000, 4)
if read_back == test_bytes:
print("\n✓ Тест записи/чтения пройден успешно!")
else:
print(f"\n✗ Ошибка: записано {test_bytes.hex().upper()}, прочитано {read_back.hex().upper()}")
print("\n" + "=" * 60)
print("ВСЕ ТЕСТЫ ЗАВЕРШЕНЫ УСПЕШНО")
print("=" * 60)
except LTACLIError as error:
print(f"\n[ОШИБКА ПРОТОКОЛА] {error}")
except KeyboardInterrupt:
print("\n\nПрервано пользователем")
except Exception as error:
print(f"\n[НЕИЗВЕСТНАЯ ОШИБКА] {error}")
finally:
cli.close()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment