"""Basic tools for parsing a bitstream at the bit level """ class Bitstream(object): _byte_read_masks = [ 0x0, 0x1, 0x3, 0x7, 0xf, 0x1f, 0x3f, 0x7f, 0xff ] def __init__(self, stream): self.bit_offset = 0 self.stream = stream self.current_octet = ord(self.stream.read(1)) def read_bits(self, length): """Read length bits from the stream and return as an integer. """ if length == 0: return 0 rv = 0 # Read the first fragment fraglen = 8 - self.bit_offset if fraglen > length: fraglen = length rv = self._byte_read(self.current_octet, fraglen) self.advance(fraglen) length -= fraglen # Now read chunks of 8 bits while length >= 8: rv <<= 8 rv |= self.current_octet self.advance(8) length -= 8 # Finally, polish off the remaining bits rv <<= length rv |= self._byte_read(self.current_octet, length) self.advance(length) return rv def read_string(self, length): """Read a string of length octets from the stream, and return as a string. """ if length == 0: return "" if self.bit_offset % 8 != 0: raise Exception("Reading string, but not octet-aligned!") rv = chr(self.current_octet) if length > 1: rv += self.stream.read(length-1) self.current_octet = ord(self.stream.read(1)) return rv def _byte_read(self, b, length): """Read up to 8 bits of data from the octet given, starting at bit position pos. Throw an exception if asked to go over an octet boundary. """ if length == 0: return 0 if self.bit_offset + length > 8: raise Exception("Can't read %d bits starting at position %s" % (length, self.bit_offset)) # Shift right to the right place b >>= 8 - length - self.bit_offset # Mask off the bits we want b &= self._byte_read_masks[length] return b def advance(self, i): """Move forward by i bits in the stream """ self.bit_offset += i if self.bit_offset >= 8: if self.bit_offset >= 16: self.stream.read(self.bit_offset // 8 - 1) self.bit_offset %= 8 self.current_octet = ord(self.stream.read(1)) class Parser(object): """Base class to parse a chunk of a bitstream. """ def __init__(self, name, length): self._name = name self.length = length def parse(self, stream, state): data = stream.read_bits(self.length) return (data, self.length) def name(self, state): return self._name class Typed(Parser): """Parse a chunk of a bitstream as a typed field. """ def __init__(self, name, length, typ): Parser.__init__(self, name, length) self.typ = typ def parse(self, stream, state): data = Parser.parse(self, stream, state) return (self.typ(data[0]), data[1]) class Reserved(Parser): """Parse a chunk of a bitstream and discard it. """ def __init__(self, length): Parser.__init__(self, None, length) def parse(self, stream, state): stream.advance(self.length) return (None, self.length) class BCD(Parser): """Parse a BCD number. """ def __init__(self, name, length): Parser.__init__(self, name, length) def parse(self, stream, state): rv = 0 total = 0 while total < length: rv *= 10 rv += stream.read_bits(4) total += 4 return (rv, total) class String(Parser): """Parse a chunk of a bitstream as a string. """ def __init__(self, name, length): Parser.__init__(self, name, length) def parse(self, stream, state): return (stream.read_string(self.length), self.length*8) class String_Deferred(String): """Parse a chunk of a bitstream as a string, taking the length from the current state. """ def __init__(self, name, field, offset=0): Parser.__init__(self, name, 0) self.field = field self.offset = offset def parse(self, stream, state): length = getattr(state, self.field) - self.offset return (stream.read_string(length), length*8) class BSData(object): def __repr__(self): return repr(self.__dict__) class StructuredParser(Parser): """Parse a chunk of bitstream and create a structure from it. """ def __init__(self, name, syntax, cls=BSData): Parser.__init__(self, name, 0) self.syntax = syntax self.cls = cls def parse(self, stream, state=None): rv = self.cls() total = 0 for ins in self.syntax: #print "SP:", ins.name(rv) data, bits = ins.parse(stream, rv) if ins.name(rv) is not None: setattr(rv, ins.name(rv), data) total += bits return (rv, total) class AccumulatingParser(Parser): """Parse a chunk of bitstream and create a structure from it, adding the resulting data to the current state. """ def __init__(self, name, syntax): Parser.__init__(self, name, 0) self.syntax = syntax def parse(self, stream, state=None): total = 0 if state is None: state = BSData() for ins in self.syntax: #print "AP:", ins.name(state) data, bits = ins.parse(stream, state) if ins.name(state) is not None: setattr(state, ins.name(state), data) total += bits return (state, total) class LengthArray(StructuredParser): """Parse a chunk of a bitstream repeatedly until we consume enough data. """ def __init__(self, name, bits, syntax): StructuredParser.__init__(self, name, syntax) self.bits = bits def parse(self, stream, state): total = 0 rv = [] while total < self.bits: try: iter(self.syntax) data, bits = StructuredParser.parse(self, stream, state) except TypeError: data, bits = self.syntax.parse(stream, state) rv.append(data) total += bits return (rv, total) class LengthArray_Deferred(LengthArray): def __init__(self, name, field, offset, syntax): LengthArray.__init__(self, name, 0, syntax) self.field = field self.offset = offset def parse(self, stream, state): self.bits = (getattr(state, self.field) + self.offset) * 8 return LengthArray.parse(self, stream, state) class CountedArray(StructuredParser): """Parse a chunk of a bitstream repeatedly until we consume enough data. """ def __init__(self, name, field, syntax): StructuredParser.__init__(self, name, syntax) self.field = field def parse(self, stream, state): n = getattr(state, self.field) total = 0 rv = [] while n > 0: data, bits = StructuredParser.parse(self, stream, state) rv.append(data) total += bits n -= 1 return (rv, total)