Source code for switchyard.lib.packet.udp

from switchyard.lib.packet.packet import PacketHeaderBase
from switchyard.lib.packet.common import checksum
import struct

    IETF RFC 768

# FIXME: currently does *nothing* about checksum

[docs]class UDP(PacketHeaderBase): __slots__ = ['_srcport','_dstport','_len','_checksum'] _PACKFMT = '!HHHH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self, **kwargs): self.srcport = self.dstport = 0 self._len = self.size() super().__init__(**kwargs) def size(self): return struct.calcsize(UDP._PACKFMT) def to_bytes(self): ''' Return packed byte representation of the UDP header. ''' return struct.pack(UDP._PACKFMT, self._srcport, self._dstport, self._len, self._checksum) def from_bytes(self, raw): '''Return an Ethernet object reconstructed from raw bytes, or an Exception if we can't resurrect the packet.''' if len(raw) < UDP._MINLEN: raise Exception("Not enough bytes ({}) to reconstruct an UDP object".format(len(raw))) fields = struct.unpack(UDP._PACKFMT, raw[:UDP._MINLEN]) self._srcport = fields[0] self._dstport = fields[1] self._len = fields[2] self._checksum = fields[3] return raw[UDP._MINLEN:] def __eq__(self, other): return self.srcport == other.srcport and \ self.dstport == other.dstport @property def srcport(self): return self._srcport @property def dstport(self): return self._dstport @srcport.setter def srcport(self,value): self._srcport = value @dstport.setter def dstport(self,value): self._dstport = value @property def checksum(self): return self._checksum def __str__(self): return '{} {}->{}'.format(self.__class__.__name__, self.srcport, self.dstport) def next_header_class(self): return None def _compute_checksum_ipv4(self, ip4, xdata): if ip4 is None: return 0 xhdr = struct.pack('!IIxBHHHHH', int(ip4.srcip), int(ip4.dstip), ip4.protocol.value, self._len, self.srcport, self.dstport, self._len, 0) return checksum(xhdr + xdata) def pre_serialize(self, raw, pkt, i): self._len = self.size() + len(raw) # checksum calc currently assumes we're only dealing with ipv4. # will need to be modified for ipv6 support... self._checksum = self._compute_checksum_ipv4(pkt.get_header_by_name('IPv4'), raw)