#!/usr/bin/env python # (C) 2009 Kevin Mehall # http://blog.kevinmehall.net/2009/cell-signal-monitoring # BSD License import sys, time class DataReader(object): def __init__(self): self.thresh=1 self.histogram=[0]*7 self.last_good=None self.total=0 self.count=0 self.total_good=0 self.crosses=0 self.last_val=0 def fmt(self, t): time=t/2.0 return "%02i:%02i%s"%(time/60, time%60, t%2==0 and ' ' or '.') def read_pt(self, s, line): self.total+=s self.count+=1 self.histogram[s+1]+=1 good=s>self.thresh if good: self.total_good+=1 if self.last_good != good and self.last_good is not None: self.crosses+=1 if self.update_spans: self.update_spans(good, line) self.last_good=good self.last_val=s return s update_spans=None @property def average_bars(self): return 1.0*self.total/self.count @property def percent_acceptable(self): return 100.0*self.total_good/self.count @property def volatility(self): return 100.0*self.crosses/self.count class RollingDataReader(DataReader): def __init__(self, window=20): super(RollingDataReader, self).__init__() self.datas=[] self.window=window self.reverse_good=None def read_pt(self, s, line): self.datas.append(s) if len(self.datas)>self.window: r=self.datas[0] self.datas=self.datas[1:] self.total-=r self.count-=1 self.histogram[r+1]-=1 good=r>self.thresh if good: self.total_good-=1 if self.reverse_good != good and self.reverse_good is not None: self.crosses-=1 self.reverse_good=good super(RollingDataReader, self).read_pt(s, line) return s class MatchDataReader(DataReader): def __init__(self, match): super(MatchDataReader, self).__init__() self.match=match def read_pt(self, s, line): if self.match in line: super(MatchDataReader, self).read_pt(s, line) class SpanDataReader(DataReader): def __init__(self): super(SpanDataReader, self).__init__() self.spansize=0 self.spanstarts=[] def update_spans(self, good, line): if self.last_good != good and self.last_good is not None: self.spansize=1 else: try: g, line, s=self.spanstarts.pop() except IndexError: pass self.spansize+=1 self.spanstarts.append((good, line, self.spansize)) @property def good_spans(self): return [i[2] for i in self.spanstarts if i[0]==True] @property def bad_spans(self): return [i[2] for i in self.spanstarts if i[0]==False] class FileDataReader(SpanDataReader): def __init__(self, others=None, updatefunc=None): super(FileDataReader, self).__init__() if others: self.others=others else: self.others=[] self.updatefunc=updatefunc def read_file(self, dataf): for line in open(dataf): self.read_line(line) def read_stdin(self): while True: self.read_line(raw_input()) if self.updatefunc: self.updatefunc() def read_line(self, line): if not '|' in line: return s=int(line.split('|')[1].strip()) for i in self.others: i.read_pt(s, line) return self.read_pt(s, line) class Display(object): def __init__(self, reader): self.reader=reader class BasicStatsDisplay(Display): def ascii(self): reader=self.reader if not reader.count: print "No Data" return print "Total time: %s"%reader.fmt(reader.count) print "Average # bars: %.3f"%(reader.average_bars) print "Acceptable signal for %s"%reader.fmt(reader.total_good) print "%% Acceptable %3.3f"%(reader.percent_acceptable) print "Volatility: %3.3f"%(reader.volatility) class HistogramDisplay(Display): def ascii(self): reader=self.reader if not reader.count: print "No Data" return hist_max=max(reader.histogram) print "Histogram" print "Bars | Time | %" for strength, c in enumerate(reader.histogram): print "%3s : %s %3i %s"%(strength-1, reader.fmt(c), round(100.0*c/reader.count), int(60.0*c/hist_max)*'#') class SpanDisplay(Display): def ascii(self): reader=self.reader if not reader.count: print "No Data" return if len(reader.good_spans): print "Average length of good signal: %s" % reader.fmt(round(float(sum(reader.good_spans))/len(reader.good_spans))) print "Max length of good signal: %s" % reader.fmt(max(reader.good_spans)) print "Min length of good signal: %s" % reader.fmt(min(reader.good_spans)) print "" if len(reader.bad_spans): print "Average length of poor signal: %s" % reader.fmt(round(float(sum(reader.bad_spans))/len(reader.bad_spans))) print "Max length of poor signal: %s" % reader.fmt(max(reader.bad_spans)) print "Min length of poor signal: %s" % reader.fmt(min(reader.bad_spans)) class EventsDisplay(Display): def ascii(self): reader=self.reader for good, line, duration in reader.spanstarts: time=line.split('\t')[0] ind=1 if duration>2: ind=2 if duration>20: ind=3 if duration>120: ind=4 if good: ind=ind*'+'+(4-ind)*' ' else: ind=ind*'-'+(4-ind)*' ' print "%s %s for %s"%(ind, time, reader.fmt(duration)) class CurrentDisplay(Display): def ascii(self): reader=self.reader print "Currently (%s): %s"%(time.asctime(), reader.last_val) print "Current %s signal has lasted for %s" % ( "acceptable" if reader.last_good else "poor", reader.fmt(reader.spansize)) class PageDisplay(Display): def __init__(self, *displays): self.display=GroupDisplay(*displays) def ascii(self): print "", self.display.ascii() class GroupDisplay(Display): def __init__(self, *displays): self.displays=displays def ascii(self): for i in self.displays: i.ascii() class SectionDisplay(Display): def __init__(self, title, *displays): self.title=(' '+title+' ').center(80, '-') self.display=GroupDisplay(*displays) def ascii(self): print "" print self.title self.display.ascii() dataf=sys.argv[2] if sys.argv[1] == "live": r=RollingDataReader() d=FileDataReader([r]) displays=PageDisplay( SectionDisplay("Cumulative", BasicStatsDisplay(d), HistogramDisplay(d), SpanDisplay(d), ), SectionDisplay("Last 10 Minutes", BasicStatsDisplay(r), HistogramDisplay(r), ), SectionDisplay("Currently", CurrentDisplay(d), ), ) d.updatefunc=displays.ascii d.read_file(dataf) displays.ascii() d.read_stdin() elif sys.argv[1] == "day": hours=range(10, 24) l=[MatchDataReader(" %02i:"%i) for i in hours] d=FileDataReader(l) d.read_file(dataf) #print zip(hours, [h.percent_acceptable for h in l]) displays=PageDisplay( SectionDisplay("Cumulative", BasicStatsDisplay(d), HistogramDisplay(d), SpanDisplay(d), EventsDisplay(d), ), *[SectionDisplay("%02i:00 to %02i:00"%(i, i+1), BasicStatsDisplay(h), HistogramDisplay(h), ) for i, h in zip(hours, l)] ) displays.ascii() else: print "live | day"