암시적 메시징의 아키텍처: Python에서 Raw CIP I/O 구현
Source: Dev.to
Class 1 I/O의 도전 과제
Ethernet/IP (EIP)는 Common Industrial Protocol (CIP)을 기반으로 하며, 두 가지 주요 메시징 유형을 정의합니다:
- Explicit Messaging (TCP 44818) – 요청/응답 방식으로, 구성 및 진단에 사용됩니다.
- Implicit Messaging (UDP 2222) – 순환 I/O 방식으로, 고속·반복 데이터 교환 (Class 1 Connections)에 사용됩니다.
구조적인 도전 과제는 상용 드라이버 대신 원시 소켓을 사용해 이러한 복잡한 연결을 설정할 때 자원 경쟁과 시간 결정성을 어떻게 관리하느냐에 있습니다.
🏗️ 4단계 연결 순서
견고한 구현을 위해서는 TCP 설정과 이후 UDP I/O 수명 주기를 신중히 관리해야 합니다. 우리의 아키텍처는 순환 프로세스 내에서 다음 네 단계를 따릅니다:
Register Session (TCP)
먼저 대상 장치의 Encapsulation Layer (TCP 44818) 로 Explicit Message 를 전송합니다. 이 단계에서 Session Handle 이 생성되며, 이후 요청들을 식별하는 데 사용됩니다.
Forward Open (TCP)
가장 중요한 단계입니다. 클라이언트는 Class 1 I/O 연결에 필요한 모든 매개변수를 포함한 Forward Open 명령을 보냅니다. 여기에는 다음이 포함됩니다:
- Requested Packet Interval (RPI)
- Connection Path (I/O 데이터용 Assembly Instance ID)
- Connection Timeout 파라미터
장치는 O2T (Originator to Target)와 T2O (Target to Origin) 연결 ID 를 반환합니다.
Cyclic I/O Exchange (UDP)
TCP 로 연결이 확립되면 고속 UDP 2222 로 전환합니다. 클라이언트는 설정된 O2T 연결 ID 로 주기적인 데이터를 전송하고, 장치는 T2O 데이터를 응답합니다. 이를 통해 순환 데이터 업데이트 시 최소 지연을 보장합니다.
Forward Close & Teardown (TCP)
대상 장치가 시간이 지나면서 타임아웃되어 오류를 보고하는 것을 방지하려면, 클라이언트가 Forward Close 명령 (TCP)을 명시적으로 전송해야 합니다. 이렇게 하면 소켓을 닫기 전에 장치가 할당한 자원을 정상적으로 해제합니다.
💻 결정론적 테스트를 위한 아키텍처
이 복잡한 순서를 신뢰성 있게 테스트하기 위해, 우리 연구 키트는 이중 애플리케이션 구조를 사용합니다:
- Raw Socket Client – 전체 TCP·UDP 상태 머신을 구현하고, 순환 Open/Close 과정을 관리합니다.
- Mock PLC Server – 로컬호스트에서 실행되는 별도 Python 애플리케이션으로, TCP 44818 및 UDP 2222 를 청취합니다. 모의 서버는 핸드셰이크 동안 즉각적이고 올바른 응답을 보장함으로써 결정론적인 테스트를 가능하게 하며, 개발자가 물리 계층 잡음이 아닌 논리 오류에 집중하도록 돕습니다.
Python I/O Loop 구조
원시 소켓 구현은 정확한 패킷 조립이 필요합니다. 아래는 순환 UDP 교환을 관리하기 위해 사용되는 개념적 구조입니다:
# Conceptual Structure for I/O Loop
while running:
# 1. Open Session (TCP) & Forward Open (TCP) is performed here...
# 2. UDP Exchange Phase:
try:
# Construct and send O2T packet using raw bytes
udp_socket.sendto(o2t_packet, (target_ip, 2222))
# Receive T2O packet (Target to Origin)
received_data, addr = udp_socket.recvfrom(1024)
# Log and parse the raw data...
except socket.timeout:
log("Warning: UDP I/O Timeout occurred.")
# 3. Forward Close (TCP) & TCP Disconnect is performed here...
time.sleep(RPI_WAIT_TIME)
🔒 결론
CIP의 원시 소켓 구현을 이해하는 것은 산업 사이버 보안, 맞춤형 SCADA 통합, 혹은 외부 라이브러리가 너무 크거나 사용할 수 없는 임베디드 시스템을 다루는 개발자에게 필수적입니다.
우리는 이 프로젝트의 전체 아키텍처와 기술적 인사이트를 GitHub에 문서화했습니다. 완전한, 바로 배포 가능한 Python 소스 코드를 확보하고, 자체 맞춤형 산업 솔루션을 위한 원시 패킷 구조를 탐구하고 싶다면 아래 링크를 확인하십시오: