#!/usr/bin/python # -*- Mode: Python ; tab-width: 4 -*- # By Dr. Zeus (http://drzeus.best.vwh.net/). Please give me credit if you do anything with this. # Version 1.1. import sys import os import string import random import getopt import threading import urllib import tempfile import traceback import copy import time import socket import shutil import pprint debug = 0 loud = 0 default_n = 5 default_range = "0-255" localtime_ranges = { 0 : "0-20", 1 : "0-20", 2 : "0-20", 3 : "0-20", 4 : "10-50", 5 : "20-90", 6 : "30-120", 7 : "40-140", 8 : "40-160", 9 : "60-180", 10 : "80-200", 11 : "100-220", 12 : "110-255", 13 : "120-255", 14 : "120-255", 15 : "110-255", 16 : "100-220", 17 : "80-200", 18 : "60-180", 19 : "30-120", 20 : "20-90", 21 : "10-50", 22 : "0-20", 23 : "0-20", } default_delay = 1.0 default_duration = 10.0 init_threads = 3 init_sleep = 2.0 max_init_time = 30.0 socket_timeout = 5.0 AuthenticationRequestedException = "Authentication Requested" NotAnImageException = "Not An Image" status_msgs = [ "Connecting to CCTVs...", "Initializing oscillation overthruster...", "Degaussing flux capacitor...", #"Charging dilithium crystals...", "Measuring tesla coil vortex...", "Bringing mutation engine online...", ] display_cmd = "xscreensaver-getimage -file %s" verify_cmd = "djpeg %s > /dev/null 2>&1" viable_cmd = "djpeg %s | pamcut -top 15 -bottom -15 | pamsumm -mean -brief" window_size = "640x480" # Typical Axis image size. def help(): sys.stderr.write( """cctv is an xscreensaver module that displays security camera images on your screen. The images are downloaded repeatedly from the Internet. The URLs of potential images are provided to the script via an input file. Images must be JPEGs. An example file is provided. Good URLs to use are those from Axis security cameras, which can be located via Google: http://www.google.com/search?q=inurl:%22jpg/image.jpg%3Fr%3D%22 This script requires Python 2.3, xscreensaver, and netpbm. When not using --root, Tkinter is also required. Perpetrated by Dr. Zeus (http://drzeus.best.vwh.net/). Inspired by the work of i.document (http://i.document.m05.de/?p=418). """ ) # TODO: # -Get URLs from Google if urlfile is not supplied. # -Support other image formats. def usage( msg=None ): if msg: sys.stderr.write( "ERROR: %s\n" % (msg) ) sys.stderr.write( "Usage: cctv.py [-d] [-q] [-n int] [--root] [--range min-max]\n" ) sys.stderr.write( " [--delay float] [--duration float] urlfile\n" ) sys.stderr.write( " -d: Turn on debugging output.\n" ) sys.stderr.write( " -q: Quiet. Suppress humorous status messages.\n" ) sys.stderr.write( " -n int: The number of CCTVs to rotate amongst. Defaults to 5.\n" ) sys.stderr.write( " --root: Display the images on the root window.\n" ) sys.stderr.write( " --range min-max: The minimum and maximum mean pixel values to\n" ) sys.stderr.write( " allow, as a string such as \"20-210\", or the special string\n" ) sys.stderr.write( " \"localtime\". If a range, the values must be in the range\n" ) sys.stderr.write( " 0 to 255. Images not in this range will be discarded.\n" ) sys.stderr.write( " If \"localtime\", images appropriate to your local time will\n" ) sys.stderr.write( " be used. If not provided, all images may be used.\n" ) sys.stderr.write( " --delay float: The minimum number of seconds to wait between image\n" ) sys.stderr.write( " downloads. Defaults to 1.0.\n" ) sys.stderr.write( " --duration float: The number of seconds to spend on each CCTV.\n" ) sys.stderr.write( " Defaults to 10.0.\n" ) sys.stderr.write( " urlfile: A file containing one URL per line.\n" ) sys.exit(1) def main(): (odict, args) = getOptions() if odict.has_key( "html" ): dumpHTML( odict, args[0] ) else: showCCTVs( odict, args[0] ) def showCCTVs( odict, urlfile ): if debug: print "Options:" pprint.pprint(odict) socket.setdefaulttimeout( socket_timeout ) if loud: print status_msgs[0] urls = loadURLs( urlfile ) window = initializeWindow( odict ) cctvs = [] for i in range( odict["n"] ): cctvs.append( CCTV( odict, i, window ) ) init_queue = copy.copy( cctvs ) for i in range( init_threads ): t = threading.Thread( target=initializeCCTVs, args=(odict, init_queue, urls) ) t.start() try: cctv = waitForCCTV( odict, cctvs ) mainloop( odict, cctv, cctvs ) finally: for cctv in cctvs: cctv.shutdown() cctv.cleanup() if window: window.quit() def mainloop( odict, cctv, cctvs ): while 1: cctv.mainloop() cctv = findNextCCTV( odict, cctv, cctvs ) def initializeCCTVs( odict, queue, urls ): while 1: try: cctv = queue.pop( 0 ) except IndexError: break try: cctv.initialize( urls ) except KeyboardInterrupt: sys.exit( 1 ) except: if debug: print "Unable to initialize %s:" % (cctv) traceback.print_exc() # This CCTV will never be usable. def waitForCCTV( odict, cctvs ): i = 1 # The first one is printed above. t0 = time.time() while 1: for cctv in cctvs: if cctv.isInitialized(): if debug: print "Found first CCTV: %s" % (cctv) if loud: sys.stdout.write( "\n" ) sys.stdout.flush() return cctv dt = time.time() - t0 if (dt > max_init_time): sys.stderr.write( "Unable to initialize any CCTVS after %d seconds. Exiting...\n" % (dt) ) sys.exit( 1 ) time.sleep( init_sleep ) if loud: if (i < len(status_msgs)): sys.stdout.write( status_msgs[i] ) if (i < (len(status_msgs) - 1)): sys.stdout.write( "\n" ) else: sys.stdout.write( "." ) sys.stdout.flush() i = i + 1 def findNextCCTV( odict, cctv, cctvs ): start_i = cctv.getIndex() + 1 if debug: init_count = 0 for cctv in cctvs: if cctv.isInitialized(): init_count = init_count + 1 print "%d/%d CCTVs are initialized." % (init_count, len(cctvs)) for j in range( len( cctvs ) ): i = (start_i + j) % len( cctvs ) cctv = cctvs[i] if cctv.isInitialized(): if debug: print "Got next CCTV: %s" % (cctv) return cctv class CCTV: def __init__( self, odict, index, window=None ): self.odict = odict self.index = index self.url = "" self.image = "" self.window = window self.first = 1 self.running = 1 def initialize( self, urls ): if debug: print "Setting up %s..." % (self) while self.running: try: candidate = urls.pop( 0 ) except IndexError: if debug: print "Unable to initialize %s: out of URLs." % (self) break try: self.download( candidate ) if self.isViable(): self.url = candidate break else: self.cleanup() except KeyboardInterrupt: raise except: # Try the next URL. continue def isInitialized( self ): return self.url def shutdown( self ): self.running = 0 def getIndex( self ): return self.index def mainloop( self ): if debug: print "Starting %s mainloop." % (self) try: duration_start = time.time() while 1: delay_start = time.time() if not self.first: #self.display() self.cleanup() self.download() self.display() self.first = 0 duration_delta = time.time() - duration_start if (duration_delta > self.odict["duration"]): break delay_delta = time.time() - delay_start if (delay_delta < self.odict["delay"]): # Sleep the remaining delay period. time.sleep( self.odict["delay"] - delay_delta ) except KeyboardInterrupt: raise except: if debug: print "Error in %s mainloop:" % (self) traceback.print_exc() # Continue on to the next CCTV. We may come back to this one evenually. def download( self, url=None ): try: if not url: url = self.url (fd, self.image) = tempfile.mkstemp() if debug: print "%s: downloading %s to %s..." % (self, url, self.image) o = NoAuthURLopener() u = o.open( url ) f = os.fdopen( fd, "w+b" ) while 1: data = u.read() if not data: break f.write( data ) u.close() f.close() self.verifyImage() except: if debug: print "%s: error downloading %s:" % (self, url) traceback.print_exc() self.cleanup() raise def isViable( self ): try: sample = self.getSample() if ((sample >= self.odict["min_sample"]) and (sample <= self.odict["max_sample"])): return 1 else: if debug: print "%s: %s is not viable: %.2f outside of range %s" % (self, self.image, sample, self.odict["range"]) except: if debug: print "%s: error checking viability of %s:" % (self, self.image) traceback.print_exc() self.cleanup() raise def verifyImage( self ): cmd = verify_cmd % (self.image) ret = os.system( cmd ) if (ret != 0): raise NotAnImageException def getSample( self ): if not self.running: return -1 cmd = viable_cmd % (self.image) p = os.popen( cmd ) return float( string.strip( p.read() ) ) def display( self ): cmd = display_cmd % (self.image) if self.window: cmd = cmd + " %d" % (self.window.winfo_id()) else: cmd = cmd + " -root" if debug: print "%s: executing %s..." % (self, cmd) os.system( cmd ) def cleanup( self ): if self.image and os.path.exists( self.image ): os.unlink( self.image ) def __str__( self ): return "CCTV@%d" % (self.index) def initializeWindow( odict ): if odict.has_key( "root" ): return None if debug: print "Creating window..." import Tkinter window = Tkinter.Tk() window.geometry( window_size ) t = threading.Thread( target=windowLoop, args=(window,) ) t.start() return window def windowLoop( window ): try: window.mainloop() except: sys.exit( 1 ) def loadURLs( urlfile ): urls = [] f = open( urlfile ) while 1: l = f.readline() if not l: break l = string.strip( l ) if not l or (l[0] == "#"): continue urls.append( l ) if debug: print "Loaded %d URLs." % (len(urls)) return randomize( urls ) def randomize( l ): new_l = [] while l: i = random.randint( 0, len(l)-1 ) new_l.append( l.pop(i) ) return new_l class NoAuthURLopener( urllib.FancyURLopener ): def prompt_user_passwd( self, host, realm ): raise AuthenticationRequestedException # This is for setting up the ranges. def dumpHTML( odict, urlfile ): dir = odict["html"] if not os.path.exists( dir ): os.mkdir( dir ) socket.setdefaulttimeout( socket_timeout ) urls = loadURLs( urlfile ) cctv = CCTV( odict, 0 ) images = [] for url in urls: try: cctv.download( url ) sample = cctv.getSample() fname = os.path.join( dir, os.path.basename(cctv.image) ) + ".jpg" if debug: print "Moving %s to %s..." % (cctv.image, fname) shutil.move( cctv.image, fname ) os.chmod( fname, 0664 ) images.append( (sample, os.path.basename(fname), url) ) except KeyboardInterrupt: raise except: continue f = open( os.path.join( dir, "index.html" ), "w" ) f.write( "\n" ) f.write( "Successfully downloaded %d images:

\n" % (len(images)) ) images.sort() for (sample, fname, url) in images: f.write( " %.2f %s

\n" % (fname, fname, sample, url, url) ) f.write( "\n" ) f.close() def getOptions(): try: (tt, args) = getopt.getopt( sys.argv[1:], "hdqn:", ["help", "root", "range=", "delay=", "duration=", "html="] ) except getopt.error: sys.stderr.write( "Error: %s\n" % (sys.exc_info()[1]) ) usage() odict = {} for t in tt: s = t[0] while (s[0] == "-"): # Strip leading '-'s s = s[1:] odict[s] = t[1] if odict.has_key("h") or odict.has_key("help"): help() usage() if (len(args) != 1): usage() if odict.has_key("d"): global debug debug = 1 if not odict.has_key("q"): global loud loud = 1 odict["n"] = int( odict.get( "n", default_n ) ) if (odict["n"] < 1): usage( "The number of CCTVs must be greater than zero." ) odict["delay"] = float( odict.get( "delay", default_delay ) ) if (odict["delay"] < 0.0): usage( "The delay must be greater than zero." ) odict["duration"] = float( odict.get( "duration", default_duration ) ) if (odict["duration"] < 0.0): usage( "The duration must be greater than zero." ) if odict.has_key( "range" ): if (string.lower(odict["range"][0]) == "l"): hour = time.localtime(time.time())[3] range = localtime_ranges[hour] if debug: print "Using range %s for hour %d." % (range, hour) odict["range"] = range else: odict["range"] = default_range try: [min, max] = map( int, string.split( odict["range"], "-" ) ) except: sys.stderr.write( "Error: %s\n" % (sys.exc_info()[1]) ) usage() if ((min < 0) or (max > 255) or (min > max)): usage( "Invalid range '%s'" % (odict["range"]) ) odict["min_sample"] = min odict["max_sample"] = max return (odict, args) #------------------------------------------------------------------------------ if __name__ == "__main__": #------------------------------------------------------------------------------ main ( )