Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions Lib/sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

__all__ = ["scheduler"]

Event = namedtuple('Event', 'time, priority, sequence, action, argument, kwargs')
class Event(namedtuple('Event', ('time', 'priority', 'sequence', 'action', 'argument', 'kwargs'))):
_cancelled = False
Event.time.__doc__ = ('''Numeric type compatible with the return value of the
timefunc function passed to the constructor.''')
Event.priority.__doc__ = ('''Events scheduled for the same time will be executed
Expand All @@ -55,10 +56,16 @@ def __init__(self, timefunc=_time, delayfunc=time.sleep):
functions"""
self._queue = []
self._lock = threading.RLock()
self._cancellations = 0
self.timefunc = timefunc
self.delayfunc = delayfunc
self._sequence_generator = count()

def _reheapify(self):
self._cancellations = 0
self._queue = [x for x in self._queue if not x._cancelled]
heapq.heapify(self._queue)

def enterabs(self, time, priority, action, argument=(), kwargs=_sentinel):
"""Enter a new event in the queue at an absolute time.

Expand Down Expand Up @@ -92,13 +99,16 @@ def cancel(self, event):

"""
with self._lock:
self._queue.remove(event)
heapq.heapify(self._queue)
assert isinstance(event, Event), event
if event._cancelled:
raise ValueError("this event was already cancelled")
event._cancelled = True
self._cancellations += 1

def empty(self):
"""Check whether the queue is empty."""
with self._lock:
return not self._queue
return len(self._queue) <= self._cancellations

def run(self, blocking=True):
"""Execute events until the queue is empty.
Expand Down Expand Up @@ -135,14 +145,22 @@ def run(self, blocking=True):
with lock:
if not q:
break
(time, priority, sequence, action,
argument, kwargs) = q[0]
time, _, _, action, argument, kwargs = event = q[0]
if event._cancelled:
self._cancellations -= 1
pop(q)
continue
now = timefunc()
if time > now:
if (self._cancellations > 50 and
self._cancellations > (len(self._queue) >> 1)):
self._reheapify()
continue
delay = True
else:
delay = False
pop(q)

if delay:
if not blocking:
return time - now
Expand All @@ -163,5 +181,7 @@ def queue(self):
# With heapq, two events scheduled at the same time will show in
# the actual order they would be retrieved.
with self._lock:
if self._cancellations:
self._reheapify()
events = self._queue[:]
return list(map(heapq.heappop, [events]*len(events)))
23 changes: 22 additions & 1 deletion Lib/test/test_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def advance(self, t):
self._cond.notify_all()


class TestCase(unittest.TestCase):
class TestScheduler(unittest.TestCase):

def test_enter(self):
l = []
Expand Down Expand Up @@ -222,5 +222,26 @@ def test_run_non_blocking(self):
self.assertEqual(l, [])


class TestCancellations(unittest.TestCase):

def test_cancel_twice(self):
scheduler = sched.scheduler()
ev = scheduler.enter(0.01, 1, lambda: 0)
scheduler.cancel(ev)
self.assertRaises(ValueError, scheduler.cancel, ev)

def test_queue(self):
scheduler = sched.scheduler()
ev = scheduler.enter(0.01, 1, lambda: 0)
scheduler.cancel(ev)
self.assertEqual(list(scheduler.queue), [])

def test_empty(self):
scheduler = sched.scheduler()
ev = scheduler.enter(0.01, 1, lambda: 0)
scheduler.cancel(ev)
self.assertTrue(scheduler.empty())


if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Optimized :meth:`sched.scheduler.cancel`.