|
|
|
@ -2,7 +2,7 @@
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
|
|
|
|
|
from .. import factory, formats, util
|
|
|
|
|
from .. import device, formats, util
|
|
|
|
|
|
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
@ -12,55 +12,33 @@ class HardwareSigner(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self, user_id, curve_name):
|
|
|
|
|
"""Connect to the device and retrieve required public key."""
|
|
|
|
|
self.client_wrapper = factory.load()
|
|
|
|
|
self.identity = self.client_wrapper.identity_type()
|
|
|
|
|
self.identity.proto = 'gpg'
|
|
|
|
|
self.identity.host = user_id
|
|
|
|
|
self.curve_name = curve_name
|
|
|
|
|
self.device = device.detect(identity_str='',
|
|
|
|
|
curve_name=curve_name)
|
|
|
|
|
self.device.identity_dict['proto'] = 'gpg'
|
|
|
|
|
self.device.identity_dict['host'] = user_id
|
|
|
|
|
self.user_id = user_id
|
|
|
|
|
|
|
|
|
|
def pubkey(self, ecdh=False):
|
|
|
|
|
"""Return public key as VerifyingKey object."""
|
|
|
|
|
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
|
|
|
|
|
if ecdh:
|
|
|
|
|
curve_name = formats.get_ecdh_curve_name(self.curve_name)
|
|
|
|
|
else:
|
|
|
|
|
curve_name = self.curve_name
|
|
|
|
|
public_node = self.client_wrapper.connection.get_public_node(
|
|
|
|
|
n=addr, ecdsa_curve_name=curve_name)
|
|
|
|
|
|
|
|
|
|
with self.device:
|
|
|
|
|
pubkey = self.device.pubkey(ecdh=ecdh)
|
|
|
|
|
return formats.decompress_pubkey(
|
|
|
|
|
pubkey=public_node.node.public_key,
|
|
|
|
|
curve_name=curve_name)
|
|
|
|
|
pubkey=pubkey, curve_name=self.device.curve_name)
|
|
|
|
|
|
|
|
|
|
def sign(self, digest):
|
|
|
|
|
"""Sign the digest and return a serialized signature."""
|
|
|
|
|
log.info('please confirm GPG signature on %s for "%s"...',
|
|
|
|
|
self.client_wrapper.device_name, self.user_id)
|
|
|
|
|
if self.curve_name == formats.CURVE_NIST256:
|
|
|
|
|
self.device, self.user_id)
|
|
|
|
|
if self.device.curve_name == formats.CURVE_NIST256:
|
|
|
|
|
digest = digest[:32] # sign the first 256 bits
|
|
|
|
|
log.debug('signing digest: %s', util.hexlify(digest))
|
|
|
|
|
result = self.client_wrapper.connection.sign_identity(
|
|
|
|
|
identity=self.identity,
|
|
|
|
|
challenge_hidden=digest,
|
|
|
|
|
challenge_visual='',
|
|
|
|
|
ecdsa_curve_name=self.curve_name)
|
|
|
|
|
assert result.signature[:1] == b'\x00'
|
|
|
|
|
sig = result.signature[1:]
|
|
|
|
|
with self.device:
|
|
|
|
|
sig = self.device.sign(blob=digest)
|
|
|
|
|
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
|
|
|
|
|
|
|
|
|
|
def ecdh(self, pubkey):
|
|
|
|
|
"""Derive shared secret using ECDH from remote public key."""
|
|
|
|
|
log.info('please confirm GPG decryption on %s for "%s"...',
|
|
|
|
|
self.client_wrapper.device_name, self.user_id)
|
|
|
|
|
result = self.client_wrapper.connection.get_ecdh_session_key(
|
|
|
|
|
identity=self.identity,
|
|
|
|
|
peer_public_key=pubkey,
|
|
|
|
|
ecdsa_curve_name=formats.get_ecdh_curve_name(self.curve_name))
|
|
|
|
|
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
|
|
|
|
|
assert result.session_key[:1] == b'\x04'
|
|
|
|
|
return result.session_key
|
|
|
|
|
|
|
|
|
|
def close(self):
|
|
|
|
|
"""Close the connection to the device."""
|
|
|
|
|
self.client_wrapper.connection.close()
|
|
|
|
|
self.device, self.user_id)
|
|
|
|
|
with self.device:
|
|
|
|
|
return self.device.ecdh(pubkey=pubkey)
|
|
|
|
|