Add experimental support for GPG signing via TREZOR

In order to use this feature, GPG "modern" (v2.1) is required [1].
Also, since TREZOR protocol does not support arbitrary long fields,
TREZOR firmware needs to be adapted  with the following patch [2],
to support signing fixed-size digests of GPG messages of arbitrary size.

[1] https://gist.github.com/vt0r/a2f8c0bcb1400131ff51
[2] https://gist.github.com/romanz/b66f5df1ca8ef15641df8ea5bb09fd47
nistp521
Roman Zeyde 8 years ago
parent 861401e89a
commit 4335740abe

@ -0,0 +1,290 @@
#!/usr/bin/env python
import argparse
import base64
import binascii
import contextlib
import hashlib
import io
import logging
import struct
import ecdsa
import util
log = logging.getLogger(__name__)
def bit(value, i):
return 1 if value & (1 << i) else 0
def low_bits(value, n):
return value & ((1 << n) - 1)
def readfmt(stream, fmt):
size = struct.calcsize(fmt)
blob = stream.read(size)
return struct.unpack(fmt, blob)
class Reader(object):
def __init__(self, stream):
self.s = stream
self._captured = None
def readfmt(self, fmt):
size = struct.calcsize(fmt)
blob = self.read(size)
obj, = struct.unpack(fmt, blob)
return obj
def read(self, size=None):
blob = self.s.read(size)
if size is not None and len(blob) < size:
raise EOFError
if self._captured:
self._captured.write(blob)
return blob
@contextlib.contextmanager
def capture(self, stream):
self._captured = stream
try:
yield
finally:
self._captured = None
length_types = {0: '>B', 1: '>H', 2: '>L'}
def parse_subpackets(s):
subpackets = []
total_size = s.readfmt('>H')
data = s.read(total_size)
s = Reader(io.BytesIO(data))
while True:
try:
subpacket_len = s.readfmt('B')
except EOFError:
break
subpackets.append(s.read(subpacket_len))
return subpackets
def parse_mpi(s):
bits = s.readfmt('>H')
blob = bytearray(s.read(int((bits + 7) // 8)))
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
def split_bits(value, *bits):
result = []
for b in reversed(bits):
mask = (1 << b) - 1
result.append(value & mask)
value = value >> b
assert value == 0
return reversed(result)
class Parser(object):
def __init__(self, stream, to_hash=None):
self.stream = stream
self.packet_types = {
2: self.signature,
4: self.onepass,
6: self.pubkey,
11: self.literal,
13: self.user_id,
}
self.to_hash = io.BytesIO()
if to_hash:
self.to_hash.write(to_hash)
def __iter__(self):
return self
def onepass(self, stream):
# pylint: disable=no-self-use
p = {'type': 'onepass'}
p['version'] = stream.readfmt('B')
p['sig_type'] = stream.readfmt('B')
p['hash_alg'] = stream.readfmt('B')
p['pubkey_alg'] = stream.readfmt('B')
p['key_id'] = stream.readfmt('8s')
p['nested'] = stream.readfmt('B')
assert not stream.read()
return p
def literal(self, stream):
p = {'type': 'literal'}
p['format'] = stream.readfmt('c')
filename_len = stream.readfmt('B')
p['filename'] = stream.read(filename_len)
p['date'] = stream.readfmt('>L')
with stream.capture(self.to_hash):
p['content'] = stream.read()
return p
def signature(self, stream):
p = {'type': 'signature'}
to_hash = io.BytesIO()
with stream.capture(to_hash):
p['version'] = stream.readfmt('B')
p['sig_type'] = stream.readfmt('B')
p['pubkey_alg'] = stream.readfmt('B')
p['hash_alg'] = stream.readfmt('B')
p['hashed_subpackets'] = parse_subpackets(stream)
self.to_hash.write(to_hash.getvalue())
# https://tools.ietf.org/html/rfc4880#section-5.2.4
self.to_hash.write(b'\x04\xff' + struct.pack('>L', to_hash.tell()))
data_to_sign = self.to_hash.getvalue()
log.debug('hashing %d bytes for signature: %r',
len(data_to_sign), data_to_sign)
digest = hashlib.sha256(data_to_sign).digest()
p['unhashed_subpackets'] = parse_subpackets(stream)
p['hash_prefix'] = stream.readfmt('2s')
if p['hash_prefix'] != digest[:2]:
log.warning('Bad hash prefix: %r (expected %r)',
digest[:2], p['hash_prefix'])
else:
p['digest'] = digest
p['sig'] = (parse_mpi(stream), parse_mpi(stream))
assert not stream.read()
return p
def pubkey(self, stream):
p = {'type': 'pubkey'}
packet = io.BytesIO()
with stream.capture(packet):
p['version'] = stream.readfmt('B')
p['created'] = stream.readfmt('>L')
p['algo'] = stream.readfmt('B')
# https://tools.ietf.org/html/rfc6637#section-11
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid == b'\x2A\x86\x48\xCE\x3D\x03\x01\x07' # NIST P-256
mpi = parse_mpi(stream)
log.debug('mpi: %x', mpi)
prefix, x, y = split_bits(mpi, 4, 256, 256)
assert prefix == 4
p['point'] = (x, y)
assert not stream.read()
# https://tools.ietf.org/html/rfc4880#section-12.2
packet_data = packet.getvalue()
data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) +
packet_data)
p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:]
log.debug('key ID: %s', binascii.hexlify(p['key_id']).decode('ascii'))
self.to_hash.write(data_to_hash)
return p
def user_id(self, stream):
value = stream.read()
self.to_hash.write(b'\xb4' + struct.pack('>L', len(value)))
self.to_hash.write(value)
return {'type': 'user_id', 'value': value}
def __next__(self):
try:
# https://tools.ietf.org/html/rfc4880#section-4.2
value = self.stream.readfmt('B')
except EOFError:
raise StopIteration
log.debug('prefix byte: %02x', value)
assert bit(value, 7) == 1
assert bit(value, 6) == 0 # new format not supported yet
tag = low_bits(value, 6)
length_type = low_bits(tag, 2)
tag = tag >> 2
fmt = length_types[length_type]
log.debug('length_type: %s', fmt)
packet_size = self.stream.readfmt(fmt)
log.debug('packet length: %d', packet_size)
packet_data = self.stream.read(packet_size)
packet_type = self.packet_types.get(tag)
if packet_type:
p = packet_type(Reader(io.BytesIO(packet_data)))
else:
p = {'type': 'UNKNOWN'}
p['tag'] = tag
log.debug('packet "%s": %s', p['type'], p)
return p
next = __next__
def original_data(filename):
parts = filename.rsplit('.', 1)
if len(parts) == 2 and parts[1] in ('sig', 'asc'):
log.debug('loading file %s', parts[0])
return open(parts[0], 'rb').read()
def load_public_key(filename):
parser = Parser(Reader(open(filename, 'rb')))
pubkey, userid, signature = list(parser)
log.info('loaded %s public key', userid['value'])
verify_digest(pubkey=pubkey, digest=signature['digest'],
signature=signature['sig'], label=filename)
return pubkey
def check(pubkey, sig_file):
d = open(sig_file, 'rb')
if d.name.endswith('.asc'):
lines = d.readlines()[3:-1]
data = base64.b64decode(''.join(lines))
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
d = io.BytesIO(payload)
parser = Parser(Reader(d), original_data(sig_file))
signature, = list(parser)
verify_digest(pubkey=pubkey, digest=signature['digest'],
signature=signature['sig'], label=sig_file)
def verify_digest(pubkey, digest, signature, label):
coords = pubkey['point']
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=coords[0], y=coords[1])
v = ecdsa.VerifyingKey.from_public_point(point=point,
curve=ecdsa.curves.NIST256p,
hashfunc=hashlib.sha256)
try:
v.verify_digest(signature=signature,
digest=digest,
sigdecode=lambda rs, order: rs)
log.info('%s is OK', label)
except ecdsa.keys.BadSignatureError:
log.error('%s has bad signature!', label)
raise
def main():
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
p = argparse.ArgumentParser()
p.add_argument('pubkey')
p.add_argument('signature')
args = p.parse_args()
check(pubkey=load_public_key(args.pubkey),
sig_file=args.signature)
if __name__ == '__main__':
main()

@ -0,0 +1,14 @@
#!/bin/bash
set -x
CREATED=1460731897 # needed for consistent public key creation
NAME="trezor_key" # will be used as GPG user id and public key name
echo "Hello GPG World!" > EXAMPLE
./signer.py $NAME --time $CREATED --public-key --file EXAMPLE --verbose
./check.py $NAME.pub EXAMPLE.sig # pure Python verification
# Install GPG v2.1 (modern) and verify the signature
gpg2 --import $NAME.pub
gpg2 --list-keys $NAME
# gpg2 --edit-key trezor_key trust # optional: mark it as trusted
gpg2 --verify EXAMPLE.sig

@ -0,0 +1,222 @@
#!/usr/bin/env python
import argparse
import base64
import binascii
import hashlib
import logging
import struct
import time
import ecdsa
import trezor_agent.client
import trezor_agent.formats
import trezor_agent.util
import util
log = logging.getLogger(__name__)
def prefix_len(fmt, blob):
return struct.pack(fmt, len(blob)) + blob
def packet(tag, blob):
assert len(blob) < 256
length_type = 0 # : 1 byte for length
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + prefix_len('>B', blob)
def subpacket(subpacket_type, fmt, *values):
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
return subpacket(subpacket_type, '>B', value)
def subpackets(*items):
prefixed = [prefix_len('>B', item) for item in items]
return prefix_len('>H', b''.join(prefixed))
def mpi(value):
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = [0] * data_size
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytearray(data_bytes)
def time_format(t):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def hexlify(blob):
return binascii.hexlify(blob).decode('ascii').upper()
def split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
return ''.join(lines)
def armor_sig(blob):
head = '-----BEGIN PGP SIGNATURE-----\nVersion: GnuPG v2\n\n'
body = base64.b64encode(blob)
checksum = base64.b64encode(util.crc24(blob))
tail = '-----END PGP SIGNATURE-----\n'
return head + split_lines(body + '=' + checksum, 64) + tail
class Signer(object):
curve = ecdsa.NIST256p
ecdsa_curve_name = trezor_agent.formats.CURVE_NIST256
def __init__(self, user_id, created):
self.user_id = user_id
self.client_wrapper = trezor_agent.factory.load()
# This requires the following patch to trezor-mcu to work:
# https://gist.github.com/romanz/b66f5df1ca8ef15641df8ea5bb09fd47
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
addr = trezor_agent.client.get_address(self.identity)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.ecdsa_curve_name)
verifying_key = trezor_agent.formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.ecdsa_curve_name)
self.created = int(created)
header = struct.pack('>BLB',
4, # version
self.created, # creation
19) # ECDSA
# https://tools.ietf.org/html/rfc6637#section-11 (NIST P-256 OID)
oid = prefix_len('>B', b'\x2A\x86\x48\xCE\x3D\x03\x01\x07')
point = verifying_key.pubkey.point
self.pubkey_data = header + oid + mpi((4 << 512) |
(point.x() << 256) |
(point.y()))
self.data_to_hash = b'\x99' + prefix_len('>H', self.pubkey_data)
fingerprint = hashlib.sha1(self.data_to_hash).digest()
self.key_id = fingerprint[-8:]
log.info('key %s created at %s',
hexlify(fingerprint[-4:]), time_format(self.created))
def close(self):
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
def export(self):
pubkey_packet = packet(tag=6, blob=self.pubkey_data)
user_id_packet = packet(tag=13, blob=self.user_id)
user_id_to_hash = user_id_packet[:1] + prefix_len('>L', self.user_id)
data_to_sign = self.data_to_hash + user_id_to_hash
log.info('signing user_id: %r', self.user_id.decode('ascii'))
hashed_subpackets = [
subpacket_time(self.created), # signature creaion time
subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
subpacket_byte(0x15, 8), # preferred hash (SHA256)
subpacket_byte(0x16, 0), # preferred compression (none)
subpacket_byte(0x17, 0x80)] # key server prefs (no-modify)
signature = self._make_signature(visual='Sign GPG public key',
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets)
sign_packet = packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def sign(self, msg, sign_time=None):
if sign_time is None:
sign_time = int(time.time())
log.info('signing message %r at %s', msg,
time_format(sign_time))
hashed_subpackets = [subpacket_time(sign_time)]
blob = self._make_signature(
visual='Sign GPG message',
data_to_sign=msg, hashed_subpackets=hashed_subpackets)
return packet(tag=2, blob=blob)
def _make_signature(self, visual, data_to_sign,
hashed_subpackets, sig_type=0):
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
19, # pubkey_alg (ECDSA)
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(
subpacket(16, self.key_id) # issuer key id
)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=hashlib.sha256(data_to_hash).digest(),
challenge_visual=visual,
ecdsa_curve_name=self.ecdsa_curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
sig = [trezor_agent.util.bytes2num(sig[:32]),
trezor_agent.util.bytes2num(sig[32:])]
hash_prefix = digest[:2] # used for decoder's sanity check
signature = mpi(sig[0]) + mpi(sig[1]) # actual ECDSA signature
return header + hashed + unhashed + hash_prefix + signature
def main():
p = argparse.ArgumentParser()
p.add_argument('user_id')
p.add_argument('-t', '--time', type=int, default=int(time.time()))
p.add_argument('-f', '--filename')
p.add_argument('-a', '--armor', action='store_true', default=False)
p.add_argument('-p', '--public-key', action='store_true', default=False)
p.add_argument('-v', '--verbose', action='store_true', default=False)
args = p.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
format='%(asctime)s %(levelname)-10s %(message)s')
s = Signer(user_id=args.user_id.encode('ascii'), created=args.time)
if args.public_key:
open(args.user_id + '.pub', 'wb').write(s.export())
if args.filename:
data = open(args.filename, 'rb').read()
sig, ext = s.sign(data), '.sig'
if args.armor:
sig, ext = armor_sig(sig), '.asc'
open(args.filename + ext, 'wb').write(sig)
s.close()
if __name__ == '__main__':
main()

@ -0,0 +1,18 @@
import struct
def crc24(blob):
CRC24_INIT = 0xB704CEL
CRC24_POLY = 0x1864CFBL
crc = CRC24_INIT
for octet in bytearray(blob):
crc ^= (octet << 16)
for _ in range(8):
crc <<= 1
if crc & 0x1000000:
crc ^= CRC24_POLY
assert 0 <= crc < 0x1000000
crc_bytes = struct.pack('>L', crc)
assert crc_bytes[0] == b'\x00'
return crc_bytes[1:]
Loading…
Cancel
Save