diff --git a/libagent/gpg/__init__.py b/libagent/gpg/__init__.py index 3d4bbde..0c7ab4b 100644 --- a/libagent/gpg/__init__.py +++ b/libagent/gpg/__init__.py @@ -10,6 +10,7 @@ See these links for more details: import argparse import contextlib +import functools import logging import os import sys @@ -28,10 +29,12 @@ def export_public_key(device_type, args): log.warning('NOTE: in order to re-generate the exact same GPG key later, ' 'run this command with "--time=%d" commandline flag (to set ' 'the timestamp of the GPG key manually).', args.time) - c = client.Client(user_id=args.user_id, curve_name=args.ecdsa_curve, - device_type=device_type) - verifying_key = c.pubkey(ecdh=False) - decryption_key = c.pubkey(ecdh=True) + c = client.Client(device=device_type()) + identity = client.create_identity(user_id=args.user_id, + curve_name=args.ecdsa_curve) + verifying_key = c.pubkey(identity=identity, ecdh=False) + decryption_key = c.pubkey(identity=identity, ecdh=True) + signer_func = functools.partial(c.sign, identity=identity) if args.subkey: # add as subkey log.info('adding %s GPG subkey for "%s" to existing key', @@ -47,10 +50,10 @@ def export_public_key(device_type, args): primary_bytes = keyring.export_public_key(args.user_id) result = encode.create_subkey(primary_bytes=primary_bytes, subkey=signing_key, - signer_func=c.sign) + signer_func=signer_func) result = encode.create_subkey(primary_bytes=result, subkey=encryption_key, - signer_func=c.sign) + signer_func=signer_func) else: # add as primary log.info('creating new %s GPG primary key for "%s"', args.ecdsa_curve, args.user_id) @@ -65,10 +68,10 @@ def export_public_key(device_type, args): result = encode.create_primary(user_id=args.user_id, pubkey=primary, - signer_func=c.sign) + signer_func=signer_func) result = encode.create_subkey(primary_bytes=result, subkey=subkey, - signer_func=c.sign) + signer_func=signer_func) sys.stdout.write(protocol.armor(result, 'PUBLIC KEY BLOCK')) @@ -112,12 +115,13 @@ def run_agent(device_type): util.setup_logging(verbosity=int(config['verbosity']), filename=config['log-file']) sock_path = keyring.get_agent_sock_path() + handler = agent.Handler(device=device_type()) with server.unix_domain_socket_server(sock_path) as sock: for conn in agent.yield_connections(sock): with contextlib.closing(conn): try: - agent.handle_connection(conn=conn, device_type=device_type) - except StopIteration: + handler.handle(conn) + except agent.AgentStop: log.info('stopping gpg-agent') return except Exception as e: # pylint: disable=broad-except diff --git a/libagent/gpg/agent.py b/libagent/gpg/agent.py index f4aa724..81fc9ff 100644 --- a/libagent/gpg/agent.py +++ b/libagent/gpg/agent.py @@ -36,41 +36,6 @@ def sig_encode(r, s): return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))' -def open_connection(keygrip_bytes, device_type): - """ - Connect to the device for the specified keygrip. - - Parse GPG public key to find the first user ID, which is used to - specify the correct signature/decryption key on the device. - """ - pubkey_dict, user_ids = decode.load_by_keygrip( - pubkey_bytes=keyring.export_public_keys(), - keygrip=keygrip_bytes) - # We assume the first user ID is used to generate TREZOR-based GPG keys. - user_id = user_ids[0]['value'].decode('ascii') - curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid']) - ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) - - conn = client.Client(user_id, curve_name=curve_name, device_type=device_type) - pubkey = protocol.PublicKey( - curve_name=curve_name, created=pubkey_dict['created'], - verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh) - assert pubkey.key_id() == pubkey_dict['key_id'] - assert pubkey.keygrip() == keygrip_bytes - return conn - - -def pksign(keygrip, digest, algo, device_type): - """Sign a message digest using a private EC key.""" - log.debug('signing %r digest (algo #%s)', digest, algo) - keygrip_bytes = binascii.unhexlify(keygrip) - conn = open_connection(keygrip_bytes, device_type=device_type) - r, s = conn.sign(binascii.unhexlify(digest)) - result = sig_encode(r, s) - log.debug('result: %r', result) - return result - - def _serialize_point(data): prefix = '{}:'.format(len(data)).encode('ascii') # https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html @@ -92,80 +57,143 @@ def parse_ecdh(line): return dict(items)[b'e'] -def pkdecrypt(keygrip, conn, device_type): - """Handle decryption using ECDH.""" - for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']: - keyring.sendline(conn, msg) - - line = keyring.recvline(conn) - assert keyring.recvline(conn) == b'END' - remote_pubkey = parse_ecdh(line) - - keygrip_bytes = binascii.unhexlify(keygrip) - conn = open_connection(keygrip_bytes, device_type=device_type) - return _serialize_point(conn.ecdh(remote_pubkey)) - - -@util.memoize -def have_key(keygrip, device_type): - """Check if current keygrip correspond to a TREZOR-based key.""" - try: - open_connection(keygrip_bytes=binascii.unhexlify(keygrip), - device_type=device_type) - return True - except KeyError as e: - log.warning('HAVEKEY(%s) failed: %s', keygrip, e) - return False - - -# pylint: disable=too-many-branches -def handle_connection(conn, device_type): - """Handle connection from GPG binary using the ASSUAN protocol.""" - keygrip = None - digest = None - algo = None - version = keyring.gpg_version() # "Clone" existing GPG version - - keyring.sendline(conn, b'OK') - for line in keyring.iterlines(conn): - parts = line.split(b' ') - command = parts[0] - args = parts[1:] - if command in {b'RESET', b'OPTION', b'SETKEYDESC'}: - pass # reply with OK - elif command == b'GETINFO': - keyring.sendline(conn, b'D ' + version) - elif command == b'AGENT_ID': - keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID - elif command in {b'SIGKEY', b'SETKEY'}: - keygrip, = args - elif command == b'SETHASH': - algo, digest = args - elif command == b'PKSIGN': - sig = pksign(keygrip, digest, algo, device_type=device_type) - keyring.sendline(conn, b'D ' + sig) - elif command == b'PKDECRYPT': - sec = pkdecrypt(keygrip, conn, device_type=device_type) - keyring.sendline(conn, b'D ' + sec) - elif command == b'HAVEKEY': - if not have_key(keygrip=args[0], device_type=device_type): - keyring.sendline(conn, - b'ERR 67108881 No secret key ') +class AgentError(Exception): + """GnuPG agent-related error.""" + + +class AgentStop(Exception): + """Raised to close the agent.""" + + +class Handler(object): + """GPG agent requests' handler.""" + + def __init__(self, device): + """C-tor.""" + self.client = client.Client(device=device) + # Cache ASSUAN commands' arguments between commands + self.keygrip = None + self.digest = None + self.algo = None + # Cache public keys from GnuPG + self.pubkey_bytes = keyring.export_public_keys() + # "Clone" existing GPG version + version = keyring.gpg_version() + + self.handlers = { + b'RESET': None, + b'OPTION': None, + b'SETKEYDESC': None, + b'GETINFO': lambda conn, _: keyring.sendline(conn, b'D ' + version), + b'AGENT_ID': lambda conn, _: keyring.sendline(conn, b'D TREZOR'), # "Fake" agent ID + b'SIGKEY': lambda _, args: self.set_key(*args), + b'SETKEY': lambda _, args: self.set_key(*args), + b'SETHASH': lambda _, args: self.set_hash(*args), + b'PKSIGN': lambda conn, _: self.pksign(conn), + b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn), + b'HAVEKEY': lambda _, args: self.have_key(*args), + b'KEYINFO': lambda conn, _: self.key_info(conn) + } + + @util.memoize + def get_identity(self, keygrip): + """ + Returns device.interface.Identity that matches specified keygrip. + + In case of missing keygrip, KeyError will be raised. + """ + keygrip_bytes = binascii.unhexlify(keygrip) + pubkey_dict, user_ids = decode.load_by_keygrip( + pubkey_bytes=self.pubkey_bytes, keygrip=keygrip_bytes) + # We assume the first user ID is used to generate TREZOR-based GPG keys. + user_id = user_ids[0]['value'].decode('ascii') + curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid']) + ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID) + + identity = client.create_identity(user_id=user_id, curve_name=curve_name) + verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh) + pubkey = protocol.PublicKey( + curve_name=curve_name, created=pubkey_dict['created'], + verifying_key=verifying_key, ecdh=ecdh) + assert pubkey.key_id() == pubkey_dict['key_id'] + assert pubkey.keygrip() == keygrip_bytes + return identity + + def pksign(self, conn): + """Sign a message digest using a private EC key.""" + log.debug('signing %r digest (algo #%s)', self.digest, self.algo) + identity = self.get_identity(keygrip=self.keygrip) + r, s = self.client.sign(identity=identity, + digest=binascii.unhexlify(self.digest)) + result = sig_encode(r, s) + log.debug('result: %r', result) + keyring.sendline(conn, b'D ' + result) + + def pkdecrypt(self, conn): + """Handle decryption using ECDH.""" + for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']: + keyring.sendline(conn, msg) + + line = keyring.recvline(conn) + assert keyring.recvline(conn) == b'END' + remote_pubkey = parse_ecdh(line) + + identity = self.get_identity(keygrip=self.keygrip) + ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey) + keyring.sendline(conn, b'D ' + _serialize_point(ec_point)) + + @util.memoize + def have_key(self, *keygrips): + """Check if current keygrip correspond to a TREZOR-based key.""" + try: + self.get_identity(keygrip=keygrips[0]) + except KeyError as e: + log.warning('HAVEKEY(%s) failed: %s', keygrips, e) + raise AgentError(b'ERR 67108881 No secret key ') + + def key_info(self, conn): + """ + Dummy reply (mainly for 'gpg --edit' to succeed). + + For details, see GnuPG agent KEYINFO command help. + https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082 + """ + fmt = 'S KEYINFO {0} X - - - - - - -' + keyring.sendline(conn, fmt.format(self.keygrip).encode('ascii')) + + def set_key(self, keygrip): + """Set hexadecimal keygrip for next operation.""" + self.keygrip = keygrip + + def set_hash(self, algo, digest): + """Set algorithm ID and hexadecimal digest for next operation.""" + self.algo = algo + self.digest = digest + + def handle(self, conn): + """Handle connection from GPG binary using the ASSUAN protocol.""" + keyring.sendline(conn, b'OK') + for line in keyring.iterlines(conn): + parts = line.split(b' ') + command = parts[0] + args = parts[1:] + + if command == b'BYE': return - elif command == b'KEYINFO': - keygrip, = args - # Dummy reply (mainly for 'gpg --edit' to succeed). - # For details, see GnuPG agent KEYINFO command help. - # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082 - fmt = 'S KEYINFO {0} X - - - - - - -' - keyring.sendline(conn, fmt.format(keygrip).encode('ascii')) - elif command == b'BYE': - return - elif command == b'KILLAGENT': + elif command == b'KILLAGENT': + keyring.sendline(conn, b'OK') + raise AgentStop() + + if command not in self.handlers: + log.error('unknown request: %r', line) + continue + + handler = self.handlers[command] + if handler: + try: + handler(conn, args) + except AgentError as e: + msg, = e.args + keyring.sendline(conn, msg) + continue keyring.sendline(conn, b'OK') - raise StopIteration - else: - log.error('unknown request: %r', line) - return - - keyring.sendline(conn, b'OK') diff --git a/libagent/gpg/client.py b/libagent/gpg/client.py index 80ad50d..1d20fb3 100644 --- a/libagent/gpg/client.py +++ b/libagent/gpg/client.py @@ -2,43 +2,47 @@ import logging -from .. import device, formats, util +from .. import formats, util +from ..device import interface log = logging.getLogger(__name__) +def create_identity(user_id, curve_name): + """Create GPG identity for hardware device.""" + result = interface.Identity(identity_str='gpg://', curve_name=curve_name) + result.identity_dict['host'] = user_id + return result + + class Client(object): """Sign messages and get public keys from a hardware device.""" - def __init__(self, user_id, curve_name, device_type): - """Connect to the device and retrieve required public key.""" - self.device = device_type() - self.user_id = user_id - self.identity = device.interface.Identity( - identity_str='gpg://', curve_name=curve_name) - self.identity.identity_dict['host'] = user_id + def __init__(self, device): + """C-tor.""" + self.device = device - def pubkey(self, ecdh=False): + def pubkey(self, identity, ecdh=False): """Return public key as VerifyingKey object.""" with self.device: - pubkey = self.device.pubkey(ecdh=ecdh, identity=self.identity) + pubkey = self.device.pubkey(ecdh=ecdh, identity=identity) return formats.decompress_pubkey( - pubkey=pubkey, curve_name=self.identity.curve_name) + pubkey=pubkey, curve_name=identity.curve_name) - def sign(self, digest): + def sign(self, identity, digest): """Sign the digest and return a serialized signature.""" log.info('please confirm GPG signature on %s for "%s"...', - self.device, self.user_id) - if self.identity.curve_name == formats.CURVE_NIST256: + self.device, identity) + if identity.curve_name == formats.CURVE_NIST256: digest = digest[:32] # sign the first 256 bits log.debug('signing digest: %s', util.hexlify(digest)) with self.device: - sig = self.device.sign(blob=digest, identity=self.identity) + sig = self.device.sign(blob=digest, identity=identity) return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:])) - def ecdh(self, pubkey): + def ecdh(self, identity, pubkey): """Derive shared secret using ECDH from remote public key.""" log.info('please confirm GPG decryption on %s for "%s"...', - self.device, self.user_id) + self.device, identity) with self.device: - return self.device.ecdh(pubkey=pubkey, identity=self.identity) + return self.device.ecdh(pubkey=pubkey, identity=identity)