Source code for switchyard.lib.packet.icmp

from switchyard.lib.packet.packet import PacketHeaderBase,Packet
from switchyard.lib.packet.common import checksum, ICMPType, ICMPTypeCodeMap
import struct
from enum import Enum
from ipaddress import IPv4Address

   (extension parameters)
TCP/IP Illustrated, Vol 1.

[docs]class ICMP(PacketHeaderBase): ''' A mother class for all ICMP message types. It holds a reference to another object that contains the specific ICMP data (icmpdata), given a particular ICMP type. Just setting the icmptype causes the data object to change (the change happens automatically when you set the icmptype). The icmpcode field will also change, but it only changes to some valid code given the new icmptype. ''' __slots__ = ('_type', '_code', '_icmpdata', '_valid_types', '_valid_codes_map', '_classtype_from_icmptype', '_icmptype_from_classtype', '_checksum') _PACKFMT = '!BBH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self, **kwargs): self._valid_types = ICMPType self._valid_codes_map = ICMPTypeCodeMap self._classtype_from_icmptype = ICMPClassFromType self._icmptype_from_classtype = ICMPTypeFromClass self._type = self._valid_types.EchoRequest self._code = self._valid_codes_map[self._type].EchoRequest self._icmpdata = ICMPEchoRequest() self._checksum = 0 super().__init__(**kwargs) def size(self): return struct.calcsize(ICMP._PACKFMT) + len(self._icmpdata.to_bytes()) def checksum(self): self._checksum = checksum(b''.join( (struct.pack(ICMP._PACKFMT, self._type.value, self._code.value, 0), self._icmpdata.to_bytes()))) return self._checksum def to_bytes(self, dochecksum=True): ''' Return packed byte representation of the UDP header. ''' csum = 0 if dochecksum: csum = self.checksum() return b''.join((struct.pack(ICMP._PACKFMT, self._type.value, self._code.value, csum), self._icmpdata.to_bytes())) def from_bytes(self, raw): if len(raw) < ICMP._MINLEN: raise Exception("Not enough bytes ({}) to reconstruct an ICMP object".format(len(raw))) fields = struct.unpack(ICMP._PACKFMT, raw[:ICMP._MINLEN]) self._type = self._valid_types(fields[0]) self._code = self._valid_codes_map[self.icmptype](fields[1]) self._checksum = fields[2] self._icmpdata = self._classtype_from_icmptype(self._type)() self._icmpdata.from_bytes(raw[ICMP._MINLEN:]) return raw[self.size():] def __eq__(self, other): return self.icmptype == other.icmptype and \ self.icmpcode == other.icmpcode and \ self.icmpdata == other.icmpdata @property def icmptype(self): return self._type @property def icmpcode(self): return self._code @icmptype.setter def icmptype(self,value): if not isinstance(value, self._valid_types): raise ValueError("ICMP type must be an {} enumeration".format(type(self._valid_types))) cls = self._classtype_from_icmptype(value) if not issubclass(self.icmpdata.__class__, cls): self.icmpdata = cls() self._type = value codes = self._valid_codes_map[value] for code in codes: if code.value == 0: self._code = code break @icmpcode.setter def icmpcode(self,value): if issubclass(value.__class__, Enum): validcodes = self._valid_codes_map[self._type] if value not in validcodes: raise ValueError("Invalid code {} for type {}".format(value, self._type)) self._code = value elif isinstance(value, int): self._code = self._valid_codes_map[self.icmptype](value) def __str__(self): typecode = if != typecode = '{}:{}'.format(, return '{} {} {}'.format(self.__class__.__name__, typecode, str(self.icmpdata)) def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return @property def icmpdata(self): return self._icmpdata @icmpdata.setter def icmpdata(self, dataobj): if not issubclass(dataobj.__class__, ICMPData): raise Exception("ICMP data must be subclass of ICMPData (you gave me {})".format(dataobj.__class__.__name__)) self._icmpdata = dataobj self.icmptype = self._icmptype_from_classtype(dataobj.__class__)
class ICMPData(PacketHeaderBase): __slots__ = ('_rawpayload',) def __init__(self, **kwargs): self._rawpayload = b'' super().__init__(**kwargs) def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return len(self._rawpayload) def to_bytes(self): return self._rawpayload def from_bytes(self, raw): self._rawpayload = bytes(raw) @property def data(self): return self._rawpayload @data.setter def data(self, value): if not isinstance(value, bytes): self._rawpayload = bytes(value, 'utf8') else: self._rawpayload = value def __eq__(self, other): return == def __hash__(self): return sum(self._rawpayload) def __str__(self): return '{} bytes of raw payload ({})'.format(len(self._rawpayload), self._rawpayload[:10])
[docs]class ICMPSourceQuench(ICMPData): _MINLEN = 4 def __init__(self): super().__init__() def size(self): return 4 + super().size() def to_bytes(self): return b''.join((b'\x00' * 4, super().to_bytes())) def from_bytes(self, raw): if len(raw) < ICMPSourceQuench._MINLEN: raise Exception("Not enough bytes ({}) to reconstruct ICMPSourceQuench data object".format(len(raw))) super().from_bytes(raw[4:])
[docs]class ICMPRedirect(ICMPData): __slots__ = ['_redirectto'] def __init__(self): super().__init__() self._redirectto = IPv4Address('') def to_bytes(self): return b''.join( (self._redirectto.packed,super().to_bytes()) ) def from_bytes(self, raw): fields = struct.unpack('!I', raw[:4]) self._redirectto = IPv4Address(fields[0]) super().from_bytes(raw[4:]) def __str__(self): return '{} RedirectAddress: {}'.format(super().__str__(), self._redirectto) @property def redirectto(self): return self._redirectto @redirectto.setter def redirectto(self, value): self._redirectto = IPv4Address(value)
[docs]class ICMPDestinationUnreachable(ICMPData): __slots__ = ('_origdgramlen', '_nexthopmtu') def __init__(self): super().__init__() self._nexthopmtu = 0 self._origdgramlen = 0 def to_bytes(self): return b''.join( (struct.pack('!xBH', self._origdgramlen, self._nexthopmtu), super().to_bytes()) ) def from_bytes(self, raw): fields = struct.unpack('!xBH', raw[:4]) self._origdgramlen = fields[0] self._nexthopmtu = fields[1] super().from_bytes(raw[4:]) @property def origdgramlen(self): return self._origdgramlen @origdgramlen.setter def origdgramlen(self, value): self._origdgramlen = int(value) @property def nexthopmtu(self): return self._nexthopmtu @nexthopmtu.setter def nexthopmtu(self, value): self.nexthopmtu = int(value) def __str__(self): return '{} NextHopMTU: {}'.format(super().__str__(), self._nexthopmtu)
[docs]class ICMPEchoRequest(ICMPData): __slots__ = ['_identifier','_sequence'] _PACKFMT = '!HH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self): super().__init__() self._identifier = 0 self._sequence = 0 def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return self._MINLEN + super().size() def from_bytes(self, raw): fields = struct.unpack(ICMPEchoRequest._PACKFMT, raw[:ICMPEchoRequest._MINLEN]) self._identifier = fields[0] self._sequence = fields[1] super().from_bytes(raw[4:]) return b'' def to_bytes(self): return b''.join( (struct.pack(ICMPEchoRequest._PACKFMT, self._identifier, self._sequence), super().to_bytes() ) ) def __str__(self): return '{} {} ({} data bytes)'.format(self._identifier, self._sequence, len( def __eq__(self, other): return self.identifier == other.identifier and \ self.sequence == other.sequence and \ == @property def identifier(self): return self._identifier @property def sequence(self): return self._sequence @identifier.setter def identifier(self, value): self._identifier = value @sequence.setter def sequence(self, value): self._sequence = value
[docs]class ICMPEchoReply(ICMPEchoRequest): pass
[docs]class ICMPTimeExceeded(ICMPData): __slots__ = ('_nexthopmtu','_origdgramlen',) def __init__(self): super().__init__() self._origdgramlen = 0 def to_bytes(self): return b''.join( (struct.pack('!xBH', self._origdgramlen, 0), super().to_bytes()) ) # FIXME: origdgram len should be padded to 4 bytes for v4, and 8 bytes for v6 def from_bytes(self, raw): fields = struct.unpack('!xBH', raw[:4]) self._origdgramlen = fields[0] self._nexthopmtu = fields[1] super().from_bytes(raw[4:]) @property def origdgramlen(self): return self._origdgramlen @origdgramlen.setter def origdgramlen(self, value): self._origdgramlen = int(value) def __str__(self): return '{} OrigDgramLen: {}'.format(super().__str__(), self._origdgramlen)
class ICMPAddressMaskRequest(ICMPData): __slots__ = ['_identifier','_sequence','_addrmask'] _PACKFMT = '!HH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self): super().__init__() self._identifier = 0 self._sequence = 0 self._addrmask = IPv4Address('') def next_header_class(self): return None def pre_serialize(self, raw, pkt, i): return def size(self): return ICMPAddressMaskRequest._MINLEN def to_bytes(self): return b''.join( (struct.pack(ICMPAddressMaskRequest._PACKFMT, self._identifier, self._sequence), self._addrmask.packed)) def from_bytes(self, raw): if len(raw) < ICMPAddressMaskRequest._MINLEN: raise Exception("Not enough bytes to unpack ICMPAddressMaskRequest object") fields = struct.unpack(ICMPAddressMaskRequest._PACKFMT, raw[:4]) self._identifier = fields[0] self._sequence = fields[1] self._addrmask = IPv4Address(raw[4:8]) return b'' @property def addrmask(self): return self._addrmask @addrmask.setter def addrmask(self, value): self._addrmask = IPv4Address(value) @property def identifier(self): return self._identifier @identifier.setter def identifier(self, value): self._identifier = int(value) @property def sequence(self): return self._sequence @sequence.setter def sequence(self, value): self._sequence = int(value) def __str__(self): return '{} {} {}'.format(self._identifier, self._sequence, self._addrmask) class ICMPAddressMaskReply(ICMPAddressMaskRequest): pass class ICMPInformationRequest(ICMPData): pass class ICMPInformationReply(ICMPData): pass class ICMPRouterAdvertisement(ICMPData): pass class ICMPRouterSolicitation(ICMPData): pass class ICMPParameterProblem(ICMPData): pass class ICMPTimestamp(ICMPData): pass class ICMPTimestampReply(ICMPData): pass def construct_icmp_class_map(): clsmap = {} for xtype in ICMPType: clsname = "ICMP{}".format( cls = eval(clsname) clsmap[xtype] = cls def inner(icmptype): icmptype = ICMPType(icmptype) return clsmap.get(icmptype, None) return inner def construct_icmp_type_map(): typemap = {} for xtype in ICMPType: clsname = "ICMP{}".format( cls = eval(clsname) typemap[cls] = xtype def inner(icmpcls): return typemap.get(icmpcls, None) return inner ICMPClassFromType = construct_icmp_class_map() ICMPTypeFromClass = construct_icmp_type_map()