|
|
|
@ -36,41 +36,6 @@ def sig_encode(r, s):
|
|
|
|
|
return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def open_connection(keygrip_bytes, device_type):
|
|
|
|
|
"""
|
|
|
|
|
Connect to the device for the specified keygrip.
|
|
|
|
|
|
|
|
|
|
Parse GPG public key to find the first user ID, which is used to
|
|
|
|
|
specify the correct signature/decryption key on the device.
|
|
|
|
|
"""
|
|
|
|
|
pubkey_dict, user_ids = decode.load_by_keygrip(
|
|
|
|
|
pubkey_bytes=keyring.export_public_keys(),
|
|
|
|
|
keygrip=keygrip_bytes)
|
|
|
|
|
# We assume the first user ID is used to generate TREZOR-based GPG keys.
|
|
|
|
|
user_id = user_ids[0]['value'].decode('ascii')
|
|
|
|
|
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
|
|
|
|
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
|
|
|
|
|
|
|
|
|
conn = client.Client(user_id, curve_name=curve_name, device_type=device_type)
|
|
|
|
|
pubkey = protocol.PublicKey(
|
|
|
|
|
curve_name=curve_name, created=pubkey_dict['created'],
|
|
|
|
|
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
|
|
|
|
|
assert pubkey.key_id() == pubkey_dict['key_id']
|
|
|
|
|
assert pubkey.keygrip() == keygrip_bytes
|
|
|
|
|
return conn
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pksign(keygrip, digest, algo, device_type):
|
|
|
|
|
"""Sign a message digest using a private EC key."""
|
|
|
|
|
log.debug('signing %r digest (algo #%s)', digest, algo)
|
|
|
|
|
keygrip_bytes = binascii.unhexlify(keygrip)
|
|
|
|
|
conn = open_connection(keygrip_bytes, device_type=device_type)
|
|
|
|
|
r, s = conn.sign(binascii.unhexlify(digest))
|
|
|
|
|
result = sig_encode(r, s)
|
|
|
|
|
log.debug('result: %r', result)
|
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _serialize_point(data):
|
|
|
|
|
prefix = '{}:'.format(len(data)).encode('ascii')
|
|
|
|
|
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
|
|
|
|
@ -92,80 +57,143 @@ def parse_ecdh(line):
|
|
|
|
|
return dict(items)[b'e']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pkdecrypt(keygrip, conn, device_type):
|
|
|
|
|
"""Handle decryption using ECDH."""
|
|
|
|
|
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
|
|
|
|
keyring.sendline(conn, msg)
|
|
|
|
|
|
|
|
|
|
line = keyring.recvline(conn)
|
|
|
|
|
assert keyring.recvline(conn) == b'END'
|
|
|
|
|
remote_pubkey = parse_ecdh(line)
|
|
|
|
|
|
|
|
|
|
keygrip_bytes = binascii.unhexlify(keygrip)
|
|
|
|
|
conn = open_connection(keygrip_bytes, device_type=device_type)
|
|
|
|
|
return _serialize_point(conn.ecdh(remote_pubkey))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@util.memoize
|
|
|
|
|
def have_key(keygrip, device_type):
|
|
|
|
|
"""Check if current keygrip correspond to a TREZOR-based key."""
|
|
|
|
|
try:
|
|
|
|
|
open_connection(keygrip_bytes=binascii.unhexlify(keygrip),
|
|
|
|
|
device_type=device_type)
|
|
|
|
|
return True
|
|
|
|
|
except KeyError as e:
|
|
|
|
|
log.warning('HAVEKEY(%s) failed: %s', keygrip, e)
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# pylint: disable=too-many-branches
|
|
|
|
|
def handle_connection(conn, device_type):
|
|
|
|
|
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
|
|
|
|
keygrip = None
|
|
|
|
|
digest = None
|
|
|
|
|
algo = None
|
|
|
|
|
version = keyring.gpg_version() # "Clone" existing GPG version
|
|
|
|
|
|
|
|
|
|
keyring.sendline(conn, b'OK')
|
|
|
|
|
for line in keyring.iterlines(conn):
|
|
|
|
|
parts = line.split(b' ')
|
|
|
|
|
command = parts[0]
|
|
|
|
|
args = parts[1:]
|
|
|
|
|
if command in {b'RESET', b'OPTION', b'SETKEYDESC'}:
|
|
|
|
|
pass # reply with OK
|
|
|
|
|
elif command == b'GETINFO':
|
|
|
|
|
keyring.sendline(conn, b'D ' + version)
|
|
|
|
|
elif command == b'AGENT_ID':
|
|
|
|
|
keyring.sendline(conn, b'D TREZOR') # "Fake" agent ID
|
|
|
|
|
elif command in {b'SIGKEY', b'SETKEY'}:
|
|
|
|
|
keygrip, = args
|
|
|
|
|
elif command == b'SETHASH':
|
|
|
|
|
algo, digest = args
|
|
|
|
|
elif command == b'PKSIGN':
|
|
|
|
|
sig = pksign(keygrip, digest, algo, device_type=device_type)
|
|
|
|
|
keyring.sendline(conn, b'D ' + sig)
|
|
|
|
|
elif command == b'PKDECRYPT':
|
|
|
|
|
sec = pkdecrypt(keygrip, conn, device_type=device_type)
|
|
|
|
|
keyring.sendline(conn, b'D ' + sec)
|
|
|
|
|
elif command == b'HAVEKEY':
|
|
|
|
|
if not have_key(keygrip=args[0], device_type=device_type):
|
|
|
|
|
keyring.sendline(conn,
|
|
|
|
|
b'ERR 67108881 No secret key <GPG Agent>')
|
|
|
|
|
class AgentError(Exception):
|
|
|
|
|
"""GnuPG agent-related error."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AgentStop(Exception):
|
|
|
|
|
"""Raised to close the agent."""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Handler(object):
|
|
|
|
|
"""GPG agent requests' handler."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, device):
|
|
|
|
|
"""C-tor."""
|
|
|
|
|
self.client = client.Client(device=device)
|
|
|
|
|
# Cache ASSUAN commands' arguments between commands
|
|
|
|
|
self.keygrip = None
|
|
|
|
|
self.digest = None
|
|
|
|
|
self.algo = None
|
|
|
|
|
# Cache public keys from GnuPG
|
|
|
|
|
self.pubkey_bytes = keyring.export_public_keys()
|
|
|
|
|
# "Clone" existing GPG version
|
|
|
|
|
version = keyring.gpg_version()
|
|
|
|
|
|
|
|
|
|
self.handlers = {
|
|
|
|
|
b'RESET': None,
|
|
|
|
|
b'OPTION': None,
|
|
|
|
|
b'SETKEYDESC': None,
|
|
|
|
|
b'GETINFO': lambda conn, _: keyring.sendline(conn, b'D ' + version),
|
|
|
|
|
b'AGENT_ID': lambda conn, _: keyring.sendline(conn, b'D TREZOR'), # "Fake" agent ID
|
|
|
|
|
b'SIGKEY': lambda _, args: self.set_key(*args),
|
|
|
|
|
b'SETKEY': lambda _, args: self.set_key(*args),
|
|
|
|
|
b'SETHASH': lambda _, args: self.set_hash(*args),
|
|
|
|
|
b'PKSIGN': lambda conn, _: self.pksign(conn),
|
|
|
|
|
b'PKDECRYPT': lambda conn, _: self.pkdecrypt(conn),
|
|
|
|
|
b'HAVEKEY': lambda _, args: self.have_key(*args),
|
|
|
|
|
b'KEYINFO': lambda conn, _: self.key_info(conn)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@util.memoize
|
|
|
|
|
def get_identity(self, keygrip):
|
|
|
|
|
"""
|
|
|
|
|
Returns device.interface.Identity that matches specified keygrip.
|
|
|
|
|
|
|
|
|
|
In case of missing keygrip, KeyError will be raised.
|
|
|
|
|
"""
|
|
|
|
|
keygrip_bytes = binascii.unhexlify(keygrip)
|
|
|
|
|
pubkey_dict, user_ids = decode.load_by_keygrip(
|
|
|
|
|
pubkey_bytes=self.pubkey_bytes, keygrip=keygrip_bytes)
|
|
|
|
|
# We assume the first user ID is used to generate TREZOR-based GPG keys.
|
|
|
|
|
user_id = user_ids[0]['value'].decode('ascii')
|
|
|
|
|
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
|
|
|
|
|
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
|
|
|
|
|
|
|
|
|
|
identity = client.create_identity(user_id=user_id, curve_name=curve_name)
|
|
|
|
|
verifying_key = self.client.pubkey(identity=identity, ecdh=ecdh)
|
|
|
|
|
pubkey = protocol.PublicKey(
|
|
|
|
|
curve_name=curve_name, created=pubkey_dict['created'],
|
|
|
|
|
verifying_key=verifying_key, ecdh=ecdh)
|
|
|
|
|
assert pubkey.key_id() == pubkey_dict['key_id']
|
|
|
|
|
assert pubkey.keygrip() == keygrip_bytes
|
|
|
|
|
return identity
|
|
|
|
|
|
|
|
|
|
def pksign(self, conn):
|
|
|
|
|
"""Sign a message digest using a private EC key."""
|
|
|
|
|
log.debug('signing %r digest (algo #%s)', self.digest, self.algo)
|
|
|
|
|
identity = self.get_identity(keygrip=self.keygrip)
|
|
|
|
|
r, s = self.client.sign(identity=identity,
|
|
|
|
|
digest=binascii.unhexlify(self.digest))
|
|
|
|
|
result = sig_encode(r, s)
|
|
|
|
|
log.debug('result: %r', result)
|
|
|
|
|
keyring.sendline(conn, b'D ' + result)
|
|
|
|
|
|
|
|
|
|
def pkdecrypt(self, conn):
|
|
|
|
|
"""Handle decryption using ECDH."""
|
|
|
|
|
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
|
|
|
|
|
keyring.sendline(conn, msg)
|
|
|
|
|
|
|
|
|
|
line = keyring.recvline(conn)
|
|
|
|
|
assert keyring.recvline(conn) == b'END'
|
|
|
|
|
remote_pubkey = parse_ecdh(line)
|
|
|
|
|
|
|
|
|
|
identity = self.get_identity(keygrip=self.keygrip)
|
|
|
|
|
ec_point = self.client.ecdh(identity=identity, pubkey=remote_pubkey)
|
|
|
|
|
keyring.sendline(conn, b'D ' + _serialize_point(ec_point))
|
|
|
|
|
|
|
|
|
|
@util.memoize
|
|
|
|
|
def have_key(self, *keygrips):
|
|
|
|
|
"""Check if current keygrip correspond to a TREZOR-based key."""
|
|
|
|
|
try:
|
|
|
|
|
self.get_identity(keygrip=keygrips[0])
|
|
|
|
|
except KeyError as e:
|
|
|
|
|
log.warning('HAVEKEY(%s) failed: %s', keygrips, e)
|
|
|
|
|
raise AgentError(b'ERR 67108881 No secret key <GPG Agent>')
|
|
|
|
|
|
|
|
|
|
def key_info(self, conn):
|
|
|
|
|
"""
|
|
|
|
|
Dummy reply (mainly for 'gpg --edit' to succeed).
|
|
|
|
|
|
|
|
|
|
For details, see GnuPG agent KEYINFO command help.
|
|
|
|
|
https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
|
|
|
|
|
"""
|
|
|
|
|
fmt = 'S KEYINFO {0} X - - - - - - -'
|
|
|
|
|
keyring.sendline(conn, fmt.format(self.keygrip).encode('ascii'))
|
|
|
|
|
|
|
|
|
|
def set_key(self, keygrip):
|
|
|
|
|
"""Set hexadecimal keygrip for next operation."""
|
|
|
|
|
self.keygrip = keygrip
|
|
|
|
|
|
|
|
|
|
def set_hash(self, algo, digest):
|
|
|
|
|
"""Set algorithm ID and hexadecimal digest for next operation."""
|
|
|
|
|
self.algo = algo
|
|
|
|
|
self.digest = digest
|
|
|
|
|
|
|
|
|
|
def handle(self, conn):
|
|
|
|
|
"""Handle connection from GPG binary using the ASSUAN protocol."""
|
|
|
|
|
keyring.sendline(conn, b'OK')
|
|
|
|
|
for line in keyring.iterlines(conn):
|
|
|
|
|
parts = line.split(b' ')
|
|
|
|
|
command = parts[0]
|
|
|
|
|
args = parts[1:]
|
|
|
|
|
|
|
|
|
|
if command == b'BYE':
|
|
|
|
|
return
|
|
|
|
|
elif command == b'KEYINFO':
|
|
|
|
|
keygrip, = args
|
|
|
|
|
# Dummy reply (mainly for 'gpg --edit' to succeed).
|
|
|
|
|
# For details, see GnuPG agent KEYINFO command help.
|
|
|
|
|
# https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=agent/command.c;h=c8b34e9882076b1b724346787781f657cac75499;hb=refs/heads/master#l1082
|
|
|
|
|
fmt = 'S KEYINFO {0} X - - - - - - -'
|
|
|
|
|
keyring.sendline(conn, fmt.format(keygrip).encode('ascii'))
|
|
|
|
|
elif command == b'BYE':
|
|
|
|
|
return
|
|
|
|
|
elif command == b'KILLAGENT':
|
|
|
|
|
elif command == b'KILLAGENT':
|
|
|
|
|
keyring.sendline(conn, b'OK')
|
|
|
|
|
raise AgentStop()
|
|
|
|
|
|
|
|
|
|
if command not in self.handlers:
|
|
|
|
|
log.error('unknown request: %r', line)
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
handler = self.handlers[command]
|
|
|
|
|
if handler:
|
|
|
|
|
try:
|
|
|
|
|
handler(conn, args)
|
|
|
|
|
except AgentError as e:
|
|
|
|
|
msg, = e.args
|
|
|
|
|
keyring.sendline(conn, msg)
|
|
|
|
|
continue
|
|
|
|
|
keyring.sendline(conn, b'OK')
|
|
|
|
|
raise StopIteration
|
|
|
|
|
else:
|
|
|
|
|
log.error('unknown request: %r', line)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
keyring.sendline(conn, b'OK')
|
|
|
|
|