Files
python-owen/owen/protocol.py
2026-01-14 21:33:17 +03:00

152 lines
5.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#! /usr/bin/env python3
"""Реализация протокола взаимодействия ОВЕН."""
from __future__ import annotations
import logging
from functools import reduce
from struct import error, unpack
from owen.converter import OWEN_TYPE
_logger = logging.getLogger(__name__)
_logger.addHandler(logging.NullHandler())
HEADER = ord("#")
FOOTER = ord("\r")
OWEN_ASCII = {"0": 0, "1": 2, "2": 4, "3": 6, "4": 8,
"5": 10, "6": 12, "7": 14, "8": 16, "9": 18,
"A": 20, "B": 22, "C": 24, "D": 26, "E": 28,
"F": 30, "G": 32, "H": 34, "I": 36, "J": 38,
"K": 40, "L": 42, "M": 44, "N": 46, "O": 48,
"P": 50, "Q": 52, "R": 54, "S": 56, "T": 58,
"U": 60, "V": 62, "W": 64, "X": 66, "Y": 68,
"Z": 70, "-": 72, "_": 74, "/": 76, " ": 78}
class OwenError(Exception):
pass
class Owen:
"""Класс, описывающий протокол ОВЕН."""
def __init__(self, unit: int, addr_len_8: bool) -> None:
"""Инициализация класса клиента с указанными параметрами."""
self.unit = unit
self.addr_len_8 = addr_len_8
@staticmethod
def fast_calc(value: int, crc: int, bits: int) -> int:
"""Вычисление значения полинома."""
return reduce(lambda crc, i: crc << 1 & 0xFFFF ^ (0x8F57
if (value << i ^ crc >> 8) & 0x80 else 0), range(bits), crc)
def owen_crc16(self, packet: tuple[int, ...]) -> int:
"""Вычисление контрольной суммы."""
return reduce(lambda crc, val: self.fast_calc(val, crc, 8), packet, 0)
def owen_hash(self, packet: tuple[int, ...]) -> int:
"""Вычисление hash-функции."""
return reduce(lambda crc, val: self.fast_calc(val << 1, crc, 7), packet, 0)
@staticmethod
def name2code(name: str) -> tuple[int, ...]:
"""Преобразование локального идентификатора в числовой код."""
code: list[int] = reduce(lambda x, ch: [*x[:-1], x[-1] + 1] if ch == "."
else [*x, OWEN_ASCII[ch]], name.upper(), [])
return (*code, *[OWEN_ASCII[" "]] * (4 - len(code)))
@staticmethod
def encode_frame(frame: tuple[int, ...]) -> bytes:
"""Преобразование пакета из числового вида в строковый."""
chars = ([71 + (num >> 4), 71 + (num & 0xF)] for num in frame)
return bytes([HEADER, *sum(chars, []), FOOTER])
@staticmethod
def decode_frame(frame: bytes) -> tuple[int, ...]:
"""Преобразование пакета из строкового вида в числовой."""
pairs = zip(*[iter(frame[1:-1])] * 2)
return tuple((i - 71 << 4) + (j - 71 & 0xF) for i, j in pairs)
@staticmethod
def pack_value(frmt: str, value: float | str | None) -> bytes:
"""Упаковка данных заданного формата."""
return b"" if value is None else OWEN_TYPE[frmt]["pack"](value)
@staticmethod
def unpack_value(frmt: str, value: bytes, index: int | None) -> float | str:
"""Распаковка данных заданного формата."""
try:
return OWEN_TYPE[frmt]["unpack"](value, index)
except error:
errcode = OWEN_TYPE["U8"]["unpack"](value, index)
msg = f"Device error={errcode:02X}"
raise OwenError(msg) from None
def make_packet(self, flag: int, name: str, index: int | None,
data: bytes) -> bytes:
"""Формирование пакета для записи."""
addr0, addr1 = (self.unit & 0xFF, 0) if self.addr_len_8 else \
(self.unit >> 3 & 0xFF, (self.unit & 0x07) << 5)
if index is not None:
data = bytes([*data, *index.to_bytes(2, "big")])
cmd = self.owen_hash(self.name2code(name))
frame = (addr0, addr1 | flag << 4 | len(data), *cmd.to_bytes(2, "big"), *data)
crc = self.owen_crc16(frame)
packet = self.encode_frame((*frame, *crc.to_bytes(2, "big")))
_logger.debug("Send param: address=%d, flag=%d, size=%d, cmd=%04X, "
"index=%s, data=%s, crc=%04X", self.unit, flag, len(data),
cmd, index, tuple(data), crc)
_logger.debug("Send frame: %r, size=%d", packet, len(packet))
return packet
def parse_response(self, packet: bytes, answer: bytes) -> bytes:
"""Расшифровка прочитанного пакета."""
_logger.debug("Recv frame: %r, size=%d", answer, len(answer))
if not answer or answer[0] != HEADER or answer[-1] != FOOTER:
msg = "Invalid message format"
raise OwenError(msg)
frame = self.decode_frame(answer)
address = frame[0] if self.addr_len_8 else frame[0] << 3 | frame[1] >> 5
flag = frame[1] >> 4 & 1
size = frame[1] & 0xF
cmd, *data, crc = unpack(f">H{size}BH", bytes(frame[2:]))
_logger.debug("Recv param: address=%d, flag=%d, size=%d, cmd=%04X, data=%s, "
"crc=%04X", address, flag, size, cmd, tuple(data), crc)
if self.owen_crc16(frame[:-2]) != crc:
msg = "Checksum error"
raise OwenError(msg)
if address != self.unit:
msg = "Sender and receiver addresses mismatch"
raise OwenError(msg)
if packet[7:9] != answer[7:9]: # hash mismatch
msg = "Network error={:02X}, hash={:02X}{:02X}".format(*data)
raise OwenError(msg)
return bytes(data)
__all__ = ["Owen"]