From d9467809c3ad353ddbe8a6eeb4dfe66026b76dfb Mon Sep 17 00:00:00 2001 From: dosisod <39638017+dosisod@users.noreply.github.com> Date: Mon, 7 Sep 2020 15:40:17 -0700 Subject: [PATCH 1/4] got most of the easy errors resolved --- .mypy.ini | 11 +++++++++++ pcapng/blocks.py | 41 +++++++++++++++++++++++++++++------------ pcapng/flags.py | 15 +++++++++++++++ pcapng/scanner.py | 20 +++++++++++++++----- pcapng/strictness.py | 4 ++++ pcapng/structs.py | 9 +++++++++ pcapng/utils.py | 16 +++++++++++++++- pcapng/writer.py | 4 ++++ 8 files changed, 102 insertions(+), 18 deletions(-) create mode 100644 .mypy.ini diff --git a/.mypy.ini b/.mypy.ini new file mode 100644 index 0000000..bec89c1 --- /dev/null +++ b/.mypy.ini @@ -0,0 +1,11 @@ +[mypy] +warn_unused_configs=True +warn_unused_ignores=True +warn_return_any=True +warn_unreachable=True +warn_redundant_casts=True + +check_untyped_defs=True +disallow_untyped_calls=True +disallow_untyped_defs=True +disallow_incomplete_defs=True \ No newline at end of file diff --git a/pcapng/blocks.py b/pcapng/blocks.py index faa77b3..7f04746 100644 --- a/pcapng/blocks.py +++ b/pcapng/blocks.py @@ -12,6 +12,7 @@ import io import itertools +from typing import Tuple from pcapng import strictness as strictness from pcapng.constants import link_types @@ -73,6 +74,7 @@ def __eq__(self, other): return [getattr(self, k) for k in keys] == [getattr(other, k) for k in keys] def _write(self, outstream): + # type: (io.BytesIO) -> None """Writes this block into the given output stream""" encoded_block = io.BytesIO() self._encode(encoded_block) @@ -125,6 +127,7 @@ def __setattr__(self, name, value): self._decoded[name] = value def __repr__(self): + # type: () -> str args = [] for item in self.schema: name = item[0] @@ -226,18 +229,22 @@ def add_interface_stats(self, interface_stats): @property def version(self): + # type: () -> Tuple[IntField, IntField] return (self.version_major, self.version_minor) @property def length(self): + # type: () -> IntField return self.section_length # Block.decode() assumes all blocks have sections -- technically true... @property def section(self): + # type: () -> SectionHeader return self def __repr__(self): + # type: () -> str return ( "<{name} version={version} endianness={endianness} " "length={length} options={options}>" @@ -291,18 +298,19 @@ class InterfaceDescription(SectionMemberBlock): @property # todo: cache this property def timestamp_resolution(self): - # ------------------------------------------------------------ - # Resolution of timestamps. If the Most Significant Bit is - # equal to zero, the remaining bits indicates the resolution - # of the timestamp as as a negative power of 10 (e.g. 6 means - # microsecond resolution, timestamps are the number of - # microseconds since 1/1/1970). If the Most Significant Bit is - # equal to one, the remaining bits indicates the resolution as - # as negative power of 2 (e.g. 10 means 1/1024 of second). If - # this option is not present, a resolution of 10^-6 is assumed - # (i.e. timestamps have the same resolution of the standard - # 'libpcap' timestamps). - # ------------------------------------------------------------ + # type: () -> float + """ + Resolution of timestamps. If the Most Significant Bit is + equal to zero, the remaining bits indicates the resolution + of the timestamp as as a negative power of 10 (e.g. 6 means + microsecond resolution, timestamps are the number of + microseconds since 1/1/1970). If the Most Significant Bit is + equal to one, the remaining bits indicates the resolution as + as negative power of 2 (e.g. 10 means 1/1024 of second). If + this option is not present, a resolution of 10^-6 is assumed + (i.e. timestamps have the same resolution of the standard + 'libpcap' timestamps). + """ if "if_tsresol" in self.options: return unpack_timestamp_resolution(self.options["if_tsresol"]) @@ -316,6 +324,7 @@ def statistics(self): @property def link_type_description(self): + # type: () -> str try: return link_types.LINKTYPE_DESCRIPTIONS[self.link_type] except KeyError: @@ -332,6 +341,7 @@ class BlockWithTimestampMixin(object): @property def timestamp(self): + # type: () -> float # First, get the accuracy from the ts_resol option return ( (self.timestamp_high << 32) + self.timestamp_low @@ -339,6 +349,7 @@ def timestamp(self): @property def timestamp_resolution(self): + # type: () -> float return self.interface.timestamp_resolution # todo: add some property returning a datetime() with timezone.. @@ -389,6 +400,7 @@ class BasePacketBlock(SectionMemberBlock, BlockWithInterfaceMixin): @property def captured_len(self): + # type: () -> int return len(self.packet_data) # Helper function. If the user hasn't explicitly set an original packet @@ -397,6 +409,7 @@ def captured_len(self): # the captured data length. @property def packet_len(self): + # type: () -> int plen = self.__getattr__("packet_len") or 0 # this call prevents recursion return plen or len(self.packet_data) @@ -469,6 +482,7 @@ def __init__(self, section, **kwargs): @property def interface_id(self): + # type: () -> int """ "The Simple Packet Block does not contain the Interface ID field. Therefore, it MUST be assumed that all the Simple Packet Blocks have @@ -479,6 +493,7 @@ def interface_id(self): @property def captured_len(self): + # type: () -> int """ "...the SnapLen value MUST be used to determine the size of the Packet Data field length." @@ -661,8 +676,10 @@ class UnknownBlock(Block): __slots__ = ["block_type", "data"] def __init__(self, block_type, data): + # type: (int, bytes) -> None self.block_type = block_type self.data = data def __repr__(self): + # type: () -> str return "UnknownBlock(0x{0:08X}, {1!r})".format(self.block_type, self.data) diff --git a/pcapng/flags.py b/pcapng/flags.py index 309dd92..8382190 100644 --- a/pcapng/flags.py +++ b/pcapng/flags.py @@ -4,6 +4,10 @@ from collections import OrderedDict from collections.abc import Iterable +from typing import ( + Any, + List +) from pcapng._compat import namedtuple @@ -34,9 +38,11 @@ def __init__(self, owner, offset, size, extra=None): self.mask = ((1 << self.size) - 1) << self.offset def get_bits(self): + # type: () -> int return (self.owner._value & self.mask) >> self.offset def set_bits(self, val): + # type: (int) -> None val &= (1 << self.size) - 1 self.owner._value &= ~self.mask self.owner._value |= val << self.offset @@ -53,9 +59,11 @@ def __init__(self, owner, offset, size, extra=None): super(FlagBool, self).__init__(owner, offset, size) def get(self): + # type: () -> bool return bool(self.get_bits()) def set(self, val): + # type: (Any) -> None self.set_bits(int(bool(val))) @@ -66,9 +74,11 @@ class FlagUInt(FlagBase): """ def get(self): + # type: () -> int return self.get_bits() def set(self, val): + # type: (int) -> None self.set_bits(val) @@ -98,6 +108,7 @@ def __init__(self, owner, offset, size, extra=None): super(FlagEnum, self).__init__(owner, offset, size, extra) def get(self): + # type: () -> str val = self.get_bits() try: return self.extra[val] @@ -105,6 +116,7 @@ def get(self): return "[invalid value]" def set(self, val): + # type: (int) -> None if val in self.extra: self.set_bits(self.extra.index(val)) elif isinstance(val, int): @@ -119,6 +131,7 @@ def set(self, val): # Class representing a single flag schema for FlagWord. # 'nbits' defaults to 1, and 'extra' defaults to None. + FlagField = namedtuple( "FlagField", ("name", "ftype", "nbits", "extra"), defaults=(1, None) ) @@ -169,9 +182,11 @@ def __init__(self, schema, nbits=32, initial=0): bitn += item.nbits def __int__(self): + # type: () -> int return self._value def __repr__(self): + # type: () -> str rv = "<{0} (value={1})".format(self.__class__.__name__, self._value) for k, v in self._schema.items(): rv += " {0}={1}".format(k, v.get()) diff --git a/pcapng/scanner.py b/pcapng/scanner.py index 4284823..e0a3b29 100644 --- a/pcapng/scanner.py +++ b/pcapng/scanner.py @@ -1,3 +1,9 @@ +from io import BytesIO +from typing import ( + Iterator, + Optional +) + import pcapng.blocks as blocks from pcapng.constants.block_types import BLK_RESERVED, BLK_RESERVED_CORRUPTED from pcapng.exceptions import CorruptedFile, StreamEmpty @@ -36,11 +42,13 @@ class FileScanner(object): __slots__ = ["stream", "current_section", "endianness"] def __init__(self, stream): + # type: (BytesIO) -> None self.stream = stream - self.current_section = None + self.current_section = None # type: Optional[blocks.SectionHeader] self.endianness = "=" def __iter__(self): + # type: () -> Iterator[blocks.Block] while True: try: yield self._read_next_block() @@ -48,6 +56,7 @@ def __iter__(self): return def _read_next_block(self): + # type: () -> blocks.Block block_type = self._read_int(32, False) if block_type == SECTION_HEADER_MAGIC: @@ -59,11 +68,10 @@ def _read_next_block(self): if self.current_section is None: raise ValueError("File not starting with a proper section header") - block = self._read_block(block_type) - - return block + return self._read_block(block_type) def _read_section_header(self): + # type: () -> blocks.SectionHeader """ Section information headers are special blocks in that they modify the state of the FileScanner instance (to change current @@ -79,6 +87,7 @@ def _read_section_header(self): ) def _read_block(self, block_type): + # type: (int) -> blocks.Block """ Read the block payload and pass to the appropriate block constructor """ @@ -86,7 +95,7 @@ def _read_block(self, block_type): if block_type in blocks.KNOWN_BLOCKS: # This is a known block -- instantiate it - return self.current_section.new_member( + return self.current_section.new_member( # type: ignore blocks.KNOWN_BLOCKS[block_type], raw=data ) @@ -106,6 +115,7 @@ def _read_block(self, block_type): return blocks.UnknownBlock(block_type, data) def _read_int(self, size, signed=False): + # type: (int, bool) -> int """ Read an integer from the stream, using current endianness """ diff --git a/pcapng/strictness.py b/pcapng/strictness.py index 0b4a25b..28697ea 100644 --- a/pcapng/strictness.py +++ b/pcapng/strictness.py @@ -20,12 +20,14 @@ class Strictness(Enum): def set_strictness(level): + # type: (Strictness) -> None assert type(level) is Strictness global strict_level strict_level = level def problem(msg): + # type: (str) -> None "Warn or raise an exception with the given message." if strict_level == Strictness.FORBID: raise PcapngStrictnessError(msg) @@ -34,11 +36,13 @@ def problem(msg): def warn(msg): + # type: (str) -> None "Show a warning with the given message." if strict_level > Strictness.NONE: warnings.warn(PcapngStrictnessWarning(msg)) def should_fix(): + # type: () -> bool "Helper function for showing code used to fix questionable pcapng data." return strict_level == Strictness.FIX diff --git a/pcapng/structs.py b/pcapng/structs.py index 73e16a8..d75113f 100644 --- a/pcapng/structs.py +++ b/pcapng/structs.py @@ -80,6 +80,7 @@ def read_int(stream, size, signed=False, endianness="="): + # type: (BytesIO, int, bool, str) -> int """ Read (and decode) an integer number from a binary stream. @@ -296,9 +297,11 @@ def load(self, stream, endianness, seen=None): pass def __repr__(self): + # type: () -> str return "{0}()".format(self.__class__.__name__) def __unicode__(self): + # type: () -> str return self.__repr__().encode("UTF-8") def encode_finish(self, stream, endianness): @@ -324,6 +327,7 @@ def encode(self, value, stream, endianness=None): write_bytes_padded(stream, value) def __repr__(self): + # type: () -> str return "{0}(size={1!r})".format(self.__class__.__name__, self.size) @@ -353,6 +357,7 @@ def encode(self, number, stream, endianness): write_int(number, stream, self.size, signed=self.signed, endianness=endianness) def __repr__(self): + # type: () -> str return "{0}(size={1!r}, signed={2!r})".format( self.__class__.__name__, self.size, self.signed ) @@ -380,6 +385,7 @@ def encode(self, options, stream, endianness): write_options(stream, options) def __repr__(self): + # type: () -> str return "{0}({1!r})".format(self.__class__.__name__, self.options_schema) @@ -461,6 +467,7 @@ def encode(self, list_data, stream, endianness): self.subfield.encode_finish(stream, endianness) def __repr__(self): + # type: () -> str return "{0}({1!r})".format(self.__class__.__name__, self.subfield) @@ -744,6 +751,7 @@ def __getitem__(self, name): return self.data[code][0] def __len__(self): + # type: () -> int return len(self.data) def __iter__(self): @@ -800,6 +808,7 @@ def add(self, name, value): self._check_multiples(code) def __repr__(self): + # type: () -> str args = dict(self.iter_all_items()) name = self.__class__.__name__ return "{0}({1!r})".format(name, args) diff --git a/pcapng/utils.py b/pcapng/utils.py index 6c4af60..c216db7 100644 --- a/pcapng/utils.py +++ b/pcapng/utils.py @@ -1,12 +1,18 @@ import socket import struct +from typing import ( + Iterable, + Tuple +) def pack_ipv4(data): + # type: (str) -> bytes return socket.inet_aton(data) def unpack_ipv4(data): + # type: (bytes) -> str return socket.inet_ntoa(data) @@ -27,32 +33,39 @@ def _get_pairs(data): def pack_ipv6(data): + # type: (str) -> bytes return socket.inet_pton(socket.AF_INET6, data) def unpack_ipv6(data): + # type: (bytes) -> str return socket.inet_ntop(socket.AF_INET6, data) def pack_macaddr(data): + # type: (str) -> bytes a = [int(x, 16) for x in data.split(":")] return struct.pack("!6B", *a) def unpack_macaddr(data): + # type: (bytes) -> str return ":".join(format(x, "02x") for x in data) def pack_euiaddr(data): + # type: (str) -> bytes a = [int(x, 16) for x in data.split(":")] return struct.pack("!8B", *a) def unpack_euiaddr(data): + # type: (bytes) -> str return unpack_macaddr(data) def unpack_timestamp_resolution(data): + # type: (bytes) -> float """ Unpack a timestamp resolution. @@ -64,10 +77,11 @@ def unpack_timestamp_resolution(data): num = data[0] base = 2 if (num >> 7 & 1) else 10 exponent = num & 0b01111111 - return base ** (-exponent) + return float(base ** (-exponent)) def pack_timestamp_resolution(base, exponent): + # type: (int, int) -> bytes """ Pack a timestamp resolution. diff --git a/pcapng/writer.py b/pcapng/writer.py index d61ce48..91483f1 100644 --- a/pcapng/writer.py +++ b/pcapng/writer.py @@ -1,3 +1,5 @@ +from io import BytesIO + import pcapng.blocks as blocks from pcapng.exceptions import PcapngDumpError @@ -14,6 +16,7 @@ class FileWriter(object): ] def __init__(self, stream, shb): + # type: (BytesIO, blocks.SectionHeader) -> None """ Start writing a new pcap-ng section to the given stream. Writes the :py:class:`SectionHeader` immediately. Also writes any @@ -37,6 +40,7 @@ def __init__(self, stream, shb): shb.interfaces[iface]._write(stream) def write_block(self, blk): + # type: (blocks.Block) -> None """ Write the given block to this stream. From 454429202b6920fa93a701b7f41caeb01097467d Mon Sep 17 00:00:00 2001 From: dosisod <39638017+dosisod@users.noreply.github.com> Date: Wed, 9 Sep 2020 23:53:48 -0700 Subject: [PATCH 2/4] added more type annotations --- pcapng/blocks.py | 10 +++++++++- pcapng/structs.py | 19 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/pcapng/blocks.py b/pcapng/blocks.py index 7f04746..b001fde 100644 --- a/pcapng/blocks.py +++ b/pcapng/blocks.py @@ -12,7 +12,7 @@ import io import itertools -from typing import Tuple +from typing import Any, Tuple, Type from pcapng import strictness as strictness from pcapng.constants import link_types @@ -67,6 +67,7 @@ def __init__(self, **kwargs): self._decoded[key] = default def __eq__(self, other): + # type: (Any) -> bool if self.__class__ != other.__class__: return False keys = [x[0] for x in self.schema] @@ -199,6 +200,7 @@ def __init__(self, endianness="<", **kwargs): super(SectionHeader, self).__init__(endianness=endianness, **kwargs) def _encode(self, outstream): + # type: (io.BytesIO) -> None write_int(0x1A2B3C4D, outstream, 32, endianness=self.endianness) super(SectionHeader, self)._encode(outstream) @@ -216,6 +218,7 @@ def new_member(self, cls, **kwargs): return blk def register_interface(self, interface): + # type: (Block) -> None """Helper method to register an interface within this section""" assert isinstance(interface, InterfaceDescription) interface_id = next(self._interfaces_id) @@ -223,6 +226,7 @@ def register_interface(self, interface): self.interfaces[interface_id] = interface def add_interface_stats(self, interface_stats): + # type: (Block) -> None """Helper method to register interface stats within this section""" assert isinstance(interface_stats, InterfaceStatistics) self.interface_stats[interface_stats.interface_id] = interface_stats @@ -370,6 +374,7 @@ def interface(self): return self.section.interfaces[self.interface_id] def _encode(self, outstream): + # type: (io.BytesIO) -> None if len(self.section.interfaces) < 1: strictness.problem( "writing {cls} for section with no interfaces".format( @@ -505,6 +510,7 @@ def captured_len(self): return min(snap_len, self.packet_len) def _encode(self, outstream): + # type: (io.BytesIO) -> None fld_size = IntField(32, False) fld_data = RawBytes(0) if len(self.section.interfaces) > 1: @@ -570,6 +576,7 @@ class ObsoletePacket(BasePacketBlock, BlockWithTimestampMixin): ] def enhanced(self): + # type: () -> SectionMemberBlock """Return an EnhancedPacket with this block's attributes.""" opts_dict = dict(self.options) opts_dict["epb_dropcount"] = self.drops_count @@ -591,6 +598,7 @@ def enhanced(self): # Do this check in _write() instead of _encode() to ensure the block gets written # with the correct magic number. def _write(self, outstream): + # type: (io.BytesIO) -> None strictness.problem("Packet Block is obsolete and must not be used") if strictness.should_fix(): self.enhanced()._write(outstream) diff --git a/pcapng/structs.py b/pcapng/structs.py index d75113f..2161b44 100644 --- a/pcapng/structs.py +++ b/pcapng/structs.py @@ -3,10 +3,12 @@ """ import abc +from io import BytesIO import struct import warnings from collections import defaultdict from collections.abc import Iterable, Mapping +from typing import List from pcapng import strictness as strictness from pcapng._compat import namedtuple @@ -105,6 +107,7 @@ def read_int(stream, size, signed=False, endianness="="): def write_int(number, stream, size, signed=False, endianness="="): + # type: (int, BytesIO, int, bool, str) -> None """ Write (and encode) an integer number to a binary stream. @@ -183,6 +186,7 @@ def read_section_header(stream): def read_block_data(stream, endianness): + # type: (BytesIO, str) -> bytes """ Read block data from a stream. @@ -208,6 +212,7 @@ def read_block_data(stream, endianness): def read_bytes(stream, size): + # type: (BytesIO, int) -> bytes """ Read the given amount of raw bytes from a stream. @@ -233,6 +238,7 @@ def read_bytes(stream, size): def write_bytes(stream, data): + # type: (BytesIO, bytes) -> None """ Write the given amount of raw bytes to a stream. @@ -243,6 +249,7 @@ def write_bytes(stream, data): def read_bytes_padded(stream, size, pad_block_size=4): + # type: (BytesIO, int, int) -> bytes """ Read the given amount of bytes from a stream, plus read and discard any necessary extra byte to align up to the pad_block_size-sized @@ -267,6 +274,7 @@ def read_bytes_padded(stream, size, pad_block_size=4): def write_bytes_padded(stream, data, pad_block_size=4): + # type: (BytesIO, bytes, int) -> None """ Read the given amount of bytes from a stream, plus read and discard any necessary extra byte to align up to the pad_block_size-sized @@ -321,9 +329,11 @@ def __init__(self, size): self.size = size # in bytes! def load(self, stream, endianness=None, seen=None): + # type: (BytesIO, None, None) -> bytes return read_bytes_padded(stream, self.size) def encode(self, value, stream, endianness=None): + # type: (bytes, BytesIO, str) -> None write_bytes_padded(stream, value) def __repr__(self): @@ -344,14 +354,17 @@ class IntField(StructField): __slots__ = ["size", "signed"] def __init__(self, size, signed=False): + # type: (int, bool) -> None self.size = size # in bits! self.signed = signed def load(self, stream, endianness, seen=None): + # type: (BytesIO, str, None) -> int number = read_int(stream, self.size, signed=self.signed, endianness=endianness) return number def encode(self, number, stream, endianness): + # type: (int, BytesIO, str) -> None if not isinstance(number, int): raise TypeError("'{}' is not numeric".format(number)) write_int(number, stream, self.size, signed=self.signed, endianness=endianness) @@ -382,6 +395,7 @@ def load(self, stream, endianness, seen=None): return Options(schema=self.options_schema, data=options, endianness=endianness) def encode(self, options, stream, endianness): + # type: (Options, BytesIO, str) -> None write_options(stream, options) def __repr__(self): @@ -404,9 +418,11 @@ class PacketBytes(StructField): __slots__ = ["dependency"] def __init__(self, len_field): + # type: (int) -> None self.dependency = len_field def load(self, stream, endianness, seen=[]): + # type: (BytesIO, str, List[int]) -> bytes try: length = seen[self.dependency] except TypeError: @@ -424,6 +440,7 @@ def load(self, stream, endianness, seen=[]): return read_bytes_padded(stream, length) def encode(self, packet, stream, endianness=None): + # type: (bytes, BytesIO, str) -> None if not packet: raise ValueError("Packet invalid") write_bytes_padded(stream, packet) @@ -541,6 +558,7 @@ def encode(self, d, stream, endianness): write_bytes_padded(stream, d["raw"]) def encode_finish(self, stream, endianness): + # type: (BytesIO, str) -> None write_int(NRB_RECORD_END, stream, 16, endianness=endianness) write_int(0, stream, 16, endianness=endianness) @@ -618,6 +636,7 @@ class EPBFlags(FlagWord): __slots__ = [] def __init__(self, val=0): + # type: (int) -> None super(EPBFlags, self).__init__( [ FlagField("inout", FlagEnum, 2, ("NA", "inbound", "outbound")), From 36ea22c3510611ada1989ba7d2cb7e91185396b3 Mon Sep 17 00:00:00 2001 From: dosisod <39638017+dosisod@users.noreply.github.com> Date: Fri, 11 Sep 2020 22:59:29 -0700 Subject: [PATCH 3/4] added type annotations for __slots__ --- pcapng/blocks.py | 29 +++++++++++++++-------------- pcapng/flags.py | 4 ++-- pcapng/scanner.py | 3 ++- pcapng/structs.py | 18 +++++++++--------- pcapng/writer.py | 3 ++- 5 files changed, 30 insertions(+), 27 deletions(-) diff --git a/pcapng/blocks.py b/pcapng/blocks.py index b001fde..74452c2 100644 --- a/pcapng/blocks.py +++ b/pcapng/blocks.py @@ -12,7 +12,7 @@ import io import itertools -from typing import Any, Tuple, Type +from typing import Any, List, Tuple, Type from pcapng import strictness as strictness from pcapng.constants import link_types @@ -43,7 +43,7 @@ class Block(object): # These are in addition to the above two properties "magic_number", "_decoded", - ] + ] # type: List[str] def __init__(self, **kwargs): if "raw" in kwargs: @@ -144,7 +144,7 @@ def __repr__(self): class SectionMemberBlock(Block): """Block which must be a member of a section""" - __slots__ = ["section"] + __slots__ = ["section"] # type: List[str] def __init__(self, section, **kwargs): super(SectionMemberBlock, self).__init__(**kwargs) @@ -174,7 +174,8 @@ class SectionHeader(Block): "_interfaces_id", "interfaces", "interface_stats", - ] + ] # type: List[str] + schema = [ ("version_major", IntField(16, False), 1), ("version_minor", IntField(16, False), 0), @@ -271,7 +272,7 @@ class InterfaceDescription(SectionMemberBlock): """ magic_number = 0x00000001 - __slots__ = ["interface_id"] + __slots__ = ["interface_id"] # type: List[str] schema = [ ("link_type", IntField(16, False), 0), # todo: enc/decode ("reserved", IntField(16, False), 0), @@ -341,7 +342,7 @@ class BlockWithTimestampMixin(object): of blocks that provide one. """ - __slots__ = [] + __slots__ = [] # type: List[str] @property def timestamp(self): @@ -365,7 +366,7 @@ class BlockWithInterfaceMixin(object): This includes all packet blocks as well as InterfaceStatistics. """ - __slots__ = [] + __slots__ = [] # type: List[str] @property def interface(self): @@ -400,7 +401,7 @@ class BasePacketBlock(SectionMemberBlock, BlockWithInterfaceMixin): the current length of the packet data. """ - __slots__ = [] + __slots__ = [] # type: List[str] readonly_fields = set(("captured_len",)) @property @@ -429,7 +430,7 @@ class EnhancedPacket(BasePacketBlock, BlockWithTimestampMixin): """ magic_number = 0x00000006 - __slots__ = [] + __slots__ = [] # type: List[str] schema = [ ("interface_id", IntField(32, False), 0), ("timestamp_high", IntField(32, False), 0), @@ -461,7 +462,7 @@ class SimplePacket(BasePacketBlock): """ magic_number = 0x00000003 - __slots__ = [] + __slots__ = [] # type: List[str] schema = [ # packet_len is NOT the captured length ("packet_len", IntField(32, False), 0), @@ -553,7 +554,7 @@ class ObsoletePacket(BasePacketBlock, BlockWithTimestampMixin): """ magic_number = 0x00000002 - __slots__ = [] + __slots__ = [] # type: List[str] schema = [ ("interface_id", IntField(16, False), 0), ("drops_count", IntField(16, False), 0), @@ -620,7 +621,7 @@ class NameResolution(SectionMemberBlock): """ magic_number = 0x00000004 - __slots__ = [] + __slots__ = [] # type: List[str] schema = [ ("records", ListField(NameResolutionRecordField()), []), ( @@ -650,7 +651,7 @@ class InterfaceStatistics( """ magic_number = 0x00000005 - __slots__ = [] + __slots__ = [] # type: List[str] schema = [ ("interface_id", IntField(32, False), 0), ("timestamp_high", IntField(32, False), 0), @@ -681,7 +682,7 @@ class UnknownBlock(Block): processing. """ - __slots__ = ["block_type", "data"] + __slots__ = ["block_type", "data"] # type: List[str] def __init__(self, block_type, data): # type: (int, bytes) -> None diff --git a/pcapng/flags.py b/pcapng/flags.py index 8382190..f909e78 100644 --- a/pcapng/flags.py +++ b/pcapng/flags.py @@ -24,7 +24,7 @@ class FlagBase(object): "size", "extra", "mask", - ] + ] # type: List[str] def __init__(self, owner, offset, size, extra=None): if size < 1: @@ -146,7 +146,7 @@ class FlagWord(object): "_nbits", "_value", "_schema", - ] + ] # type: List[str] def __init__(self, schema, nbits=32, initial=0): """ diff --git a/pcapng/scanner.py b/pcapng/scanner.py index e0a3b29..42d0d7d 100644 --- a/pcapng/scanner.py +++ b/pcapng/scanner.py @@ -1,6 +1,7 @@ from io import BytesIO from typing import ( Iterator, + List, Optional ) @@ -39,7 +40,7 @@ class FileScanner(object): just wrap it in a :py:class:`io.BytesIO` object. """ - __slots__ = ["stream", "current_section", "endianness"] + __slots__ = ["stream", "current_section", "endianness"] # type: List[str] def __init__(self, stream): # type: (BytesIO) -> None diff --git a/pcapng/structs.py b/pcapng/structs.py index 2161b44..03450de 100644 --- a/pcapng/structs.py +++ b/pcapng/structs.py @@ -298,7 +298,7 @@ class StructField(object): """Abstract base class for struct fields""" __metaclass__ = abc.ABCMeta - __slots__ = [] + __slots__ = [] # type: List[str] @abc.abstractmethod def load(self, stream, endianness, seen=None): @@ -323,7 +323,7 @@ class RawBytes(StructField): :param size: field size, in bytes """ - __slots__ = ["size"] + __slots__ = ["size"] # type: List[str] def __init__(self, size): self.size = size # in bytes! @@ -351,7 +351,7 @@ class IntField(StructField): integer. Defaults to False (unsigned) """ - __slots__ = ["size", "signed"] + __slots__ = ["size", "signed"] # type: List[str] def __init__(self, size, signed=False): # type: (int, bool) -> None @@ -385,7 +385,7 @@ class OptionsField(StructField): constructor. """ - __slots__ = ["options_schema"] + __slots__ = ["options_schema"] # type: List[str] def __init__(self, options_schema): self.options_schema = options_schema @@ -415,7 +415,7 @@ class PacketBytes(StructField): - packet data (captured_len-sized binary data) """ - __slots__ = ["dependency"] + __slots__ = ["dependency"] # type: List[str] def __init__(self, len_field): # type: (int) -> None @@ -463,7 +463,7 @@ class ListField(StructField): used to read values from the stream. """ - __slots__ = ["subfield"] + __slots__ = ["subfield"] # type: List[str] def __init__(self, subfield): self.subfield = subfield @@ -509,7 +509,7 @@ class NameResolutionRecordField(StructField): selected IP version, followed by null-separated/terminated domain names. """ - __slots__ = [] + __slots__ = [] # type: List[str] def load(self, stream, endianness, seen=None): record_type = read_int(stream, 16, False, endianness) @@ -633,7 +633,7 @@ def write_options(stream, options): class EPBFlags(FlagWord): """Class representing the epb_flags option on an EPB""" - __slots__ = [] + __slots__ = [] # type: List[str] def __init__(self, val=0): # type: (int) -> None @@ -729,7 +729,7 @@ class Options(Mapping): "_field_names", "data", "endianness", - ] + ] # type: List[str] def __init__(self, schema, data, endianness): self.schema = {} # Schema of option fields: {: Option(...)} diff --git a/pcapng/writer.py b/pcapng/writer.py index 91483f1..32f93bf 100644 --- a/pcapng/writer.py +++ b/pcapng/writer.py @@ -1,4 +1,5 @@ from io import BytesIO +from typing import List import pcapng.blocks as blocks from pcapng.exceptions import PcapngDumpError @@ -13,7 +14,7 @@ class FileWriter(object): "stream", "interfaces", "current_section", - ] + ] # type: List[str] def __init__(self, stream, shb): # type: (BytesIO, blocks.SectionHeader) -> None From f70b837acbb41ceae45983106cce6c9be47ce172 Mon Sep 17 00:00:00 2001 From: dosisod <39638017+dosisod@users.noreply.github.com> Date: Sat, 12 Sep 2020 23:37:48 -0700 Subject: [PATCH 4/4] add more type annotations --- pcapng/blocks.py | 10 ++++++++-- pcapng/flags.py | 7 ++++--- pcapng/structs.py | 9 ++++++++- pcapng/utils.py | 4 +++- 4 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pcapng/blocks.py b/pcapng/blocks.py index 74452c2..589601a 100644 --- a/pcapng/blocks.py +++ b/pcapng/blocks.py @@ -16,6 +16,7 @@ from pcapng import strictness as strictness from pcapng.constants import link_types +from pcapng.flags import FlagField from pcapng.structs import ( IntField, ListField, @@ -92,6 +93,7 @@ def _write(self, outstream): write_int(block_length, outstream, 32) def _encode(self, outstream): + # type: (io.BytesIO) -> None """Encodes the fields of this block into raw data""" for name, field, default in self.schema: field.encode( @@ -147,14 +149,16 @@ class SectionMemberBlock(Block): __slots__ = ["section"] # type: List[str] def __init__(self, section, **kwargs): + # type: (SectionMemberBlock, str) -> None super(SectionMemberBlock, self).__init__(**kwargs) self.section = section def register_block(block): + # type: (Any) -> Block """Handy decorator to register a new known block type""" KNOWN_BLOCKS[block.magic_number] = block - return block + return block # type: ignore @register_block @@ -324,6 +328,7 @@ def timestamp_resolution(self): @property def statistics(self): + # type: () -> object # todo: ensure we always have an interface id -> how?? return self.section.interface_stats.get(self.interface_id) @@ -370,6 +375,7 @@ class BlockWithInterfaceMixin(object): @property def interface(self): + # type: () -> FlagField # We need to get the correct interface from the section # by looking up the interface_id return self.section.interfaces[self.interface_id] @@ -635,7 +641,7 @@ class NameResolution(SectionMemberBlock): ), None, ), - ] + ] # type: List[Tuple[str, FlagField, object]] @register_block diff --git a/pcapng/flags.py b/pcapng/flags.py index f909e78..cd55496 100644 --- a/pcapng/flags.py +++ b/pcapng/flags.py @@ -97,7 +97,7 @@ def __init__(self, owner, offset, size, extra=None): if len(extra) > 2 ** size: raise TypeError( "{cls} iterable has too many values (got {got}, " - + "{size} bits only address {max})".format( + "{size} bits only address {max})".format( cls=self.__class__.__name__, got=len(extra), size=size, @@ -134,7 +134,7 @@ def set(self, val): FlagField = namedtuple( "FlagField", ("name", "ftype", "nbits", "extra"), defaults=(1, None) -) +) # type: Type[namedtuple] class FlagWord(object): @@ -149,6 +149,7 @@ class FlagWord(object): ] # type: List[str] def __init__(self, schema, nbits=32, initial=0): + # type: (FlagField, int, int) -> None """ :param schema: A list of FlagField objects representing the values to be packed @@ -169,7 +170,7 @@ def __init__(self, schema, nbits=32, initial=0): if tot_bits > nbits: raise TypeError( "Too many fields for {nbits}-bit field " - + "(schema defines {tot} bits)".format(nbits=nbits, tot=tot_bits) + "(schema defines {tot} bits)".format(nbits=nbits, tot=tot_bits) ) bitn = 0 diff --git a/pcapng/structs.py b/pcapng/structs.py index 03450de..8ea3484 100644 --- a/pcapng/structs.py +++ b/pcapng/structs.py @@ -8,10 +8,11 @@ import warnings from collections import defaultdict from collections.abc import Iterable, Mapping -from typing import List +from typing import Dict, List, Tuple, Union from pcapng import strictness as strictness from pcapng._compat import namedtuple +from pcapng.blocks import Block from pcapng.exceptions import ( BadMagic, CorruptedFile, @@ -131,6 +132,7 @@ def write_int(number, stream, size, signed=False, endianness="="): def read_section_header(stream): + # type: (BytesIO) -> Dict[str, Union[str, bytes]] """ Read a section header block from a stream. @@ -326,6 +328,7 @@ class RawBytes(StructField): __slots__ = ["size"] # type: List[str] def __init__(self, size): + # type: (int) -> None self.size = size # in bytes! def load(self, stream, endianness=None, seen=None): @@ -388,6 +391,7 @@ class OptionsField(StructField): __slots__ = ["options_schema"] # type: List[str] def __init__(self, options_schema): + # type: (Mapping) -> None self.options_schema = options_schema def load(self, stream, endianness, seen=None): @@ -564,6 +568,7 @@ def encode_finish(self, stream, endianness): def read_options(stream, endianness): + # type: (BytesIO, str) -> List[Tuple[int, bytes]] """ Read "options" from an "options block" in a stream, until a ``StreamEmpty`` exception is caught, or an end marker is reached. @@ -1034,6 +1039,7 @@ def _encode_value(self, value, ftype): def struct_decode(schema, stream, endianness="="): + # type: (List[Tuple[str, StructField, object]], BytesIO, str) -> bytes """ Decode structured data from a stream, following a schema. @@ -1063,6 +1069,7 @@ def struct_decode(schema, stream, endianness="="): def block_decode(block, stream): + # type: (Block, BytesIO) -> bytes return struct_decode(block.schema, stream, block.section.endianness) diff --git a/pcapng/utils.py b/pcapng/utils.py index c216db7..22de9fd 100644 --- a/pcapng/utils.py +++ b/pcapng/utils.py @@ -2,6 +2,7 @@ import struct from typing import ( Iterable, + List, Tuple ) @@ -17,6 +18,7 @@ def unpack_ipv4(data): def _get_pairs(data): + # type: (List[object]) -> List[Tuple[object, object]] """Return data in pairs This uses a clever hack, based on the fact that zip will consume @@ -29,7 +31,7 @@ def _get_pairs(data): [(1, 2), (3, 4)] """ - return list(zip(*((iter(data),) * 2))) + return list(zip(*((iter(data),) * 2))) # type: ignore def pack_ipv6(data):