In IoT, networks are given – although not necessarily constant connected. This means network code and network testing. In my IoT book I have described how I have had great benefit in using “Scapy”. This is a Python module that allows you to generate all sorts of legal and illegal packets. It is very beneficial to run a for-loop with e.g., all lengths that a packet may have – including too short/long. It is also recommended to test illegal packets – e.g., packets where the “length” field states a different length than the actual length. This can be utilized by bad guys for buffer overflows etc.
(For more Python programs – see Downloads Third Edition)
The Scapy concept
The scapy concept is very easy to understand: You design a packet by creating a variable for each layer that you need. In these variables you only set what you want to differ from the default. For e.g. a DHCP message you may set something in the DHCP options, something in the BOOTP part, something in UDP, IP and maybe even in the Ethernet layer. Combining the layers with”/”, the packet is easily generated, and you may ask Scapy to send and wait for an answer. The whole thing is administrated in Python. Here’s a central part of a DHCP-DISCOVER:
dhcp_discover = ( Ether(dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=mac, xid=5678) / DHCP(options=[("message-type","discover"),"end"])) # dump what we plan to send # ls(dhcp_discover) disc_ans = srp1(dhcp_discover,iface=iface)
Installing and running on Windows and Linux
If you already know Python, you can focus on learning Scapy. You may know that you can toy around with Python in an interactive interpreter. In the same way you can fire up a Scapy-aware interpreter and work directly with the layers described above – and many others. Please note that any Scapy script that sends packages – interactive or not – requires administrator-privileges. In windows you run a cmd-window “as administrator”. On Linux you use “sudo”. In order to do the low-level stuff, Scapy needs a library like tcpdump or pcap. If you already have Wireshark installed you are good. BTW – do use Wireshark to debug Python/Scapy.
It’s been some time ago since I last used Scapy, so I was curious as to how it would be to come back to. Unfortunately I must say that it was something of an installation nightmare – on Windows (please read the PS at the end of this sub-chapter for a great solution to this) as well as on a small Linux-target.
Only on a PC-based Linux did it perform straight out of the box – sort of. My advise is that if you can run your tests from a Linux PC – then do it. Apart from the various platforms there is also the choice of which branch of Python/Scapy to use. The current version is 3.6.x, but due to a major change between version 2 and 3, a lot of sites and samples run version 2.7. I went for the newest and it worked. Even on a Linux PC I got caught up in installation hell when I tried to load some exotic support-libraries. My recommendation is therefore to install only standard Python3 with its standard libraries and Python3-Scapy – but no more (e.g. “sudo apt-get install python3 python3-scapy”).
There is an alternative. A good colleague pointed me in the direction of anaconda.com. This site does a great job of creating a downloadable consistent Python-package that actually works. So far I have tested this only on Windows 10 – and not yet with Scapy but with NumPy. It works really great! Not only does the Python work, but there is also a very nice IDE called “Spyder”. Here you can set breakpoints, single-step and inspect variables.
Testing DHCP caused a couple of extra challenges. My first tests ran on a virtual Linux machine on a Windows host. Network-wise the guest OS is connected to the host OS via a NAT. This does not allow broadcasts to go through – and that is exactly what DHCP is. There are workarounds, but I decided to move to a native Linux. Another problem was the Scapy “srp1” command. This means “send and receive until there is 1 answer”. Even when I could see in Wireshark that I indeed did receive an answer, this was not accepted by Scapy as such. Digging into the documentation, I learned that one of the requirements to qualify a received packet as an answer, is that IP source and destination are swapped from the packet sent to the answer received. This is NOT the case in a broadcast. I needed to tell Scapy this:
scapy.all.conf.checkIPaddr = False
A simple line when you know it – as is often the case.
The full script
The rest is more or less administration. The script must be called once with “init <interface>” – where interface is e.g. “eth0” (on Linux). This generates a config file that stores values in between calls. Besides the “init” there is also “discover” and “request”. For now the “Transaction ID” is hard-coded. This could be refined – along with other stuff – but the basics are there. Note that the “srXX” (send-receive) commands come with and without a “p”. The “p” is used when you have edited the “Ether” part – as I did here. Without the “p”, the standard Ethernet layer is used.
Se also https://scapy.readthedocs.io/en/latest/
The full Python/Scapy-script:
#!/usr/bin/env python3 #call ./dhcp.py <function> # where function is one of "init", "discover", "request" # Use iface as parameter for init # Install Python3 and scapy to run this import json #Kill IPv6 warning: import logging logging.getLogger("scapy.runtime").setLevel(logging.ERROR) import scapy from scapy.all import * #discover may be the first def do_discover(): print("In do_discover") # VERY important: Tell scapy that we do not require IP's to be swapped to identify an answer scapy.all.conf.checkIPaddr = False settings = eval(open("dhcp.conf").read()) iface = settings["iface"] mac = get_if_raw_hwaddr(iface) dhcp_discover = ( Ether(dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=mac, xid=5678) / DHCP(options=[("message-type","discover"),"end"])) # dump what we plan to send # ls(dhcp_discover) disc_ans = srp1(dhcp_discover,iface=iface,filter="udp and (port 67 or 68)") # The answer is a DHCP_OFFER - check it out print("Options:", disc_ans[DHCP].options) # print("xID:", disc_ans[BOOTP].xid) # Save the offer to be used in a request settings["serverIP"] = disc_ans[BOOTP].siaddr settings["clientIP"] = disc_ans[BOOTP].yiaddr settings["XID"] = disc_ans[BOOTP].xid with open('dhcp.conf','w') as file: file.write(json.dumps(settings)) return #this does a request without a discover first def do_request(): print("In do_request") # VERY important: Tell scapy that we do not require IP's to be swapped to identify an answer scapy.all.conf.checkIPaddr = False settings = eval(open("dhcp.conf").read()) iface = settings["iface"] mac = get_if_raw_hwaddr(iface) dhcp_request = ( Ether(dst="ff:ff:ff:ff:ff:ff") / IP(src="0.0.0.0", dst="255.255.255.255") / UDP(sport=68, dport=67) / BOOTP(chaddr=mac) / DHCP(options=[("message-type","request"),("server_id",settings["serverIP"]),("requested_addr",settings["clientIP"] ),"end"])) # dump what we plan to send # ls(dhcp_request) ans_req = srp1(dhcp_request,iface=iface,filter="udp and (port 67 or 68)") print("Options:", ans_req[DHCP].options) # print("xID:", ans_req[BOOTP].xid) #this does a discover - then a request def do_discover_request(): print("In do_discover_request - not implemented yet") return def do_none(): print("Try ./dhcp.py help") return def do_init(): try: iface = sys.argv[2] except: print("init must have as second parameter the relevant interface") return settings = {'serverIP': "255.255.255.255" , "clientIP": "0.0.0.0", "XID": "0","iface": iface} with open('dhcp.conf','w') as file: file.write(json.dumps(settings)) return def do_help(): print("Examples of usage:") print(" ./dhcp.py init eth0 Second Param: interface. This is stored in dhcp.conf") print(" sudo ./dhcp.py discover Send a DHCP DISCOVER to the server and get a DHCP_OFFER") print(" sudo ./dhcp.py request Send a DHCP REQUEST to the server and get a DHCP_ACK") action = {'discover': do_discover, 'request': do_request, 'discover_request': do_discover_request, 'init': do_init, 'help': do_help} #main code try: command = sys.argv[1] except: command = "none" action.get(command, do_none)()