| Viewing file:  pool.py (6.47 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""Module implementing the Pool for :mod:``requests_toolbelt.threaded``."""import multiprocessing
 import requests
 
 from . import thread
 from .._compat import queue
 
 
 class Pool(object):
 """Pool that manages the threads containing sessions.
 
 :param queue:
 The queue you're expected to use to which you should add items.
 :type queue: queue.Queue
 :param initializer:
 Function used to initialize an instance of ``session``.
 :type initializer: collections.Callable
 :param auth_generator:
 Function used to generate new auth credentials for the session.
 :type auth_generator: collections.Callable
 :param int num_threads:
 Number of threads to create.
 :param session:
 :type session: requests.Session
 """
 
 def __init__(self, job_queue, initializer=None, auth_generator=None,
 num_processes=None, session=requests.Session):
 if num_processes is None:
 num_processes = multiprocessing.cpu_count() or 1
 
 if num_processes < 1:
 raise ValueError("Number of processes should at least be 1.")
 
 self._job_queue = job_queue
 self._response_queue = queue.Queue()
 self._exc_queue = queue.Queue()
 self._processes = num_processes
 self._initializer = initializer or _identity
 self._auth = auth_generator or _identity
 self._session = session
 self._pool = [
 thread.SessionThread(self._new_session(), self._job_queue,
 self._response_queue, self._exc_queue)
 for _ in range(self._processes)
 ]
 
 def _new_session(self):
 return self._auth(self._initializer(self._session()))
 
 @classmethod
 def from_exceptions(cls, exceptions, **kwargs):
 r"""Create a :class:`~Pool` from an :class:`~ThreadException`\ s.
 
 Provided an iterable that provides :class:`~ThreadException` objects,
 this classmethod will generate a new pool to retry the requests that
 caused the exceptions.
 
 :param exceptions:
 Iterable that returns :class:`~ThreadException`
 :type exceptions: iterable
 :param kwargs:
 Keyword arguments passed to the :class:`~Pool` initializer.
 :returns: An initialized :class:`~Pool` object.
 :rtype: :class:`~Pool`
 """
 job_queue = queue.Queue()
 for exc in exceptions:
 job_queue.put(exc.request_kwargs)
 
 return cls(job_queue=job_queue, **kwargs)
 
 @classmethod
 def from_urls(cls, urls, request_kwargs=None, **kwargs):
 """Create a :class:`~Pool` from an iterable of URLs.
 
 :param urls:
 Iterable that returns URLs with which we create a pool.
 :type urls: iterable
 :param dict request_kwargs:
 Dictionary of other keyword arguments to provide to the request
 method.
 :param kwargs:
 Keyword arguments passed to the :class:`~Pool` initializer.
 :returns: An initialized :class:`~Pool` object.
 :rtype: :class:`~Pool`
 """
 request_dict = {'method': 'GET'}
 request_dict.update(request_kwargs or {})
 job_queue = queue.Queue()
 for url in urls:
 job = request_dict.copy()
 job.update({'url': url})
 job_queue.put(job)
 
 return cls(job_queue=job_queue, **kwargs)
 
 def exceptions(self):
 """Iterate over all the exceptions in the pool.
 
 :returns: Generator of :class:`~ThreadException`
 """
 while True:
 exc = self.get_exception()
 if exc is None:
 break
 yield exc
 
 def get_exception(self):
 """Get an exception from the pool.
 
 :rtype: :class:`~ThreadException`
 """
 try:
 (request, exc) = self._exc_queue.get_nowait()
 except queue.Empty:
 return None
 else:
 return ThreadException(request, exc)
 
 def get_response(self):
 """Get a response from the pool.
 
 :rtype: :class:`~ThreadResponse`
 """
 try:
 (request, response) = self._response_queue.get_nowait()
 except queue.Empty:
 return None
 else:
 return ThreadResponse(request, response)
 
 def responses(self):
 """Iterate over all the responses in the pool.
 
 :returns: Generator of :class:`~ThreadResponse`
 """
 while True:
 resp = self.get_response()
 if resp is None:
 break
 yield resp
 
 def join_all(self):
 """Join all the threads to the master thread."""
 for session_thread in self._pool:
 session_thread.join()
 
 
 class ThreadProxy(object):
 proxied_attr = None
 
 def __getattr__(self, attr):
 """Proxy attribute accesses to the proxied object."""
 get = object.__getattribute__
 if attr not in self.attrs:
 response = get(self, self.proxied_attr)
 return getattr(response, attr)
 else:
 return get(self, attr)
 
 
 class ThreadResponse(ThreadProxy):
 """A wrapper around a requests Response object.
 
 This will proxy most attribute access actions to the Response object. For
 example, if you wanted the parsed JSON from the response, you might do:
 
 .. code-block:: python
 
 thread_response = pool.get_response()
 json = thread_response.json()
 
 """
 proxied_attr = 'response'
 attrs = frozenset(['request_kwargs', 'response'])
 
 def __init__(self, request_kwargs, response):
 #: The original keyword arguments provided to the queue
 self.request_kwargs = request_kwargs
 #: The wrapped response
 self.response = response
 
 
 class ThreadException(ThreadProxy):
 """A wrapper around an exception raised during a request.
 
 This will proxy most attribute access actions to the exception object. For
 example, if you wanted the message from the exception, you might do:
 
 .. code-block:: python
 
 thread_exc = pool.get_exception()
 msg = thread_exc.message
 
 """
 proxied_attr = 'exception'
 attrs = frozenset(['request_kwargs', 'exception'])
 
 def __init__(self, request_kwargs, exception):
 #: The original keyword arguments provided to the queue
 self.request_kwargs = request_kwargs
 #: The captured and wrapped exception
 self.exception = exception
 
 
 def _identity(session_obj):
 return session_obj
 
 
 __all__ = ['ThreadException', 'ThreadResponse', 'Pool']
 
 |