Viewing file: client_test.py (56.84 KB) -rw-r--r-- Select action/file-type: (+) | (+) | (+) | Code (+) | Session (+) | (+) | SDB (+) | (+) | (+) | (+) | (+) | (+) |
"""Tests for acme.client.""" # pylint: disable=too-many-lines import copy import datetime import json import unittest
from six.moves import http_client # pylint: disable=import-error
import josepy as jose import mock import OpenSSL import requests
from acme import challenges from acme import errors from acme import jws as acme_jws from acme import messages from acme import messages_test from acme import test_util from acme.magic_typing import Dict # pylint: disable=unused-import, no-name-in-module
CERT_DER = test_util.load_vector('cert.der') CERT_SAN_PEM = test_util.load_vector('cert-san.pem') CSR_SAN_PEM = test_util.load_vector('csr-san.pem') KEY = jose.JWKRSA.load(test_util.load_vector('rsa512_key.pem')) KEY2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
DIRECTORY_V1 = messages.Directory({ messages.NewRegistration: 'https://www.letsencrypt-demo.org/acme/new-reg', messages.Revocation: 'https://www.letsencrypt-demo.org/acme/revoke-cert', messages.NewAuthorization: 'https://www.letsencrypt-demo.org/acme/new-authz', messages.CertificateRequest: 'https://www.letsencrypt-demo.org/acme/new-cert', })
DIRECTORY_V2 = messages.Directory({ 'newAccount': 'https://www.letsencrypt-demo.org/acme/new-account', 'newNonce': 'https://www.letsencrypt-demo.org/acme/new-nonce', 'newOrder': 'https://www.letsencrypt-demo.org/acme/new-order', 'revokeCert': 'https://www.letsencrypt-demo.org/acme/revoke-cert', })
class ClientTestBase(unittest.TestCase): """Base for tests in acme.client."""
def setUp(self): self.response = mock.MagicMock( ok=True, status_code=http_client.OK, headers={}, links={}) self.net = mock.MagicMock() self.net.post.return_value = self.response self.net.get.return_value = self.response
self.identifier = messages.Identifier( typ=messages.IDENTIFIER_FQDN, value='example.com')
# Registration self.contact = ('mailto:cert-admin@example.com', 'tel:+12025551212') reg = messages.Registration( contact=self.contact, key=KEY.public_key()) the_arg = dict(reg) # type: Dict self.new_reg = messages.NewRegistration(**the_arg) # pylint: disable=star-args self.regr = messages.RegistrationResource( body=reg, uri='https://www.letsencrypt-demo.org/acme/reg/1')
# Authorization authzr_uri = 'https://www.letsencrypt-demo.org/acme/authz/1' challb = messages.ChallengeBody( uri=(authzr_uri + '/1'), status=messages.STATUS_VALID, chall=challenges.DNS(token=jose.b64decode( 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'))) self.challr = messages.ChallengeResource( body=challb, authzr_uri=authzr_uri) self.authz = messages.Authorization( identifier=messages.Identifier( typ=messages.IDENTIFIER_FQDN, value='example.com'), challenges=(challb,), combinations=None) self.authzr = messages.AuthorizationResource( body=self.authz, uri=authzr_uri)
# Reason code for revocation self.rsn = 1
class BackwardsCompatibleClientV2Test(ClientTestBase): """Tests for acme.client.BackwardsCompatibleClientV2."""
def setUp(self): super(BackwardsCompatibleClientV2Test, self).setUp() # contains a loaded cert self.certr = messages.CertificateResource( body=messages_test.CERT)
loaded = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, CERT_SAN_PEM) wrapped = jose.ComparableX509(loaded) self.chain = [wrapped, wrapped]
self.cert_pem = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, messages_test.CERT.wrapped).decode()
single_chain = OpenSSL.crypto.dump_certificate( OpenSSL.crypto.FILETYPE_PEM, loaded).decode() self.chain_pem = single_chain + single_chain
self.fullchain_pem = self.cert_pem + self.chain_pem
self.orderr = messages.OrderResource( csr_pem=CSR_SAN_PEM)
def _init(self): uri = 'http://www.letsencrypt-demo.org/directory' from acme.client import BackwardsCompatibleClientV2 return BackwardsCompatibleClientV2(net=self.net, key=KEY, server=uri)
def test_init_downloads_directory(self): uri = 'http://www.letsencrypt-demo.org/directory' from acme.client import BackwardsCompatibleClientV2 BackwardsCompatibleClientV2(net=self.net, key=KEY, server=uri) self.net.get.assert_called_once_with(uri)
def test_init_acme_version(self): self.response.json.return_value = DIRECTORY_V1.to_json() client = self._init() self.assertEqual(client.acme_version, 1)
self.response.json.return_value = DIRECTORY_V2.to_json() client = self._init() self.assertEqual(client.acme_version, 2)
def test_query_registration_client_v2(self): self.response.json.return_value = DIRECTORY_V2.to_json() client = self._init() self.response.json.return_value = self.regr.body.to_json() self.assertEqual(self.regr, client.query_registration(self.regr))
def test_forwarding(self): self.response.json.return_value = DIRECTORY_V1.to_json() client = self._init() self.assertEqual(client.directory, client.client.directory) self.assertEqual(client.key, KEY) self.assertEqual(client.deactivate_registration, client.client.deactivate_registration) self.assertRaises(AttributeError, client.__getattr__, 'nonexistent') self.assertRaises(AttributeError, client.__getattr__, 'new_account_and_tos') self.assertRaises(AttributeError, client.__getattr__, 'new_account')
def test_new_account_and_tos(self): # v2 no tos self.response.json.return_value = DIRECTORY_V2.to_json() with mock.patch('acme.client.ClientV2') as mock_client: client = self._init() client.new_account_and_tos(self.new_reg) mock_client().new_account.assert_called_with(self.new_reg)
# v2 tos good with mock.patch('acme.client.ClientV2') as mock_client: mock_client().directory.meta.__contains__.return_value = True client = self._init() client.new_account_and_tos(self.new_reg, lambda x: True) mock_client().new_account.assert_called_with( self.new_reg.update(terms_of_service_agreed=True))
# v2 tos bad with mock.patch('acme.client.ClientV2') as mock_client: mock_client().directory.meta.__contains__.return_value = True client = self._init() def _tos_cb(tos): raise errors.Error self.assertRaises(errors.Error, client.new_account_and_tos, self.new_reg, _tos_cb) mock_client().new_account.assert_not_called()
# v1 yes tos self.response.json.return_value = DIRECTORY_V1.to_json() with mock.patch('acme.client.Client') as mock_client: regr = mock.MagicMock(terms_of_service="TOS") mock_client().register.return_value = regr client = self._init() client.new_account_and_tos(self.new_reg) mock_client().register.assert_called_once_with(self.new_reg) mock_client().agree_to_tos.assert_called_once_with(regr)
# v1 no tos with mock.patch('acme.client.Client') as mock_client: regr = mock.MagicMock(terms_of_service=None) mock_client().register.return_value = regr client = self._init() client.new_account_and_tos(self.new_reg) mock_client().register.assert_called_once_with(self.new_reg) mock_client().agree_to_tos.assert_not_called()
@mock.patch('OpenSSL.crypto.load_certificate_request') @mock.patch('acme.crypto_util._pyopenssl_cert_or_req_all_names') def test_new_order_v1(self, mock__pyopenssl_cert_or_req_all_names, unused_mock_load_certificate_request): self.response.json.return_value = DIRECTORY_V1.to_json() mock__pyopenssl_cert_or_req_all_names.return_value = ['example.com', 'www.example.com'] mock_csr_pem = mock.MagicMock() with mock.patch('acme.client.Client') as mock_client: mock_client().request_domain_challenges.return_value = mock.sentinel.auth client = self._init() orderr = client.new_order(mock_csr_pem) self.assertEqual(orderr.authorizations, [mock.sentinel.auth, mock.sentinel.auth])
def test_new_order_v2(self): self.response.json.return_value = DIRECTORY_V2.to_json() mock_csr_pem = mock.MagicMock() with mock.patch('acme.client.ClientV2') as mock_client: client = self._init() client.new_order(mock_csr_pem) mock_client().new_order.assert_called_once_with(mock_csr_pem)
@mock.patch('acme.client.Client') def test_finalize_order_v1_success(self, mock_client): self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr mock_client().fetch_chain.return_value = self.chain
deadline = datetime.datetime(9999, 9, 9) client = self._init() result = client.finalize_order(self.orderr, deadline) self.assertEqual(result.fullchain_pem, self.fullchain_pem) mock_client().fetch_chain.assert_called_once_with(self.certr)
@mock.patch('acme.client.Client') def test_finalize_order_v1_fetch_chain_error(self, mock_client): self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr mock_client().fetch_chain.return_value = self.chain mock_client().fetch_chain.side_effect = [errors.Error, self.chain]
deadline = datetime.datetime(9999, 9, 9) client = self._init() result = client.finalize_order(self.orderr, deadline) self.assertEqual(result.fullchain_pem, self.fullchain_pem) self.assertEqual(mock_client().fetch_chain.call_count, 2)
@mock.patch('acme.client.Client') def test_finalize_order_v1_timeout(self, mock_client): self.response.json.return_value = DIRECTORY_V1.to_json()
mock_client().request_issuance.return_value = self.certr
deadline = deadline = datetime.datetime.now() - datetime.timedelta(seconds=60) client = self._init() self.assertRaises(errors.TimeoutError, client.finalize_order, self.orderr, deadline)
def test_finalize_order_v2(self): self.response.json.return_value = DIRECTORY_V2.to_json() mock_orderr = mock.MagicMock() mock_deadline = mock.MagicMock() with mock.patch('acme.client.ClientV2') as mock_client: client = self._init() client.finalize_order(mock_orderr, mock_deadline) mock_client().finalize_order.assert_called_once_with(mock_orderr, mock_deadline)
def test_revoke(self): self.response.json.return_value = DIRECTORY_V1.to_json() with mock.patch('acme.client.Client') as mock_client: client = self._init() client.revoke(messages_test.CERT, self.rsn) mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn)
self.response.json.return_value = DIRECTORY_V2.to_json() with mock.patch('acme.client.ClientV2') as mock_client: client = self._init() client.revoke(messages_test.CERT, self.rsn) mock_client().revoke.assert_called_once_with(messages_test.CERT, self.rsn)
def test_update_registration(self): self.response.json.return_value = DIRECTORY_V1.to_json() with mock.patch('acme.client.Client') as mock_client: client = self._init() client.update_registration(mock.sentinel.regr, None) mock_client().update_registration.assert_called_once_with(mock.sentinel.regr, None)
# newNonce present means it will pick acme_version 2 def test_external_account_required_true(self): self.response.json.return_value = messages.Directory({ 'newNonce': 'http://letsencrypt-test.com/acme/new-nonce', 'meta': messages.Directory.Meta(external_account_required=True), }).to_json()
client = self._init()
self.assertTrue(client.external_account_required())
# newNonce present means it will pick acme_version 2 def test_external_account_required_false(self): self.response.json.return_value = messages.Directory({ 'newNonce': 'http://letsencrypt-test.com/acme/new-nonce', 'meta': messages.Directory.Meta(external_account_required=False), }).to_json()
client = self._init()
self.assertFalse(client.external_account_required())
def test_external_account_required_false_v1(self): self.response.json.return_value = messages.Directory({ 'meta': messages.Directory.Meta(external_account_required=False), }).to_json()
client = self._init()
self.assertFalse(client.external_account_required())
class ClientTest(ClientTestBase): """Tests for acme.client.Client.""" # pylint: disable=too-many-instance-attributes,too-many-public-methods
def setUp(self): super(ClientTest, self).setUp()
self.directory = DIRECTORY_V1
# Registration self.regr = self.regr.update( terms_of_service='https://www.letsencrypt-demo.org/tos')
# Request issuance self.certr = messages.CertificateResource( body=messages_test.CERT, authzrs=(self.authzr,), uri='https://www.letsencrypt-demo.org/acme/cert/1', cert_chain_uri='https://www.letsencrypt-demo.org/ca')
from acme.client import Client self.client = Client( directory=self.directory, key=KEY, alg=jose.RS256, net=self.net)
def test_init_downloads_directory(self): uri = 'http://www.letsencrypt-demo.org/directory' from acme.client import Client self.client = Client( directory=uri, key=KEY, alg=jose.RS256, net=self.net) self.net.get.assert_called_once_with(uri)
@mock.patch('acme.client.ClientNetwork') def test_init_without_net(self, mock_net): mock_net.return_value = mock.sentinel.net alg = jose.RS256 from acme.client import Client self.client = Client( directory=self.directory, key=KEY, alg=alg) mock_net.called_once_with(KEY, alg=alg, verify_ssl=True) self.assertEqual(self.client.net, mock.sentinel.net)
def test_register(self): # "Instance of 'Field' has no to_json/update member" bug: # pylint: disable=no-member self.response.status_code = http_client.CREATED self.response.json.return_value = self.regr.body.to_json() self.response.headers['Location'] = self.regr.uri self.response.links.update({ 'terms-of-service': {'url': self.regr.terms_of_service}, })
self.assertEqual(self.regr, self.client.register(self.new_reg)) # TODO: test POST call arguments
def test_update_registration(self): # "Instance of 'Field' has no to_json/update member" bug: # pylint: disable=no-member self.response.headers['Location'] = self.regr.uri self.response.json.return_value = self.regr.body.to_json() self.assertEqual(self.regr, self.client.update_registration(self.regr)) # TODO: test POST call arguments
# TODO: split here and separate test self.response.json.return_value = self.regr.body.update( contact=()).to_json()
def test_deactivate_account(self): self.response.headers['Location'] = self.regr.uri self.response.json.return_value = self.regr.body.to_json() self.assertEqual(self.regr, self.client.deactivate_registration(self.regr))
def test_query_registration(self): self.response.json.return_value = self.regr.body.to_json() self.assertEqual(self.regr, self.client.query_registration(self.regr))
def test_agree_to_tos(self): self.client.update_registration = mock.Mock() self.client.agree_to_tos(self.regr) regr = self.client.update_registration.call_args[0][0] self.assertEqual(self.regr.terms_of_service, regr.body.agreement)
def _prepare_response_for_request_challenges(self): self.response.status_code = http_client.CREATED self.response.headers['Location'] = self.authzr.uri self.response.json.return_value = self.authz.to_json()
def test_request_challenges(self): self._prepare_response_for_request_challenges() self.client.request_challenges(self.identifier) self.net.post.assert_called_once_with( self.directory.new_authz, messages.NewAuthorization(identifier=self.identifier), acme_version=1)
def test_request_challenges_deprecated_arg(self): self._prepare_response_for_request_challenges() self.client.request_challenges(self.identifier, new_authzr_uri="hi") self.net.post.assert_called_once_with( self.directory.new_authz, messages.NewAuthorization(identifier=self.identifier), acme_version=1)
def test_request_challenges_custom_uri(self): self._prepare_response_for_request_challenges() self.client.request_challenges(self.identifier) self.net.post.assert_called_once_with( 'https://www.letsencrypt-demo.org/acme/new-authz', mock.ANY, acme_version=1)
def test_request_challenges_unexpected_update(self): self._prepare_response_for_request_challenges() self.response.json.return_value = self.authz.update( identifier=self.identifier.update(value='foo')).to_json() self.assertRaises( errors.UnexpectedUpdate, self.client.request_challenges, self.identifier)
def test_request_challenges_wildcard(self): wildcard_identifier = messages.Identifier( typ=messages.IDENTIFIER_FQDN, value='*.example.org') self.assertRaises( errors.WildcardUnsupportedError, self.client.request_challenges, wildcard_identifier)
def test_request_domain_challenges(self): self.client.request_challenges = mock.MagicMock() self.assertEqual( self.client.request_challenges(self.identifier), self.client.request_domain_challenges('example.com'))
def test_answer_challenge(self): self.response.links['up'] = {'url': self.challr.authzr_uri} self.response.json.return_value = self.challr.body.to_json()
chall_response = challenges.DNSResponse(validation=None)
self.client.answer_challenge(self.challr.body, chall_response)
# TODO: split here and separate test self.assertRaises(errors.UnexpectedUpdate, self.client.answer_challenge, self.challr.body.update(uri='foo'), chall_response)
def test_answer_challenge_missing_next(self): self.assertRaises( errors.ClientError, self.client.answer_challenge, self.challr.body, challenges.DNSResponse(validation=None))
def test_answer_challenge_key_authorization_fallback(self): self.response.links['up'] = {'url': self.challr.authzr_uri} self.response.json.return_value = self.challr.body.to_json()
def _wrapper_post(url, obj, *args, **kwargs): # pylint: disable=unused-argument """ Simulate an old ACME CA server, that would respond a 'malformed' error if keyAuthorization is missing. """ jobj = obj.to_partial_json() if 'keyAuthorization' not in jobj: raise messages.Error.with_code('malformed') return self.response self.net.post.side_effect = _wrapper_post
# This challenge response is of type KeyAuthorizationChallengeResponse, so the fallback # should be triggered, and avoid an exception. http_chall_response = challenges.HTTP01Response(key_authorization='test', resource=mock.MagicMock()) self.client.answer_challenge(self.challr.body, http_chall_response)
# This challenge response is not of type KeyAuthorizationChallengeResponse, so the fallback # should not be triggered, leading to an exception. dns_chall_response = challenges.DNSResponse(validation=None) self.assertRaises( errors.Error, self.client.answer_challenge, self.challr.body, dns_chall_response)
def test_retry_after_date(self): self.response.headers['Retry-After'] = 'Fri, 31 Dec 1999 23:59:59 GMT' self.assertEqual( datetime.datetime(1999, 12, 31, 23, 59, 59), self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime') def test_retry_after_invalid(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = 'foooo' self.assertEqual( datetime.datetime(2015, 3, 27, 0, 0, 10), self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime') def test_retry_after_overflow(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta dt_mock.datetime.side_effect = datetime.datetime
self.response.headers['Retry-After'] = "Tue, 116 Feb 2016 11:50:00 MST" self.assertEqual( datetime.datetime(2015, 3, 27, 0, 0, 10), self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime') def test_retry_after_seconds(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta
self.response.headers['Retry-After'] = '50' self.assertEqual( datetime.datetime(2015, 3, 27, 0, 0, 50), self.client.retry_after(response=self.response, default=10))
@mock.patch('acme.client.datetime') def test_retry_after_missing(self, dt_mock): dt_mock.datetime.now.return_value = datetime.datetime(2015, 3, 27) dt_mock.timedelta = datetime.timedelta
self.assertEqual( datetime.datetime(2015, 3, 27, 0, 0, 10), self.client.retry_after(response=self.response, default=10))
def test_poll(self): self.response.json.return_value = self.authzr.body.to_json() self.assertEqual((self.authzr, self.response), self.client.poll(self.authzr))
# TODO: split here and separate test self.response.json.return_value = self.authz.update( identifier=self.identifier.update(value='foo')).to_json() self.assertRaises( errors.UnexpectedUpdate, self.client.poll, self.authzr)
def test_request_issuance(self): self.response.content = CERT_DER self.response.headers['Location'] = self.certr.uri self.response.links['up'] = {'url': self.certr.cert_chain_uri} self.assertEqual(self.certr, self.client.request_issuance( messages_test.CSR, (self.authzr,))) # TODO: check POST args
def test_request_issuance_missing_up(self): self.response.content = CERT_DER self.response.headers['Location'] = self.certr.uri self.assertEqual( self.certr.update(cert_chain_uri=None), self.client.request_issuance(messages_test.CSR, (self.authzr,)))
def test_request_issuance_missing_location(self): self.assertRaises( errors.ClientError, self.client.request_issuance, messages_test.CSR, (self.authzr,))
@mock.patch('acme.client.datetime') @mock.patch('acme.client.time') def test_poll_and_request_issuance(self, time_mock, dt_mock): # clock.dt | pylint: disable=no-member clock = mock.MagicMock(dt=datetime.datetime(2015, 3, 27))
def sleep(seconds): """increment clock""" clock.dt += datetime.timedelta(seconds=seconds) time_mock.sleep.side_effect = sleep
def now(): """return current clock value""" return clock.dt dt_mock.datetime.now.side_effect = now dt_mock.timedelta = datetime.timedelta
def poll(authzr): # pylint: disable=missing-docstring # record poll start time based on the current clock value authzr.times.append(clock.dt)
# suppose it takes 2 seconds for server to produce the # result, increment clock clock.dt += datetime.timedelta(seconds=2)
if len(authzr.retries) == 1: # no more retries done = mock.MagicMock(uri=authzr.uri, times=authzr.times) done.body.status = authzr.retries[0] return done, []
# response (2nd result tuple element) is reduced to only # Retry-After header contents represented as integer # seconds; authzr.retries is a list of Retry-After # headers, head(retries) is peeled of as a current # Retry-After header, and tail(retries) is persisted for # later poll() calls return (mock.MagicMock(retries=authzr.retries[1:], uri=authzr.uri + '.', times=authzr.times), authzr.retries[0]) self.client.poll = mock.MagicMock(side_effect=poll)
mintime = 7
def retry_after(response, default): # pylint: disable=missing-docstring # check that poll_and_request_issuance correctly passes mintime self.assertEqual(default, mintime) return clock.dt + datetime.timedelta(seconds=response) self.client.retry_after = mock.MagicMock(side_effect=retry_after)
def request_issuance(csr, authzrs): # pylint: disable=missing-docstring return csr, authzrs self.client.request_issuance = mock.MagicMock( side_effect=request_issuance)
csr = mock.MagicMock() authzrs = ( mock.MagicMock(uri='a', times=[], retries=( 8, 20, 30, messages.STATUS_VALID)), mock.MagicMock(uri='b', times=[], retries=( 5, messages.STATUS_VALID)), )
cert, updated_authzrs = self.client.poll_and_request_issuance( csr, authzrs, mintime=mintime, # make sure that max_attempts is per-authorization, rather # than global max_attempts=max(len(authzrs[0].retries), len(authzrs[1].retries))) self.assertTrue(cert[0] is csr) self.assertTrue(cert[1] is updated_authzrs) self.assertEqual(updated_authzrs[0].uri, 'a...') self.assertEqual(updated_authzrs[1].uri, 'b.') self.assertEqual(updated_authzrs[0].times, [ datetime.datetime(2015, 3, 27), # a is scheduled for 10, but b is polling [9..11), so it # will be picked up as soon as b is finished, without # additional sleeping datetime.datetime(2015, 3, 27, 0, 0, 11), datetime.datetime(2015, 3, 27, 0, 0, 33), datetime.datetime(2015, 3, 27, 0, 1, 5), ]) self.assertEqual(updated_authzrs[1].times, [ datetime.datetime(2015, 3, 27, 0, 0, 2), datetime.datetime(2015, 3, 27, 0, 0, 9), ]) self.assertEqual(clock.dt, datetime.datetime(2015, 3, 27, 0, 1, 7))
# CA sets invalid | TODO: move to a separate test invalid_authzr = mock.MagicMock( times=[], retries=[messages.STATUS_INVALID]) self.assertRaises( errors.PollError, self.client.poll_and_request_issuance, csr, authzrs=(invalid_authzr,), mintime=mintime)
# exceeded max_attempts | TODO: move to a separate test self.assertRaises( errors.PollError, self.client.poll_and_request_issuance, csr, authzrs, mintime=mintime, max_attempts=2)
def test_check_cert(self): self.response.headers['Location'] = self.certr.uri self.response.content = CERT_DER self.assertEqual(self.certr.update(body=messages_test.CERT), self.client.check_cert(self.certr))
# TODO: split here and separate test self.response.headers['Location'] = 'foo' self.assertRaises( errors.UnexpectedUpdate, self.client.check_cert, self.certr)
def test_check_cert_missing_location(self): self.response.content = CERT_DER self.assertRaises( errors.ClientError, self.client.check_cert, self.certr)
def test_refresh(self): self.client.check_cert = mock.MagicMock() self.assertEqual( self.client.check_cert(self.certr), self.client.refresh(self.certr))
def test_fetch_chain_no_up_link(self): self.assertEqual([], self.client.fetch_chain(self.certr.update( cert_chain_uri=None)))
def test_fetch_chain_single(self): # pylint: disable=protected-access self.client._get_cert = mock.MagicMock() self.client._get_cert.return_value = ( mock.MagicMock(links={}), "certificate") self.assertEqual([self.client._get_cert(self.certr.cert_chain_uri)[1]], self.client.fetch_chain(self.certr))
def test_fetch_chain_max(self): # pylint: disable=protected-access up_response = mock.MagicMock(links={'up': {'url': 'http://cert'}}) noup_response = mock.MagicMock(links={}) self.client._get_cert = mock.MagicMock() self.client._get_cert.side_effect = [ (up_response, "cert")] * 9 + [(noup_response, "last_cert")] chain = self.client.fetch_chain(self.certr, max_length=10) self.assertEqual(chain, ["cert"] * 9 + ["last_cert"])
def test_fetch_chain_too_many(self): # recursive # pylint: disable=protected-access response = mock.MagicMock(links={'up': {'url': 'http://cert'}}) self.client._get_cert = mock.MagicMock() self.client._get_cert.return_value = (response, "certificate") self.assertRaises(errors.Error, self.client.fetch_chain, self.certr)
def test_revoke(self): self.client.revoke(self.certr.body, self.rsn) self.net.post.assert_called_once_with( self.directory[messages.Revocation], mock.ANY, acme_version=1)
def test_revocation_payload(self): obj = messages.Revocation(certificate=self.certr.body, reason=self.rsn) self.assertTrue('reason' in obj.to_partial_json().keys()) self.assertEqual(self.rsn, obj.to_partial_json()['reason'])
def test_revoke_bad_status_raises_error(self): self.response.status_code = http_client.METHOD_NOT_ALLOWED self.assertRaises( errors.ClientError, self.client.revoke, self.certr, self.rsn)
class ClientV2Test(ClientTestBase): """Tests for acme.client.ClientV2."""
def setUp(self): super(ClientV2Test, self).setUp()
self.directory = DIRECTORY_V2
from acme.client import ClientV2 self.client = ClientV2(self.directory, self.net)
self.new_reg = self.new_reg.update(terms_of_service_agreed=True)
self.authzr_uri2 = 'https://www.letsencrypt-demo.org/acme/authz/2' self.authz2 = self.authz.update(identifier=messages.Identifier( typ=messages.IDENTIFIER_FQDN, value='www.example.com'), status=messages.STATUS_PENDING) self.authzr2 = messages.AuthorizationResource( body=self.authz2, uri=self.authzr_uri2)
self.order = messages.Order( identifiers=(self.authz.identifier, self.authz2.identifier), status=messages.STATUS_PENDING, authorizations=(self.authzr.uri, self.authzr_uri2), finalize='https://www.letsencrypt-demo.org/acme/acct/1/order/1/finalize') self.orderr = messages.OrderResource( body=self.order, uri='https://www.letsencrypt-demo.org/acme/acct/1/order/1', authorizations=[self.authzr, self.authzr2], csr_pem=CSR_SAN_PEM)
def test_new_account(self): self.response.status_code = http_client.CREATED self.response.json.return_value = self.regr.body.to_json() self.response.headers['Location'] = self.regr.uri
self.assertEqual(self.regr, self.client.new_account(self.new_reg))
def test_new_account_conflict(self): self.response.status_code = http_client.OK self.response.headers['Location'] = self.regr.uri self.assertRaises(errors.ConflictError, self.client.new_account, self.new_reg)
def test_new_order(self): order_response = copy.deepcopy(self.response) order_response.status_code = http_client.CREATED order_response.json.return_value = self.order.to_json() order_response.headers['Location'] = self.orderr.uri self.net.post.return_value = order_response
authz_response = copy.deepcopy(self.response) authz_response.json.return_value = self.authz.to_json() authz_response.headers['Location'] = self.authzr.uri authz_response2 = self.response authz_response2.json.return_value = self.authz2.to_json() authz_response2.headers['Location'] = self.authzr2.uri
with mock.patch('acme.client.ClientV2._post_as_get') as mock_post_as_get: mock_post_as_get.side_effect = (authz_response, authz_response2) self.assertEqual(self.client.new_order(CSR_SAN_PEM), self.orderr)
@mock.patch('acme.client.datetime') def test_poll_and_finalize(self, mock_datetime): mock_datetime.datetime.now.return_value = datetime.datetime(2018, 2, 15) mock_datetime.timedelta = datetime.timedelta expected_deadline = mock_datetime.datetime.now() + datetime.timedelta(seconds=90)
self.client.poll_authorizations = mock.Mock(return_value=self.orderr) self.client.finalize_order = mock.Mock(return_value=self.orderr)
self.assertEqual(self.client.poll_and_finalize(self.orderr), self.orderr) self.client.poll_authorizations.assert_called_once_with(self.orderr, expected_deadline) self.client.finalize_order.assert_called_once_with(self.orderr, expected_deadline)
@mock.patch('acme.client.datetime') def test_poll_authorizations_timeout(self, mock_datetime): now_side_effect = [datetime.datetime(2018, 2, 15), datetime.datetime(2018, 2, 16), datetime.datetime(2018, 2, 17)] mock_datetime.datetime.now.side_effect = now_side_effect self.response.json.side_effect = [ self.authz.to_json(), self.authz2.to_json(), self.authz2.to_json()]
self.assertRaises( errors.TimeoutError, self.client.poll_authorizations, self.orderr, now_side_effect[1])
def test_poll_authorizations_failure(self): deadline = datetime.datetime(9999, 9, 9) challb = self.challr.body.update(status=messages.STATUS_INVALID, error=messages.Error.with_code('unauthorized')) authz = self.authz.update(status=messages.STATUS_INVALID, challenges=(challb,)) self.response.json.return_value = authz.to_json()
self.assertRaises( errors.ValidationError, self.client.poll_authorizations, self.orderr, deadline)
def test_poll_authorizations_success(self): deadline = datetime.datetime(9999, 9, 9) updated_authz2 = self.authz2.update(status=messages.STATUS_VALID) updated_authzr2 = messages.AuthorizationResource( body=updated_authz2, uri=self.authzr_uri2) updated_orderr = self.orderr.update(authorizations=[self.authzr, updated_authzr2])
self.response.json.side_effect = ( self.authz.to_json(), self.authz2.to_json(), updated_authz2.to_json()) self.assertEqual(self.client.poll_authorizations(self.orderr, deadline), updated_orderr)
def test_finalize_order_success(self): updated_order = self.order.update( certificate='https://www.letsencrypt-demo.org/acme/cert/') updated_orderr = self.orderr.update(body=updated_order, fullchain_pem=CERT_SAN_PEM)
self.response.json.return_value = updated_order.to_json() self.response.text = CERT_SAN_PEM
deadline = datetime.datetime(9999, 9, 9) self.assertEqual(self.client.finalize_order(self.orderr, deadline), updated_orderr)
def test_finalize_order_error(self): updated_order = self.order.update(error=messages.Error.with_code('unauthorized')) self.response.json.return_value = updated_order.to_json()
deadline = datetime.datetime(9999, 9, 9) self.assertRaises(errors.IssuanceError, self.client.finalize_order, self.orderr, deadline)
def test_finalize_order_timeout(self): deadline = datetime.datetime.now() - datetime.timedelta(seconds=60) self.assertRaises(errors.TimeoutError, self.client.finalize_order, self.orderr, deadline)
def test_revoke(self): self.client.revoke(messages_test.CERT, self.rsn) self.net.post.assert_called_once_with( self.directory["revokeCert"], mock.ANY, acme_version=2, new_nonce_url=DIRECTORY_V2['newNonce'])
def test_update_registration(self): # "Instance of 'Field' has no to_json/update member" bug: # pylint: disable=no-member self.response.headers['Location'] = self.regr.uri self.response.json.return_value = self.regr.body.to_json() self.assertEqual(self.regr, self.client.update_registration(self.regr)) self.assertNotEqual(self.client.net.account, None) self.assertEqual(self.client.net.post.call_count, 2) self.assertTrue(DIRECTORY_V2.newAccount in self.net.post.call_args_list[0][0])
self.response.json.return_value = self.regr.body.update( contact=()).to_json()
def test_external_account_required_true(self): self.client.directory = messages.Directory({ 'meta': messages.Directory.Meta(external_account_required=True) })
self.assertTrue(self.client.external_account_required())
def test_external_account_required_false(self): self.client.directory = messages.Directory({ 'meta': messages.Directory.Meta(external_account_required=False) })
self.assertFalse(self.client.external_account_required())
def test_external_account_required_default(self): self.assertFalse(self.client.external_account_required())
def test_post_as_get(self): with mock.patch('acme.client.ClientV2._authzr_from_response') as mock_client: mock_client.return_value = self.authzr2
self.client.poll(self.authzr2) # pylint: disable=protected-access
self.client.net.post.assert_called_once_with( self.authzr2.uri, None, acme_version=2, new_nonce_url='https://www.letsencrypt-demo.org/acme/new-nonce') self.client.net.get.assert_not_called()
class FakeError(messages.Error): # pylint: disable=too-many-ancestors """Fake error to reproduce a malformed request ACME error""" def __init__(self): # pylint: disable=super-init-not-called pass @property def code(self): return 'malformed' self.client.net.post.side_effect = FakeError()
self.client.poll(self.authzr2) # pylint: disable=protected-access
self.client.net.get.assert_called_once_with(self.authzr2.uri)
class MockJSONDeSerializable(jose.JSONDeSerializable): # pylint: disable=missing-docstring def __init__(self, value): self.value = value
def to_partial_json(self): return {'foo': self.value}
@classmethod def from_json(cls, value): pass # pragma: no cover
class ClientNetworkTest(unittest.TestCase): """Tests for acme.client.ClientNetwork.""" # pylint: disable=too-many-public-methods
def setUp(self): self.verify_ssl = mock.MagicMock() self.wrap_in_jws = mock.MagicMock(return_value=mock.sentinel.wrapped)
from acme.client import ClientNetwork self.net = ClientNetwork( key=KEY, alg=jose.RS256, verify_ssl=self.verify_ssl, user_agent='acme-python-test')
self.response = mock.MagicMock(ok=True, status_code=http_client.OK) self.response.headers = {} self.response.links = {}
def test_init(self): self.assertTrue(self.net.verify_ssl is self.verify_ssl)
def test_wrap_in_jws(self): # pylint: disable=protected-access jws_dump = self.net._wrap_in_jws( MockJSONDeSerializable('foo'), nonce=b'Tg', url="url", acme_version=1) jws = acme_jws.JWS.json_loads(jws_dump) self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) self.assertEqual(jws.signature.combined.nonce, b'Tg')
def test_wrap_in_jws_v2(self): self.net.account = {'uri': 'acct-uri'} # pylint: disable=protected-access jws_dump = self.net._wrap_in_jws( MockJSONDeSerializable('foo'), nonce=b'Tg', url="url", acme_version=2) jws = acme_jws.JWS.json_loads(jws_dump) self.assertEqual(json.loads(jws.payload.decode()), {'foo': 'foo'}) self.assertEqual(jws.signature.combined.nonce, b'Tg') self.assertEqual(jws.signature.combined.kid, u'acct-uri') self.assertEqual(jws.signature.combined.url, u'url')
def test_check_response_not_ok_jobj_no_error(self): self.response.ok = False self.response.json.return_value = {} with mock.patch('acme.client.messages.Error.from_json') as from_json: from_json.side_effect = jose.DeserializationError # pylint: disable=protected-access self.assertRaises( errors.ClientError, self.net._check_response, self.response)
def test_check_response_not_ok_jobj_error(self): self.response.ok = False self.response.json.return_value = messages.Error( detail='foo', typ='serverInternal', title='some title').to_json() # pylint: disable=protected-access self.assertRaises( messages.Error, self.net._check_response, self.response)
def test_check_response_not_ok_no_jobj(self): self.response.ok = False self.response.json.side_effect = ValueError # pylint: disable=protected-access self.assertRaises( errors.ClientError, self.net._check_response, self.response)
def test_check_response_ok_no_jobj_ct_required(self): self.response.json.side_effect = ValueError for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: self.response.headers['Content-Type'] = response_ct # pylint: disable=protected-access self.assertRaises( errors.ClientError, self.net._check_response, self.response, content_type=self.net.JSON_CONTENT_TYPE)
def test_check_response_ok_no_jobj_no_ct(self): self.response.json.side_effect = ValueError for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: self.response.headers['Content-Type'] = response_ct # pylint: disable=protected-access,no-value-for-parameter self.assertEqual( self.response, self.net._check_response(self.response))
def test_check_response_conflict(self): self.response.ok = False self.response.status_code = 409 # pylint: disable=protected-access self.assertRaises(errors.ConflictError, self.net._check_response, self.response)
def test_check_response_jobj(self): self.response.json.return_value = {} for response_ct in [self.net.JSON_CONTENT_TYPE, 'foo']: self.response.headers['Content-Type'] = response_ct # pylint: disable=protected-access,no-value-for-parameter self.assertEqual( self.response, self.net._check_response(self.response))
def test_send_request(self): self.net.session = mock.MagicMock() self.net.session.request.return_value = self.response # pylint: disable=protected-access self.assertEqual(self.response, self.net._send_request( 'HEAD', 'http://example.com/', 'foo', bar='baz')) self.net.session.request.assert_called_once_with( 'HEAD', 'http://example.com/', 'foo', headers=mock.ANY, verify=mock.ANY, timeout=mock.ANY, bar='baz')
@mock.patch('acme.client.logger') def test_send_request_get_der(self, mock_logger): self.net.session = mock.MagicMock() self.net.session.request.return_value = mock.MagicMock( ok=True, status_code=http_client.OK, headers={"Content-Type": "application/pkix-cert"}, content=b"hi") # pylint: disable=protected-access self.net._send_request('HEAD', 'http://example.com/', 'foo', timeout=mock.ANY, bar='baz') mock_logger.debug.assert_called_with( 'Received response:\nHTTP %d\n%s\n\n%s', 200, 'Content-Type: application/pkix-cert', b'aGk=')
def test_send_request_post(self): self.net.session = mock.MagicMock() self.net.session.request.return_value = self.response # pylint: disable=protected-access self.assertEqual(self.response, self.net._send_request( 'POST', 'http://example.com/', 'foo', data='qux', bar='baz')) self.net.session.request.assert_called_once_with( 'POST', 'http://example.com/', 'foo', headers=mock.ANY, verify=mock.ANY, timeout=mock.ANY, data='qux', bar='baz')
def test_send_request_verify_ssl(self): # pylint: disable=protected-access for verify in True, False: self.net.session = mock.MagicMock() self.net.session.request.return_value = self.response self.net.verify_ssl = verify # pylint: disable=protected-access self.assertEqual( self.response, self.net._send_request('GET', 'http://example.com/')) self.net.session.request.assert_called_once_with( 'GET', 'http://example.com/', verify=verify, timeout=mock.ANY, headers=mock.ANY)
def test_send_request_user_agent(self): self.net.session = mock.MagicMock() # pylint: disable=protected-access self.net._send_request('GET', 'http://example.com/', headers={'bar': 'baz'}) self.net.session.request.assert_called_once_with( 'GET', 'http://example.com/', verify=mock.ANY, timeout=mock.ANY, headers={'User-Agent': 'acme-python-test', 'bar': 'baz'})
self.net._send_request('GET', 'http://example.com/', headers={'User-Agent': 'foo2'}) self.net.session.request.assert_called_with( 'GET', 'http://example.com/', verify=mock.ANY, timeout=mock.ANY, headers={'User-Agent': 'foo2'})
def test_send_request_timeout(self): self.net.session = mock.MagicMock() # pylint: disable=protected-access self.net._send_request('GET', 'http://example.com/', headers={'bar': 'baz'}) self.net.session.request.assert_called_once_with( mock.ANY, mock.ANY, verify=mock.ANY, headers=mock.ANY, timeout=45)
def test_del(self, close_exception=None): sess = mock.MagicMock()
if close_exception is not None: sess.close.side_effect = close_exception
self.net.session = sess del self.net sess.close.assert_called_once_with()
def test_del_error(self): self.test_del(ReferenceError)
@mock.patch('acme.client.requests') def test_requests_error_passthrough(self, mock_requests): mock_requests.exceptions = requests.exceptions mock_requests.request.side_effect = requests.exceptions.RequestException # pylint: disable=protected-access self.assertRaises(requests.exceptions.RequestException, self.net._send_request, 'GET', 'uri')
def test_urllib_error(self): # Using a connection error to test a properly formatted error message try: # pylint: disable=protected-access self.net._send_request('GET', "http://localhost:19123/nonexistent.txt")
# Value Error Generated Exceptions except ValueError as y: self.assertEqual("Requesting localhost/nonexistent: " "Connection refused", str(y))
# Requests Library Exceptions except requests.exceptions.ConnectionError as z: #pragma: no cover self.assertTrue("'Connection aborted.'" in str(z) or "[WinError 10061]" in str(z))
class ClientNetworkWithMockedResponseTest(unittest.TestCase): """Tests for acme.client.ClientNetwork which mock out response.""" # pylint: disable=too-many-instance-attributes
def setUp(self): from acme.client import ClientNetwork self.net = ClientNetwork(key=None, alg=None)
self.response = mock.MagicMock(ok=True, status_code=http_client.OK) self.response.headers = {} self.response.links = {} self.response.checked = False self.acmev1_nonce_response = mock.MagicMock(ok=False, status_code=http_client.METHOD_NOT_ALLOWED) self.acmev1_nonce_response.headers = {} self.obj = mock.MagicMock() self.wrapped_obj = mock.MagicMock() self.content_type = mock.sentinel.content_type
self.all_nonces = [ jose.b64encode(b'Nonce'), jose.b64encode(b'Nonce2'), jose.b64encode(b'Nonce3')] self.available_nonces = self.all_nonces[:]
def send_request(*args, **kwargs): # pylint: disable=unused-argument,missing-docstring self.assertFalse("new_nonce_url" in kwargs) method = args[0] uri = args[1] if method == 'HEAD' and uri != "new_nonce_uri": response = self.acmev1_nonce_response else: response = self.response
if self.available_nonces: response.headers = { self.net.REPLAY_NONCE_HEADER: self.available_nonces.pop().decode()} else: response.headers = {} return response
# pylint: disable=protected-access self.net._send_request = self.send_request = mock.MagicMock( side_effect=send_request) self.net._check_response = self.check_response self.net._wrap_in_jws = mock.MagicMock(return_value=self.wrapped_obj)
def check_response(self, response, content_type): # pylint: disable=missing-docstring self.assertEqual(self.response, response) self.assertEqual(self.content_type, content_type) self.assertTrue(self.response.ok) self.response.checked = True return self.response
def test_head(self): self.assertEqual(self.acmev1_nonce_response, self.net.head( 'http://example.com/', 'foo', bar='baz')) self.send_request.assert_called_once_with( 'HEAD', 'http://example.com/', 'foo', bar='baz')
def test_head_v2(self): self.assertEqual(self.response, self.net.head( 'new_nonce_uri', 'foo', bar='baz')) self.send_request.assert_called_once_with( 'HEAD', 'new_nonce_uri', 'foo', bar='baz')
def test_get(self): self.assertEqual(self.response, self.net.get( 'http://example.com/', content_type=self.content_type, bar='baz')) self.assertTrue(self.response.checked) self.send_request.assert_called_once_with( 'GET', 'http://example.com/', bar='baz')
def test_post_no_content_type(self): self.content_type = self.net.JOSE_CONTENT_TYPE self.assertEqual(self.response, self.net.post('uri', self.obj)) self.assertTrue(self.response.checked)
def test_post(self): # pylint: disable=protected-access self.assertEqual(self.response, self.net.post( 'uri', self.obj, content_type=self.content_type)) self.assertTrue(self.response.checked) self.net._wrap_in_jws.assert_called_once_with( self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
self.available_nonces = [] self.assertRaises(errors.MissingNonce, self.net.post, 'uri', self.obj, content_type=self.content_type) self.net._wrap_in_jws.assert_called_with( self.obj, jose.b64decode(self.all_nonces.pop()), "uri", 1)
def test_post_wrong_initial_nonce(self): # HEAD self.available_nonces = [b'f', jose.b64encode(b'good')] self.assertRaises(errors.BadNonce, self.net.post, 'uri', self.obj, content_type=self.content_type)
def test_post_wrong_post_response_nonce(self): self.available_nonces = [jose.b64encode(b'good'), b'f'] self.assertRaises(errors.BadNonce, self.net.post, 'uri', self.obj, content_type=self.content_type)
def test_post_failed_retry(self): check_response = mock.MagicMock() check_response.side_effect = messages.Error.with_code('badNonce')
# pylint: disable=protected-access self.net._check_response = check_response self.assertRaises(messages.Error, self.net.post, 'uri', self.obj, content_type=self.content_type)
def test_post_not_retried(self): check_response = mock.MagicMock() check_response.side_effect = [messages.Error.with_code('malformed'), self.response]
# pylint: disable=protected-access self.net._check_response = check_response self.assertRaises(messages.Error, self.net.post, 'uri', self.obj, content_type=self.content_type)
def test_post_successful_retry(self): post_once = mock.MagicMock() post_once.side_effect = [messages.Error.with_code('badNonce'), self.response]
# pylint: disable=protected-access self.assertEqual(self.response, self.net.post( 'uri', self.obj, content_type=self.content_type))
def test_head_get_post_error_passthrough(self): self.send_request.side_effect = requests.exceptions.RequestException for method in self.net.head, self.net.get: self.assertRaises( requests.exceptions.RequestException, method, 'GET', 'uri') self.assertRaises(requests.exceptions.RequestException, self.net.post, 'uri', obj=self.obj)
def test_post_bad_nonce_head(self): # pylint: disable=protected-access # regression test for https://github.com/certbot/certbot/issues/6092 bad_response = mock.MagicMock(ok=False, status_code=http_client.SERVICE_UNAVAILABLE) self.net._send_request = mock.MagicMock() self.net._send_request.return_value = bad_response self.content_type = None check_response = mock.MagicMock() self.net._check_response = check_response self.assertRaises(errors.ClientError, self.net.post, 'uri', self.obj, content_type=self.content_type, acme_version=2, new_nonce_url='new_nonce_uri') self.assertEqual(check_response.call_count, 1)
def test_new_nonce_uri_removed(self): self.content_type = None self.net.post('uri', self.obj, content_type=None, acme_version=2, new_nonce_url='new_nonce_uri')
class ClientNetworkSourceAddressBindingTest(unittest.TestCase): """Tests that if ClientNetwork has a source IP set manually, the underlying library has used the provided source address."""
def setUp(self): self.source_address = "8.8.8.8"
def test_source_address_set(self): from acme.client import ClientNetwork net = ClientNetwork(key=None, alg=None, source_address=self.source_address) for adapter in net.session.adapters.values(): self.assertTrue(self.source_address in adapter.source_address)
def test_behavior_assumption(self): """This is a test that guardrails the HTTPAdapter behavior so that if the default for a Session() changes, the assumptions here aren't violated silently.""" from acme.client import ClientNetwork # Source address not specified, so the default adapter type should be bound -- this # test should fail if the default adapter type is changed by requests net = ClientNetwork(key=None, alg=None) session = requests.Session() for scheme in session.adapters.keys(): client_network_adapter = net.session.adapters.get(scheme) default_adapter = session.adapters.get(scheme) self.assertEqual(client_network_adapter.__class__, default_adapter.__class__)
if __name__ == '__main__': unittest.main() # pragma: no cover
|