#!/usr/bin/python import time import select import os import sys from optparse import OptionParser import pickle import pprint from dvb import device from dvb.structures import Properties, Property from dvb import structures from dvb import tables from dvb import bs class FullTable(list): """Accumulate DVB SI table data until we have a full table's worth of data. """ def __init__(self, series): list.__init__(self) self.series = series self.version = -1 self.full_loop = False def add(self, table): if not table.current_next_indicator: # Drop tables for next return # Check version of this table vc = (table.version_number - self.version) % 32 if self.version == -1 or (vc != 0 and vc < 16): # We've got a version increment: ditch everything and start again max_sections = table.last_section_number + 1 self[:] = [None]*max_sections self.full_loop = False # If we've seen this section before, we've come full circle if self[table.section_number] is not None: self.full_loop = True # Add this table to the list self[table.section_number] = table self.table_id = table.table_id self.version = table.version_number def complete(self): return len(self) > 0 and all(self) and self.full_loop def read_si_tables(dmx, parsers, required, optional=[]): P = select.poll() P.register(dmx.fileno(), select.POLLIN) strm = bs.Bitstream(dmx) results = {} stuff = tables.PacketStuffing() complete = False while not complete: evs = P.poll(50) for fd, ev in evs: if ev & select.POLLIN and fd == dmx.fileno(): offset, count = stuff.parse(strm) # Read the pointer octet strm.read_string(offset) # Skip to the data table_id = strm.read_bits(8) # Read the table type table, bits = parsers[table_id].parse(strm) # Read and parse the table table.table_id = table_id T = results.setdefault(table.series(), FullTable(table.series())) T.add(table) # We are complete if all of our required table IDs are present # in the results, and we've seen all of the sections for those # IDs, and, if any of the optional ones are present, that we have # seen all of the sections for each of those. complete = True tmp_req = required[:] for tid, tab in results.iteritems(): if tab.table_id in tmp_req: tmp_req.remove(tab.table_id) if not tab.complete(): complete = False if len(tmp_req) != 0: complete = False return results parser = OptionParser(usage="Usage: %prog [options]") parser.add_option("-o", "--output", dest="ofile", default="scan", help="output to FILE", metavar="FILE") parser.add_option("-d", "--device", dest="device", default=2, type="int", help="Select device number (0)") (options, args) = parser.parse_args() if len(args) > 0: parser.print_usage() sys.exit(1) D = device.Device("/dev/dvb/adapter%d" % options.device) fe = D.get_fe(readonly=False) fe.tune(505833330, bandwidth=0, code_rate_hp=3, code_rate_lp=3, modulation=1, transmission_mode=0, guard_interval=0, hierarchy=0) dmx = D.get_demux() dmx.set_filter(pid=0, start=True, output=dmx) parsers = tables.known_parsers() PAT = read_si_tables(dmx, parsers, [0x00]) # 0x00 == PAT PAT0 = PAT[0][0] for progs in PAT0.programs: if progs.program_number == 0: nit_pid = progs.network_PID break dmx.close() dmx = D.get_demux() dmx.set_filter(pid=nit_pid, start=True, output=dmx) # 0x40, 0x41 = NIT, this network and other network respectively NIT = read_si_tables(dmx, parsers, [0x40], [0x41]) dmx.close() streams = {} for t in NIT[0x40]: for ts in t.transport_stream_loop: # Store the TS details so that we can access them by TS streams[ts.transport_stream_id] = ts # Pull out the tuning information consistently, in a way that # we can pass to fe.tune() for d in ts.transport_descriptors: if d.descriptor_tag == 0x5a: # Terrestrial delivery ts.delivery_system = "DVB-T" ts.tuning = { "frequency": d.frequency, "bandwidth": d.bandwidth, "code_rate_hp": d.code_rate_HP_stream, "code_rate_lp": d.code_rate_LP_stream, "modulation": d.constellation, "transmission_mode": d.transmission_mode, "guard_interval": d.guard_interval, "hierarchy": d.hierarchy_information } elif d.descriptor_tag == 0x43: # Satellite delivery ts.delivery_system = "DVB-S" #pprint.pprint(NIT) # We now have enough information, in "streams", to tune to all of the # transport streams/muxes on this network. We now need to get the # SDTs, and pull out the channel information. # Work on the principle that all the info we need is in the current # stream, and only go looking for missing stuff if we don't find it # here. dmx = D.get_demux() dmx.set_filter(pid=0x11, start=True, output=dmx) # PID 0x11 == SDT SDT = read_si_tables(dmx, parsers, [0x42], [0x46]) # 0x42 = SDT-this, 0x46 = SDT-other dmx.close() services = {} for full_tab in SDT.itervalues(): for tab in full_tab: tsid = tab.transport_stream_id ts = streams[tsid] for svc in tab.service_info: # Pull in the descriptors that we're most interested in for d in svc.descriptors_loop: if d.descriptor_tag == 0x48: # service_descriptor svc.provider = d.service_provider_name svc.name = d.service_name svc.type = d.service_type elif d.descriptor_tag == 0x73: # default_authority pass # Add this service to the services table services[(tsid, svc.service_id)] = svc # and put it in the TS as well try: ts.services.append(svc) except AttributeError: ts.services = [ svc, ] # Now dump the data we want results = [] for tsid, ts in streams.iteritems(): try: for svc in ts.services: tinfo = [svc.provider, svc.name, ts.tuning] results.append(tinfo) print svc.provider, "--", svc.name except AttributeError, ex: print "No services found for TS", ts.transport_stream_id, ex of = open(options.ofile, "w") pickle.dump(results, of, -1) of.close()