Source code for switchyard.lib.packet.icmp

import struct
from enum import IntEnum
from ipaddress import IPv4Address

from .packet import PacketHeaderBase,Packet
from .common import checksum, ICMPType, ICMPTypeCodeMap
from ..exceptions import *

'''
References: https://www.ietf.org/rfc/rfc792.txt
            https://tools.ietf.org/html/rfc4884 (extension parameters)
TCP/IP Illustrated, Vol 1.
'''


[docs]class ICMP(PacketHeaderBase): ''' A mother class for all ICMP message types. It holds a reference to another object that contains the specific ICMP data (icmpdata), given a particular ICMP type. Just setting the icmptype causes the data object to change (the change happens automatically when you set the icmptype). The icmpcode field will also change, but it only changes to some valid code given the new icmptype. ''' __slots__ = ('_type', '_code', '_icmpdata', '_valid_types', '_valid_codes_map', '_classtype_from_icmptype', '_icmptype_from_classtype', '_checksum') _PACKFMT = '!BBH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self, **kwargs): self._valid_types = ICMPType self._valid_codes_map = ICMPTypeCodeMap self._classtype_from_icmptype = ICMPClassFromType self._icmptype_from_classtype = ICMPTypeFromClass self._type = self._valid_types.EchoRequest self._code = self._valid_codes_map[self._type].EchoRequest self._icmpdata = ICMPEchoRequest() self._checksum = 0 # make sure that icmptype is set first; this has the # side-effect of also creating the "right" icmpdata object. if 'icmptype' in kwargs: self.icmptype = kwargs.pop('icmptype') # as a convenience, allow kw syntax to set icmpdata values popattr = [] for attr,val in kwargs.items(): if hasattr(self.icmpdata, attr): setattr(self.icmpdata, attr, val) popattr.append(attr) for pattr in popattr: kwargs.pop(pattr) super().__init__(**kwargs) def size(self): return struct.calcsize(ICMP._PACKFMT) + len(self._icmpdata.to_bytes()) def checksum(self): self._checksum = checksum(b''.join( (struct.pack(ICMP._PACKFMT, self._type.value, self._code.value, 0), self._icmpdata.to_bytes()))) return self._checksum def to_bytes(self, dochecksum=True): ''' Return packed byte representation of the UDP header. ''' csum = 0 if dochecksum: csum = self.checksum() return b''.join((struct.pack(ICMP._PACKFMT, self._type.value, self._code.value, csum), self._icmpdata.to_bytes())) def from_bytes(self, raw): if len(raw) < ICMP._MINLEN: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct an ICMP object".format(len(raw))) fields = struct.unpack(ICMP._PACKFMT, raw[:ICMP._MINLEN]) self._type = self._valid_types(fields[0]) self._code = self._valid_codes_map[self.icmptype](fields[1]) self._checksum = fields[2] self._icmpdata = self._classtype_from_icmptype(self._type)() self._icmpdata.from_bytes(raw[ICMP._MINLEN:]) return raw[self.size():] def __eq__(self, other): return self.icmptype == other.icmptype and \ self.icmpcode == other.icmpcode and \ self.icmpdata == other.icmpdata @property def icmptype(self): return self._type @property def icmpcode(self): return self._code @icmptype.setter def icmptype(self, value): if not isinstance(value, self._valid_types): value = self._valid_types(value) # JS: revised following line as above; too restrictive # raise ValueError("ICMP type must be an {} enumeration".format(type(self._valid_types))) cls = self._classtype_from_icmptype(value) if not issubclass(self.icmpdata.__class__, cls): self.icmpdata = cls() self._type = value codes = self._valid_codes_map[value] for code in codes: if code.value == 0: self._code = code break @icmpcode.setter def icmpcode(self,value): if issubclass(value.__class__, IntEnum): validcodes = self._valid_codes_map[self._type] self._check_typecode_consistency(value) self._code = value elif isinstance(value, int): self._code = self._valid_codes_map[self.icmptype](value) def _check_typecode_consistency(self, xcode): validcodes = self._valid_codes_map[self._type] if xcode not in validcodes: raise ValueError("Invalid code {} for type {}".format(xcode, self._type.name, self._type)) def __str__(self): typecode = self.icmptype.name if self.icmptype.name != self.icmpcode.name: typecode = '{}:{}'.format(self.icmptype.name, self.icmpcode.name) return '{} {} {}'.format(self.__class__.__name__, typecode, str(self.icmpdata)) def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return @property def icmpdata(self): return self._icmpdata @icmpdata.setter def icmpdata(self, dataobj): if not issubclass(dataobj.__class__, ICMPData): raise Exception("ICMP data must be subclass of ICMPData (you gave me {})".format(dataobj.__class__.__name__)) self._icmpdata = dataobj self.icmptype = self._icmptype_from_classtype(dataobj.__class__)
class ICMPData(PacketHeaderBase): __slots__ = ('_rawpayload',) def __init__(self, **kwargs): self._rawpayload = b'' super().__init__(**kwargs) def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return len(self._rawpayload) def to_bytes(self): return self._rawpayload def from_bytes(self, raw): self._rawpayload = bytes(raw) @property def data(self): return self._rawpayload @data.setter def data(self, value): if not isinstance(value, bytes): self._rawpayload = bytes(value, 'utf8') else: self._rawpayload = value def __eq__(self, other): return self.data == other.data def __str__(self): return '{} bytes of raw payload ({})'.format(len(self._rawpayload), self._rawpayload[:10])
[docs]class ICMPSourceQuench(ICMPData): _MINLEN = 4 def __init__(self): super().__init__() def size(self): return 4 + super().size() def to_bytes(self): return b''.join((b'\x00' * 4, super().to_bytes())) def from_bytes(self, raw): if len(raw) < ICMPSourceQuench._MINLEN: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct ICMPSourceQuench data object".format(len(raw))) super().from_bytes(raw[4:])
[docs]class ICMPRedirect(ICMPData): __slots__ = ['_redirectto'] def __init__(self): super().__init__() self._redirectto = IPv4Address('0.0.0.0') def to_bytes(self): return b''.join( (self._redirectto.packed,super().to_bytes()) ) def from_bytes(self, raw): if len(raw) < 4: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct ICMPRedirect data object".format(len(raw))) fields = struct.unpack('!I', raw[:4]) self._redirectto = IPv4Address(fields[0]) super().from_bytes(raw[4:]) def __str__(self): return '{} RedirectAddress: {}'.format(super().__str__(), self._redirectto) @property def redirectto(self): return self._redirectto @redirectto.setter def redirectto(self, value): self._redirectto = IPv4Address(value)
[docs]class ICMPDestinationUnreachable(ICMPData): __slots__ = ('_origdgramlen', '_nexthopmtu') def __init__(self): super().__init__() self._nexthopmtu = 0 self._origdgramlen = 0 def to_bytes(self): return b''.join( (struct.pack('!xBH', self._origdgramlen, self._nexthopmtu), super().to_bytes()) ) def from_bytes(self, raw): if len(raw) < 4: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct ICMPDestinationUnreachable data object".format(len(raw))) fields = struct.unpack('!xBH', raw[:4]) self._origdgramlen = fields[0] self._nexthopmtu = fields[1] super().from_bytes(raw[4:]) @property def origdgramlen(self): return self._origdgramlen @origdgramlen.setter def origdgramlen(self, value): self._origdgramlen = int(value) @property def nexthopmtu(self): return self._nexthopmtu @nexthopmtu.setter def nexthopmtu(self, value): self._nexthopmtu = int(value) def __str__(self): return '{} NextHopMTU: {}'.format(super().__str__(), self._nexthopmtu)
[docs]class ICMPEchoRequest(ICMPData): __slots__ = ['_identifier','_sequence'] _PACKFMT = '!HH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self): super().__init__() self._identifier = 0 self._sequence = 0 def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return self._MINLEN + super().size() def from_bytes(self, raw): if len(raw) < 4: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct {} data object".format(len(raw))) fields = struct.unpack(ICMPEchoRequest._PACKFMT, raw[:ICMPEchoRequest._MINLEN]) self._identifier = fields[0] self._sequence = fields[1] super().from_bytes(raw[4:]) return b'' def to_bytes(self): return b''.join( (struct.pack(ICMPEchoRequest._PACKFMT, self._identifier, self._sequence), super().to_bytes() ) ) def __str__(self): return '{} {} ({} data bytes)'.format(self._identifier, self._sequence, len(self.data)) def __eq__(self, other): return self.identifier == other.identifier and \ self.sequence == other.sequence and \ self.data == other.data @property def identifier(self): return self._identifier @property def sequence(self): return self._sequence @identifier.setter def identifier(self, value): self._identifier = int(value) @sequence.setter def sequence(self, value): self._sequence = int(value)
[docs]class ICMPEchoReply(ICMPEchoRequest): pass
[docs]class ICMPTimeExceeded(ICMPData): __slots__ = ('_nexthopmtu','_origdgramlen',) def __init__(self): super().__init__() self._origdgramlen = 0 def to_bytes(self): return b''.join( (struct.pack('!xBH', self._origdgramlen, 0), super().to_bytes()) ) # FIXME: origdgram len should be padded to 4 bytes for v4, and 8 bytes for v6 def from_bytes(self, raw): if len(raw) < 4: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct ICMPTimeExceeded data object".format(len(raw))) fields = struct.unpack('!xBH', raw[:4]) self._origdgramlen = fields[0] self._nexthopmtu = fields[1] super().from_bytes(raw[4:]) @property def origdgramlen(self): return self._origdgramlen @origdgramlen.setter def origdgramlen(self, value): self._origdgramlen = int(value) def __str__(self): return '{} OrigDgramLen: {}'.format(super().__str__(), self._origdgramlen)
class ICMPAddressMaskRequest(ICMPData): __slots__ = ['_identifier','_sequence','_addrmask'] _PACKFMT = '!HH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self): super().__init__() self._identifier = 0 self._sequence = 0 self._addrmask = IPv4Address('0.0.0.0') def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return ICMPAddressMaskRequest._MINLEN def to_bytes(self): return b''.join( (struct.pack(ICMPAddressMaskRequest._PACKFMT, self._identifier, self._sequence), self._addrmask.packed)) def from_bytes(self, raw): if len(raw) < ICMPAddressMaskRequest._MINLEN: raise NotEnoughDataError("Not enough bytes to unpack ICMPAddressMaskRequest object") fields = struct.unpack(ICMPAddressMaskRequest._PACKFMT, raw[:4]) self._identifier = fields[0] self._sequence = fields[1] self._addrmask = IPv4Address(raw[4:8]) return b'' @property def addrmask(self): return self._addrmask @addrmask.setter def addrmask(self, value): self._addrmask = IPv4Address(value) @property def identifier(self): return self._identifier @identifier.setter def identifier(self, value): self._identifier = int(value) @property def sequence(self): return self._sequence @sequence.setter def sequence(self, value): self._sequence = int(value) def __str__(self): return '{} {} {}'.format(self._identifier, self._sequence, self._addrmask) class ICMPAddressMaskReply(ICMPAddressMaskRequest): pass class ICMPInformationRequest(ICMPData): pass class ICMPInformationReply(ICMPData): pass class ICMPRouterAdvertisement(ICMPData): pass class ICMPRouterSolicitation(ICMPData): pass class ICMPParameterProblem(ICMPData): pass class ICMPTimestamp(ICMPData): pass class ICMPTimestampReply(ICMPData): pass def construct_icmp_class_map(): clsmap = {} for xtype in ICMPType: clsname = "ICMP{}".format(xtype.name) cls = eval(clsname) clsmap[xtype] = cls def inner(icmptype): icmptype = ICMPType(icmptype) return clsmap.get(icmptype, None) return inner def construct_icmp_type_map(): typemap = {} for xtype in ICMPType: clsname = "ICMP{}".format(xtype.name) cls = eval(clsname) typemap[cls] = xtype def inner(icmpcls): return typemap.get(icmpcls, None) return inner ICMPClassFromType = construct_icmp_class_map() ICMPTypeFromClass = construct_icmp_type_map()