Source code for switchyard.lib.packet.ethernet

import struct
from .packet import PacketHeaderBase,Packet
from ..address import EthAddr,SpecialEthAddr
from .arp import Arp
from .ipv4 import IPv4
from .ipv6 import IPv6
from .common import EtherType
from ..exceptions import *


class Vlan(PacketHeaderBase):
    '''
    Strictly speaking this header doesn't fully represent the 802.1Q header, 
    but rather the 2nd half of that header and the "displaced" ethertype
    field from the Ethernet header.  The first two bytes of the 802.1Q header
    basically get treated as the ethertype field in the Ethernet header,
    and that ethertype "points to" this Vlan header for parsing/understanding
    the next 4 bytes (or more, depending on whether QinQ or QinQinQ 
    encapsulation is done).

    first 16 bits is TCI: tag control information
       3 bits: priority code point
       1 bit: drop eligible indicator
       12 bits: vlan id
    '''

    __slots__ = ['_vlanid', '_pcp', '_ethertype']
    _PACKFMT = '!HH'
    _MINLEN = struct.calcsize(_PACKFMT)
    _next_header_map = {
        EtherType.IP: IPv4,
        EtherType.ARP: Arp,
        EtherType.IPv6: IPv6,
        EtherType.NoType: None,
    }
    _next_header_class_key = '_ethertype'

    def __init__(self, **kwargs):
        '''
        VLAN constructor accepts an initial VLAN Id and the EtherType
        of the next header.
        '''
        self._vlanid = 0
        self._pcp = 0
        self._ethertype = EtherType.IP
        super().__init__(**kwargs)

    @property
    def vlanid(self):
        return self._vlanid

    @vlanid.setter
    def vlanid(self, value):
        self._vlanid = int(value) & 0x0fff # mask out high-order 4 bits

    @property
    def pcp(self):
        return self._pcp

    @pcp.setter
    def pcp(self, value):
        self._pcp = max(min(int(value),3),0)

    @property
    def ethertype(self):
        return self._ethertype

    @ethertype.setter
    def ethertype(self, value):
        self._ethertype = EtherType(value)

    def from_bytes(self, raw):
        if len(raw) < Vlan._MINLEN:
            raise NotEnoughDataError("Not enough bytes to unpack Vlan header; need {}, "
                "only have {}".format(Vlan._MINLEN, len(raw)))
        fields = struct.unpack(Vlan._PACKFMT, raw[:Vlan._MINLEN])
        self.vlanid = fields[0]
        self.pcp = ((fields[0] & 0xf000) >> 12)
        self.ethertype = fields[1]
        return raw[Vlan._MINLEN:]

    def to_bytes(self):
        return struct.pack(Vlan._PACKFMT, ((self._pcp << 12) | self._vlanid), 
            self._ethertype.value)

    def __eq__(self, other):
        return isinstance(other, Vlan) and \
            self.vlanid == other.vlanid and self.ethertype == other.ethertype

    def size(self):
        return Vlan._MINLEN

    def __str__(self): return '{} {} {}'.format(self.__class__.__name__,
    self.vlanid,  self.ethertype.name)


[docs]class Ethernet(PacketHeaderBase): __slots__ = ['_src','_dst','_ethertype'] _PACKFMT = '!6s6sH' _MINLEN = struct.calcsize(_PACKFMT) _next_header_map = { EtherType.IP: IPv4, EtherType.ARP: Arp, EtherType.IPv6: IPv6, EtherType.x8021Q: Vlan, EtherType.NoType: None, } _next_header_class_key = '_ethertype' def __init__(self, **kwargs): self._src = self._dst = EthAddr() self._ethertype = EtherType.IP super().__init__(**kwargs) def size(self): return struct.calcsize(Ethernet._PACKFMT) @property def src(self): return self._src @src.setter def src(self, value): self._src = EthAddr(value) @property def dst(self): return self._dst @dst.setter def dst(self, value): self._dst = EthAddr(value) @property def ethertype(self): return self._ethertype @ethertype.setter def ethertype(self, value): self._ethertype = EtherType(value) def to_bytes(self): ''' Return packed byte representation of the Ethernet header. ''' return struct.pack(Ethernet._PACKFMT, self._dst.packed, self._src.packed, self._ethertype.value) def from_bytes(self, raw): '''Return an Ethernet object reconstructed from raw bytes, or an Exception if we can't resurrect the packet.''' if len(raw) < Ethernet._MINLEN: raise NotEnoughDataError("Not enough bytes ({}) to reconstruct an " "Ethernet object".format(len(raw))) dst,src,ethertype = struct.unpack(Ethernet._PACKFMT, raw[:Ethernet._MINLEN]) self.src = src self.dst = dst if ethertype <= 1500: self.ethertype = EtherType.NoType else: self.ethertype = ethertype return raw[Ethernet._MINLEN:] def __eq__(self, other): return isinstance(other, Ethernet) and \ self.src == other.src and self.dst == other.dst and \ self.ethertype == other.ethertype def __str__(self): return '{} {}->{} {}'.format(self.__class__.__name__, self.src, self.dst, self.ethertype.name)