Source code for guv.semaphore

import greenlet

from .patcher import original

time = original('time')
import logging

from . import hubs

from .timeout import Timeout

log = logging.getLogger('guv')


[docs]class Semaphore: """An unbounded semaphore Optionally initialize with a resource *count*, then :meth:`acquire` and :meth:`release` resources as needed. Attempting to :meth:`acquire` when count* is zero suspends the calling greenthread until *count* becomes nonzero again. This is API-compatible with :class:`threading.Semaphore`. It is a context manager, and thus can be used in a with block:: sem = Semaphore(2) with sem: do_some_stuff() If not specified, *value* defaults to 1. It is possible to limit acquire time:: sem = Semaphore() ok = sem.acquire(timeout=0.1) # True if acquired, False if timed out. """ def __init__(self, value=1): self.counter = value if value < 0: raise ValueError("Semaphore must be initialized with a positive " "number, got %s" % value) self._waiters = set() def __repr__(self): params = (self.__class__.__name__, hex(id(self)), self.counter, len(self._waiters)) return '<%s at %s c=%s _w[%s]>' % params def __str__(self): params = (self.__class__.__name__, self.counter, len(self._waiters)) return '<%s c=%s _w[%s]>' % params
[docs] def locked(self): """Returns true if a call to acquire would block. """ return self.counter <= 0
[docs] def bounded(self): """Returns False; for consistency with :class:`~guv.semaphore.CappedSemaphore`. """ return False
[docs] def acquire(self, blocking=True, timeout=None): """Acquire a semaphore This function behaves like :meth:`threading.Lock.acquire`. When invoked without arguments: if the internal counter is larger than zero on entry, decrement it by one and return immediately. If it is zero on entry, block, waiting until some other thread has called release() to make it larger than zero. This is done with proper interlocking so that if multiple acquire() calls are blocked, release() will wake exactly one of them up. The implementation may pick one at random, so the order in which blocked threads are awakened should not be relied on. There is no return value in this case. When invoked with blocking set to true, do the same thing as when called without arguments, and return true. When invoked with blocking set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. """ if not blocking and timeout is not None: raise ValueError('must not specify timeout for non-blocking acquire') if not blocking and self.locked(): return False if isinstance(timeout, (float, int)) and timeout < 0: timeout = None if self.counter <= 0: self._waiters.add(greenlet.getcurrent()) try: if timeout is not None: ok = False with Timeout(timeout, False): while self.counter <= 0: hubs.get_hub().switch() ok = True if not ok: return False else: while self.counter <= 0: # running = hubs.get_hub().running # if not running: # log.warn('Loop is no longer running, potential deadlock: (at {}) {}\n' # 'waiters: {}' # .format(id(self), self, self._waiters)) # return hubs.get_hub().switch() finally: self._waiters.discard(greenlet.getcurrent()) self.counter -= 1 return True
def __enter__(self): self.acquire()
[docs] def release(self, blocking=True): """Release a semaphore, incrementing the internal counter by one. When it was zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. The *blocking* argument is for consistency with CappedSemaphore and is ignored """ self.counter += 1 if self._waiters: hubs.get_hub().schedule_call_now(self._do_acquire) return True
def _do_acquire(self): if self._waiters and self.counter > 0: waiter = self._waiters.pop() waiter.switch() def __exit__(self, typ, val, tb): self.release() @property def balance(self): """An integer value that represents how many new calls to :meth:`acquire` or :meth:`release` would be needed to get the counter to 0. If it is positive, then its value is the number of acquires that can happen before the next acquire would block. If it is negative, it is the negative of the number of releases that would be required in order to make the counter 0 again (one more release would push the counter to 1 and unblock acquirers). It takes into account how many greenthreads are currently blocking in :meth:`acquire`. """ # positive means there are free items # zero means there are no free items but nobody has requested one # negative means there are requests for items, but no items return self.counter - len(self._waiters)
[docs]class BoundedSemaphore(Semaphore): """A bounded semaphore checks to make sure its current value doesn't exceed its initial value. If it does, ValueError is raised. In most situations semaphores are used to guard resources with limited capacity. If the semaphore is released too many times it's a sign of a bug. If not given, *value* defaults to 1. """ def __init__(self, value=1): super(BoundedSemaphore, self).__init__(value) self.original_counter = value
[docs] def release(self, blocking=True): """Release a semaphore, incrementing the internal counter by one. If the counter would exceed the initial value, raises ValueError. When it was zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. The *blocking* argument is for consistency with :class:`CappedSemaphore` and is ignored """ if self.counter >= self.original_counter: raise ValueError("Semaphore released too many times") return super(BoundedSemaphore, self).release(blocking)
[docs]class CappedSemaphore: """A blockingly bounded semaphore. Optionally initialize with a resource *count*, then :meth:`acquire` and :meth:`release` resources as needed. Attempting to :meth:`acquire` when *count* is zero suspends the calling greenthread until count becomes nonzero again. Attempting to :meth:`release` after *count* has reached *limit* suspends the calling greenthread until *count* becomes less than *limit* again. This has the same API as :class:`threading.Semaphore`, though its semantics and behavior differ subtly due to the upper limit on calls to :meth:`release`. It is **not** compatible with :class:`threading.BoundedSemaphore` because it blocks when reaching *limit* instead of raising a ValueError. It is a context manager, and thus can be used in a with block:: sem = CappedSemaphore(2) with sem: do_some_stuff() """ def __init__(self, count, limit): if count < 0: raise ValueError("CappedSemaphore must be initialized with a " "positive number, got %s" % count) if count > limit: # accidentally, this also catches the case when limit is None raise ValueError("'count' cannot be more than 'limit'") self.lower_bound = Semaphore(count) self.upper_bound = Semaphore(limit - count) def __repr__(self): params = (self.__class__.__name__, hex(id(self)), self.balance, self.lower_bound, self.upper_bound) return '<%s at %s b=%s l=%s u=%s>' % params def __str__(self): params = (self.__class__.__name__, self.balance, self.lower_bound, self.upper_bound) return '<%s b=%s l=%s u=%s>' % params
[docs] def locked(self): """Returns true if a call to acquire would block. """ return self.lower_bound.locked()
[docs] def bounded(self): """Returns true if a call to release would block. """ return self.upper_bound.locked()
[docs] def acquire(self, blocking=True): """Acquire a semaphore. When invoked without arguments: if the internal counter is larger than zero on entry, decrement it by one and return immediately. If it is zero on entry, block, waiting until some other thread has called release() to make it larger than zero. This is done with proper interlocking so that if multiple acquire() calls are blocked, release() will wake exactly one of them up. The implementation may pick one at random, so the order in which blocked threads are awakened should not be relied on. There is no return value in this case. When invoked with blocking set to true, do the same thing as when called without arguments, and return true. When invoked with blocking set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. """ if not blocking and self.locked(): return False self.upper_bound.release() try: return self.lower_bound.acquire() except: self.upper_bound.counter -= 1 # using counter directly means that it can be less than zero. # however I certainly don't need to wait here and I don't seem to have # a need to care about such inconsistency raise
def __enter__(self): self.acquire()
[docs] def release(self, blocking=True): """Release a semaphore. In this class, this behaves very much like an :meth:`acquire` but in the opposite direction. Imagine the docs of :meth:`acquire` here, but with every direction reversed. When calling this method, it will block if the internal counter is greater than or equal to *limit*. """ if not blocking and self.bounded(): return False self.lower_bound.release() try: return self.upper_bound.acquire() except: self.lower_bound.counter -= 1 raise
def __exit__(self, typ, val, tb): self.release() @property def balance(self): """An integer value that represents how many new calls to :meth:`acquire` or :meth:`release` would be needed to get the counter to 0. If it is positive, then its value is the number of acquires that can happen before the next acquire would block. If it is negative, it is the negative of the number of releases that would be required in order to make the counter 0 again (one more release would push the counter to 1 and unblock acquirers). It takes into account how many greenthreads are currently blocking in :meth:`acquire` and :meth:`release`. """ return self.lower_bound.balance - self.upper_bound.balance