#! /usr/bin/env python3 """Пример работы с программируемым логическим реле Овен ПР100""" import logging import time import sys from datetime import datetime from pymodbus.client.sync import ModbusSerialClient from pymodbus.exceptions import ModbusException from owen.client import OwenModbusClient from owen.device import PR100 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def read_digital_inputs(pr100: OwenModbusClient, num_inputs: int = 4): """Чтение состояния дискретных входов""" logger.info("Чтение дискретных входов") try: # Читаем все входы одним регистром di_value = pr100.get_param(name="DI") logger.info(f"Регистр дискретных входов: {di_value} (0x{di_value:04X})") # Декодируем отдельные входы for i in range(num_inputs): state = (di_value >> i) & 1 logger.info(f" DI{i+1}: {'ВКЛ' if state else 'ВЫКЛ'}") return True except Exception as e: logger.error(f"Ошибка при чтении дискретных входов: {e}") return False def read_analog_inputs(pr100: OwenModbusClient): """Чтение аналоговых входов""" logger.info("Чтение аналоговых входов") success_count = 0 for i in range(1, 5): try: # Читаем как float value_float = pr100.get_param(name=f"AI{i}") logger.info(f"AI{i} (float): {value_float:.3f}") # Читаем как int с учетом dp value_int = pr100.get_param(name=f"AI{i}.INT") dp = pr100.get_param(name=f"AI{i}.DP") logger.info(f"AI{i} (int): {value_int}, точность: {dp} знаков") success_count += 1 except Exception as e: logger.error(f"Ошибка при чтении AI{i}: {e}") return success_count > 0 def control_outputs(pr100: OwenModbusClient, outputs: dict): """ Управление дискретными выходами Args: pr100: Экземпляр клиента ПР100 outputs: Словарь {номер_выхода: состояние}, например {1: True, 2: False} """ logger.info("Управление дискретными выходами") try: # Читаем текущее состояние current_value = pr100.get_param(name="DO") logger.info(f"Текущее состояние выходов: 0x{current_value:04X}") # Формируем новое значение new_value = current_value for output_num, state in outputs.items(): bit_pos = output_num - 1 if state: new_value |= (1 << bit_pos) # Установить бит else: new_value &= ~(1 << bit_pos) # Сбросить бит # Записываем новое состояние logger.info(f"Запись нового состояния: 0x{new_value:04X}") pr100.set_param(name="DO", value=new_value) # Показываем, что изменилось for output_num, state in outputs.items(): logger.info(f" Q{output_num}: {'ВКЛ' if state else 'ВЫКЛ'}") return True except Exception as e: logger.error(f"Ошибка при управлении выходами: {e}") return False def read_system_time(pr100: OwenModbusClient): """Чтение системного времени ПР100""" logger.info("Чтение системного времени") try: sec = pr100.get_param(name="TIME.SEC") min_val = pr100.get_param(name="TIME.MIN") hour = pr100.get_param(name="TIME.HOUR") day = pr100.get_param(name="TIME.DAY") month = pr100.get_param(name="TIME.MONTH") year = pr100.get_param(name="TIME.YEAR") dow = pr100.get_param(name="TIME.DOW") days_of_week = ["Пн", "Вт", "Ср", "Чт", "Пт", "Сб", "Вс"] logger.info(f"Дата: {day:02d}.{month:02d}.20{year:02d}") logger.info(f"Время: {hour:02d}:{min_val:02d}:{sec:02d}") logger.info(f"День недели: {days_of_week[dow]}") return True except Exception as e: logger.error(f"Ошибка при чтении системного времени: {e}") return False def set_system_time(pr100: OwenModbusClient): """Установка системного времени ПР100 на текущее""" logger.info("Установка системного времени") try: now = datetime.now() pr100.set_param(name="TIME.SEC", value=now.second) pr100.set_param(name="TIME.MIN", value=now.minute) pr100.set_param(name="TIME.HOUR", value=now.hour) pr100.set_param(name="TIME.DAY", value=now.day) pr100.set_param(name="TIME.MONTH", value=now.month) pr100.set_param(name="TIME.YEAR", value=now.year % 100) logger.info(f"Время установлено: {now.strftime('%d.%m.%Y %H:%M:%S')}") return True except Exception as e: logger.error(f"Ошибка при установке времени: {e}") return False def monitoring_loop(pr100: OwenModbusClient, duration: int = 10): """ Цикл мониторинга входов и выходов Args: pr100: Экземпляр клиента ПР100 duration: Длительность мониторинга в секундах """ logger.info(f"Запуск мониторинга на {duration} секунд") start_time = time.time() error_count = 0 max_errors = 5 try: while time.time() - start_time < duration: try: # Читаем входы di = pr100.get_param(name="DI") ai1 = pr100.get_param(name="AI1") do = pr100.get_param(name="DO") logger.info(f"DI: 0x{di:04X}, AI1: {ai1:.2f}, DO: 0x{do:04X}") error_count = 0 # Сброс счетчика при успешном чтении except Exception as e: error_count += 1 logger.error(f"Ошибка чтения (#{error_count}): {e}") if error_count >= max_errors: logger.error(f"Достигнуто максимальное количество ошибок ({max_errors}). Остановка мониторинга.") return False time.sleep(1) return True except KeyboardInterrupt: logger.info("Мониторинг прерван пользователем") return True def test_connection(transport: ModbusSerialClient, unit_id: int) -> bool: """ Проверка соединения с устройством Args: transport: Клиент Modbus unit_id: Адрес устройства Returns: True если соединение успешно, False в противном случае """ try: if not transport.connect(): logger.error("Не удалось установить соединение с портом") return False # Пробуем прочитать регистр для проверки связи result = transport.read_holding_registers(address=0, count=1, unit=unit_id) if result.isError(): logger.error(f"Устройство не отвечает на адресе {unit_id}") return False logger.info("Соединение установлено успешно") return True except Exception as e: logger.error(f"Ошибка при проверке соединения: {e}") return False def main(): logger.info("=" * 60) logger.info("Начало работы с Овен ПР100") logger.info("=" * 60) # Параметры подключения PORT = "COM6" BAUDRATE = 115200 UNIT_ID = 1 # Адрес устройства Modbus transport = None try: logger.info(f"Настройка подключения: {PORT}, {BAUDRATE} бод, адрес {UNIT_ID}") # Создание клиента Modbus transport = ModbusSerialClient( method="rtu", port=PORT, baudrate=BAUDRATE, stopbits=1, parity="N", bytesize=8, timeout=0.5, retry_on_empty=True ) # Проверка соединения if not test_connection(transport, UNIT_ID): logger.error("=" * 60) logger.error("КРИТИЧЕСКАЯ ОШИБКА: Не удалось подключиться к устройству") logger.error(f"Проверьте:") logger.error(f" 1. Правильность имени порта ({PORT})") logger.error(f" 2. Подключение кабеля") logger.error(f" 3. Питание устройства") logger.error(f" 4. Адрес устройства ({UNIT_ID})") logger.error(f" 5. Скорость обмена ({BAUDRATE})") logger.error("=" * 60) return 1 # Создание клиента ПР100 pr100 = OwenModbusClient(transport=transport, device=PR100, unit=UNIT_ID) logger.info("-" * 60) # Чтение дискретных входов read_digital_inputs(pr100, num_inputs=4) logger.info("-" * 60) # Чтение аналоговых входов read_analog_inputs(pr100) logger.info("-" * 60) # Чтение системного времени read_system_time(pr100) logger.info("-" * 60) # Установка системного времени (закомментировано по умолчанию) # if input("Установить текущее время? (y/N): ").lower() == 'y': # set_system_time(pr100) # logger.info("-" * 60) # Управление выходами # ВНИМАНИЕ: Записывается только когда переключатель в положении СТОП! logger.warning("ВНИМАНИЕ: Управление выходами работает только в режиме СТОП!") control_outputs(pr100, {1: True, 2: False, 3: True}) logger.info("-" * 60) # Запуск мониторинга (закомментировано по умолчанию) # monitoring_loop(pr100, duration=30) logger.info("=" * 60) logger.info("Работа завершена успешно") logger.info("=" * 60) return 0 except FileNotFoundError as e: logger.error("=" * 60) logger.error(f"ОШИБКА: Порт {PORT} не найден") logger.error("Доступные порты можно посмотреть с помощью команды:") logger.error(" python -m serial.tools.list_ports") logger.error("=" * 60) return 1 except ModbusException as e: logger.error("=" * 60) logger.error(f"ОШИБКА Modbus: {e}") logger.error("=" * 60) return 1 except KeyboardInterrupt: logger.info("=" * 60) logger.info("Работа прервана пользователем") logger.info("=" * 60) return 0 except Exception as e: logger.error("=" * 60) logger.error(f"НЕОЖИДАННАЯ ОШИБКА: {e}", exc_info=True) logger.error("=" * 60) return 1 finally: if transport is not None: try: transport.close() logger.info("Соединение закрыто") except Exception as e: logger.error(f"Ошибка при закрытии соединения: {e}") if __name__ == "__main__": sys.exit(main())