サイバーセキュリティプログラミング ―Pythonで学ぶハッカーの思考 3章 勉強まとめ
続きです。
この章では、パケットの観測を行うスニッファーを書いてみるらしい。
UDPを用いたホスト発見ツールの作成
標的ネットワークで動作中のホストを発見するUDPベースのスニッファーの作成。
閉じているUDPポートにパケットが届いた時の処理方法によって、特定のIPアドレスがホストで稼働しているかを判断するらしい。
具体的に言うと、UDPデータグラム(データグラムというのは、宛先に届かない場合でもユーザに通知しない、信頼できないサービスでのパケットのことらしい)を閉じたポートに送った時に、ICMPメッセージ(Internet Control Message Protocolの略。インターネット・プロトコルのデータグラム処理における誤りの通知や通信に関する情報の通知などのために使用する)が返信された場合、そのホストは稼働しているといえるので判断することが可能。
# -*- coding: utf-8 -*- import socket import os #リッスンするホストのIPアドレス host = "192.168.11.4" #rawソケットを作成しパブリックなインタフェースにバインド if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) #キャプチャ結果にIPヘッダーを含めるように指定 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #windowsの場合はioctlを使用してプロミスキャスモードを有効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) #単一パケットの読み込み print(sniffer.recvfrom(65565)) #windowsのプロミスキャスモードを無効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
os.name
実行するOSの名前が分かる。Windowsの場合は"nt"で、Linuxの場合は"posix"と表記される。
公式ドキュメント:
16.1. os — 雑多なオペレーティングシステムインタフェース — Python 3.5.1 ドキュメント
socket_protocol = socket.IPPROTO_IP
socket_protocol = socket.IPPROTO_ICMP
ネットワークインターフェースでパケットを盗聴するために必要なパラメータの指定。
windowsはプロトコルによらずすべての入力パケットが盗聴できる。
linuxはICMPを指定しなければならない。
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
ソケットの設定。
AF_INET は、標準的なIPv4 のアドレスやホスト名を使用するための設定。
socket.SOCK_RAWは、受信したデータにTCPヘッダやIPヘッダが含まれる。これによって、ポート番号やIPアドレスを取得できる。
socket_protocolは、先ほど設定したもの
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
setsockoptは、ソケットのオプションの設定を行う。
sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
ioctlでプロミスキャスモードを有効にする。ioctlは、ユーザースペース(OSのユーザーに近い処理、機能を担う、個々のアプリケーションが稼働する層)のプログラムがカーネルモードのコンポーネント(他のプログラムから呼び出されたり連結されたりして使用されるプログラム部品)とやり取りするための仕組み。
プロミスキャスモードは、同一ネットワーク内を流れるすべてのパケットを受信して読み込むモードのこと。
公式ドキュメント:
18.1. socket — 低水準ネットワークインターフェース — Python 3.5.1 ドキュメント
IPレイヤーのデコード
受信したパケットのIPヘッダー部分をデコードする。
# -*- coding: utf-8 -*- import socket import os import struct from ctypes import * #リッスンするホストのIPアドレス host = "192.168.11.3" #IPヘッダー class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8, 4), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num", c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): #プロトコルの定数値を名称にマッピング self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} #可読なIPアドレスの値に変換 self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) #可読なプロトコル名称に変換 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) #キャプチャ結果にIPヘッダーを含めるように指定 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #windowsの場合はioctlを使用してプロミスキャスモードを有効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: #パケットの読み込み raw_buffer = sniffer.recvfrom(65565)[0] #バッファーの最初の20バイトからIPアドレス構造体を作成 ip_header = IP(raw_buffer[0:20]) #検出されたプロトコルとホストを出力 print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address)) except KeyboardInterrupt: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
class IP(Structure):
構造体と共用体は ctypes モジュールに定義されている Structure および Union ベースクラスからの派生クラスでなければならないらしい。ここでは、Structureクラスから派生して、Cライクな構造体を作成する。
公式ドキュメント:
16.16. ctypes — Pythonのための外部関数ライブラリ — Python 3.5.1 ドキュメント
_fields_
構造体のフィールドを定義するシーケンス。要素は2要素タプルか3要素タプルでなければならない。
第一要素はフィールドの名前、第二要素はフィールドの型を指定。
第三要素は、フィールドのビット幅を定義する。
ここで使われている型は、下記表に対応してる。
ctypes の型 | C type | Python の型 |
---|---|---|
c_bool | _Bool | bool (1) |
c_char | char | 1文字のバイト列オブジェクト |
c_wchar | wchar_t | 1文字の文字列 |
c_byte | char | int |
c_ubyte | unsigned char | int |
c_short | short | int |
c_ushort | unsigned short | int |
c_int | int | int |
c_uint | unsigned int | int |
c_long | long | int |
c_ulong | unsigned long | int |
c_longlong | __int64 または long long | int |
c_ulonglong | unsigned __int64 または unsigned long long | int |
c_size_t | size_t | int |
c_ssize_t | ssize_t または Py_ssize_t | int |
c_float | float | float |
c_double | double | float |
c_longdouble | long double | float |
c_char_p | char * (NUL 終端) | バイト列オブジェクトまたは None |
c_wchar_p | wchar_t * (NUL 終端) | 文字列または None |
c_void_p | void * | 整数または None |
引用元: http://docs.python.jp/3/library/ctypes.html#fundamental-data-types
これによって、C言語ライクな構造体が作成できる。
def _new_(self, socket_buffer=None):
self.from_buffer_copy(socket_buffer)
__new__について詳しく分かっていないので、間違っている可能性大です……。
def _new_(cls) は、インスタンス生成時に呼び出される。第一引数はクラスオブジェクト。戻り値がクラスのインスタンスなら、そのインスタンスの_init_メソッドが呼び出される。
ここでは、from_buffer_copyがsocket_bufferを引数にして呼び出されている。
from_buffer_copyは、引数のオブジェクトの読み出し可能バッファをコピーすることで、ctypes のインスタンスを生成する。
これによって、バイナリ型のバッファーが、_fields_に代入されたという解釈でいいのだろうか。
公式ドキュメント:
16.16. ctypes — Pythonのための外部関数ライブラリ — Python 3.5.1 ドキュメント
def _init_(self, socket_buffer=None):
def _init_は、インスタンスの初期化処理で呼ばれる。_new_で他クラスの_init_メソッドが呼ばれていなかったら呼ばれる。
self.src_address = socket.inet_ntoa(struct.pack("<L", self.src))
self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst))
socket.inet_ntoaは、引数をIPv4 のドット区切りの 10 進数表記の文字列に変換する。
公式ドキュメント: 18.1. socket — 低水準ネットワークインターフェース — Python 3.5.1 ドキュメント
structモジュールは、Cの構造体(ここでは、_fields_)を変換するもの。
struct.packは、バイナリ変換を行う。第一引数で、フォーマットを決められる。この場合、”< ”でビッグエンディアン(データの上位バイトから並べる)、”L”でunsigned long型(4バイトの符号なし整数(0~2の32乗-1)の値)を指定。self.srcとself.dstは、_fields_のsrcとdstを指している?
ICMPのデコード
上記のコードに、ICMPのデコードを機能を追加する。
# -*- coding: utf-8 -*- import socket import os import struct from ctypes import * #リッスンするホストのIPアドレス host = "192.168.11.3" #IPヘッダー class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8, 4), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num", c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): #プロトコルの定数値を名称にマッピング self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} #可読なIPアドレスの値に変換 self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) #可読なプロトコル名称に変換 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) class ICMP(Structure): _fields_ = [ ("type", c_uint8), ("code", c_uint8), ("unused", c_uint16), ("next_hop_mtu", c_uint16) ] def __new__(self, socket_buffer): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer): pass if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) #キャプチャ結果にIPヘッダーを含めるように指定 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #windowsの場合はioctlを使用してプロミスキャスモードを有効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) try: while True: #パケットの読み込み raw_buffer = sniffer.recvfrom(65565)[0] #バッファーの最初の20バイトからIPアドレス構造体を作成 ip_header = IP(raw_buffer[0:20]) #検出されたプロトコルとホストを出力 print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address)) #ICMPであればそれを処理 if ip_header.protocol == "ICMP": #ICMPパケットの位置を計算 offset = ip_header.ihl * 4 buf = raw_buffer[offset:offset + sizeof(ICMP)] #ICMP構造体を作成 icmp_header = ICMP(buf) print("ICMP -> Type: {0} Code: {1}".format(icmp_header.type, icmp_header.code)) except KeyboardInterrupt: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
class ICMP(Structure):
中身はIPクラスとほぼ一緒。
if ip_header.protocol == "ICMP":
こちらから送信したメッセージのIPヘッダーとデータの部分の最初の8バイトが、ICMPメッセージを含んでいるので、IPクラスのip_header.protocolを調べればICMPレスポンスかどうかが判断できる。
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + sizeof(ICMP)]
オフセットは、基準となる位置からの差を表している。この場合、得たいデータは、ICMPヘッダ。
| MACヘッダ | IPヘッダ | ICMPヘッダ | データ...|
上記のように通知書式が並んでいるので、IPヘッダの終わりからICMPの終わりまでが欲しい。
IPヘッダーのislフィールドは、IPヘッダーに含まれる4バイトの塊の数を示しているので、これを4倍することでIPヘッダーの大きさが分かる。
なので、このオフセットからオフセットにICMPの大きさを足した長さまでをraw_bufferリストからスライスすればICMPヘッダが得られる。
次に上記のプログラムをサブネット(1つのネットワークを複数に分割したもの)全体に対して実施できるようにする。
# -*- coding: utf-8 -*- import socket import os import struct from ctypes import * import threading import time from netaddr import IPNetwork, IPAddress #リッスンするホストのIPアドレス host = "192.168.11.3" #標的のサブネット subnet = "192.168.11.0/24" #ICMPレスポンスのチェック用マジック文字列 magic_message = "PYTHONRULES!" #UDPデータグラムをサブネット全体に送信 def udp_sender(subnet, magic_message): time.sleep(5) sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) for ip in IPNetwork(subnet): try: sender.sendto(magic_message.encode(), (str(ip), 65212)) except: pass #IPヘッダー class IP(Structure): _fields_ = [ ("ihl", c_uint8, 4), ("version", c_uint8, 4), ("tos", c_uint8, 4), ("len", c_uint16), ("id", c_uint16), ("offset", c_uint16), ("ttl", c_uint8), ("protocol_num", c_uint8), ("sum", c_uint16), ("src", c_uint32), ("dst", c_uint32) ] def __new__(self, socket_buffer=None): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer=None): #プロトコルの定数値を名称にマッピング self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"} #可読なIPアドレスの値に変換 self.src_address = socket.inet_ntoa(struct.pack("<L", self.src)) self.dst_address = socket.inet_ntoa(struct.pack("<L", self.dst)) #可読なプロトコル名称に変換 try: self.protocol = self.protocol_map[self.protocol_num] except: self.protocol = str(self.protocol_num) class ICMP(Structure): _fields_ = [ ("type", c_uint8), ("code", c_uint8), ("unused", c_uint16), ("next_hop_mtu", c_uint16) ] def __new__(self, socket_buffer): return self.from_buffer_copy(socket_buffer) def __init__(self, socket_buffer): pass if os.name == "nt": socket_protocol = socket.IPPROTO_IP else: socket_protocol = socket.IPPROTO_ICMP sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol) sniffer.bind((host, 0)) #キャプチャ結果にIPヘッダーを含めるように指定 sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1) #windowsの場合はioctlを使用してプロミスキャスモードを有効化 if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON) #パケットの送信開始 t = threading.Thread(target=udp_sender, args=(subnet, magic_message)) t.start() try: while True: #パケットの読み込み raw_buffer = sniffer.recvfrom(65565)[0] #バッファーの最初の20バイトからIPアドレス構造体を作成 ip_header = IP(raw_buffer[0:20]) #検出されたプロトコルとホストを出力 #print("Protocol:{0}:{1} -> {2}".format(ip_header.protocol, ip_header.src_address, ip_header.dst_address)) #ICMPであればそれを処理 if ip_header.protocol == "ICMP": #ICMPパケットの位置を計算 offset = ip_header.ihl * 4 buf = raw_buffer[offset:offset + sizeof(ICMP)] #ICMP構造体を作成 icmp_header = ICMP(buf) #print("ICMP -> Type: {0} Code: {1}".format(icmp_header.type, icmp_header.code)) #コードとタイプが3であるかどうか if icmp_header.code == 3 and icmp_header.type == 3: #標的サブネットのホストかを確認 if IPAddress(ip_header.src_address) in IPNetwork(subnet): #マジック文字列を含むか確認 if raw_buffer[len(raw_buffer) - len(magic_message):].decode() == magic_message: print("Host Up:{}".format(ip_header.src_address)) except KeyboardInterrupt: if os.name == "nt": sniffer.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
for ip in IPNetwork(subnet):
sender.sendto(magic_message.encode(), (str(ip), 65212))
IPNetworkは、プレフィックス付きのIPアドレスを扱える。ここでは、IPアドレスの配列として使っていて、ipにIPアドレスの値が次々に代入されている。
Netaddrは、ここ
を見ました。分かりやすい……!!
magic_messageをサブネットに片っ端に送信して、返ってきたホストは動作していることが分かる……という流れみたい。
if icmp_header.code == 3 and icmp_header.type == 3:
タイプが3のICMPメッセージは、宛先到達不可能クラス。
コードが3のICMPメッセージは、ポート到達不可能エラーが発生したことが分かる。
これによって、冒頭に書いた(閉じているUDPポートにパケットが届いた時の処理方法によって判断する)ことを実装できる。
if IPAddress(ip_header.src_address) in IPNetwork(subnet):
IPAddressは、単純にIPアドレスを表す。
サブネットの中に、ip_header.src_addressが存在しているかを調べている。
if raw_buffer[len(raw_buffer) - len(magic_message):].decode() == magic_message:
udp_senderで送ったmagic_messageは、raw_bufferの最後にのっているので、それを調べている。
分かっていないところも多くあるので、理解できたら追記します。