From 60571e65dd0fa5f1065873fd0aeec8d956e296b5 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Fri, 23 Oct 2015 12:45:32 +0300 Subject: [PATCH] trezor: add support for Ed25519 SSH keys --- .pylintrc | 2 +- setup.py | 4 +- tox.ini | 6 +- trezor_agent/__main__.py | 36 ++++---- trezor_agent/formats.py | 138 +++++++++++++++++++--------- trezor_agent/protocol.py | 19 +--- trezor_agent/tests/test_formats.py | 40 +++++++- trezor_agent/tests/test_protocol.py | 65 ++++++++----- trezor_agent/tests/test_trezor.py | 17 ++-- trezor_agent/trezor/client.py | 36 ++++---- 10 files changed, 225 insertions(+), 138 deletions(-) diff --git a/.pylintrc b/.pylintrc index aad9f69..f213de1 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,2 +1,2 @@ [MESSAGES CONTROL] -disable=invalid-name, missing-docstring, locally-disabled +disable=invalid-name, missing-docstring, locally-disabled,no-member diff --git a/setup.py b/setup.py index 972c64f..5309717 100644 --- a/setup.py +++ b/setup.py @@ -3,14 +3,14 @@ from setuptools import setup setup( name='trezor_agent', - version='0.4.2', + version='0.5.0', description='Using Trezor as hardware SSH agent', author='Roman Zeyde', author_email='roman.zeyde@gmail.com', license='MIT', url='http://github.com/romanz/trezor-agent', packages=['trezor_agent', 'trezor_agent.trezor'], - install_requires=['ecdsa>=0.13', 'trezor>=0.6.6'], + install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'trezor>=0.6.6'], platforms=['POSIX'], classifiers=[ 'Development Status :: 3 - Alpha', diff --git a/tox.ini b/tox.ini index e33cdf5..dec83b4 100644 --- a/tox.ini +++ b/tox.ini @@ -1,6 +1,5 @@ [tox] -envlist = py27,py34 -skipsdist = True +envlist = py27 [testenv] deps= pytest @@ -9,9 +8,8 @@ deps= coverage pylint six - ecdsa commands= pep8 trezor_agent pylint --report=no --rcfile .pylintrc trezor_agent - coverage run --omit='trezor_agent/__main__.py,trezor_agent/trezor/_library.py' --source trezor_agent/ -m py.test -v + coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent coverage report diff --git a/trezor_agent/__main__.py b/trezor_agent/__main__.py index 213956d..1b658bd 100644 --- a/trezor_agent/__main__.py +++ b/trezor_agent/__main__.py @@ -3,9 +3,11 @@ import re import sys import argparse import subprocess +import functools from . import trezor from . import server +from . import formats import logging log = logging.getLogger(__name__) @@ -48,9 +50,13 @@ def create_agent_parser(): g = p.add_mutually_exclusive_group() g.add_argument('-s', '--shell', default=False, action='store_true', - help='run $SHELL as subprocess under SSH agent') + help='run ${SHELL} as subprocess under SSH agent') g.add_argument('-c', '--connect', default=False, action='store_true', help='connect to specified host via SSH') + curves = ', '.join(sorted(formats.SUPPORTED_CURVES)) + p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE', + default=formats.CURVE_NIST256, + help='specify ECDSA curve name: ' + curves) p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT', help='command to run under the SSH agent') return p @@ -64,21 +70,15 @@ def setup_logging(verbosity): logging.basicConfig(format=fmt, level=level) -def ssh_command(identity): - command = ['ssh', identity.host] - if identity.user: - command += ['-l', identity.user] - if identity.port: - command += ['-p', identity.port] - return command +def ssh_sign(client, label, blob): + return client.sign_ssh_challenge(label=label, blob=blob) -def trezor_agent(): +def run_agent(client_factory): args = create_agent_parser().parse_args() setup_logging(verbosity=args.verbose) - with trezor.Client() as client: - + with client_factory(curve=args.ecdsa_curve_name) as client: label = args.identity command = args.command @@ -88,12 +88,11 @@ def trezor_agent(): if command: command = ['git'] + command - identity = client.get_identity(label=label) - public_key = client.get_public_key(identity=identity) + public_key = client.get_public_key(label=label) use_shell = False if args.connect: - command = ssh_command(identity) + args.command + command = ['ssh', label] + args.command log.debug('SSH connect: %r', command) if args.shell: @@ -104,13 +103,14 @@ def trezor_agent(): sys.stdout.write(public_key) return - def signer(label, blob): - identity = client.get_identity(label=label) - return client.sign_ssh_challenge(identity=identity, blob=blob) - try: + signer = functools.partial(ssh_sign, client=client) with server.serve(public_keys=[public_key], signer=signer) as env: return server.run_process(command=command, environ=env, use_shell=use_shell) except KeyboardInterrupt: log.info('server stopped') + + +def trezor_agent(): + run_agent(trezor.Client) diff --git a/trezor_agent/formats.py b/trezor_agent/formats.py index f6bc62f..d3f3153 100644 --- a/trezor_agent/formats.py +++ b/trezor_agent/formats.py @@ -2,15 +2,25 @@ import io import hashlib import base64 import ecdsa +import ed25519 from . import util import logging log = logging.getLogger(__name__) -DER_OCTET_STRING = b'\x04' -ECDSA_KEY_PREFIX = b'ecdsa-sha2-' -ECDSA_CURVE_NAME = b'nistp256' +# Supported ECDSA curves +CURVE_NIST256 = b'nist256p1' +CURVE_ED25519 = b'ed25519' +SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519} + +# SSH key types +SSH_NIST256_DER_OCTET = b'\x04' +SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-' +SSH_NIST256_CURVE_NAME = b'nistp256' +SSH_NIST256_KEY_TYPE = SSH_NIST256_KEY_PREFIX + SSH_NIST256_CURVE_NAME +SSH_ED25519_KEY_TYPE = b'ssh-ed25519' +SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_ED25519_KEY_TYPE} hashfunc = hashlib.sha256 @@ -20,73 +30,111 @@ def fingerprint(blob): return ':'.join('{:02x}'.format(c) for c in bytearray(digest)) -def parse_pubkey(blob, curve=ecdsa.NIST256p): +def parse_pubkey(blob): + fp = fingerprint(blob) s = io.BytesIO(blob) key_type = util.read_frame(s) log.debug('key type: %s', key_type) - curve_name = util.read_frame(s) - log.debug('curve name: %s', curve_name) - point = util.read_frame(s) - assert s.read() == b'' - _type, point = point[:1], point[1:] - assert _type == DER_OCTET_STRING - size = len(point) // 2 - assert len(point) == 2 * size - coords = (util.bytes2num(point[:size]), util.bytes2num(point[size:])) - log.debug('coordinates: %s', coords) - fp = fingerprint(blob) + assert key_type in SUPPORTED_KEY_TYPES, key_type + + result = {'blob': blob, 'type': key_type, 'fingerprint': fp} + + if key_type == SSH_NIST256_KEY_TYPE: + curve_name = util.read_frame(s) + log.debug('curve name: %s', curve_name) + point = util.read_frame(s) + assert s.read() == b'' + _type, point = point[:1], point[1:] + assert _type == SSH_NIST256_DER_OCTET + size = len(point) // 2 + assert len(point) == 2 * size + coords = (util.bytes2num(point[:size]), util.bytes2num(point[size:])) + + curve = ecdsa.NIST256p + point = ecdsa.ellipticcurve.Point(curve.curve, *coords) + vk = ecdsa.VerifyingKey.from_public_point(point, curve, hashfunc) + + def ecdsa_verifier(sig, msg): + assert len(sig) == 2 * size + sig_decode = ecdsa.util.sigdecode_string + vk.verify(signature=sig, data=msg, sigdecode=sig_decode) + parts = [sig[:size], sig[size:]] + return b''.join([util.frame(b'\x00' + p) for p in parts]) + + result.update(point=coords, curve=CURVE_NIST256, + verifier=ecdsa_verifier) + + if key_type == SSH_ED25519_KEY_TYPE: + pubkey = util.read_frame(s) + assert s.read() == b'' + vk = ed25519.VerifyingKey(pubkey) + + def ed25519_verify(sig, msg): + assert len(sig) == 64 + vk.verify(sig, msg) + return sig + + result.update(curve=CURVE_ED25519, verifier=ed25519_verify) - point = ecdsa.ellipticcurve.Point(curve.curve, *coords) - vk = ecdsa.VerifyingKey.from_public_point(point, curve, hashfunc) - result = { - 'point': coords, - 'curve': curve_name, - 'fingerprint': fp, - 'type': key_type, - 'blob': blob, - 'size': size, - 'verifying_key': vk - } return result -def decompress_pubkey(pub, curve=ecdsa.NIST256p): - P = curve.curve.p() - A = curve.curve.a() - B = curve.curve.b() - x = util.bytes2num(pub[1:33]) - beta = pow(int(x*x*x+A*x+B), int((P+1)//4), int(P)) +def decompress_pubkey(pub): + if pub[:1] == b'\x00': + # set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey() + return ed25519.VerifyingKey(pub[1:]) + + if pub[:1] in {b'\x02', b'\x03'}: # set by ecdsa_get_public_key33() + curve = ecdsa.NIST256p + P = curve.curve.p() + A = curve.curve.a() + B = curve.curve.b() + x = util.bytes2num(pub[1:33]) + beta = pow(int(x * x * x + A * x + B), int((P + 1) // 4), int(P)) - p0 = util.bytes2num(pub[:1]) - y = (P-beta) if ((beta + p0) % 2) else beta + p0 = util.bytes2num(pub[:1]) + y = (P - beta) if ((beta + p0) % 2) else beta - point = ecdsa.ellipticcurve.Point(curve.curve, x, y) - return ecdsa.VerifyingKey.from_public_point(point, curve=curve, - hashfunc=hashfunc) + point = ecdsa.ellipticcurve.Point(curve.curve, x, y) + return ecdsa.VerifyingKey.from_public_point(point, curve=curve, + hashfunc=hashfunc) + raise ValueError('invalid {!r}', pub) def serialize_verifying_key(vk): - key_type = ECDSA_KEY_PREFIX + ECDSA_CURVE_NAME - curve_name = ECDSA_CURVE_NAME - key_blob = DER_OCTET_STRING + vk.to_string() - parts = [key_type, curve_name, key_blob] - return b''.join([util.frame(p) for p in parts]) + if isinstance(vk, ed25519.keys.VerifyingKey): + pubkey = vk.to_bytes() + key_type = SSH_ED25519_KEY_TYPE + blob = util.frame(SSH_ED25519_KEY_TYPE) + util.frame(pubkey) + return key_type, blob + + if isinstance(vk, ecdsa.keys.VerifyingKey): + curve_name = SSH_NIST256_CURVE_NAME + key_blob = SSH_NIST256_DER_OCTET + vk.to_string() + parts = [SSH_NIST256_KEY_TYPE, curve_name, key_blob] + key_type = SSH_NIST256_KEY_TYPE + blob = b''.join([util.frame(p) for p in parts]) + return key_type, blob + + raise TypeError('unsupported {!r}'.format(vk)) def export_public_key(pubkey, label): - blob = serialize_verifying_key(decompress_pubkey(pubkey)) + assert len(pubkey) == 33 + key_type, blob = serialize_verifying_key(decompress_pubkey(pubkey)) + log.debug('fingerprint: %s', fingerprint(blob)) b64 = base64.b64encode(blob).decode('ascii') - key_type = ECDSA_KEY_PREFIX + ECDSA_CURVE_NAME return '{} {} {}\n'.format(key_type.decode('ascii'), b64, label) def import_public_key(line): ''' Parse public key textual format, as saved at .pub file ''' + log.debug('loading SSH public key: %r', line) file_type, base64blob, name = line.split() blob = base64.b64decode(base64blob) result = parse_pubkey(blob) result['name'] = name.encode('ascii') assert result['type'] == file_type.encode('ascii') - log.debug('loaded %s %s', file_type, result['fingerprint']) + log.debug('loaded %s public key: %s', file_type, result['fingerprint']) return result diff --git a/trezor_agent/protocol.py b/trezor_agent/protocol.py index f72200c..3dbbc53 100644 --- a/trezor_agent/protocol.py +++ b/trezor_agent/protocol.py @@ -1,4 +1,5 @@ import io +import binascii from . import util from . import formats @@ -9,15 +10,10 @@ log = logging.getLogger(__name__) SSH_AGENTC_REQUEST_RSA_IDENTITIES = 1 SSH_AGENT_RSA_IDENTITIES_ANSWER = 2 -SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES = 9 - SSH2_AGENTC_REQUEST_IDENTITIES = 11 SSH2_AGENT_IDENTITIES_ANSWER = 12 SSH2_AGENTC_SIGN_REQUEST = 13 SSH2_AGENT_SIGN_RESPONSE = 14 -SSH2_AGENTC_ADD_IDENTITY = 17 -SSH2_AGENTC_REMOVE_IDENTITY = 18 -SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19 class Error(Exception): @@ -91,23 +87,16 @@ class Handler(object): raise MissingKey('key not found') log.debug('signing %d-byte blob', len(blob)) - r, s = self.signer(label=key['name'], blob=blob) - signature = (r, s) - log.debug('signature: %s', signature) + signature = self.signer(label=key['name'], blob=blob) + log.debug('signature: %s', binascii.hexlify(signature)) try: - key['verifying_key'].verify(signature=signature, data=blob, - sigdecode=lambda sig, _: sig) + sig_bytes = key['verifier'](sig=signature, msg=blob) log.info('signature status: OK') except formats.ecdsa.BadSignatureError: log.exception('signature status: ERROR') raise BadSignature('invalid ECDSA signature') - sig_bytes = io.BytesIO() - for x in signature: - x_frame = util.frame(b'\x00' + util.num2bytes(x, key['size'])) - sig_bytes.write(x_frame) - sig_bytes = sig_bytes.getvalue() log.debug('signature size: %d bytes', len(sig_bytes)) data = util.frame(util.frame(key['type']), util.frame(sig_bytes)) diff --git a/trezor_agent/tests/test_formats.py b/trezor_agent/tests/test_formats.py index 8de32ea..232410a 100644 --- a/trezor_agent/tests/test_formats.py +++ b/trezor_agent/tests/test_formats.py @@ -1,4 +1,5 @@ import binascii +import pytest from .. import formats @@ -27,13 +28,48 @@ def test_parse_public_key(): assert key['name'] == b'home' assert key['point'] == _point - assert key['curve'] == b'nistp256' + assert key['curve'] == b'nist256p1' assert key['fingerprint'] == '4b:19:bc:0f:c8:7e:dc:fa:1a:e3:c2:ff:6f:e0:80:a2' # nopep8 assert key['type'] == b'ecdsa-sha2-nistp256' - assert key['size'] == 32 def test_decompress(): blob = '036236ceabde25207e81e404586e3a3af1acda1dfed2abbbb4876c1fc5b296b575' result = formats.export_public_key(binascii.unhexlify(blob), label='home') assert result == _public_key + + +def test_parse_ed25519(): + pubkey = ('ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFBdF2tj' + 'fSO8nLIi736is+f0erq28RTc7CkM11NZtTKR hello\n') + p = formats.import_public_key(pubkey) + assert p['name'] == b'hello' + assert p['curve'] == b'ed25519' + + BLOB = (b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#' + b'\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14' + b'\xdc\xec)\x0c\xd7SY\xb52\x91') + assert p['blob'] == BLOB + assert p['fingerprint'] == '6b:b0:77:af:e5:3a:21:6d:17:82:9b:06:19:03:a1:97' # nopep8 + assert p['type'] == b'ssh-ed25519' + + +def test_export_ed25519(): + pub = (b'\x00P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4' + b'z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91') + vk = formats.decompress_pubkey(pub) + result = formats.serialize_verifying_key(vk) + assert result == (b'ssh-ed25519', + b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc' + b'\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc' + b'\xec)\x0c\xd7SY\xb52\x91') + + +def test_decompress_error(): + with pytest.raises(ValueError): + formats.decompress_pubkey('') + + +def test_serialize_error(): + with pytest.raises(TypeError): + formats.serialize_verifying_key(None) diff --git a/trezor_agent/tests/test_protocol.py b/trezor_agent/tests/test_protocol.py index 8ca4adc..9de526a 100644 --- a/trezor_agent/tests/test_protocol.py +++ b/trezor_agent/tests/test_protocol.py @@ -5,52 +5,73 @@ import pytest # pylint: disable=line-too-long -KEY = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEUksojS/qRlTKBKLQO7CBX7a7oqFkysuFn1nJ6gzlR3wNuQXEgd7qb2bjmiiBHsjNxyWvH5SxVi3+fghrqODWo= ssh://localhost' # nopep8 -BLOB = b'\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj' # nopep8 -SIG = (61640221631134565789126560951398335114074531708367858563384221818711312348703, 51535548700089687831159696283235534298026173963719263249292887877395159425513) # nopep8 +NIST256_KEY = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEUksojS/qRlTKBKLQO7CBX7a7oqFkysuFn1nJ6gzlR3wNuQXEgd7qb2bjmiiBHsjNxyWvH5SxVi3+fghrqODWo= ssh://localhost' # nopep8 +NIST256_BLOB = b'\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj' # nopep8 +NIST256_SIG = b'\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1fq\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8 LIST_MSG = b'\x0b' -LIST_REPLY = b'\x00\x00\x00\x84\x0c\x00\x00\x00\x01\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x0fssh://localhost' # nopep8 +LIST_NIST256_REPLY = b'\x00\x00\x00\x84\x0c\x00\x00\x00\x01\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x0fssh://localhost' # nopep8 -SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\xd1\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x00' # nopep8 -SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8 +NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\xd1\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x00' # nopep8 +NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8 def test_list(): - key = formats.import_public_key(KEY) + key = formats.import_public_key(NIST256_KEY) h = protocol.Handler(keys=[key], signer=None) reply = h.handle(LIST_MSG) - assert reply == LIST_REPLY + assert reply == LIST_NIST256_REPLY -def signer(label, blob): +def ecdsa_signer(label, blob): assert label == b'ssh://localhost' - assert blob == BLOB - return SIG + assert blob == NIST256_BLOB + return NIST256_SIG -def test_sign(): - key = formats.import_public_key(KEY) - h = protocol.Handler(keys=[key], signer=signer) - reply = h.handle(SIGN_MSG) - assert reply == SIGN_REPLY +def test_ecdsa_sign(): + key = formats.import_public_key(NIST256_KEY) + h = protocol.Handler(keys=[key], signer=ecdsa_signer) + reply = h.handle(NIST256_SIGN_MSG) + assert reply == NIST256_SIGN_REPLY def test_sign_missing(): - h = protocol.Handler(keys=[], signer=signer) + h = protocol.Handler(keys=[], signer=ecdsa_signer) with pytest.raises(protocol.MissingKey): - h.handle(SIGN_MSG) + h.handle(NIST256_SIGN_MSG) def test_sign_wrong(): def wrong_signature(label, blob): assert label == b'ssh://localhost' - assert blob == BLOB - return (0, 0) + assert blob == NIST256_BLOB + return b'\x00' * 64 - key = formats.import_public_key(KEY) + key = formats.import_public_key(NIST256_KEY) h = protocol.Handler(keys=[key], signer=wrong_signature) with pytest.raises(protocol.BadSignature): - h.handle(SIGN_MSG) + h.handle(NIST256_SIGN_MSG) + + +ED25519_KEY = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFBdF2tjfSO8nLIi736is+f0erq28RTc7CkM11NZtTKR ssh://localhost' # nopep8 +ED25519_SIGN_MSG = b'''\r\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x94\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\xc0\x9f\x97\x0fTC!\x80\x07\x91\xdb^8\xc1\xd62\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x00''' # nopep8 +ED25519_SIGN_REPLY = b'''\x00\x00\x00X\x0e\x00\x00\x00S\x00\x00\x00\x0bssh-ed25519\x00\x00\x00@\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3