| Viewing file:  synchronize.py (11.77 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
## Module implementing synchronization primitives
 #
 # multiprocessing/synchronize.py
 #
 # Copyright (c) 2006-2008, R Oudkerk
 # Licensed to PSF under a Contributor Agreement.
 #
 
 __all__ = [
 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
 ]
 
 import threading
 import sys
 import tempfile
 import _multiprocessing
 
 from time import time as _time
 
 from . import context
 from . import process
 from . import util
 
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
 # See issue 3770
 try:
 from _multiprocessing import SemLock, sem_unlink
 except (ImportError):
 raise ImportError("This platform lacks a functioning sem_open" +
 " implementation, therefore, the required" +
 " synchronization primitives needed will not" +
 " function, see issue 3770.")
 
 #
 # Constants
 #
 
 RECURSIVE_MUTEX, SEMAPHORE = list(range(2))
 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
 
 #
 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
 #
 
 class SemLock(object):
 
 _rand = tempfile._RandomNameSequence()
 
 def __init__(self, kind, value, maxvalue, *, ctx):
 if ctx is None:
 ctx = context._default_context.get_context()
 name = ctx.get_start_method()
 unlink_now = sys.platform == 'win32' or name == 'fork'
 for i in range(100):
 try:
 sl = self._semlock = _multiprocessing.SemLock(
 kind, value, maxvalue, self._make_name(),
 unlink_now)
 except FileExistsError:
 pass
 else:
 break
 else:
 raise FileExistsError('cannot find name for semaphore')
 
 util.debug('created semlock with handle %s' % sl.handle)
 self._make_methods()
 
 if sys.platform != 'win32':
 def _after_fork(obj):
 obj._semlock._after_fork()
 util.register_after_fork(self, _after_fork)
 
 if self._semlock.name is not None:
 # We only get here if we are on Unix with forking
 # disabled.  When the object is garbage collected or the
 # process shuts down we unlink the semaphore name
 from .semaphore_tracker import register
 register(self._semlock.name)
 util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
 exitpriority=0)
 
 @staticmethod
 def _cleanup(name):
 from .semaphore_tracker import unregister
 sem_unlink(name)
 unregister(name)
 
 def _make_methods(self):
 self.acquire = self._semlock.acquire
 self.release = self._semlock.release
 
 def __enter__(self):
 return self._semlock.__enter__()
 
 def __exit__(self, *args):
 return self._semlock.__exit__(*args)
 
 def __getstate__(self):
 context.assert_spawning(self)
 sl = self._semlock
 if sys.platform == 'win32':
 h = context.get_spawning_popen().duplicate_for_child(sl.handle)
 else:
 h = sl.handle
 return (h, sl.kind, sl.maxvalue, sl.name)
 
 def __setstate__(self, state):
 self._semlock = _multiprocessing.SemLock._rebuild(*state)
 util.debug('recreated blocker with handle %r' % state[0])
 self._make_methods()
 
 @staticmethod
 def _make_name():
 return '%s-%s' % (process.current_process()._config['semprefix'],
 next(SemLock._rand))
 
 #
 # Semaphore
 #
 
 class Semaphore(SemLock):
 
 def __init__(self, value=1, *, ctx):
 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
 
 def get_value(self):
 return self._semlock._get_value()
 
 def __repr__(self):
 try:
 value = self._semlock._get_value()
 except Exception:
 value = 'unknown'
 return '<%s(value=%s)>' % (self.__class__.__name__, value)
 
 #
 # Bounded semaphore
 #
 
 class BoundedSemaphore(Semaphore):
 
 def __init__(self, value=1, *, ctx):
 SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
 
 def __repr__(self):
 try:
 value = self._semlock._get_value()
 except Exception:
 value = 'unknown'
 return '<%s(value=%s, maxvalue=%s)>' % \
 (self.__class__.__name__, value, self._semlock.maxvalue)
 
 #
 # Non-recursive lock
 #
 
 class Lock(SemLock):
 
 def __init__(self, *, ctx):
 SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
 
 def __repr__(self):
 try:
 if self._semlock._is_mine():
 name = process.current_process().name
 if threading.current_thread().name != 'MainThread':
 name += '|' + threading.current_thread().name
 elif self._semlock._get_value() == 1:
 name = 'None'
 elif self._semlock._count() > 0:
 name = 'SomeOtherThread'
 else:
 name = 'SomeOtherProcess'
 except Exception:
 name = 'unknown'
 return '<%s(owner=%s)>' % (self.__class__.__name__, name)
 
 #
 # Recursive lock
 #
 
 class RLock(SemLock):
 
 def __init__(self, *, ctx):
 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
 
 def __repr__(self):
 try:
 if self._semlock._is_mine():
 name = process.current_process().name
 if threading.current_thread().name != 'MainThread':
 name += '|' + threading.current_thread().name
 count = self._semlock._count()
 elif self._semlock._get_value() == 1:
 name, count = 'None', 0
 elif self._semlock._count() > 0:
 name, count = 'SomeOtherThread', 'nonzero'
 else:
 name, count = 'SomeOtherProcess', 'nonzero'
 except Exception:
 name, count = 'unknown', 'unknown'
 return '<%s(%s, %s)>' % (self.__class__.__name__, name, count)
 
 #
 # Condition variable
 #
 
 class Condition(object):
 
 def __init__(self, lock=None, *, ctx):
 self._lock = lock or ctx.RLock()
 self._sleeping_count = ctx.Semaphore(0)
 self._woken_count = ctx.Semaphore(0)
 self._wait_semaphore = ctx.Semaphore(0)
 self._make_methods()
 
 def __getstate__(self):
 context.assert_spawning(self)
 return (self._lock, self._sleeping_count,
 self._woken_count, self._wait_semaphore)
 
 def __setstate__(self, state):
 (self._lock, self._sleeping_count,
 self._woken_count, self._wait_semaphore) = state
 self._make_methods()
 
 def __enter__(self):
 return self._lock.__enter__()
 
 def __exit__(self, *args):
 return self._lock.__exit__(*args)
 
 def _make_methods(self):
 self.acquire = self._lock.acquire
 self.release = self._lock.release
 
 def __repr__(self):
 try:
 num_waiters = (self._sleeping_count._semlock._get_value() -
 self._woken_count._semlock._get_value())
 except Exception:
 num_waiters = 'unknown'
 return '<%s(%s, %s)>' % (self.__class__.__name__, self._lock, num_waiters)
 
 def wait(self, timeout=None):
 assert self._lock._semlock._is_mine(), \
 'must acquire() condition before using wait()'
 
 # indicate that this thread is going to sleep
 self._sleeping_count.release()
 
 # release lock
 count = self._lock._semlock._count()
 for i in range(count):
 self._lock.release()
 
 try:
 # wait for notification or timeout
 return self._wait_semaphore.acquire(True, timeout)
 finally:
 # indicate that this thread has woken
 self._woken_count.release()
 
 # reacquire lock
 for i in range(count):
 self._lock.acquire()
 
 def notify(self):
 assert self._lock._semlock._is_mine(), 'lock is not owned'
 assert not self._wait_semaphore.acquire(False)
 
 # to take account of timeouts since last notify() we subtract
 # woken_count from sleeping_count and rezero woken_count
 while self._woken_count.acquire(False):
 res = self._sleeping_count.acquire(False)
 assert res
 
 if self._sleeping_count.acquire(False): # try grabbing a sleeper
 self._wait_semaphore.release()      # wake up one sleeper
 self._woken_count.acquire()         # wait for the sleeper to wake
 
 # rezero _wait_semaphore in case a timeout just happened
 self._wait_semaphore.acquire(False)
 
 def notify_all(self):
 assert self._lock._semlock._is_mine(), 'lock is not owned'
 assert not self._wait_semaphore.acquire(False)
 
 # to take account of timeouts since last notify*() we subtract
 # woken_count from sleeping_count and rezero woken_count
 while self._woken_count.acquire(False):
 res = self._sleeping_count.acquire(False)
 assert res
 
 sleepers = 0
 while self._sleeping_count.acquire(False):
 self._wait_semaphore.release()        # wake up one sleeper
 sleepers += 1
 
 if sleepers:
 for i in range(sleepers):
 self._woken_count.acquire()       # wait for a sleeper to wake
 
 # rezero wait_semaphore in case some timeouts just happened
 while self._wait_semaphore.acquire(False):
 pass
 
 def wait_for(self, predicate, timeout=None):
 result = predicate()
 if result:
 return result
 if timeout is not None:
 endtime = _time() + timeout
 else:
 endtime = None
 waittime = None
 while not result:
 if endtime is not None:
 waittime = endtime - _time()
 if waittime <= 0:
 break
 self.wait(waittime)
 result = predicate()
 return result
 
 #
 # Event
 #
 
 class Event(object):
 
 def __init__(self, *, ctx):
 self._cond = ctx.Condition(ctx.Lock())
 self._flag = ctx.Semaphore(0)
 
 def is_set(self):
 with self._cond:
 if self._flag.acquire(False):
 self._flag.release()
 return True
 return False
 
 def set(self):
 with self._cond:
 self._flag.acquire(False)
 self._flag.release()
 self._cond.notify_all()
 
 def clear(self):
 with self._cond:
 self._flag.acquire(False)
 
 def wait(self, timeout=None):
 with self._cond:
 if self._flag.acquire(False):
 self._flag.release()
 else:
 self._cond.wait(timeout)
 
 if self._flag.acquire(False):
 self._flag.release()
 return True
 return False
 
 #
 # Barrier
 #
 
 class Barrier(threading.Barrier):
 
 def __init__(self, parties, action=None, timeout=None, *, ctx):
 import struct
 from .heap import BufferWrapper
 wrapper = BufferWrapper(struct.calcsize('i') * 2)
 cond = ctx.Condition()
 self.__setstate__((parties, action, timeout, cond, wrapper))
 self._state = 0
 self._count = 0
 
 def __setstate__(self, state):
 (self._parties, self._action, self._timeout,
 self._cond, self._wrapper) = state
 self._array = self._wrapper.create_memoryview().cast('i')
 
 def __getstate__(self):
 return (self._parties, self._action, self._timeout,
 self._cond, self._wrapper)
 
 @property
 def _state(self):
 return self._array[0]
 
 @_state.setter
 def _state(self, value):
 self._array[0] = value
 
 @property
 def _count(self):
 return self._array[1]
 
 @_count.setter
 def _count(self, value):
 self._array[1] = value
 
 |