Source code for switchyard.lib.packet.ethernet

from switchyard.lib.packet.packet import PacketHeaderBase,Packet
from switchyard.lib.address import EthAddr,SpecialEthAddr
import struct
from switchyard.lib.packet.arp import Arp
from switchyard.lib.packet.ipv4 import IPv4
from switchyard.lib.packet.ipv6 import IPv6
from switchyard.lib.packet.common import EtherType


class Vlan(PacketHeaderBase):
    '''
    Strictly speaking this header doesn't fully represent the 802.1Q header, but
    rather the 2nd half of that header and the "displaced" ethertype
    field from the Ethernet header.  The first two bytes of the 802.1Q header
    basically get treated as the ethertype field in the Ethernet header,
    and that ethertype "points to" this Vlan header for parsing/understanding
    the next 4 bytes (or more, depending on whether QinQ or QinQinQ 
    encapsulation is done).

    first 16 bits is TCI: tag control information
       3 bits: priority code point
       1 bit: drop eligible indicator
       12 bits: vlan id
    '''

    __slots__ = ['_vlanid', '_pcp', '_ethertype']
    _PACKFMT = '!HH'
    _MINLEN = struct.calcsize(_PACKFMT)

    def __init__(self, **kwargs):
        '''
        VLAN constructor accepts an initial VLAN Id and the EtherType
        of the next header.
        '''
        self._vlanid = 0
        self._pcp = 0
        self._ethertype = EtherType.IP
        super().__init__(**kwargs)

    @property
    def vlan(self):
        return self._vlanid

    @vlan.setter
    def vlan(self, value):
        self._vlanid = int(value) & 0x0fff # mask out high-order 4 bits

    @property
    def pcp(self):
        return self._pcp

    @pcp.setter
    def pcp(self, value):
        self._pcp = max(min(int(value),3),0)

    @property
    def ethertype(self):
        return self._ethertype

    @ethertype.setter
    def ethertype(self, value):
        self._ethertype = EtherType(value)

    def from_bytes(self, raw):
        if len(raw) < Vlan._MINLEN:
            raise Exception("Not enough bytes to unpack Vlan header; need {}, only have {}".format(Vlan._MINLEN, len(raw)))
        fields = struct.unpack(Vlan._PACKFMT, raw[:Vlan._MINLEN])
        self.vlan = fields[0]
        self.pcp = ((fields[0] & 0xf000) >> 12)
        self.ethertype = fields[1]
        return raw[Vlan._MINLEN:]

    def to_bytes(self):
        return struct.pack(Vlan._PACKFMT, ((self._pcp << 12) | self._vlanid), self._ethertype.value)

    def __eq__(self, other):
        return self.vlan == other.vlan and self.ethertype == other.ethertype

    def size(self):
        return Vlan._MINLEN

    def pre_serialize(self, raw, pkt, i):
        pass

    def next_header_class(self):
        return EtherTypeClasses[self.ethertype]

    def __str__(self):
        return '{} {} {}'.format(self.__class__.__name__, self.vlan, self.ethertype.name)

EtherTypeClasses = {
    EtherType.IP: IPv4,
    EtherType.ARP: Arp,
    EtherType.IPv6: IPv6,
    EtherType.x8021Q: Vlan,
    EtherType.NoType: None,
}


[docs]class Ethernet(PacketHeaderBase): __slots__ = ['_src','_dst','_ethertype'] _PACKFMT = '!6s6sH' _MINLEN = struct.calcsize(_PACKFMT) def __init__(self, **kwargs): self._src = self._dst = EthAddr() self._ethertype = EtherType.IP super().__init__(**kwargs) def size(self): return struct.calcsize(Ethernet._PACKFMT) @property def src(self): return self._src @src.setter def src(self, value): self._src = EthAddr(value) @property def dst(self): return self._dst @dst.setter def dst(self, value): self._dst = EthAddr(value) @property def ethertype(self): return self._ethertype @ethertype.setter def ethertype(self, value): self._ethertype = EtherType(value) def to_bytes(self): ''' Return packed byte representation of the Ethernet header. ''' return struct.pack(Ethernet._PACKFMT, self._dst.packed, self._src.packed, self._ethertype.value) def from_bytes(self, raw): '''Return an Ethernet object reconstructed from raw bytes, or an Exception if we can't resurrect the packet.''' if len(raw) < Ethernet._MINLEN: raise Exception("Not enough bytes ({}) to reconstruct an Ethernet object".format(len(raw))) dst,src,ethertype = struct.unpack(Ethernet._PACKFMT, raw[:Ethernet._MINLEN]) self.src = src self.dst = dst if ethertype <= 1500: self.ethertype = EtherType.NoType else: self.ethertype = ethertype return raw[Ethernet._MINLEN:] def next_header_class(self): if self.ethertype not in EtherTypeClasses: raise Exception("No mapping for ethertype {} to a packet header class".format(self.ethertype)) cls = EtherTypeClasses.get(self.ethertype, None) if cls is None: print ("Warning: no class exists to parse next protocol type: {}".format(self.ethertype)) return cls def pre_serialize(self, raw, pkt, i): pass def __eq__(self, other): return self.src == other.src and self.dst == other.dst and self.ethertype == other.ethertype def __str__(self): return '{} {}->{} {}'.format(self.__class__.__name__, self.src, self.dst, self.ethertype.name)