| Viewing file:  synchronize.py (10.4 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
## Module implementing synchronization primitives
 #
 # multiprocessing/synchronize.py
 #
 # Copyright (c) 2006-2008, R Oudkerk
 # All rights reserved.
 #
 # Redistribution and use in source and binary forms, with or without
 # modification, are permitted provided that the following conditions
 # are met:
 #
 # 1. Redistributions of source code must retain the above copyright
 #    notice, this list of conditions and the following disclaimer.
 # 2. Redistributions in binary form must reproduce the above copyright
 #    notice, this list of conditions and the following disclaimer in the
 #    documentation and/or other materials provided with the distribution.
 # 3. Neither the name of author nor the names of any contributors may be
 #    used to endorse or promote products derived from this software
 #    without specific prior written permission.
 #
 # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
 # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 # ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
 # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 # SUCH DAMAGE.
 #
 
 __all__ = [
 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
 ]
 
 import threading
 import os
 import sys
 
 from time import time as _time, sleep as _sleep
 
 import _multiprocessing
 from multiprocessing.process import current_process
 from multiprocessing.util import Finalize, register_after_fork, debug
 from multiprocessing.forking import assert_spawning, Popen
 
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
 # See issue 3770
 try:
 from _multiprocessing import SemLock
 except (ImportError):
 raise ImportError("This platform lacks a functioning sem_open" +
 " implementation, therefore, the required" +
 " synchronization primitives needed will not" +
 " function, see issue 3770.")
 
 #
 # Constants
 #
 
 RECURSIVE_MUTEX, SEMAPHORE = range(2)
 SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
 
 #
 # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
 #
 
 class SemLock(object):
 
 def __init__(self, kind, value, maxvalue):
 sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
 debug('created semlock with handle %s' % sl.handle)
 self._make_methods()
 
 if sys.platform != 'win32':
 def _after_fork(obj):
 obj._semlock._after_fork()
 register_after_fork(self, _after_fork)
 
 def _make_methods(self):
 self.acquire = self._semlock.acquire
 self.release = self._semlock.release
 
 def __enter__(self):
 return self._semlock.__enter__()
 
 def __exit__(self, *args):
 return self._semlock.__exit__(*args)
 
 def __getstate__(self):
 assert_spawning(self)
 sl = self._semlock
 return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
 
 def __setstate__(self, state):
 self._semlock = _multiprocessing.SemLock._rebuild(*state)
 debug('recreated blocker with handle %r' % state[0])
 self._make_methods()
 
 #
 # Semaphore
 #
 
 class Semaphore(SemLock):
 
 def __init__(self, value=1):
 SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
 
 def get_value(self):
 return self._semlock._get_value()
 
 def __repr__(self):
 try:
 value = self._semlock._get_value()
 except Exception:
 value = 'unknown'
 return '<Semaphore(value=%s)>' % value
 
 #
 # Bounded semaphore
 #
 
 class BoundedSemaphore(Semaphore):
 
 def __init__(self, value=1):
 SemLock.__init__(self, SEMAPHORE, value, value)
 
 def __repr__(self):
 try:
 value = self._semlock._get_value()
 except Exception:
 value = 'unknown'
 return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
 (value, self._semlock.maxvalue)
 
 #
 # Non-recursive lock
 #
 
 class Lock(SemLock):
 
 def __init__(self):
 SemLock.__init__(self, SEMAPHORE, 1, 1)
 
 def __repr__(self):
 try:
 if self._semlock._is_mine():
 name = current_process().name
 if threading.current_thread().name != 'MainThread':
 name += '|' + threading.current_thread().name
 elif self._semlock._get_value() == 1:
 name = 'None'
 elif self._semlock._count() > 0:
 name = 'SomeOtherThread'
 else:
 name = 'SomeOtherProcess'
 except Exception:
 name = 'unknown'
 return '<Lock(owner=%s)>' % name
 
 #
 # Recursive lock
 #
 
 class RLock(SemLock):
 
 def __init__(self):
 SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
 
 def __repr__(self):
 try:
 if self._semlock._is_mine():
 name = current_process().name
 if threading.current_thread().name != 'MainThread':
 name += '|' + threading.current_thread().name
 count = self._semlock._count()
 elif self._semlock._get_value() == 1:
 name, count = 'None', 0
 elif self._semlock._count() > 0:
 name, count = 'SomeOtherThread', 'nonzero'
 else:
 name, count = 'SomeOtherProcess', 'nonzero'
 except Exception:
 name, count = 'unknown', 'unknown'
 return '<RLock(%s, %s)>' % (name, count)
 
 #
 # Condition variable
 #
 
 class Condition(object):
 
 def __init__(self, lock=None):
 self._lock = lock or RLock()
 self._sleeping_count = Semaphore(0)
 self._woken_count = Semaphore(0)
 self._wait_semaphore = Semaphore(0)
 self._make_methods()
 
 def __getstate__(self):
 assert_spawning(self)
 return (self._lock, self._sleeping_count,
 self._woken_count, self._wait_semaphore)
 
 def __setstate__(self, state):
 (self._lock, self._sleeping_count,
 self._woken_count, self._wait_semaphore) = state
 self._make_methods()
 
 def __enter__(self):
 return self._lock.__enter__()
 
 def __exit__(self, *args):
 return self._lock.__exit__(*args)
 
 def _make_methods(self):
 self.acquire = self._lock.acquire
 self.release = self._lock.release
 
 def __repr__(self):
 try:
 num_waiters = (self._sleeping_count._semlock._get_value() -
 self._woken_count._semlock._get_value())
 except Exception:
 num_waiters = 'unknown'
 return '<Condition(%s, %s)>' % (self._lock, num_waiters)
 
 def wait(self, timeout=None):
 assert self._lock._semlock._is_mine(), \
 'must acquire() condition before using wait()'
 
 # indicate that this thread is going to sleep
 self._sleeping_count.release()
 
 # release lock
 count = self._lock._semlock._count()
 for i in xrange(count):
 self._lock.release()
 
 try:
 # wait for notification or timeout
 self._wait_semaphore.acquire(True, timeout)
 finally:
 # indicate that this thread has woken
 self._woken_count.release()
 
 # reacquire lock
 for i in xrange(count):
 self._lock.acquire()
 
 def notify(self):
 assert self._lock._semlock._is_mine(), 'lock is not owned'
 assert not self._wait_semaphore.acquire(False)
 
 # to take account of timeouts since last notify() we subtract
 # woken_count from sleeping_count and rezero woken_count
 while self._woken_count.acquire(False):
 res = self._sleeping_count.acquire(False)
 assert res
 
 if self._sleeping_count.acquire(False): # try grabbing a sleeper
 self._wait_semaphore.release()      # wake up one sleeper
 self._woken_count.acquire()         # wait for the sleeper to wake
 
 # rezero _wait_semaphore in case a timeout just happened
 self._wait_semaphore.acquire(False)
 
 def notify_all(self):
 assert self._lock._semlock._is_mine(), 'lock is not owned'
 assert not self._wait_semaphore.acquire(False)
 
 # to take account of timeouts since last notify*() we subtract
 # woken_count from sleeping_count and rezero woken_count
 while self._woken_count.acquire(False):
 res = self._sleeping_count.acquire(False)
 assert res
 
 sleepers = 0
 while self._sleeping_count.acquire(False):
 self._wait_semaphore.release()        # wake up one sleeper
 sleepers += 1
 
 if sleepers:
 for i in xrange(sleepers):
 self._woken_count.acquire()       # wait for a sleeper to wake
 
 # rezero wait_semaphore in case some timeouts just happened
 while self._wait_semaphore.acquire(False):
 pass
 
 #
 # Event
 #
 
 class Event(object):
 
 def __init__(self):
 self._cond = Condition(Lock())
 self._flag = Semaphore(0)
 
 def is_set(self):
 self._cond.acquire()
 try:
 if self._flag.acquire(False):
 self._flag.release()
 return True
 return False
 finally:
 self._cond.release()
 
 def set(self):
 self._cond.acquire()
 try:
 self._flag.acquire(False)
 self._flag.release()
 self._cond.notify_all()
 finally:
 self._cond.release()
 
 def clear(self):
 self._cond.acquire()
 try:
 self._flag.acquire(False)
 finally:
 self._cond.release()
 
 def wait(self, timeout=None):
 self._cond.acquire()
 try:
 if self._flag.acquire(False):
 self._flag.release()
 else:
 self._cond.wait(timeout)
 
 if self._flag.acquire(False):
 self._flag.release()
 return True
 return False
 finally:
 self._cond.release()
 
 |