diff --git a/setup.py b/setup.py index eb3f91b..03a4ec3 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,8 @@ setup( author_email='roman.zeyde@gmail.com', url='http://github.com/romanz/trezor-agent', packages=['trezor_agent', 'trezor_agent.gpg'], - install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'trezor>=0.7.4', 'semver>=2.2'], + install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=3.0.0', 'semver>=2.2', + 'trezor>=0.7.6', 'keepkey>=0.7.3', 'ledgerblue>=0.1.8'], platforms=['POSIX'], classifiers=[ 'Environment :: Console', @@ -27,10 +28,6 @@ setup( 'Topic :: Security', 'Topic :: Utilities', ], - extras_require={ - 'trezorlib': ['python-trezor>=0.7.6'], - 'keepkeylib': ['keepkey>=0.7.3'], - }, entry_points={'console_scripts': [ 'trezor-agent = trezor_agent.__main__:run_agent', 'trezor-git = trezor_agent.__main__:run_git', diff --git a/trezor_agent/_ledger.py b/trezor_agent/_ledger.py deleted file mode 100644 index 1d159de..0000000 --- a/trezor_agent/_ledger.py +++ /dev/null @@ -1,148 +0,0 @@ -"""TREZOR-like interface for Ledger hardware wallet.""" -import binascii -import struct - -from trezorlib.types_pb2 import IdentityType # pylint: disable=import-error,unused-import -from . import util - - -class LedgerClientConnection(object): - """Mock for TREZOR-like connection object.""" - - def __init__(self, dongle): - """Create connection.""" - self.dongle = dongle - - @staticmethod - def expand_path(path): - """Convert BIP32 path into bytes.""" - return b''.join((struct.pack('>I', e) for e in path)) - - @staticmethod - def convert_public_key(ecdsa_curve_name, result): - """Convert Ledger reply into PublicKey object.""" - from trezorlib.messages_pb2 import PublicKey # pylint: disable=import-error - if ecdsa_curve_name == 'nist256p1': - if (result[64] & 1) != 0: - result = bytearray([0x03]) + result[1:33] - else: - result = bytearray([0x02]) + result[1:33] - else: - result = result[1:] - keyX = bytearray(result[0:32]) - keyY = bytearray(result[32:][::-1]) - if (keyX[31] & 1) != 0: - keyY[31] |= 0x80 - result = b'\x00' + bytes(keyY) - publicKey = PublicKey() - publicKey.node.public_key = bytes(result) - return publicKey - - # pylint: disable=unused-argument - def get_public_node(self, n, ecdsa_curve_name='secp256k1', show_display=False): - """Get PublicKey object for specified BIP32 address and elliptic curve.""" - donglePath = LedgerClientConnection.expand_path(n) - if ecdsa_curve_name == 'nist256p1': - p2 = '01' - else: - p2 = '02' - apdu = '800200' + p2 - apdu = binascii.unhexlify(apdu) - apdu += bytearray([len(donglePath) + 1, len(donglePath) // 4]) - apdu += donglePath - result = bytearray(self.dongle.exchange(bytes(apdu)))[1:] - return LedgerClientConnection.convert_public_key(ecdsa_curve_name, result) - - # pylint: disable=too-many-locals - def sign_identity(self, identity, challenge_hidden, challenge_visual, - ecdsa_curve_name='secp256k1'): - """Sign specified challenges using secret key derived from given identity.""" - from trezorlib.messages_pb2 import SignedIdentity # pylint: disable=import-error - n = util.get_bip32_address(identity) - donglePath = LedgerClientConnection.expand_path(n) - if identity.proto == 'ssh': - ins = '04' - p1 = '00' - else: - ins = '08' - p1 = '00' - if ecdsa_curve_name == 'nist256p1': - p2 = '81' if identity.proto == 'ssh' else '01' - else: - p2 = '82' if identity.proto == 'ssh' else '02' - apdu = '80' + ins + p1 + p2 - apdu = binascii.unhexlify(apdu) - apdu += bytearray([len(challenge_hidden) + len(donglePath) + 1]) - apdu += bytearray([len(donglePath) // 4]) + donglePath - apdu += challenge_hidden - result = bytearray(self.dongle.exchange(bytes(apdu))) - if ecdsa_curve_name == 'nist256p1': - offset = 3 - length = result[offset] - r = result[offset+1:offset+1+length] - if r[0] == 0: - r = r[1:] - offset = offset + 1 + length + 1 - length = result[offset] - s = result[offset+1:offset+1+length] - if s[0] == 0: - s = s[1:] - offset = offset + 1 + length - signature = SignedIdentity() - signature.signature = b'\x00' + bytes(r) + bytes(s) - if identity.proto == 'ssh': - keyData = result[offset:] - pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) - signature.public_key = pk.node.public_key - return signature - else: - signature = SignedIdentity() - signature.signature = b'\x00' + bytes(result[0:64]) - if identity.proto == 'ssh': - keyData = result[64:] - pk = LedgerClientConnection.convert_public_key(ecdsa_curve_name, keyData) - signature.public_key = pk.node.public_key - return signature - - def get_ecdh_session_key(self, identity, peer_public_key, ecdsa_curve_name='secp256k1'): - """Create shared secret key for GPG decryption.""" - from trezorlib.messages_pb2 import ECDHSessionKey # pylint: disable=import-error - n = util.get_bip32_address(identity, True) - donglePath = LedgerClientConnection.expand_path(n) - if ecdsa_curve_name == 'nist256p1': - p2 = '01' - else: - p2 = '02' - apdu = '800a00' + p2 - apdu = binascii.unhexlify(apdu) - apdu += bytearray([len(peer_public_key) + len(donglePath) + 1]) - apdu += bytearray([len(donglePath) // 4]) + donglePath - apdu += peer_public_key - result = bytearray(self.dongle.exchange(bytes(apdu))) - sessionKey = ECDHSessionKey() - sessionKey.session_key = bytes(result) - return sessionKey - - def clear_session(self): - """Mock for TREZOR interface compatibility.""" - pass - - def close(self): - """Close connection.""" - self.dongle.close() - - # pylint: disable=unused-argument - # pylint: disable=no-self-use - def ping(self, msg, button_protection=False, pin_protection=False, - passphrase_protection=False): - """Mock for TREZOR interface compatibility.""" - return msg - - -class CallException(Exception): - """Ledger-related error (mainly for TREZOR compatibility).""" - - def __init__(self, code, message): - """Create an error.""" - super(CallException, self).__init__() - self.args = [code, message] diff --git a/trezor_agent/device/__init__.py b/trezor_agent/device/__init__.py new file mode 100644 index 0000000..65e915c --- /dev/null +++ b/trezor_agent/device/__init__.py @@ -0,0 +1,28 @@ +"""Cryptographic hardware device management.""" + +import logging + +from . import trezor +from . import keepkey +from . import ledger +from . import interface + +log = logging.getLogger(__name__) + +DEVICE_TYPES = [ + trezor.Trezor, + keepkey.KeepKey, + ledger.LedgerNanoS, +] + + +def detect(identity_str, curve_name): + """Detect the first available device and return it to the user.""" + for device_type in DEVICE_TYPES: + try: + with device_type(identity_str, curve_name) as d: + return d + except interface.NotFoundError as e: + log.debug('device not found: %s', e) + raise IOError('No device found: "{}" ({})'.format(identity_str, + curve_name)) diff --git a/trezor_agent/device/interface.py b/trezor_agent/device/interface.py new file mode 100644 index 0000000..b175efe --- /dev/null +++ b/trezor_agent/device/interface.py @@ -0,0 +1,125 @@ +"""Device abstraction layer.""" + +import hashlib +import io +import logging +import re +import struct + +from .. import formats, util + +log = logging.getLogger(__name__) + +_identity_regexp = re.compile(''.join([ + '^' + r'(?:(?P.*)://)?', + r'(?:(?P.*)@)?', + r'(?P.*?)', + r'(?::(?P\w*))?', + r'(?P/.*)?', + '$' +])) + + +def string_to_identity(identity_str): + """Parse string into Identity dictionary.""" + m = _identity_regexp.match(identity_str) + result = m.groupdict() + log.debug('parsed identity: %s', result) + return {k: v for k, v in result.items() if v} + + +def identity_to_string(identity_dict): + """Dump Identity dictionary into its string representation.""" + result = [] + if identity_dict.get('proto'): + result.append(identity_dict['proto'] + '://') + if identity_dict.get('user'): + result.append(identity_dict['user'] + '@') + result.append(identity_dict['host']) + if identity_dict.get('port'): + result.append(':' + identity_dict['port']) + if identity_dict.get('path'): + result.append(identity_dict['path']) + return ''.join(result) + + +def get_bip32_address(identity_dict, ecdh=False): + """Compute BIP32 derivation address according to SLIP-0013/0017.""" + index = struct.pack('I', e) for e in path)) + + +def _convert_public_key(ecdsa_curve_name, result): + """Convert Ledger reply into PublicKey object.""" + if ecdsa_curve_name == 'nist256p1': + if (result[64] & 1) != 0: + result = bytearray([0x03]) + result[1:33] + else: + result = bytearray([0x02]) + result[1:33] + else: + result = result[1:] + keyX = bytearray(result[0:32]) + keyY = bytearray(result[32:][::-1]) + if (keyX[31] & 1) != 0: + keyY[31] |= 0x80 + result = b'\x00' + bytes(keyY) + return bytes(result) + + +class LedgerNanoS(interface.Device): + """Connection to Ledger Nano S device.""" + + def connect(self): + """Enumerate and connect to the first USB HID interface.""" + try: + return comm.getDongle() + except comm.CommException as e: + raise interface.NotFoundError( + '{} not connected: "{}"'.format(self, e)) + + def pubkey(self, ecdh=False): + """Get PublicKey object for specified BIP32 address and elliptic curve.""" + curve_name = self.get_curve_name(ecdh) + path = _expand_path(interface.get_bip32_address(self.identity_dict, + ecdh=ecdh)) + if curve_name == 'nist256p1': + p2 = '01' + else: + p2 = '02' + apdu = '800200' + p2 + apdu = binascii.unhexlify(apdu) + apdu += bytearray([len(path) + 1, len(path) // 4]) + apdu += path + result = bytearray(self.conn.exchange(bytes(apdu)))[1:] + return _convert_public_key(curve_name, result) + + def sign(self, blob): + """Sign given blob and return the signature (as bytes).""" + path = _expand_path(interface.get_bip32_address(self.identity_dict, + ecdh=False)) + if self.identity_dict['proto'] == 'ssh': + ins = '04' + p1 = '00' + else: + ins = '08' + p1 = '00' + if self.curve_name == 'nist256p1': + p2 = '81' if self.identity_dict['proto'] == 'ssh' else '01' + else: + p2 = '82' if self.identity_dict['proto'] == 'ssh' else '02' + apdu = '80' + ins + p1 + p2 + apdu = binascii.unhexlify(apdu) + apdu += bytearray([len(blob) + len(path) + 1]) + apdu += bytearray([len(path) // 4]) + path + apdu += blob + result = bytearray(self.conn.exchange(bytes(apdu))) + if self.curve_name == 'nist256p1': + offset = 3 + length = result[offset] + r = result[offset+1:offset+1+length] + if r[0] == 0: + r = r[1:] + offset = offset + 1 + length + 1 + length = result[offset] + s = result[offset+1:offset+1+length] + if s[0] == 0: + s = s[1:] + offset = offset + 1 + length + return bytes(r) + bytes(s) + else: + return bytes(result[:64]) + + def ecdh(self, pubkey): + """Get shared session key using Elliptic Curve Diffie-Hellman.""" + path = _expand_path(interface.get_bip32_address(self.identity_dict, + ecdh=True)) + if self.curve_name == 'nist256p1': + p2 = '01' + else: + p2 = '02' + apdu = '800a00' + p2 + apdu = binascii.unhexlify(apdu) + apdu += bytearray([len(pubkey) + len(path) + 1]) + apdu += bytearray([len(path) // 4]) + path + apdu += pubkey + result = bytearray(self.conn.exchange(bytes(apdu))) + assert result[0] == 0x04 + return bytes(result) diff --git a/trezor_agent/device/trezor.py b/trezor_agent/device/trezor.py new file mode 100644 index 0000000..1f538a3 --- /dev/null +++ b/trezor_agent/device/trezor.py @@ -0,0 +1,108 @@ +"""TREZOR-related code (see http://bitcointrezor.com/).""" + +import binascii +import logging +import semver + +from . import interface + +log = logging.getLogger(__name__) + + +class Trezor(interface.Device): + """Connection to TREZOR device.""" + + from . import trezor_defs as defs + + required_version = '>=1.4.0' + + def connect(self): + """Enumerate and connect to the first USB HID interface.""" + def empty_passphrase_handler(_): + return self.defs.PassphraseAck(passphrase='') + + for d in self.defs.HidTransport.enumerate(): + log.debug('endpoint: %s', d) + transport = self.defs.HidTransport(d) + connection = self.defs.Client(transport) + connection.callback_PassphraseRequest = empty_passphrase_handler + f = connection.features + log.debug('connected to %s %s', self, f.device_id) + log.debug('label : %s', f.label) + log.debug('vendor : %s', f.vendor) + current_version = '{}.{}.{}'.format(f.major_version, + f.minor_version, + f.patch_version) + log.debug('version : %s', current_version) + log.debug('revision : %s', binascii.hexlify(f.revision)) + if not semver.match(current_version, self.required_version): + fmt = ('Please upgrade your {} firmware to {} version' + ' (current: {})') + raise ValueError(fmt.format(self, self.required_version, + current_version)) + connection.ping(msg='', pin_protection=True) # unlock PIN + return connection + raise interface.NotFoundError('{} not connected'.format(self)) + + def close(self): + """Close connection.""" + self.conn.close() + + def pubkey(self, ecdh=False): + """Return public key.""" + curve_name = self.get_curve_name(ecdh=ecdh) + log.debug('"%s" getting public key (%s) from %s', + interface.identity_to_string(self.identity_dict), + curve_name, self) + addr = interface.get_bip32_address(self.identity_dict, ecdh=ecdh) + result = self.conn.get_public_node(n=addr, + ecdsa_curve_name=curve_name) + log.debug('result: %s', result) + return result.node.public_key + + def _identity_proto(self): + result = self.defs.IdentityType() + for name, value in self.identity_dict.items(): + setattr(result, name, value) + return result + + def sign(self, blob): + """Sign given blob and return the signature (as bytes).""" + curve_name = self.get_curve_name(ecdh=False) + log.debug('"%s" signing %r (%s) on %s', + interface.identity_to_string(self.identity_dict), blob, + curve_name, self) + try: + result = self.conn.sign_identity( + identity=self._identity_proto(), + challenge_hidden=blob, + challenge_visual='', + ecdsa_curve_name=curve_name) + log.debug('result: %s', result) + assert len(result.signature) == 65 + assert result.signature[:1] == b'\x00' + return result.signature[1:] + except self.defs.CallException as e: + msg = '{} error: {}'.format(self, e) + log.debug(msg, exc_info=True) + raise interface.DeviceError(msg) + + def ecdh(self, pubkey): + """Get shared session key using Elliptic Curve Diffie-Hellman.""" + curve_name = self.get_curve_name(ecdh=True) + log.debug('"%s" shared session key (%s) for %r from %s', + interface.identity_to_string(self.identity_dict), + curve_name, pubkey, self) + try: + result = self.conn.get_ecdh_session_key( + identity=self._identity_proto(), + peer_public_key=pubkey, + ecdsa_curve_name=curve_name) + log.debug('result: %s', result) + assert len(result.session_key) in {65, 33} # NIST256 or Curve25519 + assert result.session_key[:1] == b'\x04' + return result.session_key + except self.defs.CallException as e: + msg = '{} error: {}'.format(self, e) + log.debug(msg, exc_info=True) + raise interface.DeviceError(msg) diff --git a/trezor_agent/device/trezor_defs.py b/trezor_agent/device/trezor_defs.py new file mode 100644 index 0000000..2dff8ee --- /dev/null +++ b/trezor_agent/device/trezor_defs.py @@ -0,0 +1,8 @@ +"""TREZOR-related definitions.""" + +# pylint: disable=unused-import +from trezorlib.client import TrezorClient as Client +from trezorlib.client import CallException +from trezorlib.transport_hid import HidTransport +from trezorlib.messages_pb2 import PassphraseAck +from trezorlib.types_pb2 import IdentityType