| Viewing file:  util.py (4.78 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
"""Utility functions."""
 import sys
 import os
 import base64
 import json
 import hashlib
 try:
 from collections import OrderedDict
 except ImportError:
 OrderedDict = dict
 
 __all__ = ['urlsafe_b64encode', 'urlsafe_b64decode', 'utf8',
 'to_json', 'from_json', 'matches_requirement']
 
 def urlsafe_b64encode(data):
 """urlsafe_b64encode without padding"""
 return base64.urlsafe_b64encode(data).rstrip(binary('='))
 
 
 def urlsafe_b64decode(data):
 """urlsafe_b64decode without padding"""
 pad = b'=' * (4 - (len(data) & 3))
 return base64.urlsafe_b64decode(data + pad)
 
 
 def to_json(o):
 '''Convert given data to JSON.'''
 return json.dumps(o, sort_keys=True)
 
 
 def from_json(j):
 '''Decode a JSON payload.'''
 return json.loads(j)
 
 def open_for_csv(name, mode):
 if sys.version_info[0] < 3:
 nl = {}
 bin = 'b'
 else:
 nl = { 'newline': '' }
 bin = ''
 return open(name, mode + bin, **nl)
 
 try:
 unicode
 
 def utf8(data):
 '''Utf-8 encode data.'''
 if isinstance(data, unicode):
 return data.encode('utf-8')
 return data
 except NameError:
 def utf8(data):
 '''Utf-8 encode data.'''
 if isinstance(data, str):
 return data.encode('utf-8')
 return data
 
 
 try:
 # For encoding ascii back and forth between bytestrings, as is repeatedly
 # necessary in JSON-based crypto under Python 3
 unicode
 def native(s):
 return s
 def binary(s):
 if isinstance(s, unicode):
 return s.encode('ascii')
 return s
 except NameError:
 def native(s):
 if isinstance(s, bytes):
 return s.decode('ascii')
 return s
 def binary(s):
 if isinstance(s, str):
 return s.encode('ascii')
 
 class HashingFile(object):
 def __init__(self, fd, hashtype='sha256'):
 self.fd = fd
 self.hashtype = hashtype
 self.hash = hashlib.new(hashtype)
 self.length = 0
 def write(self, data):
 self.hash.update(data)
 self.length += len(data)
 self.fd.write(data)
 def close(self):
 self.fd.close()
 def digest(self):
 if self.hashtype == 'md5':
 return self.hash.hexdigest()
 digest = self.hash.digest()
 return self.hashtype + '=' + native(urlsafe_b64encode(digest))
 
 class OrderedDefaultDict(OrderedDict):
 def __init__(self, *args, **kwargs):
 if not args:
 self.default_factory = None
 else:
 if not (args[0] is None or callable(args[0])):
 raise TypeError('first argument must be callable or None')
 self.default_factory = args[0]
 args = args[1:]
 super(OrderedDefaultDict, self).__init__(*args, **kwargs)
 
 def __missing__ (self, key):
 if self.default_factory is None:
 raise KeyError(key)
 self[key] = default = self.default_factory()
 return default
 
 if sys.platform == 'win32':
 import ctypes.wintypes
 # CSIDL_APPDATA for reference - not used here for compatibility with
 # dirspec, which uses LOCAL_APPDATA and COMMON_APPDATA in that order
 csidl = dict(CSIDL_APPDATA=26, CSIDL_LOCAL_APPDATA=28,
 CSIDL_COMMON_APPDATA=35)
 def get_path(name):
 SHGFP_TYPE_CURRENT = 0
 buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
 ctypes.windll.shell32.SHGetFolderPathW(0, csidl[name], 0, SHGFP_TYPE_CURRENT, buf)
 return buf.value
 
 def save_config_path(*resource):
 appdata = get_path("CSIDL_LOCAL_APPDATA")
 path = os.path.join(appdata, *resource)
 if not os.path.isdir(path):
 os.makedirs(path)
 return path
 def load_config_paths(*resource):
 ids = ["CSIDL_LOCAL_APPDATA", "CSIDL_COMMON_APPDATA"]
 for id in ids:
 base = get_path(id)
 path = os.path.join(base, *resource)
 if os.path.exists(path):
 yield path
 else:
 def save_config_path(*resource):
 import xdg.BaseDirectory
 return xdg.BaseDirectory.save_config_path(*resource)
 def load_config_paths(*resource):
 import xdg.BaseDirectory
 return xdg.BaseDirectory.load_config_paths(*resource)
 
 def matches_requirement(req, wheels):
 """List of wheels matching a requirement.
 
 :param req: The requirement to satisfy
 :param wheels: List of wheels to search.
 """
 try:
 from pkg_resources import Distribution, Requirement
 except ImportError:
 raise RuntimeError("Cannot use requirements without pkg_resources")
 
 req = Requirement.parse(req)
 
 selected = []
 for wf in wheels:
 f = wf.parsed_filename
 dist = Distribution(project_name=f.group("name"), version=f.group("ver"))
 if dist in req:
 selected.append(wf)
 return selected
 
 |