"""Custom bitstream parsers for bits of the SI specifications """ import datetime from dvb import bs class Time(bs.Parser): """Parse 40 bits of data and turn it into a date/time. """ def __init__(self, name): bs.Parser.__init__(self, name, 40) def parse(self, stream, state): # See ETSI EN 300 468, Annex C for detailed calculations. MJD = stream.read_bits(16) base = datetime.datetime(2000, 1, 1) base += datetime.timedelta(MJD-51544) # 2000-01-01 00:00 was MJD=51544 # Get the time offset in the next 24 bits, and add that on as well offset = Duration("").parse(stream, state)[0] base += offset return (base, self.length) class Duration(bs.Parser): """Parse 24 bits of data and turn it into a duration. """ def __init__(self, name): bs.Parser.__init__(self, name, 24) def parse(self, stream, state): hours = stream.read_bits(4)*10 hours += stream.read_bits(4) minutes = stream.read_bits(4)*10 minutes += stream.read_bits(4) seconds = stream.read_bits(4)*10 seconds += stream.read_bits(4) rv = datetime.timedelta(hours=hours, minutes=minutes, seconds=seconds) return (rv, self.length) class Descriptor(bs.StructuredParser): _descriptor_header = (bs.Parser("descriptor_tag", 8), bs.Parser("descriptor_length", 8)) _descriptor_table = { # network_name_descriptor 0x40: (bs.String_Deferred("network_name", "descriptor_length"), ), # service_list_descriptor 0x41: (bs.LengthArray_Deferred( "service_info", "descriptor_length", 0, (bs.Parser("service_id", 16), bs.Parser("service_type", 8)), ), ), # stuffing_descriptor #0x42: (), # satellite_delivery_system_descriptor 0x43: (bs.BCD("frequency", 32), bs.BCD("orbital_position", 16), bs.Typed("west_east_flag", 1, bool), bs.Parser("polarization", 2), bs.Parser("roll_off", 2), bs.Parser("modulation_system", 1), bs.Parser("modulation_type", 2), bs.BCD("symbol_rate", 28), bs.Parser("FEC_inner", 4) ), # cable_delivery_system_descriptor 0x44: (bs.BCD("frequency", 32), bs.Reserved(12), bs.Parser("FEC_outer", 4), bs.Parser("modulation", 8), bs.BCD("symbol_rate", 28), bs.Parser("FEC_inner", 4) ), # VBI_data_descriptor #0x45: (), # VBI_teletext_descriptor #0x46: (), # bouquet_name_descriptor #0x47: (), # service_descriptor 0x48: (bs.Parser("service_type", 8), bs.Parser("service_provider_name_length", 8), bs.String_Deferred("service_provider_name", "service_provider_name_length"), bs.Parser("service_name_length", 8), bs.String_Deferred("service_name", "service_name_length") ), # country_availability_descriptor #0x49: (), # linkage_descriptor #0x4a: (), # NVOD_reference_descriptor #0x4b: (), # time_shifted_service_descriptor 0x4c: (bs.Parser("service_id", 16), ), # short_event_descriptor #0x4d: (), # extended_event_descriptor #0x4e: (), # time_shifted_event_descriptor #0x4f: (), # component_descriptor 0x50: (bs.Reserved(4), bs.Parser("stream_content", 4), bs.Parser("component_type", 8), bs.Parser("component_tag", 8), bs.Parser("ISO_639_language_code", 24), bs.String_Deferred("description", "descriptor_length", -6) ), # mosaic_descriptor #0x51: (), # stream_identifier_descriptor 0x52: (bs.Parser("component_tag", 8),), # CA_identifier_descriptor #0x53: (), # content_descriptor 0x54: (bs.LengthArray_Deferred( "content_identifier", "descriptor_length", 0, (bs.Parser("content_nibble_level_1", 4), bs.Parser("content_nibble_level_2", 4), bs.Parser("user_nibble_1", 4), bs.Parser("user_nibble_2", 4)) ), ), # parental_rating_descriptor #0x55: (), # teletext_descriptor #0x56: (), # telephone_descriptor #0x57: (), # local_time_offset_descriptor #0x58: (), # subtitling_descriptor #0x59: (), # terrestrial_delivery_system_descriptor 0x5a: (bs.Parser("frequency", 32), bs.Parser("bandwidth", 3), bs.Parser("priority", 1), bs.Parser("Time_Slicing_indicator", 1), bs.Parser("MPE-FEC_indicator", 1), bs.Reserved(2), bs.Parser("constellation", 2), bs.Parser("hierarchy_information", 3), bs.Parser("code_rate_HP_stream", 3), bs.Parser("code_rate_LP_stream", 3), bs.Parser("guard_interval", 2), bs.Parser("transmission_mode", 2), bs.Typed("other_frequency_flag", 1, bool), bs.Reserved(32) ), # multilingual_network_name_descriptor #0x5b: (), # multilingual_bouquet_name_descriptor #0x5c: (), # multilingual_service_name_descriptor #0x5d: (), # multilingual_component_name_descriptor #0x5e: (), # private_data_specifier_descriptor #0x5f: (), # service_move_descriptor #0x60: (), # short_smoothing_buffer_descriptor #0x61: (), # frequency_list_descriptor 0x62: (bs.Reserved(6), bs.Parser("coding_type", 2), bs.LengthArray_Deferred( "frequency_list", "descriptor_length", -1, bs.Parser("centre_frequency", 32) ) ), # partial_transport_stream_descriptor #0x63: (), # data_broadcast_descriptor #0x64: (), # scrambling_descriptor #0x65: (), # data_broadcast_id_descriptor #0x66: (), # transport_stream_descriptor #0x67: (), # DSNG_descriptor #0x68: (), # PDC_descriptor 0x69: (bs.Reserved(4), bs.Parser("programme_identification_label", 20) ), # AC-3_descriptor #0x6a: (), # ancillary_data_descriptor #0x6b: (), # cell_list_descriptor #0x6c: (), # cell_frequency_link_descriptor #0x6d: (), # announcement_support_descriptor #0x6e: (), # application_signalling_descriptor #0x6f: (), # adaptation_field_data_descriptor #0x70: (), # service_identifier_descriptor 0x71: (bs.Parser("service_type", 8), bs.Parser("service_provider_name_length", 8), bs.String_Deferred( "service_provider_name", "service_provider_name_length"), bs.Parser("service_name_length", 8), bs.String_Deferred( "service_name", "service_name_length") ), # service_availability_descriptor 0x72: (bs.Typed("availability_flag", 1, bool), bs.Reserved(7), bs.LengthArray_Deferred( "cell_id_loop", "descriptor_length", -1, bs.Parser("cell_id", 16) ) ), # default_authority_descriptor 0x73: (bs.String_Deferred("default_authority", "descriptor_length"),), # related_content_descriptor #0x74: (), # TVA_id_descriptor #0x75: (), # content_identifier_descriptor #0x76: (), # time_slice_fec_identifier_descriptor #0x77: (), # ECM_repetition_rate_descriptor #0x78: (), # S2_satellite_delivery_system_descriptor #0x79: (), # enhanced_AC-3_descriptor #0x7a: (), # DTS_descriptor #0x7b: (), # AAC_descriptor #0x7c: (), # XAIT_location_descriptor #0x7d: (), # FTA_content_management_descriptor #0x7e: (), # extension_descriptor #0x7f: (), } def __init__(self): bs.StructuredParser.__init__(self, None, self._descriptor_header) def parse(self, stream, state): header, bits = bs.StructuredParser.parse(self, stream, state) tag = header.descriptor_tag if tag in self._descriptor_table: parser = bs.AccumulatingParser(None, self._descriptor_table[tag]) header, read = parser.parse(stream, header) bits += read else: header.data = stream.read_string(header.descriptor_length) bits += header.descriptor_length*8 return (header, bits) class PAT_ProgramPID(bs.Parser): def __init__(self, length): bs.Parser.__init__(self, None, length) def name(self, state): if state.program_number == 0: return "network_PID" else: return "program_map_PID" class PMT_StreamInfoArray(bs.LengthArray): _pmt_si_syntax = (bs.Parser("stream_type", 8), bs.Reserved(3), bs.Parser("elementary_PID", 13), bs.Reserved(4), bs.Parser("ES_info_length", 12), bs.LengthArray_Deferred( "ES_info", "ES_info_length", 0, Descriptor()) ) def __init__(self, name): bs.LengthArray.__init__(self, name, 0, self._pmt_si_syntax) def parse(self, stream, state): octets = state.section_length - state.program_info_length - 13 self.bits = octets*8 return bs.LengthArray.parse(self, stream, state)