読者です 読者をやめる 読者になる 読者になる

サイバーセキュリティプログラミング ―Pythonで学ぶハッカーの思考 3章 勉強まとめ

サイバーセキュリティプログラミング

nnpo.hatenablog.com

続きです。


この章では、パケットの観測を行うスニッファーを書いてみるらしい。

UDPを用いたホスト発見ツールの作成

標的ネットワークで動作中のホストを発見するUDPベースのスニッファーの作成。

閉じているUDPポートにパケットが届いた時の処理方法によって、特定のIPアドレスがホストで稼働しているかを判断するらしい。

具体的に言うと、UDPデータグラム(データグラムというのは、宛先に届かない場合でもユーザに通知しない、信頼できないサービスでのパケットのことらしい)を閉じたポートに送った時に、ICMPメッセージ(Internet Control Message Protocolの略。インターネット・プロトコルのデータグラム処理における誤りの通知や通信に関する情報の通知などのために使用する)が返信された場合、そのホストは稼働しているといえるので判断することが可能。

# -*- coding: utf-8 -*-

import socket
import os

#リッスンするホストのIPアドレス
host = "192.168.11.4"

#rawソケットを作成しパブリックなインタフェースにバインド
if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

#キャプチャ結果にIPヘッダーを含めるように指定
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

#windowsの場合はioctlを使用してプロミスキャスモードを有効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

#単一パケットの読み込み
print(sniffer.recvfrom(65565))

#windowsのプロミスキャスモードを無効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

os.name

実行するOSの名前が分かる。Windowsの場合は"nt"で、Linuxの場合は"posix"と表記される。

公式ドキュメント:

16.1. os — 雑多なオペレーティングシステムインタフェース — Python 3.5.1 ドキュメント

socket_protocol = socket.IPPROTO_IP

socket_protocol = socket.IPPROTO_ICMP

ネットワークインターフェースでパケットを盗聴するために必要なパラメータの指定。

windowsプロトコルによらずすべての入力パケットが盗聴できる。

linuxはICMPを指定しなければならない。

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

ソケットの設定。

AF_INET は、標準的なIPv4 のアドレスやホスト名を使用するための設定。

socket.SOCK_RAWは、受信したデータにTCPヘッダやIPヘッダが含まれる。これによって、ポート番号やIPアドレスを取得できる。

socket_protocolは、先ほど設定したもの

sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

setsockoptは、ソケットのオプションの設定を行う。

sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

ioctlでプロミスキャスモードを有効にする。ioctlは、ユーザースペース(OSのユーザーに近い処理、機能を担う、個々のアプリケーションが稼働する層)のプログラムがカーネルモードのコンポーネント(他のプログラムから呼び出されたり連結されたりして使用されるプログラム部品)とやり取りするための仕組み。

プロミスキャスモードは、同一ネットワーク内を流れるすべてのパケットを受信して読み込むモードのこと。

公式ドキュメント:

18.1. socket — 低水準ネットワークインターフェース — Python 3.5.1 ドキュメント

IPレイヤーのデコード

受信したパケットのIPヘッダー部分をデコードする。

# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

#リッスンするホストのIPアドレス
host = "192.168.11.3"

#IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",          c_uint8, 4),
        ("version",      c_uint8, 4),
        ("tos",          c_uint8, 4),
        ("len",          c_uint16),
        ("id",           c_uint16),
        ("offset",       c_uint16),
        ("ttl",          c_uint8),
        ("protocol_num", c_uint8),
        ("sum",          c_uint16),
        ("src",          c_uint32),
        ("dst",          c_uint32)
    ]

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        #プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        #可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        #可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

#キャプチャ結果にIPヘッダーを含めるように指定
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

#windowsの場合はioctlを使用してプロミスキャスモードを有効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:
    while True:
        #パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]
        #バッファーの最初の20バイトからIPアドレス構造体を作成
        ip_header = IP(raw_buffer[0:20])
        #検出されたプロトコルとホストを出力
        print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address))
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

class IP(Structure):

構造体と共用体は ctypes モジュールに定義されている Structure および Union ベースクラスからの派生クラスでなければならないらしい。ここでは、Structureクラスから派生して、Cライクな構造体を作成する。

公式ドキュメント:

16.16. ctypes — Pythonのための外部関数ライブラリ — Python 3.5.1 ドキュメント

_fields_

構造体のフィールドを定義するシーケンス。要素は2要素タプルか3要素タプルでなければならない。

第一要素はフィールドの名前、第二要素はフィールドの型を指定。

第三要素は、フィールドのビット幅を定義する。

ここで使われている型は、下記表に対応してる。

ctypes の型 C type Python の型
c_bool _Bool bool (1)
c_char char 1文字のバイト列オブジェクト
c_wchar wchar_t 1文字の文字列
c_byte char int
c_ubyte unsigned char int
c_short short int
c_ushort unsigned short int
c_int int int
c_uint unsigned int int
c_long long int
c_ulong unsigned long int
c_longlong __int64 または long long int
c_ulonglong unsigned __int64 または unsigned long long int
c_size_t size_t int
c_ssize_t ssize_t または Py_ssize_t int
c_float float float
c_double double float
c_longdouble long double float
c_char_p char * (NUL 終端) バイト列オブジェクトまたは None
c_wchar_p wchar_t * (NUL 終端) 文字列または None
c_void_p void * 整数または None

引用元: http://docs.python.jp/3/library/ctypes.html#fundamental-data-types

これによって、C言語ライクな構造体が作成できる。

def _new_(self, socket_buffer=None):

self.from_buffer_copy(socket_buffer)

__new__について詳しく分かっていないので、間違っている可能性大です……。

def _new_(cls) は、インスタンス生成時に呼び出される。第一引数はクラスオブジェクト。戻り値がクラスのインスタンスなら、そのインスタンスの_init_メソッドが呼び出される。

ここでは、from_buffer_copyがsocket_bufferを引数にして呼び出されている。

from_buffer_copyは、引数のオブジェクトの読み出し可能バッファをコピーすることで、ctypes のインスタンスを生成する。

これによって、バイナリ型のバッファーが、_fields_に代入されたという解釈でいいのだろうか。

公式ドキュメント:

16.16. ctypes — Pythonのための外部関数ライブラリ — Python 3.5.1 ドキュメント

def _init_(self, socket_buffer=None):

def _init_は、インスタンスの初期化処理で呼ばれる。_new_で他クラスの_init_メソッドが呼ばれていなかったら呼ばれる。

self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))

self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))

socket.inet_ntoaは、引数をIPv4 のドット区切りの 10 進数表記の文字列に変換する。

公式ドキュメント: 18.1. socket — 低水準ネットワークインターフェース — Python 3.5.1 ドキュメント

structモジュールは、Cの構造体(ここでは、_fields_)を変換するもの。

struct.packは、バイナリ変換を行う。第一引数で、フォーマットを決められる。この場合、”< ”でビッグエンディアン(データの上位バイトから並べる)、”L”でunsigned long型(4バイトの符号なし整数(0~2の32乗-1)の値)を指定。self.srcとself.dstは、_fields_のsrcとdstを指している?

ICMPのデコード

上記のコードに、ICMPのデコードを機能を追加する。

# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

#リッスンするホストのIPアドレス
host = "192.168.11.3"

#IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",          c_uint8, 4),
        ("version",      c_uint8, 4),
        ("tos",          c_uint8, 4),
        ("len",          c_uint16),
        ("id",           c_uint16),
        ("offset",       c_uint16),
        ("ttl",          c_uint8),
        ("protocol_num", c_uint8),
        ("sum",          c_uint16),
        ("src",          c_uint32),
        ("dst",          c_uint32)
    ]

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        #プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        #可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        #可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

class ICMP(Structure):
    _fields_ = [
        ("type", c_uint8),
        ("code", c_uint8),
        ("unused", c_uint16),
        ("next_hop_mtu", c_uint16)
        ]
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self, socket_buffer):
        pass

if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

#キャプチャ結果にIPヘッダーを含めるように指定
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

#windowsの場合はioctlを使用してプロミスキャスモードを有効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

try:
    while True:
        #パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]
        #バッファーの最初の20バイトからIPアドレス構造体を作成
        ip_header = IP(raw_buffer[0:20])
        #検出されたプロトコルとホストを出力
        print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address))

        #ICMPであればそれを処理
        if ip_header.protocol == "ICMP":
            #ICMPパケットの位置を計算
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            #ICMP構造体を作成
            icmp_header = ICMP(buf)

            print("ICMP -> Type: {0} Code: {1}".format(icmp_header.type, icmp_header.code))
except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

class ICMP(Structure):

中身はIPクラスとほぼ一緒。

if ip_header.protocol == "ICMP":

こちらから送信したメッセージのIPヘッダーとデータの部分の最初の8バイトが、ICMPメッセージを含んでいるので、IPクラスのip_header.protocolを調べればICMPレスポンスかどうかが判断できる。

offset = ip_header.ihl * 4

buf = raw_buffer[offset:offset + sizeof(ICMP)]

オフセットは、基準となる位置からの差を表している。この場合、得たいデータは、ICMPヘッダ。

| MACヘッダ | IPヘッダ | ICMPヘッダ | データ...|

上記のように通知書式が並んでいるので、IPヘッダの終わりからICMPの終わりまでが欲しい。

IPヘッダーのislフィールドは、IPヘッダーに含まれる4バイトの塊の数を示しているので、これを4倍することでIPヘッダーの大きさが分かる。

なので、このオフセットからオフセットにICMPの大きさを足した長さまでをraw_bufferリストからスライスすればICMPヘッダが得られる。


次に上記のプログラムをサブネット(1つのネットワークを複数に分割したもの)全体に対して実施できるようにする。

# -*- coding: utf-8 -*-

import socket

import os
import struct
from ctypes import *

import threading
import time
from netaddr import IPNetwork, IPAddress

#リッスンするホストのIPアドレス
host = "192.168.11.3"

#標的のサブネット
subnet = "192.168.11.0/24"

#ICMPレスポンスのチェック用マジック文字列
magic_message = "PYTHONRULES!"

#UDPデータグラムをサブネット全体に送信
def udp_sender(subnet, magic_message):
    time.sleep(5)
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    for ip in IPNetwork(subnet):

        try:
            sender.sendto(magic_message.encode(), (str(ip), 65212))
        except:
            pass

#IPヘッダー
class IP(Structure):
    _fields_ = [
        ("ihl",          c_uint8, 4),
        ("version",      c_uint8, 4),
        ("tos",          c_uint8, 4),
        ("len",          c_uint16),
        ("id",           c_uint16),
        ("offset",       c_uint16),
        ("ttl",          c_uint8),
        ("protocol_num", c_uint8),
        ("sum",          c_uint16),
        ("src",          c_uint32),
        ("dst",          c_uint32)
    ]

    def __new__(self, socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        #プロトコルの定数値を名称にマッピング
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}
        #可読なIPアドレスの値に変換
        self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
        #可読なプロトコル名称に変換
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

class ICMP(Structure):
    _fields_ = [
        ("type", c_uint8),
        ("code", c_uint8),
        ("unused", c_uint16),
        ("next_hop_mtu", c_uint16)
        ]
    def __new__(self, socket_buffer):
        return self.from_buffer_copy(socket_buffer)
    def __init__(self, socket_buffer):
        pass

if os.name == "nt":
    socket_protocol = socket.IPPROTO_IP
else:
    socket_protocol = socket.IPPROTO_ICMP

sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)

sniffer.bind((host, 0))

#キャプチャ結果にIPヘッダーを含めるように指定
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

#windowsの場合はioctlを使用してプロミスキャスモードを有効化
if os.name == "nt":
    sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)

#パケットの送信開始
t = threading.Thread(target=udp_sender, args=(subnet, magic_message))
t.start()

try:
    while True:
        #パケットの読み込み
        raw_buffer = sniffer.recvfrom(65565)[0]
        #バッファーの最初の20バイトからIPアドレス構造体を作成
        ip_header = IP(raw_buffer[0:20])
        #検出されたプロトコルとホストを出力
        #print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address))

        #ICMPであればそれを処理
        if ip_header.protocol == "ICMP":
            #ICMPパケットの位置を計算
            offset = ip_header.ihl * 4
            buf = raw_buffer[offset:offset + sizeof(ICMP)]

            #ICMP構造体を作成
            icmp_header = ICMP(buf)

            #print("ICMP -> Type: {0} Code: {1}".format(icmp_header.type, icmp_header.code))

            #コードとタイプが3であるかどうか
            if icmp_header.code == 3 and icmp_header.type == 3:

                #標的サブネットのホストかを確認
                if IPAddress(ip_header.src_address) in IPNetwork(subnet):

                    #マジック文字列を含むか確認
                    if raw_buffer[len(raw_buffer) - len(magic_message):].decode() == magic_message:
                        print("Host Up:{}".format(ip_header.src_address))

except KeyboardInterrupt:
    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

for ip in IPNetwork(subnet):

sender.sendto(magic_message.encode(), (str(ip), 65212))

IPNetworkは、プレフィックス付きのIPアドレスを扱える。ここでは、IPアドレスの配列として使っていて、ipにIPアドレスの値が次々に代入されている。

Netaddrは、ここ

http://goo.gl/1xikmP

を見ました。分かりやすい……!!

magic_messageをサブネットに片っ端に送信して、返ってきたホストは動作していることが分かる……という流れみたい。

if icmp_header.code == 3 and icmp_header.type == 3:

タイプが3のICMPメッセージは、宛先到達不可能クラス。

コードが3のICMPメッセージは、ポート到達不可能エラーが発生したことが分かる。

これによって、冒頭に書いた(閉じているUDPポートにパケットが届いた時の処理方法によって判断する)ことを実装できる。

if IPAddress(ip_header.src_address) in IPNetwork(subnet):

IPAddressは、単純にIPアドレスを表す。

サブネットの中に、ip_header.src_addressが存在しているかを調べている。

if raw_buffer[len(raw_buffer) - len(magic_message):].decode() == magic_message:

udp_senderで送ったmagic_messageは、raw_bufferの最後にのっているので、それを調べている。


分かっていないところも多くあるので、理解できたら追記します。