"""WSGI handlers for manipulating tuner details """ import os import os.path import time import signal import subprocess import threading import logging import urlparse try: import json except ImportError: import simplejson as json import sqlobject import calliope.config as config from calliope.tuner import Tuner from calliope.channeltuner import ChannelTuner from calliope.channel import Channel from calliope.ZeroconfService import ZeroconfPublisher import calliope.muddleware as muddleware reaper = None tuner_url_base = None log = logging.getLogger("recorder") class Reaper(threading.Thread): def __init__(self): threading.Thread.__init__(self, name="Reaper") self.children = {} self.setDaemon(True) self.lock = threading.Lock() def run(self): log.info("Started daemon reaper") while True: time.sleep(60) self.lock.acquire() try: deletions = [] for childtuner in self.children.itervalues(): child, tuner = childtuner child.poll() if child.returncode is not None: deletions.append(child.pid) log.info("Process completed, return code %d", child.returncode) tuner.pid = None tuner.state = "" for pid in deletions: del self.children[pid] finally: self.lock.release() def add_child(self, child, tuner): self.lock.acquire() try: self.children[child.pid] = (child, tuner) finally: self.lock.release() def get_child(self, pid): self.lock.acquire() try: rv = self.children[pid][0] except KeyError: rv = None finally: self.lock.release() return rv def _read_tuners(): # Check the tuners present: match off tuners with our records hw_tids = Tuner.list_hw_ids() tids = [ t.tid for t in Tuner.select() ] while len(hw_tids) > 0 and len(tids) > 0: tid = hw_tids.pop(0) manufacturer, product = Tuner.hw_identity(tid) tuner = Tuner.select(Tuner.q.tid == tid).getOne(None) if (tuner is not None and tuner.manufacturer == manufacturer and tuner.product == product): # Then we have a match, and the tuner probably hasn't moved tids.remove(tid) continue # Otherwise, we should go looking for the tuner found = None for tuner in Tuner.select(): if tuner.tid not in tids: # Skip tuners we've already processed continue if tuner.manufacturer == manufacturer and tuner.product == product: found = tuner break if found is None: # We've got a completely new tuner t = Tuner() t.setup(tuner_url_base, tid) else: # We've got a tuner that looks like it's moved tids.remove(found.tid) continue def setup(mapper): opfx = mapper.prefix # Local tuners mapper.prefix = "/api/tuners/local" mapper.add("[/]", GET=get_tuner_ids) mapper.add("/{tunerid:digits}[/]", GET=get_base_info) mapper.add("/{tunerid:digits}/channels[/]", GET=get_channel_names, POST=reload_channels) mapper.add("/{tunerid:digits}/channels/all", GET=get_all_channels) mapper.add("/{tunerid:digits}/channels/all_tuning", GET=get_all_channeltuners) mapper.add("/{tunerid:digits}/channels/by-id/{chanid:digits}[/]", GET=get_channel_by_id) mapper.add("/{tunerid:digits}/channels/by-name/{chanid}[/]", GET=get_channel_by_name) mapper.add("/{tunerid:digits}/recording", GET=current_recording, PUT=control_recording, DELETE=cancel_recording) # FIXME: Also add URLs for EPG mapper.prefix = opfx # Set up Zeroconf rpub = ZeroconfPublisher("Calliope Recorder", config.port, "_calliope._tcp", text=["path=/api/tuners/local", "protocol=http"]) hostname = rpub.hostname() # Start up the process reaper global reaper reaper = Reaper() reaper.start() global tuner_url_base url_parts = ["http", "%s:%d" % (hostname, config.port), "/api/tuners/local/%d", None, None, None] tuner_url_base = urlparse.urlunparse(url_parts) _read_tuners() rpub.publish() def _lazy_get_tuner(tid): return Tuner.select(Tuner.q.tid == tid).getOne(None) def get_tuner_ids(req, res): res.data = Tuner.list_hw_ids() def get_base_info(req, res): """Get the base information about a tuner """ tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner is None: res.result = "404 Not found" res.data = "Not found" else: res.headers["Last-Modified"] = time.strftime(muddleware.RFC_2822_DATE, time.gmtime(tuner.last_update)) if muddleware.change_test(req, tuner.last_update): res.data = tuner.get_data() else: res.result = "304 Not changed" res.data = "Not changed" def get_channel_names(req, res): """Retrieve the full list of channel IDs on a tuner """ tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner is None: res.result = "404 Not found" res.data = "Not found" else: res.data = [[ch.channel.id, ch.channel.label] for ch in tuner.channels] def get_all_channels(req, res): """Retrieve the full list of channels available on a tuner """ # FIXME: handle If-Modified-Since header tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner is None: res.result = "404 Not found" res.data = "Not found" else: res.data = [ch.channel.get_data() for ch in tuner.channels] def get_all_channeltuners(req, res): """Retrieve the full list of channels and tuning info available on a tuner """ # FIXME: handle If-Modified-Since header tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner is None: res.result = "404 Not found" res.data = "Not found" else: res.data = [ch.get_data() for ch in tuner.channels] def reload_channels(req, res): """Force a reload/rescan of all channels on this tuner """ res.data = None tid = int(req['wsgiorg.routing_args'][1]['tunerid']) t = _lazy_get_tuner(tid) if t is not None: t.destroySelf() Tuner().setup(tuner_url_base, tid) def get_channel_by_id(req, res): """Retrieve the data on a single channel on this tuner """ res.data = None tid = int(req['wsgiorg.routing_args'][1]['tunerid']) chid = int(req['wsgiorg.routing_args'][1]['chanid']) chs = ChannelTuner.select( sqlobject.AND( Tuner.q.tid == tid, ChannelTuner.q.channelID == chid )) ch = chs.getOne(None) if ch == None: res.result = "404 Not found" res.data = "Not found" else: res.data = ch.get_data() def get_channel_by_name(req, res): """Retrieve the data on a single channel on this tuner """ res.data = None tid = int(req['wsgiorg.routing_args'][1]['tunerid']) chid = req['wsgiorg.routing_args'][1]['chanid'] tuner = _lazy_get_tuner(tid) channel = Channel.select(Channel.q.label == chid).getOne() ch = ChannelTuner.select( sqlobject.AND( ChannelTuner.q.tuner == tuner, ChannelTuner.q.channel == channel )) res.data = ch.getOne().get_data() def current_recording(req, res): """Retrieve details of the current recording being made on this tuner """ tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner.state == "": res.data = None elif tuner.state == "EPG" or tuner.state == "Scan": res.data = tuner.state elif tuner.state == "Recording": res.data = tuner.state else: res.data = "Unknown recording state" res.result = "500 Infernal server error" def make_title(rec): """Construct a filename from a recording control dict's metadata """ rv = rec['title'] if 'episode' in rec: rv += "." if 'series' in rec: rv += "S%d-" % rec['series'] rv += "Ep%d" % rec['episode'] if 'subtitle' in rec: rv += "." + rec['subtitle'] return quote_filename(rv) def quote_filename(name): """Remove nasty characters in filenames """ for c in "\"',;:!": name = name.replace(c, "") for c in " \t\n\r/": name = name.replace(c, "-") return name def control_recording(req, res): """Set the current recording state on this tuner """ # Get the tuner tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) if tuner.state != "": res.data = "In use" res.result = "403 Forbidden" return os.umask(0002) # Get the recording(s) details content_len = int(req['CONTENT_LENGTH']) text = req['wsgi.input'].read(content_len) recs = json.loads(text) # These aren't recordings, just recording # control dictionaries end = time.time() + 1 channels = {} for r in recs: end = max(end, r['end']) channels[r['channel']] = 1 dfile_name = "+".join([ make_title(r) for r in recs ]) + ".ts" dfile_path = os.path.join(config.file_store, dfile_name) freq = None pids = [] for channame in channels.iterkeys(): ch = Channel.select(Channel.q.label == channame).getOne() ct = ChannelTuner.select( sqlobject.AND( ChannelTuner.q.tuner == tuner, ChannelTuner.q.channel == ch ) ).getOne(None) if ct is None: log.error("Recording failed: No channel") res.data = "Channel not found: %s" % (channame,) res.result = "404 Not found" return if freq is None: freq = ct.freq else: if freq != ct.freq: log.error("Recording failed: Different muxes") res.data = "Attempted to use different muxes on the same tuner" res.result = "403 Forbidden" return channels[channame] = ct pids.append(str(ct.vpid)) pids.append(str(ct.apid)) length = max(int(end-time.time()), 1) write_metadata(dfile_path + ".info", dfile_name, length, channels, recs) # Start up the recording process dfile = open(dfile_path, "w") params = ["dvbstream", "-f", str(freq), # Frequency "-c", str(tuner.tid), # Card number "-o", # Output to stdout "-n", str(length), # Timeout ] params.extend(pids) process = subprocess.Popen( params, stdout=dfile, cwd=config.file_store ) tuner.pid = process.pid tuner.state = "Recording" reaper.add_child(process, tuner) res.data = None def write_metadata(mfile_name, ofile_name, length, channels, recordings): baseid = str(time.time()) mfile = open(mfile_name, "w") mfile.write(""" @prefix : . @prefix calrdf: . @prefix rdf: . @prefix rdfs: . @prefix xml: . @prefix xsd: . @prefix dc: . """) # The recording itself mfile.write(""" <:rec-%(baseid)s> a calrdf:MuxRecording; calrdf:start "%(start)s"^^xsd:dateTime; calrdf:end "%(end)s"^^xsd:dateTime; calrdf:filename "%(filename)s"^^xsd:string. """ % { "baseid": baseid, "start": time.strftime("%Y-%m-%dT%H:%M:%S"), "end": time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(time.time()+length)), "filename": ofile_name }) # The channels we're using for id, chan in channels.iteritems(): mfile.write(""" <:rec-%(baseid)s> calrdf:channel_recording <:rec-%(baseid)s-%(seqc)d>. <:rec-%(baseid)s-%(seqc)d> a calrdf:ChanRecording; calrdf:apid "%(apid)d"^^xsd:integer; calrdf:vpid "%(vpid)d"^^xsd:integer; calrdf:channel "%(channel)s"^^xsd:string. """ % { "baseid": baseid, "seqc": chan.id, "apid": chan.apid, "vpid": chan.vpid, "channel": chan.channel.label }) for seqp, prog in enumerate(recordings): seqc = channels[prog['channel']].id mfile.write(""" <:rec-%(baseid)s-%(seqc)d> calrdf:programme_recording <:rec-%(baseid)s-%(seqc)d-%(seqp)d>. <:rec-%(baseid)s-%(seqc)d-%(seqp)d> a calrdf:ProgRecording; calrdf:start "%(start)s"^^xsd:dateTime; calrdf:end "%(end)s"^^xsd:dateTime; dc:title "%(title)s"^^xsd:string """ % { "baseid": baseid, "seqc": seqc, "seqp": seqp, "start": time.strftime("%Y-%m%dT%H:%M:%S", time.gmtime(prog['start'])), "end": time.strftime("%Y-%m%dT%H:%M:%S", time.gmtime(prog['end'])), "title": prog['title'], }) if "subtitle" in prog: mfile.write(""";\n calrdf:subtitle "%s"^^xsd:string""" % prog['subtitle']) if "series" in prog: mfile.write(""";\n calrdf:series "%d"^^xsd:integer""" % prog['series']) if "episode" in prog: mfile.write(""";\n calrdf:episode "%d"^^xsd:integer""" % prog['episode']) mfile.write(".\n") mfile.close() def cancel_recording(req, res): """Cancel the current recording on this tuner """ tid = int(req['wsgiorg.routing_args'][1]['tunerid']) tuner = _lazy_get_tuner(tid) log.info("Cancelling recording on %s", str(tuner)) log.debug("State is '%s'", tuner.state) if tuner.state == "": log.debug("Empty state") res.result = "404 No recording" res.data = "No recording" elif tuner.state == "Scan": # FIXME: kill the current scan pass elif tuner.state == "EPG": # FIXME: kill the current EPG scan pass elif tuner.state == "Recording": process = reaper.get_child(tuner.pid) if process is not None: #process.terminate() os.kill(process.pid, signal.SIGTERM) process.wait() tuner.pid = None tuner.state = "" else: res.data = "Unknown recording state" res.result = "500 Infernal server error"