"""Scheduler object for calliope """ from __future__ import with_statement import sys import time import threading from Queue import Empty import subprocess import logging log = logging.getLogger("scheduler") from calliope.recording import Recording PRE_DELAY = 3600 # Time before a recording to do allocation/scheduling # magic class Scheduler(threading.Thread): def __init__(self, alert, tuner_lock): threading.Thread.__init__(self, name="Event Scheduler") self.alert = alert self.tuner_lock = tuner_lock self.setDaemon(True) # Don't let this thread stop the main # thread from exiting self.build_events() self.allocated = False self.zero_delay = 0 def run(self): time.sleep(5) # Sleep for a bit to let everything else settle self.running = True while self.running: try: # Wait for stuff to happen -- either a new queue message, or # the next event we were expecting if self.events: now = time.time() next_event = max(now, self.events[0][0]) delay = next_event - now log.info("Scheduler: Waiting for %f until %d", delay, next_event) if int(delay) <= 0: self.zero_delay += 1 else: self.zero_delay = 0 if self.zero_delay >= 10: log.fatal("Scheduler is spinning. Something is badly wrong -- shutting down the scheduler.") return else: delay = None log.info("Scheduler: Waiting forever") self.get_event(delay) self.process_waiting_events() except Exception, e: log.error("Exception %s", str(e)) def get_event(self, delay): """Get an event off the queue and process it (or time out after a delay) """ try: msg = self.alert.get(True, delay) now = time.time() # If no Empty exception was thrown, we've got an event # from over the fence, so we need to check what # happened, and do something. if msg[0] == "Deleted": pass elif msg[0] in ("Added", "Modified"): # Something was added or changed rec = msg[1] if rec.start - PRE_DELAY < now: # If it was a late addition, do # allocation/scheduling right now self.allocate() elif msg[0] == "Quit": self.running = False # Rebuild the events list self.build_events() except Empty: # In this case, we've hit the time-out, so we've got a # scheduled event come up. pass def process_waiting_events(self): """Process all the events in the event queue with times in the past """ now = time.time() self.allocated = False while self.events and self.events[0][0] < now: ev = self.events.pop(0) rc = ev[1]() if self.regen_events: self.build_events() def build_events(self): """Build a list of all the upcoming events we have in store """ self.regen_events = False self.events = [] now = time.time() for r in Recording.select(): if r.state == "Waiting": # Wake up an hour before any recording to check for # scheduling conflicts self.events.append((r.start - PRE_DELAY, self.allocate)) elif r.state == "Allocated": # Wake up for every recording self.events.append((r.start, r.record)) #self.events.append((now, self.update_channels, None)) #self.events.append((now, self.update_epg, None)) self.events.sort(lambda a, b: cmp(a[0], b[0])) #print "Events list generated:\n", "\n".join([str(ev) for ev in self.events]) if self.events: return self.events[0][0] else: return None def allocate(self): """Run ocarina, the allocator process. """ if self.allocated: # Prevent the allocator from being run more # than once in any given wake-up return with self.tuner_lock: log.info("Ocarina running...") ocarina = subprocess.Popen(["/usr/local/bin/ocarina",], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, stderr = ocarina.communicate() for line in stdout.split("\n"): log.debug(line) if ocarina.returncode != 0: log.error("Ocarina returned an error!") log.info("Ocarina complete") self.allocated = True self.regen_events = True return ocarina.returncode == 0 def update_channels(self): """Grab the channel listings from the remote tuners """ return True def update_epg(self): """Grab EPG data from the remote tuners """ return True