Initial commit

This commit is contained in:
2026-01-14 21:33:17 +03:00
commit 40d80ef55e
17 changed files with 2573 additions and 0 deletions

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2023 RAA80
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

55
README.md Normal file
View File

@@ -0,0 +1,55 @@
# python-owen
Библиотека для работы с приборами фирмы [Овен] по протоколам Modbus и ОВЕН
## 📋 Поддерживаемые модели
| Модель | Модель | Модель |
| :--------: | :--------: | :--------: |
| [ТРМ101] | [ТРМ151] | [ТРМ210] |
| [ТРМ136] | [ТРМ200] | [ТРМ212] |
| [ТРМ138] | [ТРМ201] | [ТРМ251] |
| [ТРМ148] | [ТРМ202] | [2ТРМ1] |
| [ПР100] | | |
> Возможна поддержка других моделей путем добавления настроек в файл `owen/device.py`
> Не со всеми моделями проверена работа библиотеки
## ⚠️ Важная информация
### Требования к версии pymodbus
**Для корректной работы python-owen необходимо установить старую версию библиотеки pymodbus:**
```bash
pip3 install pymodbus==2.5.3
```
⚠️ **При использовании более новых версий pymodbus библиотека работать не будет!**
### Особенности Овен ПР100
**Овен ПР100 не поддерживает протокол ОВЕН — только Modbus!**
## 📦 О проекте
Это форк репозитория: https://github.com/RAA80/python-owen
---
## 📚 Ссылки на документацию приборов
[Овен]: https://owen.ru
[ТРМ101]: https://owen.ru/product/trm101
[ТРМ136]: https://owen.ru/product/trm136
[ТРМ138]: https://owen.ru/product/trm138
[ТРМ148]: https://owen.ru/product/trm148
[ТРМ151]: https://owen.ru/product/trm151
[ТРМ200]: https://owen.ru/product/trm200
[ТРМ201]: https://owen.ru/product/trm201
[ТРМ202]: https://owen.ru/product/trm202
[ТРМ210]: https://owen.ru/product/trm210
[ТРМ212]: https://owen.ru/product/trm212
[ТРМ251]: https://owen.ru/product/trm251
[2ТРМ1]: https://owen.ru/product/2trm1
Р100]: https://owen.ru/product/pr100

54
example/example.py Normal file
View File

@@ -0,0 +1,54 @@
#! /usr/bin/env python3
"""Пример использования библиотеки."""
import logging
from pymodbus.client.sync import ModbusSerialClient
from serial import Serial
from owen.client import OwenModbusClient, OwenSerialClient
from owen.device import TRM201
logging.basicConfig(level=logging.INFO)
if __name__ == "__main__":
transport = Serial(port="COM5",
baudrate=115200,
stopbits=1,
parity="N",
bytesize=8,
timeout=0.1)
trm = OwenSerialClient(transport=transport, device=TRM201, unit=1, addr_len_8=True) # длина адреса в битах: True=8, False=11
# transport = ModbusSerialClient(method="rtu",
# port="COM5",
# baudrate=115200,
# stopbits=2,
# parity="N",
# bytesize=8, # "rtu": bytesize=8, "ascii": bytesize=7
# timeout=0.1,
# retry_on_empty=True)
# trm = OwenModbusClient(transport=transport, device=TRM201, unit=1)
print(trm)
""" !!!
Если параметр не использует индекс, т.е. index=None,
либо прибор одноканальный и у параметра index=0,
то индекс указывать необязательно
Для многоканальных приборов (например ТРМ202) и протокола Modbus названия
параметров (например IN.T1, IN.T2) для совместимости с протоколом ОВЕН
преобразуются в:
IN.T1 --> name="IN.T", index=0
IN.T2 --> name="IN.T", index=1
"""
name = "SP" # Остальные названия параметров в файле 'device.py' для конкретного устройства
value = trm.get_param(name=name, index=0)
print(f"{name} = {value}")
result = trm.set_param(name=name, index=0, value=value)
print(f"{name} = {result}")

272
main.py Normal file
View File

@@ -0,0 +1,272 @@
#! /usr/bin/env python3
"""Пример работы с программируемым логическим реле Овен ПР100"""
import logging
import time
import sys
from datetime import datetime
from pymodbus.client.sync import ModbusSerialClient
from pymodbus.exceptions import ModbusException
from owen.client import OwenModbusClient
from owen.device import PR100
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def read_digital_inputs(pr100: OwenModbusClient, num_inputs: int = 4):
"""Чтение состояния дискретных входов"""
logger.info("Чтение дискретных входов")
try:
# Читаем все входы одним регистром
di_value = pr100.get_param(name="DI")
logger.info(f"Регистр дискретных входов: {di_value} (0x{di_value:04X})")
# Декодируем отдельные входы
for i in range(num_inputs):
state = (di_value >> i) & 1
logger.info(f" DI{i+1}: {'ВКЛ' if state else 'ВЫКЛ'}")
return True
except Exception as e:
logger.error(f"Ошибка при чтении дискретных входов: {e}")
return False
def read_analog_inputs(pr100: OwenModbusClient):
"""Чтение аналоговых входов"""
logger.info("Чтение аналоговых входов")
success_count = 0
for i in range(1, 5):
try:
# Читаем как float
value_float = pr100.get_param(name=f"AI{i}")
logger.info(f"AI{i} (float): {value_float:.3f}")
# Читаем как int с учетом dp
value_int = pr100.get_param(name=f"AI{i}.INT")
dp = pr100.get_param(name=f"AI{i}.DP")
logger.info(f"AI{i} (int): {value_int}, точность: {dp} знаков")
success_count += 1
except Exception as e:
logger.error(f"Ошибка при чтении AI{i}: {e}")
return success_count > 0
def control_outputs(pr100: OwenModbusClient, outputs: dict):
"""
Управление дискретными выходами
Args:
pr100: Экземпляр клиента ПР100
outputs: Словарь {номер_выхода: состояние}, например {1: True, 2: False}
"""
logger.info("Управление дискретными выходами")
try:
# Читаем текущее состояние
current_value = pr100.get_param(name="DO")
logger.info(f"Текущее состояние выходов: 0x{current_value:04X}")
# Формируем новое значение
new_value = current_value
for output_num, state in outputs.items():
bit_pos = output_num - 1
if state:
new_value |= (1 << bit_pos) # Установить бит
else:
new_value &= ~(1 << bit_pos) # Сбросить бит
# Записываем новое состояние
logger.info(f"Запись нового состояния: 0x{new_value:04X}")
pr100.set_param(name="DO", value=new_value)
# Показываем, что изменилось
for output_num, state in outputs.items():
logger.info(f" Q{output_num}: {'ВКЛ' if state else 'ВЫКЛ'}")
return True
except Exception as e:
logger.error(f"Ошибка при управлении выходами: {e}")
return False
def read_system_time(pr100: OwenModbusClient):
"""Чтение системного времени ПР100"""
logger.info("Чтение системного времени")
try:
sec = pr100.get_param(name="TIME.SEC")
min_val = pr100.get_param(name="TIME.MIN")
hour = pr100.get_param(name="TIME.HOUR")
day = pr100.get_param(name="TIME.DAY")
month = pr100.get_param(name="TIME.MONTH")
year = pr100.get_param(name="TIME.YEAR")
dow = pr100.get_param(name="TIME.DOW")
days_of_week = ["Пн", "Вт", "Ср", "Чт", "Пт", "Сб", "Вс"]
logger.info(f"Дата: {day:02d}.{month:02d}.20{year:02d}")
logger.info(f"Время: {hour:02d}:{min_val:02d}:{sec:02d}")
logger.info(f"День недели: {days_of_week[dow]}")
return True
except Exception as e:
logger.error(f"Ошибка при чтении системного времени: {e}")
return False
def set_system_time(pr100: OwenModbusClient):
"""Установка системного времени ПР100 на текущее"""
logger.info("Установка системного времени")
try:
now = datetime.now()
pr100.set_param(name="TIME.SEC", value=now.second)
pr100.set_param(name="TIME.MIN", value=now.minute)
pr100.set_param(name="TIME.HOUR", value=now.hour)
pr100.set_param(name="TIME.DAY", value=now.day)
pr100.set_param(name="TIME.MONTH", value=now.month)
pr100.set_param(name="TIME.YEAR", value=now.year % 100)
logger.info(f"Время установлено: {now.strftime('%d.%m.%Y %H:%M:%S')}")
return True
except Exception as e:
logger.error(f"Ошибка при установке времени: {e}")
return False
def monitoring_loop(pr100: OwenModbusClient, duration: int = 10):
"""
Цикл мониторинга входов и выходов
Args:
pr100: Экземпляр клиента ПР100
duration: Длительность мониторинга в секундах
"""
logger.info(f"Запуск мониторинга на {duration} секунд")
start_time = time.time()
error_count = 0
max_errors = 5
try:
while time.time() - start_time < duration:
try:
# Читаем входы
di = pr100.get_param(name="DI")
ai1 = pr100.get_param(name="AI1")
do = pr100.get_param(name="DO")
logger.info(f"DI: 0x{di:04X}, AI1: {ai1:.2f}, DO: 0x{do:04X}")
error_count = 0 # Сброс счетчика при успешном чтении
except Exception as e:
error_count += 1
logger.error(f"Ошибка чтения (#{error_count}): {e}")
if error_count >= max_errors:
logger.error(f"Достигнуто максимальное количество ошибок ({max_errors}). Остановка мониторинга.")
return False
time.sleep(1)
return True
except KeyboardInterrupt:
logger.info("Мониторинг прерван пользователем")
return True
def test_connection(transport: ModbusSerialClient, unit_id: int) -> bool:
"""
Проверка соединения с устройством
Args:
transport: Клиент Modbus
unit_id: Адрес устройства
Returns:
True если соединение успешно, False в противном случае
"""
try:
if not transport.connect():
logger.error("Не удалось установить соединение с портом")
return False
# Пробуем прочитать регистр для проверки связи
result = transport.read_holding_registers(address=0, count=1, unit=unit_id)
if result.isError():
logger.error(f"Устройство не отвечает на адресе {unit_id}")
return False
logger.info("Соединение установлено успешно")
return True
except Exception as e:
logger.error(f"Ошибка при проверке соединения: {e}")
return False
def main():
logger.info("=" * 60)
logger.info("Начало работы с Овен ПР100")
logger.info("=" * 60)
# Параметры подключения
PORT = "COM6"
BAUDRATE = 115200
UNIT_ID = 1 # Адрес устройства Modbus
transport = None
try:
logger.info(f"Настройка подключения: {PORT}, {BAUDRATE} бод, адрес {UNIT_ID}")
# Создание клиента Modbus
transport = ModbusSerialClient(
method="rtu",
port=PORT,
baudrate=BAUDRATE,
stopbits=1,
parity="N",
bytesize=8,
timeout=0.5,
retry_on_empty=True
)
# Проверка соединения
if not test_connection(transport, UNIT_ID):
logger.error("=" * 60)
logger.error("КРИТИЧЕСКАЯ ОШИБКА: Не удалось подключиться к устройству")
logger.error(f"Проверьте:")
logger.error(f" 1. Правильность имени порта ({PORT})")
logger.error(f" 2. Подключение кабеля")
logger.error(f" 3. Питание устройства")
logger.error(f" 4. Адрес устройства ({UNIT_ID})")
logger.error(f" 5. Скорость обмена ({BAUDRATE})")
logger.error("=" * 60)
return 1
# Создание клиента ПР100
pr100 = OwenModbusClient(transport=transport, device=PR100, unit=UNIT_ID)
logger.info("-" * 60)
# Чтение дискретных входов
read_digital_inputs(pr100, num_inputs=4)
logger.info("-" * 60)
# Чтение аналоговых входов
read_analog_inputs(pr100)
logger.info("-" * 60)
# Чтение системного времени
read_system_time(pr100)
logger.info("-" * 60)
# Установка системного времени (закомментировано по умолчанию)
# if input("Установить текущее время? (y/N): ").lower() == 'y':
# set_system_time(pr100)
# logger.info("-" * 60)
# Управление выходами
# ВНИМАНИЕ: Записывается только когда переключатель в положении СТОП!
logger.warning("ВНИМАНИЕ: Управление выходами работает только в режиме СТОП!")
control_outputs(pr100, {1: True, 2: False, 3: True})
logger.info("-" * 60)
# Запуск мониторинга (закомментировано по умолчанию)
# monitoring_loop(pr100, duration=30)
logger.info("=" * 60)
logger.info("Работа завершена успешно")
logger.info("=" * 60)
return 0
except FileNotFoundError as e:
logger.error("=" * 60)
logger.error(f"ОШИБКА: Порт {PORT} не найден")
logger.error("Доступные порты можно посмотреть с помощью команды:")
logger.error(" python -m serial.tools.list_ports")
logger.error("=" * 60)
return 1
except ModbusException as e:
logger.error("=" * 60)
logger.error(f"ОШИБКА Modbus: {e}")
logger.error("=" * 60)
return 1
except KeyboardInterrupt:
logger.info("=" * 60)
logger.info("Работа прервана пользователем")
logger.info("=" * 60)
return 0
except Exception as e:
logger.error("=" * 60)
logger.error(f"НЕОЖИДАННАЯ ОШИБКА: {e}", exc_info=True)
logger.error("=" * 60)
return 1
finally:
if transport is not None:
try:
transport.close()
logger.info("Соединение закрыто")
except Exception as e:
logger.error(f"Ошибка при закрытии соединения: {e}")
if __name__ == "__main__":
sys.exit(main())

0
owen/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

209
owen/client.py Normal file
View File

@@ -0,0 +1,209 @@
#! /usr/bin/env python3
"""Реализация класса клиента для управления контроллером ОВЕН."""
from __future__ import annotations
from operator import mul, truediv
from typing import TYPE_CHECKING, Callable
from pymodbus.constants import Endian
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from owen.protocol import Owen, OwenError
if TYPE_CHECKING:
from pymodbus.client.sync import ModbusSerialClient
from pymodbus.pdu import ModbusResponse
from serial import Serial
from owen.device import MODBUS_PARAMS, OWEN_DEVICE, OWEN_PARAMS
class ClientMixin:
"""Класс-примесь клиента."""
@staticmethod
def check_index(name: str, dev: OWEN_PARAMS | MODBUS_PARAMS,
index: int | None) -> int | None:
"""Проверка индекса."""
if not index:
index = None if None in dev["index"] else 0
if index not in dev["index"]:
msg = f"'{name}' does not support index '{index}'"
raise OwenError(msg)
return index
@staticmethod
def check_value(name: str, dev: OWEN_PARAMS | MODBUS_PARAMS,
value: float | str | None) -> None:
"""Проверка данных."""
if all([value is None, dev["min"] is not None, dev["max"] is not None]) or \
all([value is not None, dev["min"] == dev["max"] is None or
value < dev["min"] or value > dev["max"]]): # type: ignore
msg = f"An '{name}' value of '{value}' is out of range"
raise OwenError(msg)
class OwenSerialClient(ClientMixin):
"""Класс клиента для взаимодействия с устройством по протоколу ОВЕН."""
def __init__(self, transport: Serial, device: OWEN_DEVICE, unit: int, *,
addr_len_8: bool = True) -> None:
"""Инициализация класса клиента с указанными параметрами."""
self.socket = transport
self.unit = unit
self.device = device
self.addr_len_8 = addr_len_8
self._owen = Owen(self.unit, addr_len_8)
def __repr__(self) -> str:
"""Строковое представление объекта."""
return (f"{type(self).__name__}(transport={self.socket}, unit={self.unit}, "
f"addr_len_8={self.addr_len_8})")
def __del__(self) -> None:
"""Закрытие соединения с устройством при удалении объекта."""
if self.socket:
self.socket.close()
def bus_exchange(self, packet: bytes) -> bytes:
"""Обмен по интерфейсу."""
self.socket.reset_input_buffer()
self.socket.reset_output_buffer()
self.socket.write(packet)
return self.socket.read_until(b"\r")
def send_message(self, flag: int, name: str, index: int | None,
data: bytes = b"") -> bytes:
"""Подготовка данных для обмена."""
packet = self._owen.make_packet(flag, name, index, data)
answer = self.bus_exchange(packet)
return self._owen.parse_response(packet, answer)
def get_param(self, name: str, index: int | None = None) -> float | str:
"""Чтение данных из устройства."""
dev = self.device["Owen"][name.upper()]
index = self.check_index(name, dev, index)
result = self.send_message(1, name, index)
return self._owen.unpack_value(dev["type"], result, index)
def set_param(self, name: str, index: int | None = None,
value: float | str | None = None) -> bool:
"""Запись данных в устройство."""
dev = self.device["Owen"][name.upper()]
index = self.check_index(name, dev, index)
self.check_value(name, dev, value)
data = self._owen.pack_value(dev["type"], value)
return bool(self.send_message(0, name, index, data))
class OwenModbusClient(ClientMixin):
"""Класс клиента для взаимодействия с устройством по протоколу Modbus."""
def __init__(self, transport: ModbusSerialClient, device: OWEN_DEVICE,
unit: int) -> None:
"""Инициализация класса клиента с указанными параметрами."""
self.socket = transport
self.unit = unit
self.device = device
self.socket.connect()
def __repr__(self) -> str:
"""Строковое представление объекта."""
return f"{type(self).__name__}(transport={self.socket}, unit={self.unit})"
def __del__(self) -> None:
"""Закрытие соединения с устройством при удалении объекта."""
if self.socket:
self.socket.close()
@staticmethod
def check_error(retcode: ModbusResponse) -> bool:
"""Проверка возвращаемого значения на ошибку."""
if retcode.isError():
raise OwenError(retcode)
return True
def _read(self, dev: MODBUS_PARAMS, index: int | None) -> float | str:
"""Чтение данных из регистра Modbus."""
count = {"U16": 1, "I16": 1, "U32": 2, "I32": 2, "F32": 2, "STR": 4,
}[dev["type"]]
result = self.socket.read_holding_registers(address=dev["index"][index],
count=count,
unit=self.unit)
self.check_error(result)
decoder = BinaryPayloadDecoder.fromRegisters(result.registers, Endian.Big)
return {"U16": decoder.decode_16bit_uint,
"I16": decoder.decode_16bit_int,
"U32": decoder.decode_32bit_uint,
"I32": decoder.decode_32bit_int,
"F32": decoder.decode_32bit_float,
"STR": lambda: decoder.decode_string(8),
}[dev["type"]]()
def modify_value(self, func: Callable[[float, float], float], dev: MODBUS_PARAMS,
index: int | None, value: float) -> float:
"""Преобразование значения к нужной точности."""
if dev["dp"]:
dp_dev = self.device["Modbus"][dev["dp"]]
dp = self._read(dp_dev, index)
value = func(value, 10.0**int(dp))
prec = dev["precision"]
return func(value, 10.0**prec) if prec else value
def get_param(self, name: str, index: int | None = None) -> float | str:
"""Чтение данных из устройства."""
dev = self.device["Modbus"][name.upper()]
index = self.check_index(name, dev, index)
value = self._read(dev, index)
return self.modify_value(truediv, dev, index, value)
def set_param(self, name: str, index: int | None = None,
value: float | str | None = None) -> bool:
"""Запись данных в устройство."""
dev = self.device["Modbus"][name.upper()]
index = self.check_index(name, dev, index)
self.check_value(name, dev, value)
value = self.modify_value(mul, dev, index, value)
builder = BinaryPayloadBuilder(None, Endian.Big)
{"U16": lambda value: builder.add_16bit_uint(int(value)),
"I16": lambda value: builder.add_16bit_int(int(value)),
"U32": lambda value: builder.add_32bit_uint(int(value)),
"I32": lambda value: builder.add_32bit_int(int(value)),
"F32": lambda value: builder.add_32bit_float(float(value)),
"STR": lambda value: builder.add_string(str(value)),
}[dev["type"]](value)
result = self.socket.write_registers(address=dev["index"][index],
values=builder.build(),
skip_encode=True,
unit=self.unit)
return self.check_error(result)
__all__ = ["OwenModbusClient", "OwenSerialClient"]

229
owen/converter.py Normal file
View File

@@ -0,0 +1,229 @@
#! /usr/bin/env python3
"""Функции для упаковки и распаковки разных типов данных."""
from binascii import hexlify, unhexlify
from decimal import Decimal
from struct import pack, unpack
def pack_str(value: str) -> bytes:
"""Упаковка данных типа STR."""
return bytes(value[::-1], encoding="cp1251")
def unpack_str(value: bytes, index: int) -> str:
"""Распаковка данных типа STR."""
return bytes(value[::-1]).decode("cp1251")
def pack_u24(value: tuple[int, int]) -> bytes:
"""Упаковка данных типа U24."""
return pack(">BH", *value)[:3]
def unpack_u24(value: bytes, index: int) -> tuple[int, int]:
"""Распаковка данных типа U24."""
return unpack(">BH", value[:3])
def pack_i8(value: int) -> bytes:
"""Упаковка данных типа I8."""
return pack(">b", value)[:1]
def unpack_i8(value: bytes, index: int) -> int:
"""Распаковка данных типа I8."""
return unpack(">b", value[:1])[0]
def pack_u8(value: int) -> bytes:
"""Упаковка данных типа U8."""
return pack(">B", value)[:1]
def unpack_u8(value: bytes, index: int) -> int:
"""Распаковка данных типа U8."""
return unpack(">B", value[:1])[0]
def pack_i16(value: int) -> bytes:
"""Упаковка данных типа I16."""
return pack(">h", value)[:2]
def unpack_i16(value: bytes, index: int) -> int:
"""Распаковка данных типа I16."""
return unpack(">h", value[:2])[0]
def pack_u16(value: int) -> bytes:
"""Упаковка данных типа U16."""
return pack(">H", value)[:2]
def unpack_u16(value: bytes, index: int) -> int:
"""Распаковка данных типа U16."""
return unpack(">H", value[:2])[0]
def pack_f24(value: float) -> bytes:
"""Упаковка данных типа F24."""
return pack(">f", value)[:3]
def unpack_f24(value: bytes, index: int) -> float:
"""Распаковка данных типа F24."""
return unpack(">f", value[:3] + b"\x00")[0]
def pack_f32(value: float) -> bytes:
"""Упаковка данных типа F32."""
return pack(">f", value)[:4]
def unpack_f32(value: bytes, index: int) -> float:
"""Распаковка данных типа F32."""
return unpack(">f", value[:4])[0]
def pack_f32t(value: tuple[float, int]) -> bytes:
"""Упаковка данных типа F32+T."""
return pack(">fH", *value)[:6]
def unpack_f32t(value: bytes, index: int) -> tuple[float, int]:
"""Распаковка данных типа F32+T."""
return unpack(">fH", value[:6])
def pack_i32(value: int) -> bytes:
"""Упаковка данных типа I32."""
return pack(">i", value)[:4]
def unpack_i32(value: bytes, index: int) -> int:
"""Распаковка данных типа I32."""
return unpack(">i", value[:4])[0]
def pack_u32(value: int) -> bytes:
"""Упаковка данных типа U32."""
return pack(">I", value)[:4]
def unpack_u32(value: bytes, index: int) -> int:
"""Распаковка данных типа U32."""
return unpack(">I", value[:4])[0]
def pack_sdot(value: float) -> bytes:
"""Упаковка данных типа STORED_DOT."""
sign, digits, exponent = Decimal(str(value)).as_tuple()
mantissa = int("".join(map(str, digits)))
frmt, size, chunk = {mantissa < 16: (">B", 4, slice(1)),
mantissa >= 4096: (">I", 20, slice(1, 4)),
}.get(True, (">H", 12, slice(2)))
bin_str = f"{sign:1b}{abs(exponent):03b}{mantissa:0{size}b}"
return pack(frmt, int(bin_str, 2))[chunk]
def unpack_sdot(value: bytes, index: int) -> float:
"""Распаковка данных типа STORED_DOT."""
if index is not None:
value = value[:-2]
data = int.from_bytes(value, "big")
shift = len(value) * 8 - 4
sign = data >> (shift + 3) & 1
exponent = data >> shift & 7
mantissa = data & (2 ** shift - 1)
return (-1) ** sign * 10 ** (-exponent) * mantissa
def _encode_dot_u(value: float) -> bytes:
"""Упаковка данных типа DEC_DOTi."""
value = int(value)
length = len(str(value))
length += length % 2
hexstr = f"{value:0{length}d}"
return unhexlify(hexstr)
def pack_dot0(value: float) -> bytes:
"""Упаковка данных типа DEC_DOT0."""
return _encode_dot_u(value)
def pack_dot3(value: float) -> bytes:
"""Упаковка данных типа DEC_DOT3."""
return _encode_dot_u(value * 1000)
def _decode_dot_u(value: bytes, index: int) -> int:
"""Распаковка данных типа DEC_DOTi."""
if index is not None:
value = value[:-2]
return int(hexlify(value).decode())
def unpack_dot0(value: bytes, index: int) -> int:
"""Распаковка данных типа DEC_DOT0."""
return _decode_dot_u(value, index)
def unpack_dot3(value: bytes, index: int) -> float:
"""Распаковка данных типа DEC_DOT3."""
return _decode_dot_u(value, index) / 1000.0
OWEN_TYPE = {"F32+T": {"pack": pack_f32t, "unpack": unpack_f32t},
"SDOT": {"pack": pack_sdot, "unpack": unpack_sdot},
"DOT0": {"pack": pack_dot0, "unpack": unpack_dot0},
"DOT3": {"pack": pack_dot3, "unpack": unpack_dot3},
"F32": {"pack": pack_f32, "unpack": unpack_f32},
"F24": {"pack": pack_f24, "unpack": unpack_f24},
"U16": {"pack": pack_u16, "unpack": unpack_u16},
"I16": {"pack": pack_i16, "unpack": unpack_i16},
"U32": {"pack": pack_u32, "unpack": unpack_u32},
"I32": {"pack": pack_i32, "unpack": unpack_i32},
"U8": {"pack": pack_u8, "unpack": unpack_u8},
"I8": {"pack": pack_i8, "unpack": unpack_i8},
"U24": {"pack": pack_u24, "unpack": unpack_u24}, # для N.err
"STR": {"pack": pack_str, "unpack": unpack_str}}

1244
owen/device.py Normal file

File diff suppressed because it is too large Load Diff

151
owen/protocol.py Normal file
View File

@@ -0,0 +1,151 @@
#! /usr/bin/env python3
"""Реализация протокола взаимодействия ОВЕН."""
from __future__ import annotations
import logging
from functools import reduce
from struct import error, unpack
from owen.converter import OWEN_TYPE
_logger = logging.getLogger(__name__)
_logger.addHandler(logging.NullHandler())
HEADER = ord("#")
FOOTER = ord("\r")
OWEN_ASCII = {"0": 0, "1": 2, "2": 4, "3": 6, "4": 8,
"5": 10, "6": 12, "7": 14, "8": 16, "9": 18,
"A": 20, "B": 22, "C": 24, "D": 26, "E": 28,
"F": 30, "G": 32, "H": 34, "I": 36, "J": 38,
"K": 40, "L": 42, "M": 44, "N": 46, "O": 48,
"P": 50, "Q": 52, "R": 54, "S": 56, "T": 58,
"U": 60, "V": 62, "W": 64, "X": 66, "Y": 68,
"Z": 70, "-": 72, "_": 74, "/": 76, " ": 78}
class OwenError(Exception):
pass
class Owen:
"""Класс, описывающий протокол ОВЕН."""
def __init__(self, unit: int, addr_len_8: bool) -> None:
"""Инициализация класса клиента с указанными параметрами."""
self.unit = unit
self.addr_len_8 = addr_len_8
@staticmethod
def fast_calc(value: int, crc: int, bits: int) -> int:
"""Вычисление значения полинома."""
return reduce(lambda crc, i: crc << 1 & 0xFFFF ^ (0x8F57
if (value << i ^ crc >> 8) & 0x80 else 0), range(bits), crc)
def owen_crc16(self, packet: tuple[int, ...]) -> int:
"""Вычисление контрольной суммы."""
return reduce(lambda crc, val: self.fast_calc(val, crc, 8), packet, 0)
def owen_hash(self, packet: tuple[int, ...]) -> int:
"""Вычисление hash-функции."""
return reduce(lambda crc, val: self.fast_calc(val << 1, crc, 7), packet, 0)
@staticmethod
def name2code(name: str) -> tuple[int, ...]:
"""Преобразование локального идентификатора в числовой код."""
code: list[int] = reduce(lambda x, ch: [*x[:-1], x[-1] + 1] if ch == "."
else [*x, OWEN_ASCII[ch]], name.upper(), [])
return (*code, *[OWEN_ASCII[" "]] * (4 - len(code)))
@staticmethod
def encode_frame(frame: tuple[int, ...]) -> bytes:
"""Преобразование пакета из числового вида в строковый."""
chars = ([71 + (num >> 4), 71 + (num & 0xF)] for num in frame)
return bytes([HEADER, *sum(chars, []), FOOTER])
@staticmethod
def decode_frame(frame: bytes) -> tuple[int, ...]:
"""Преобразование пакета из строкового вида в числовой."""
pairs = zip(*[iter(frame[1:-1])] * 2)
return tuple((i - 71 << 4) + (j - 71 & 0xF) for i, j in pairs)
@staticmethod
def pack_value(frmt: str, value: float | str | None) -> bytes:
"""Упаковка данных заданного формата."""
return b"" if value is None else OWEN_TYPE[frmt]["pack"](value)
@staticmethod
def unpack_value(frmt: str, value: bytes, index: int | None) -> float | str:
"""Распаковка данных заданного формата."""
try:
return OWEN_TYPE[frmt]["unpack"](value, index)
except error:
errcode = OWEN_TYPE["U8"]["unpack"](value, index)
msg = f"Device error={errcode:02X}"
raise OwenError(msg) from None
def make_packet(self, flag: int, name: str, index: int | None,
data: bytes) -> bytes:
"""Формирование пакета для записи."""
addr0, addr1 = (self.unit & 0xFF, 0) if self.addr_len_8 else \
(self.unit >> 3 & 0xFF, (self.unit & 0x07) << 5)
if index is not None:
data = bytes([*data, *index.to_bytes(2, "big")])
cmd = self.owen_hash(self.name2code(name))
frame = (addr0, addr1 | flag << 4 | len(data), *cmd.to_bytes(2, "big"), *data)
crc = self.owen_crc16(frame)
packet = self.encode_frame((*frame, *crc.to_bytes(2, "big")))
_logger.debug("Send param: address=%d, flag=%d, size=%d, cmd=%04X, "
"index=%s, data=%s, crc=%04X", self.unit, flag, len(data),
cmd, index, tuple(data), crc)
_logger.debug("Send frame: %r, size=%d", packet, len(packet))
return packet
def parse_response(self, packet: bytes, answer: bytes) -> bytes:
"""Расшифровка прочитанного пакета."""
_logger.debug("Recv frame: %r, size=%d", answer, len(answer))
if not answer or answer[0] != HEADER or answer[-1] != FOOTER:
msg = "Invalid message format"
raise OwenError(msg)
frame = self.decode_frame(answer)
address = frame[0] if self.addr_len_8 else frame[0] << 3 | frame[1] >> 5
flag = frame[1] >> 4 & 1
size = frame[1] & 0xF
cmd, *data, crc = unpack(f">H{size}BH", bytes(frame[2:]))
_logger.debug("Recv param: address=%d, flag=%d, size=%d, cmd=%04X, data=%s, "
"crc=%04X", address, flag, size, cmd, tuple(data), crc)
if self.owen_crc16(frame[:-2]) != crc:
msg = "Checksum error"
raise OwenError(msg)
if address != self.unit:
msg = "Sender and receiver addresses mismatch"
raise OwenError(msg)
if packet[7:9] != answer[7:9]: # hash mismatch
msg = "Network error={:02X}, hash={:02X}{:02X}".format(*data)
raise OwenError(msg)
return bytes(data)
__all__ = ["Owen"]

27
setup.py Normal file
View File

@@ -0,0 +1,27 @@
#! /usr/bin/env python3
from setuptools import setup
setup(name="python-owen",
version="0.4.2",
description="OWEN controllers library",
url="https://github.com/RAA80/python-owen",
author="Alexey Ryadno",
author_email="aryadno@mail.ru",
license="MIT",
packages=["owen"],
install_requires=["pymodbus < 3", "pyserial >= 3.4"],
platforms=["Linux", "Windows"],
classifiers=["Development Status :: 4 - Beta",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: Microsoft :: Windows",
"Operating System :: POSIX :: Linux",
"Operating System :: POSIX",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
)

153
test/test_client.py Normal file
View File

@@ -0,0 +1,153 @@
#! /usr/bin/env python3
import unittest
from unittest.mock import MagicMock, patch
from pymodbus.client.sync import ModbusSerialClient
from pymodbus.pdu import ModbusResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse
from serial import Serial
from owen.client import ClientMixin, OwenError, OwenModbusClient, OwenSerialClient
from owen.device import TRM201
class FakeOwenSerialClient(OwenSerialClient):
def bus_exchange(self, packet: bytes) -> bytes:
return {b"#GHHGTMOHHRTO\r": b"#GHGMTMOHJHJGJISSTGTIPLKK\r", # чтение параметра "DEV" тип "STR"
b"#GHHGHUTIKGJI\r": b"#GHGHHUTIGGJKGK\r", # чтение параметра "A.LEN" тип "U8" без индекса
b"#GHHIRJURGGGGHQIV\r": b"#GHGJRJURGHGGGGQROU\r", # чтение параметра "DP" тип "U8" с индексом
b"#GHHGPVMIJIMK\r": b"#GHGIPVMIGGGHNHIR\r", # чтение параметра "ADDR" тип "U16" без индекса
b"#GHHGROTVJNPQ\r": b"#GHGJROTVKIQJIOOJKN\r", # чтение параметра "PV" тип "F24" без индекса
b"#GHHIUSIGGGGGTJIT\r": b"#GHGLUSIGKKJROGGGGGPVUS\r", # чтение параметра "SL.H" тип "F24" с индексом
b"#GHHGGIJJRIQN\r": b"#GHGJGIJJKNRKMLLNJK\r", # чтение параметра "N.ERR" тип "U24"
b"#GHGHHUTIGGJKGK\r": b"#GHGHHUTIGGJKGK\r", # запись параметра "A.LEN" тип "U8" без индекса
b"#GHGJQLQRGHGGGGPNOJ\r": b"#GHGJQLQRGHGGGGPNOJ\r", # запись параметра "CMP" тип "U8" с индексом
b"#GHGIPVMIGGGHNHIR\r": b"#GHGIPVMIGGGHNHIR\r", # запись параметра "ADDR" тип "U16" без индекса
b"#GHGJPPKMGGGGGGQMGJ\r": b"#GHGJPPKMGGGGGGQMGJ\r", # запись параметра "R.OUT" тип "F24" без индекса
b"#GHGLUSIGKKJROGGGGGPVUS\r": b"#GHGLUSIGKKJROGGGGGPVUS\r", # запись параметра "SL.H" тип "F24" с индексом
b"#GHGGGGUPJSUL\r": b"", # запись параметра "INIT" тип "U8" без индекса
}[packet]
class TestClientMixin(unittest.TestCase):
"""The unittest for ClientMixin."""
def setUp(self) -> None:
self.client = ClientMixin()
def tearDown(self) -> None:
del self.client
def test_check_index(self) -> None:
name = "A.LEN"
dev = TRM201["Owen"][name]
# correct index
self.assertEqual(None, self.client.check_index(name=name, dev=dev, index=None))
# invalid index
self.assertRaises(OwenError, lambda: self.client.check_index(name=name, dev=dev, index=1))
def test_check_value(self) -> None:
name = "DEV"
dev = TRM201["Owen"][name]
# correct value
self.assertIsNone(self.client.check_value(name=name, dev=dev, value=None))
# invalid value
self.assertRaises(OwenError, lambda: self.client.check_value(name=name, dev=dev, value=1))
name = "SP"
dev = TRM201["Owen"][name]
# correct value
self.assertTrue(10.0, self.client.check_value(name=name, dev=dev, value=10.0))
# invalid value (> max)
self.assertRaises(OwenError, lambda: self.client.check_value(name=name, dev=dev, value=10000))
# invalid value (< min)
self.assertRaises(OwenError, lambda: self.client.check_value(name=name, dev=dev, value=-10000))
# invalid value
self.assertRaises(OwenError, lambda: self.client.check_value(name=name, dev=dev, value=None))
class TestOwenSerialClient(unittest.TestCase):
"""The unittest for OwenSerialClient."""
@patch("serial.Serial", autospec=True)
def setUp(self, mock_serial: Serial) -> None:
self.client = FakeOwenSerialClient(transport=mock_serial, device=TRM201,
unit=1, addr_len_8=True)
def tearDown(self) -> None:
del self.client
def test_get_param(self) -> None:
# correct index
self.assertEqual(0, self.client.get_param(name="A.LEN", index=None))
self.assertEqual("ТРМ201", self.client.get_param(name="DEV", index=None))
self.assertEqual(1, self.client.get_param(name="DP", index=0))
self.assertEqual(1, self.client.get_param(name="ADDR", index=None))
self.assertEqual(81.578125, self.client.get_param(name="PV", index=0))
self.assertEqual(750.0, self.client.get_param(name="SL.H", index=0))
self.assertEqual((71, 46181), self.client.get_param(name="N.ERR", index=None))
# invalid index
self.assertRaises(OwenError, lambda: self.client.get_param(name="A.LEN", index=2))
def test_set_param(self) -> None:
# correct index and value
self.assertTrue(self.client.set_param(name="A.LEN", index=None, value=0))
self.assertTrue(self.client.set_param(name="CMP", index=0, value=1))
self.assertTrue(self.client.set_param(name="ADDR", index=None, value=1))
self.assertTrue(self.client.set_param(name="R.OUT", index=None, value=0.0))
self.assertTrue(self.client.set_param(name="SL.H", index=0, value=750.0))
self.assertRaises(OwenError, lambda: self.client.set_param(name="INIT", index=None, value=None))
# invalid index
self.assertRaises(OwenError, lambda: self.client.set_param(name="A.LEN", index=2, value=0))
# invalid value
self.assertRaises(OwenError, lambda: self.client.set_param(name="A.LEN", index=None, value=2))
self.assertRaises(OwenError, lambda: self.client.set_param(name="INIT", index=None, value=1))
class TestOwenModbusClient(unittest.TestCase):
"""The unittest for OwenModbusClient."""
@patch("pymodbus.client.sync.ModbusSerialClient", autospec=True)
def setUp(self, mock_modbus: ModbusSerialClient) -> None:
self.client = OwenModbusClient(transport=mock_modbus, device=TRM201, unit=1)
def tearDown(self) -> None:
del self.client
def test_check_error(self) -> None:
err = MagicMock(ModbusResponse)
err.isError.return_value = False
self.assertTrue(self.client.check_error(retcode=err))
err.isError.return_value = True
self.assertRaises(OwenError, lambda: self.client.check_error(retcode=err))
def test_set_param(self) -> None:
# correct index and value
value = 20.0
self.client.modify_value = MagicMock(return_value=value)
self.client.socket.write_registers = MagicMock(return_value=WriteMultipleRegistersResponse(1, 2))
self.assertTrue(self.client.set_param(name="SP", index=0, value=value))
# invalid index
self.assertRaises(OwenError, lambda: self.client.set_param(name="SP", index=2, value=value))
# invalid value
self.assertRaises(OwenError, lambda: self.client.set_param(name="SP", index=0, value=None))
def test_get_param(self) -> None:
# correct index
self.client._read = MagicMock(return_value=bytearray([0x1, 0x3, 0x2, 0x0, 0x1, 0x79, 0x84]))
self.client.modify_value = MagicMock(return_value=20.0)
self.assertEqual(20.0, self.client.get_param(name="SP", index=0))
# invalid index
self.assertRaises(OwenError, lambda: self.client.get_param(name="SP", index=2))
if __name__ == "__main__":
unittest.main()

158
test/test_protocol.py Normal file
View File

@@ -0,0 +1,158 @@
#! /usr/bin/env python3
import unittest
from owen.protocol import Owen, OwenError
class TestOwenProtocol(unittest.TestCase):
"""The unittest for Owen protocol."""
def setUp(self) -> None:
self.trm = Owen(unit=1, addr_len_8=True)
self.trm11 = Owen(unit=400, addr_len_8=False)
def tearDown(self) -> None:
del self.trm
del self.trm11
def test_fast_calc(self) -> None:
self.assertEqual(20158, self.trm.fast_calc(84, 159, 7))
self.assertEqual(5565, self.trm.fast_calc(18, 36695, 8))
self.assertEqual(53661, self.trm.fast_calc(71, 34988, 8))
self.assertEqual(60031, self.trm.fast_calc(72, 0, 7))
self.assertEqual(64238, self.trm.fast_calc(156, 23651, 7))
self.assertIsInstance(self.trm.fast_calc(156, 23651, 7), int)
def test_owen_crc16(self) -> None:
self.assertEqual(16434, self.trm.owen_crc16((1, 16, 30, 210)))
self.assertEqual(44267, self.trm.owen_crc16((1, 18, 200, 128, 0, 0)))
self.assertEqual(23007, self.trm.owen_crc16((1, 5, 225, 125, 195, 71, 230, 0, 0)))
self.assertEqual(40940, self.trm.owen_crc16((1, 5, 236, 32, 68, 59, 128, 0, 0)))
self.assertEqual(59803, self.trm.owen_crc16((1, 8, 45, 91, 52, 48, 48, 48, 46, 51, 48, 86)))
self.assertEqual(15584, self.trm.owen_crc16((1, 16, 232, 196)))
self.assertEqual(38212, self.trm.owen_crc16((1, 6, 214, 129, 49, 48, 50, 204, 208, 210)))
self.assertIsInstance(self.trm.owen_crc16((1, 6, 214, 129, 49, 48, 50, 204, 208, 210)), int)
def test_owen_hash(self) -> None:
self.assertEqual(7890, self.trm.owen_hash((21, 42, 28, 46)))
self.assertEqual(60448, self.trm.owen_hash((56, 43, 34, 78)))
self.assertEqual(47327, self.trm.owen_hash((50, 62, 78, 78)))
self.assertEqual(39238, self.trm.owen_hash((55, 48, 60, 58)))
self.assertEqual(13800, self.trm.owen_hash((48, 78, 78, 78)))
self.assertEqual(46941, self.trm.owen_hash((25, 56, 51, 48)))
self.assertEqual(64104, self.trm.owen_hash((24, 38, 73, 24)))
self.assertEqual(11410, self.trm.owen_hash((28, 62, 72, 2)))
self.assertEqual(233, self.trm.owen_hash((36, 46, 36, 58)))
self.assertIsInstance(self.trm.owen_hash((36, 46, 36, 58)), int)
def test_name2code(self) -> None:
self.assertEqual((21, 42, 28, 46), self.trm.name2code("A.LEN"))
self.assertEqual((56, 43, 34, 78), self.trm.name2code("SL.H"))
self.assertEqual((50, 62, 78, 78), self.trm.name2code("PV"))
self.assertEqual((55, 48, 60, 58), self.trm.name2code("R.OUT"))
self.assertEqual((48, 78, 78, 78), self.trm.name2code("O"))
self.assertEqual((25, 56, 51, 48), self.trm.name2code("C.SP.O"))
self.assertEqual((24, 38, 73, 24), self.trm.name2code("CJ-.C"))
self.assertEqual((28, 62, 72, 2), self.trm.name2code("EV-1"))
self.assertEqual((36, 46, 36, 58), self.trm.name2code("INIT"))
self.assertIsInstance(self.trm.name2code("INIT"), tuple)
def test_encode_frame(self) -> None:
self.assertEqual(b"#GHHGHUTIKGJI\r", self.trm.encode_frame((1, 16, 30, 210, 64, 50)))
self.assertEqual(b"#GHGHHUTIGGJKGK\r", self.trm.encode_frame((1, 1, 30, 210, 0, 52, 4)))
self.assertEqual(b"#GHHISOOGGGGGQSUR\r", self.trm.encode_frame((1, 18, 200, 128, 0, 0, 172, 235)))
self.assertEqual(b"#GHGJSOOGGGGGGGUQRK\r", self.trm.encode_frame((1, 3, 200, 128, 0, 0, 0, 234, 180)))
self.assertEqual(b"#GHHIPHGNGGGGKKPV\r", self.trm.encode_frame((1, 18, 145, 7, 0, 0, 68, 159)))
self.assertEqual(b"#GHGLPHGNKHSOGGGGGGJOMV\r", self.trm.encode_frame((1, 5, 145, 7, 65, 200, 0, 0, 0, 56, 111)))
self.assertEqual(b"#GHHGHIGJUIMK\r", self.trm.encode_frame((1, 16, 18, 3, 226, 100)))
self.assertEqual(b"#GHGHHIGJGGIHHO\r", self.trm.encode_frame((1, 1, 18, 3, 0, 33, 24)))
self.assertIsInstance(self.trm.encode_frame((1, 1, 18, 3, 0, 33, 24)), bytes)
def test_decode_frame(self) -> None:
self.assertEqual((1, 1, 30, 210, 0, 52, 4), self.trm.decode_frame(b"#GHGHHUTIGGJKGK\r"))
self.assertEqual((1, 3, 200, 128, 0, 0, 0, 234, 180), self.trm.decode_frame(b"#GHGJSOOGGGGGGGUQRK\r"))
self.assertEqual((1, 5, 57, 243, 0, 0, 0, 0, 0, 11, 51), self.trm.decode_frame(b"#GHGLJPVJGGGGGGGGGGGRJJ\r"))
self.assertEqual((1, 5, 225, 125, 195, 71, 230, 0, 0, 89, 223), self.trm.decode_frame(b"#GHGLUHNTSJKNUMGGGGLPTV\r"))
self.assertEqual((1, 8, 45, 91, 52, 48, 48, 48, 46, 51, 48, 86, 233, 155), self.trm.decode_frame(b"#GHGOITLRJKJGJGJGIUJJJGLMUPPR\r"))
self.assertEqual((1, 3, 180, 101, 0, 0, 0, 9, 1), self.trm.decode_frame(b"#GHGJRKMLGGGGGGGPGH\r"))
self.assertEqual((1, 3, 2, 51, 71, 180, 101, 87, 52), self.trm.decode_frame(b"#GHGJGIJJKNRKMLLNJK\r"))
self.assertEqual((1, 3, 2, 51, 71, 100, 234, 99, 78), self.trm.decode_frame(b"#GHGJGIJJKNMKUQMJKU\r"))
self.assertEqual((1, 1, 30, 37, 20, 126, 6), self.trm.decode_frame(b"#GHGHHUILHKNUGM\r"))
self.assertIsInstance(self.trm.decode_frame(b"#GHGHHUILHKNUGM\r"), tuple)
def test_pack_value(self) -> None:
self.assertEqual(bytes([194, 71, 255, 167, 15, 225]), self.trm.pack_value("F32+T", (-49.99966049194336, 4065)))
self.assertEqual(bytes([66, 246, 233, 223]), self.trm.pack_value("F32", 123.45678))
self.assertEqual(bytes([164, 14]), self.trm.pack_value("SDOT", -10.38))
self.assertEqual(bytes([29, 172]), self.trm.pack_value("SDOT", 350.0))
self.assertEqual(bytes([16, 16, 4]), self.trm.pack_value("SDOT", 410.0))
self.assertEqual(bytes([16]), self.trm.pack_value("SDOT", 0.0))
self.assertEqual(bytes([0]), self.trm.pack_value("DOT0", 0))
self.assertEqual(bytes([153]), self.trm.pack_value("DOT0", 99))
self.assertEqual(bytes([3, 4]), self.trm.pack_value("DOT0", 304))
self.assertEqual(bytes([9, 135, 101, 67, 33]), self.trm.pack_value("DOT0", 987654321))
self.assertEqual(bytes([66, 246, 233]), self.trm.pack_value("F24", 123.45678))
self.assertEqual(bytes([4, 210]), self.trm.pack_value("U16", 1234))
self.assertEqual(bytes([251, 46]), self.trm.pack_value("I16", -1234))
self.assertEqual(bytes([12]), self.trm.pack_value("U8", 12))
self.assertEqual(bytes([244]), self.trm.pack_value("I8", -12))
self.assertEqual(bytes([50, 48, 50, 204, 208, 210]), self.trm.pack_value("STR", "ТРМ202"))
self.assertEqual(b"", self.trm.pack_value("U8", None)) # if empty buffer
self.assertIsInstance(self.trm.pack_value("I8", -12), bytes)
def test_unpack_value(self) -> None:
self.assertEqual((-49.99966049194336, 4065), self.trm.unpack_value("F32+T", bytes([194, 71, 255, 167, 15, 225]), None))
self.assertEqual(123.45677947998047, self.trm.unpack_value("F32", bytes([66, 246, 233, 223]), None))
self.assertEqual(350.0, self.trm.unpack_value("SDOT", bytes([29, 172, 0, 0]), 0))
self.assertEqual(410.0, self.trm.unpack_value("SDOT", bytes([16, 16, 4, 0, 0]), 0))
self.assertEqual(350.0, self.trm.unpack_value("SDOT", bytes([29, 172]), None))
self.assertEqual(410.0, self.trm.unpack_value("SDOT", bytes([16, 16, 4]), None))
self.assertEqual(0.0, self.trm.unpack_value("SDOT", bytes([16, 0, 0]), 0))
self.assertEqual(0.0, self.trm.unpack_value("SDOT", bytes([16]), None))
self.assertEqual(0, self.trm.unpack_value("DOT0", bytes([0]), None))
self.assertEqual(99, self.trm.unpack_value("DOT0", bytes([153]), None))
self.assertEqual(304, self.trm.unpack_value("DOT0", bytes([3, 4]), None))
self.assertEqual(304, self.trm.unpack_value("DOT0", bytes([3, 4, 0, 0]), 0))
self.assertEqual(987654321, self.trm.unpack_value("DOT0", bytes([9, 135, 101, 67, 33]), None))
self.assertEqual(123.455078125, self.trm.unpack_value("F24", bytes([66, 246, 233]), None))
self.assertEqual((71, 46059), self.trm.unpack_value("U24", bytes([71, 179, 235]), None))
self.assertEqual(1234, self.trm.unpack_value("U16", bytes([4, 210]), None))
self.assertEqual(-1234, self.trm.unpack_value("I16", bytes([251, 46]), None))
self.assertEqual(12, self.trm.unpack_value("U8", bytes([12]), None))
self.assertEqual(-12, self.trm.unpack_value("I8", bytes([244]), None))
self.assertEqual("ТРМ202", self.trm.unpack_value("STR", bytes([50, 48, 50, 204, 208, 210]), None))
self.assertRaises(OwenError, lambda: self.trm.unpack_value("F32", bytes([253]), None)) # if error code
def test_make_packet(self) -> None:
self.assertEqual(b"#GHHGHUTIKGJI\r", self.trm.make_packet(1, "A.LEN", None, b""))
self.assertEqual(b"#GHHISOOGGGGGQSUR\r", self.trm.make_packet(1, "DON", 0, b""))
self.assertEqual(b"#GHGLJPVJGGGGGGGGGGGRJJ\r", self.trm.make_packet(0, "FB", 0, bytes([0, 0, 0])))
self.assertEqual(b"#GHGLUHNTSJKNUMGGGGLPTV\r", self.trm.make_packet(0, "SL.L", 0, bytes([195, 71, 230])))
self.assertEqual(b"#GHGHRNIUGGMJSQ\r", self.trm.make_packet(0, "SBIT", None, bytes([0])))
self.assertEqual(b"#GHGLPHGNKHSOGGGGGGJOMV\r", self.trm.make_packet(0, "SP", 0, bytes([65, 200, 0])))
self.assertEqual(b"#GHGHRNMGGORMUL\r", self.trm.make_packet(0, "BPS", None, bytes([8])))
self.assertEqual(b"#JIHIPHGNGGGGJHVJ\r", self.trm11.make_packet(1, "SP", 0, b""))
self.assertEqual(b"#JIGLPHGNKHRHPQGGGGUMHO\r", self.trm11.make_packet(0, "SP", 0, bytes([65, 177, 154])))
self.assertIsInstance(self.trm11.make_packet(0, "SP", 0, bytes([65, 177, 154])), bytes)
def test_parse_response(self) -> None:
self.assertEqual(bytes([0]), self.trm.parse_response(b"#GHHGHUTIKGJI\r", b"#GHGHHUTIGGJKGK\r"))
self.assertEqual(bytes([0, 0, 0]), self.trm.parse_response(b"#GHHISOOGGGGGQSUR\r", b"#GHGJSOOGGGGGGGUQRK\r"))
self.assertEqual(bytes([195, 71, 230, 0, 0]), self.trm.parse_response(b"#GHHIUHNTGGGGPULL\r", b"#GHGLUHNTSJKNUMGGGGLPTV\r"))
self.assertEqual(bytes([52, 48, 48, 48, 46, 51, 48, 86]), self.trm.parse_response(b"#GHHGITLRRKVN\r", b"#GHGOITLRJKJGJGJGIUJJJGLMUPPR\r"))
self.assertEqual(bytes([71, 180, 101]), self.trm.parse_response(b"#GHHGGIJJRIQN\r", b"#GHGJGIJJKNRKMLLNJK\r"))
self.assertEqual(bytes([100]), self.trm.parse_response(b"#GHHGJONIJKMN\r", b"#GHGHJONIMKKIMP\r"))
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHGHUTIKGJI\r", b"")) # if empty message
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHGHUTIKGJI\r", b"GHHGHUTIKGJI\r")) # if first byte not '#'
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHGHUTIKGJI\r", b"#GHHGHUTIKGJI")) # if last byte not '\r'
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHINNRQGGGGRUIR\r", b"#GHGJGIJJKNNNRQPUSV\r")) # if error code
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHIUHNTGGGGPULL\r", b"#GHGLUHNTSJKNUMGGGGLPTD\r")) # if checksum error
self.assertRaises(OwenError, lambda: self.trm.parse_response(b"#GHHGROTVJNPQ\r", b"#IJKJGIJJJHKOKNIJTO\r")) # if addresses mismatch
self.assertIsInstance(self.trm.parse_response(b"#GHHGJONIJKMN\r", b"#GHGHJONIMKKIMP\r"), bytes)
self.assertIsInstance(self.trm.parse_response(b"#GHGLUHNTJVOGGGGGGGQGIG\r", b"#GHGLUHNTJVOGGGGGGGQGIG\r"), bytes)
if __name__ == "__main__":
unittest.main()