Package robot_script :: Package event_simulators :: Module event_simulator
[hide private]
[frames] | no frames]

Source Code for Module robot_script.event_simulators.event_simulator

  1  #!/usr/bin/env python 
  2   
  3  import threading; 
  4  from Queue import Queue; 
  5  from Queue import Empty; 
  6  import time; 
  7  from collections import OrderedDict; 
  8  import signal 
  9   
 10  import rospy; 
 11   
12 -class EventSimulator(threading.Thread):
13 ''' 14 Superclass for all robot_script event simulation applications. 15 Takes a schedule and a callback, and invokes the callback at 16 times determined by the schedule. Each callback invokation is 17 passed an argument that is specified by the schedule. Example 18 schedule:: 19 20 schedule = OrderedDict(); 21 schedule[2.0] = 'This'; 22 schedule[5.0] = 'is'; 23 schedule[6.0] = 'a'; 24 schedule[7.2] = 'test'; 25 26 This schedule invokes the callback at 2.0 seconds from program start, 27 and at 5, 6, and 7.2 seconds. The callback argument is the dict value 28 corresponding to the time. See example in __main__. 29 30 The simulator runs in a separate thread, which is started by calling 31 start(). Make sure that your subclasses invoked this superclass' start() 32 method. 33 34 The callback function may return a value. This value is pushed into an 35 eventQueue that may be access by other applications. Applications may 36 obtain the eventQueue via getEventQueue(). See Python's built-in Queue 37 class for how applications may feed from that queue. 38 ''' 39
40 - def start(self, schedule, callback, repeat=False):
41 ''' 42 Starts the simulator. 43 @param schedule: plan with times and callback arguments. 44 @type schedule: collections.OrderedDict 45 @param callback: callable to invoke at the times indicated in the schedule. 46 @type callback: callable 47 @param repeat: if True, schedule is excecuted over and over. 48 @type repeat: boolean 49 ''' 50 threading.Thread.__init__(self); 51 if schedule is None or len(schedule) == 0: 52 raise ValueError("Schedule for EventSimulator must have at least one entry.") 53 self.schedule = schedule; 54 self.callback = callback; 55 self.repeat = repeat; 56 self.eventQueue = Queue(); 57 self.keepRunning = True; 58 signal.signal(signal.SIGINT, self.sigintHandler); 59 super(EventSimulator,self).start();
60
61 - def getEventQueue(self):
62 ''' 63 Return this event simulator's event queue. This queue is fed with 64 return values from the callbacks. 65 ''' 66 return self.eventQueue;
67
68 - def run(self):
69 ''' 70 Thread loop: works through the schedule, sleeping as needed. 71 ''' 72 while (self.keepRunning and not rospy.is_shutdown()): 73 # Start time: 74 prevKeyframeTime = 0; 75 for actionTime in self.schedule.keys(): 76 # Schedule is in absolute number of seconds since program 77 # start. So compute distance between this and prev 78 # schedule entry: 79 sleepTime = actionTime - prevKeyframeTime; 80 time.sleep(sleepTime); 81 if not self.keepRunning: 82 return; 83 prevKeyframeTime = actionTime; 84 result = self.callback(self.schedule[actionTime]); 85 if result is not None: 86 self.eventQueue.put(result); 87 88 if self.repeat: 89 continue; 90 else: 91 return;
92
93 - def stop(self):
94 self.keepRunning = False;
95
96 - def sigintHandler(signum, frame):
97 self.stop();
98 99 if __name__ == '__main__': 100 101 import sys 102 103 schedule = OrderedDict(); 104 schedule[2.0] = 'This'; 105 schedule[5.0] = 'is'; 106 schedule[6.0] = 'a'; 107 schedule[7.2] = 'test'; 108
109 - class MyEvents(EventSimulator):
110
111 - def start(self, schedule, repeat=False):
112 self.startTime = time.time(); 113 super(MyEvents, self).start(schedule, self.printTime, repeat=repeat);
114
115 - def printTime(self, word):
116 print str(time.time() - self.startTime) + ": " + word; 117 return None;
118 119 120 MyEvents().start(schedule); 121
122 - class MyComplexEvents(EventSimulator):
123 - def start(self, schedule, repeat=False):
124 self.startTime = time.time(); 125 super(MyComplexEvents, self).start(schedule, self.queuePrintLine, repeat=repeat);
126
127 - def queuePrintLine(self, word):
128 return str(time.time() - self.startTime) + ": " + word + " (via queue)";
129 130 eventSimulator = MyComplexEvents(); 131 eventSimulator.start(schedule); 132 eventQueue = eventSimulator.getEventQueue(); 133 while (True): 134 try: 135 event = eventQueue.get(block=True, timeout=4.0); 136 except Empty: 137 print("Queue empty for more than 4 seconds. Quitting.") 138 sys.exit(); 139 print event; 140