gpg: minor renames and code refactoring

nistp521
Roman Zeyde 8 years ago
parent d7913a84d5
commit 9a435ae23e

@ -20,15 +20,15 @@ def original_data(filename):
def verify(pubkey, sig_file):
d = open(sig_file, 'rb')
if d.name.endswith('.asc'):
lines = d.readlines()[3:-1]
"""Verify correctness of public key and signature."""
stream = open(sig_file, 'rb')
if stream.name.endswith('.asc'):
lines = stream.readlines()[3:-1]
data = base64.b64decode(''.join(lines))
payload, checksum = data[:-3], data[-3:]
assert util.crc24(payload) == checksum
d = io.BytesIO(payload)
parser = decode.Parser(decode.Reader(d), original_data(sig_file))
stream = io.BytesIO(payload)
parser = decode.Parser(util.Reader(stream), original_data(sig_file))
signature, = list(parser)
decode.verify_digest(pubkey=pubkey, digest=signature['digest'],
signature=signature['sig'], label='GPG signature')

@ -1,78 +1,24 @@
"""Decoders for GPG v2 data structures."""
import binascii
import contextlib
import hashlib
import io
import logging
import struct
import subprocess
import ecdsa
import ed25519
from trezor_agent.util import num2bytes
from .. import util
log = logging.getLogger(__name__)
def bit(value, i):
"""Extract the i-th bit out of value."""
return 1 if value & (1 << i) else 0
def low_bits(value, n):
"""Extract the lowest n bits out of value."""
return value & ((1 << n) - 1)
def readfmt(stream, fmt):
"""Read and unpack an object from stream, using a struct format string."""
size = struct.calcsize(fmt)
blob = stream.read(size)
return struct.unpack(fmt, blob)
class Reader(object):
"""Read basic type objects out of given stream."""
def __init__(self, stream):
"""Create a non-capturing reader."""
self.s = stream
self._captured = None
def readfmt(self, fmt):
"""Read a specified object, using a struct format string."""
size = struct.calcsize(fmt)
blob = self.read(size)
obj, = struct.unpack(fmt, blob)
return obj
def read(self, size=None):
"""Read `size` bytes from stream."""
blob = self.s.read(size)
if size is not None and len(blob) < size:
raise EOFError
if self._captured:
self._captured.write(blob)
return blob
@contextlib.contextmanager
def capture(self, stream):
"""Capture all data read during this context."""
self._captured = stream
try:
yield
finally:
self._captured = None
length_types = {0: '>B', 1: '>H', 2: '>L'}
def parse_subpackets(s):
"""See https://tools.ietf.org/html/rfc4880#section-5.2.3.1 for details."""
subpackets = []
total_size = s.readfmt('>H')
data = s.read(total_size)
s = Reader(io.BytesIO(data))
s = util.Reader(io.BytesIO(data))
while True:
try:
@ -92,23 +38,8 @@ def parse_mpi(s):
return sum(v << (8 * i) for i, v in enumerate(reversed(blob)))
def split_bits(value, *bits):
"""
Split integer value into list of ints, according to `bits` list.
For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
"""
result = []
for b in reversed(bits):
mask = (1 << b) - 1
result.append(value & mask)
value = value >> b
assert value == 0
return reversed(result)
def _parse_nist256p1_verifier(mpi):
prefix, x, y = split_bits(mpi, 4, 256, 256)
prefix, x, y = util.split_bits(mpi, 4, 256, 256)
assert prefix == 4
point = ecdsa.ellipticcurve.Point(curve=ecdsa.NIST256p.curve,
x=x, y=y)
@ -124,12 +55,12 @@ def _parse_nist256p1_verifier(mpi):
def _parse_ed25519_verifier(mpi):
prefix, value = split_bits(mpi, 8, 256)
prefix, value = util.split_bits(mpi, 8, 256)
assert prefix == 0x40
vk = ed25519.VerifyingKey(num2bytes(value, size=32))
vk = ed25519.VerifyingKey(util.num2bytes(value, size=32))
def _ed25519_verify(signature, digest):
sig = b''.join(num2bytes(val, size=32)
sig = b''.join(util.num2bytes(val, size=32)
for val in signature)
vk.verify(sig, digest)
return _ed25519_verify
@ -229,7 +160,7 @@ class Parser(object):
data_to_hash = (b'\x99' + struct.pack('>H', len(packet_data)) +
packet_data)
p['key_id'] = hashlib.sha1(data_to_hash).digest()[-8:]
log.debug('key ID: %s', binascii.hexlify(p['key_id']).decode('ascii'))
log.debug('key ID: %s', util.hexlify(p['key_id']))
self.to_hash.write(data_to_hash)
return p
@ -249,20 +180,20 @@ class Parser(object):
raise StopIteration
log.debug('prefix byte: %02x', value)
assert bit(value, 7) == 1
assert bit(value, 6) == 0 # new format not supported yet
assert util.bit(value, 7) == 1
assert util.bit(value, 6) == 0 # new format not supported yet
tag = low_bits(value, 6)
length_type = low_bits(tag, 2)
tag = util.low_bits(value, 6)
length_type = util.low_bits(tag, 2)
tag = tag >> 2
fmt = length_types[length_type]
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
log.debug('length_type: %s', fmt)
packet_size = self.stream.readfmt(fmt)
log.debug('packet length: %d', packet_size)
packet_data = self.stream.read(packet_size)
packet_type = self.packet_types.get(tag)
if packet_type:
p = packet_type(Reader(io.BytesIO(packet_data)))
p = packet_type(util.Reader(io.BytesIO(packet_data)))
else:
raise ValueError('Unknown packet type: {}'.format(packet_type))
p['tag'] = tag
@ -274,7 +205,7 @@ class Parser(object):
def load_public_key(stream):
"""Parse and validate GPG public key from an input stream."""
parser = Parser(Reader(stream))
parser = Parser(util.Reader(stream))
pubkey, userid, signature = list(parser)
log.debug('loaded public key "%s"', userid['value'])
verify_digest(pubkey=pubkey, digest=signature['digest'],
@ -282,6 +213,16 @@ def load_public_key(stream):
return pubkey
def load_from_gpg(user_id):
"""Load existing GPG public key for `user_id` from local keyring."""
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
if pubkey_bytes:
return load_public_key(io.BytesIO(pubkey_bytes))
else:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
def verify_digest(pubkey, digest, signature, label):
"""Verify a digest signature from a specified public key."""
verifier = pubkey['verifier']

@ -0,0 +1,243 @@
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
import base64
import hashlib
import logging
import struct
import time
from .. import client, factory, formats, util
log = logging.getLogger(__name__)
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 256
length_type = 0 # : 1 byte for length
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + util.prefix_len('>B', blob)
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [util.prefix_len('>B', item) for item in items]
return util.prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = [0] * data_size
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytearray(data_bytes)
def _dump_nist256(vk):
return mpi((4 << 512) |
(vk.pubkey.point.x() << 256) |
(vk.pubkey.point.y()))
def _dump_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'dump': _dump_nist256
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'dump': _dump_ed25519
}
}
def _find_curve_by_algo_id(algo_id):
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
class Signer(object):
"""Performs GPG operations with the TREZOR."""
def __init__(self, user_id, created, curve_name):
"""Construct and loads a public key from the device."""
self.user_id = user_id
assert curve_name in formats.SUPPORTED_CURVES
self.curve_name = curve_name
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
addr = client.get_address(self.identity)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
self.verifying_key = formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
self.created = int(created)
log.info('%s GPG public key %s created at %s', self.curve_name,
self.hex_short_key_id(), util.time_format(self.created))
@classmethod
def from_public_key(cls, pubkey, user_id):
"""
Create from an existing GPG public key.
`pubkey` should be loaded via `load_from_gpg(user_id)`
from the local GPG keyring.
"""
s = Signer(user_id=user_id,
created=pubkey['created'],
curve_name=_find_curve_by_algo_id(pubkey['algo']))
assert s.key_id() == pubkey['key_id']
return s
def _pubkey_data(self):
curve_info = SUPPORTED_CURVES[self.curve_name]
header = struct.pack('>BLB',
4, # version
self.created, # creation
curve_info['algo_id'])
oid = util.prefix_len('>B', curve_info['oid'])
blob = curve_info['dump'](self.verifying_key)
return header + oid + blob
def _pubkey_data_to_hash(self):
return b'\x99' + util.prefix_len('>H', self._pubkey_data())
def _fingerprint(self):
return hashlib.sha1(self._pubkey_data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def hex_short_key_id(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return util.hexlify(self.key_id()[-4:])
def close(self):
"""Close connection and turn off the screen of the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
def export(self):
"""Export GPG public key, ready for "gpg2 --import"."""
pubkey_packet = packet(tag=6, blob=self._pubkey_data())
user_id_packet = packet(tag=13, blob=self.user_id)
data_to_sign = (self._pubkey_data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', self.user_id))
log.info('signing public key "%s"', self.user_id)
hashed_subpackets = [
subpacket_time(self.created), # signature creaion time
subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
subpacket_byte(0x15, 8), # preferred hash (SHA256)
subpacket_byte(0x16, 0), # preferred compression (none)
subpacket_byte(0x17, 0x80)] # key server prefs (no-modify)
signature = self._make_signature(visual=self.hex_short_key_id(),
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets)
sign_packet = packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def sign(self, msg, sign_time=None):
"""Sign GPG message at specified time."""
if sign_time is None:
sign_time = int(time.time())
log.info('signing %d byte message at %s',
len(msg), util.time_format(sign_time))
hashed_subpackets = [subpacket_time(sign_time)]
blob = self._make_signature(
visual=self.hex_short_key_id(),
data_to_sign=msg, hashed_subpackets=hashed_subpackets)
return packet(tag=2, blob=blob)
def _make_signature(self, visual, data_to_sign,
hashed_subpackets, sig_type=0):
curve_info = SUPPORTED_CURVES[self.curve_name]
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
curve_info['algo_id'],
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(
subpacket(16, self.key_id()) # issuer key id
)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual=visual,
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
return (header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature
def _split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
return ''.join(lines)
def armor(blob, type_str):
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
body = base64.b64encode(blob)
checksum = base64.b64encode(util.crc24(blob))
tail = '-----END PGP {}-----\n'.format(type_str)
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail

@ -4,7 +4,7 @@ import logging
import subprocess as sp
import sys
from . import signer
from . import decode, encode
log = logging.getLogger(__name__)
@ -22,12 +22,12 @@ def main():
command = args[0]
user_id = ' '.join(args[1:])
assert command == '-bsau' # --detach-sign --sign --armor --local-user
pubkey = signer.load_from_gpg(user_id)
s = signer.Signer.from_public_key(user_id=user_id, pubkey=pubkey)
pubkey = decode.load_from_gpg(user_id)
s = encode.Signer.from_public_key(user_id=user_id, pubkey=pubkey)
data = sys.stdin.read()
sig = s.sign(data)
sig = signer.armor(sig, 'SIGNATURE')
sig = encode.armor(sig, 'SIGNATURE')
sys.stdout.write(sig)
s.close()

@ -1,278 +1,14 @@
#!/usr/bin/env python
"""Create GPG ECDSA signatures and public keys using TREZOR device."""
"""Create signatures and export public keys for GPG using TREZOR."""
import argparse
import base64
import binascii
import hashlib
import io
import logging
import struct
import subprocess
import time
from . import decode, check
from .. import client, factory, formats, util
from . import check, decode, encode
log = logging.getLogger(__name__)
def prefix_len(fmt, blob):
"""Prefix `blob` with its size, serialized using `fmt` format."""
return struct.pack(fmt, len(blob)) + blob
def packet(tag, blob):
"""Create small GPG packet."""
assert len(blob) < 256
length_type = 0 # : 1 byte for length
leading_byte = 0x80 | (tag << 2) | (length_type)
return struct.pack('>B', leading_byte) + prefix_len('>B', blob)
def subpacket(subpacket_type, fmt, *values):
"""Create GPG subpacket."""
blob = struct.pack(fmt, *values) if values else fmt
return struct.pack('>B', subpacket_type) + blob
def subpacket_long(subpacket_type, value):
"""Create GPG subpacket with 32-bit unsigned integer."""
return subpacket(subpacket_type, '>L', value)
def subpacket_time(value):
"""Create GPG subpacket with time in seconds (since Epoch)."""
return subpacket_long(2, value)
def subpacket_byte(subpacket_type, value):
"""Create GPG subpacket with 8-bit unsigned integer."""
return subpacket(subpacket_type, '>B', value)
def subpackets(*items):
"""Serialize several GPG subpackets."""
prefixed = [prefix_len('>B', item) for item in items]
return prefix_len('>H', b''.join(prefixed))
def mpi(value):
"""Serialize multipresicion integer using GPG format."""
bits = value.bit_length()
data_size = (bits + 7) // 8
data_bytes = [0] * data_size
for i in range(data_size):
data_bytes[i] = value & 0xFF
value = value >> 8
data_bytes.reverse()
return struct.pack('>H', bits) + bytearray(data_bytes)
def time_format(t):
"""Utility for consistent time formatting."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def hexlify(blob):
"""Utility for consistent hexadecimal formatting."""
return binascii.hexlify(blob).decode('ascii').upper()
def _dump_nist256(vk):
return mpi((4 << 512) |
(vk.pubkey.point.x() << 256) |
(vk.pubkey.point.y()))
def _dump_ed25519(vk):
return mpi((0x40 << 256) |
util.bytes2num(vk.to_bytes()))
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'dump': _dump_nist256
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'dump': _dump_ed25519
}
}
def _find_curve_by_algo_id(algo_id):
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
class Signer(object):
"""Performs GPG operations with the TREZOR."""
def __init__(self, user_id, created, curve_name):
"""Construct and loads a public key from the device."""
self.user_id = user_id
assert curve_name in formats.SUPPORTED_CURVES
self.curve_name = curve_name
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
addr = client.get_address(self.identity)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
self.verifying_key = formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
self.created = int(created)
log.info('%s GPG public key %s created at %s', self.curve_name,
self.hex_short_key_id(), time_format(self.created))
@classmethod
def from_public_key(cls, pubkey, user_id):
"""
Create from an existing GPG public key.
`pubkey` should be loaded via `load_from_gpg(user_id)`
from the local GPG keyring.
"""
s = Signer(user_id=user_id,
created=pubkey['created'],
curve_name=_find_curve_by_algo_id(pubkey['algo']))
assert s.key_id() == pubkey['key_id']
return s
def _pubkey_data(self):
curve_info = SUPPORTED_CURVES[self.curve_name]
header = struct.pack('>BLB',
4, # version
self.created, # creation
curve_info['algo_id'])
oid = prefix_len('>B', curve_info['oid'])
blob = curve_info['dump'](self.verifying_key)
return header + oid + blob
def _pubkey_data_to_hash(self):
return b'\x99' + prefix_len('>H', self._pubkey_data())
def _fingerprint(self):
return hashlib.sha1(self._pubkey_data_to_hash()).digest()
def key_id(self):
"""Short (8 byte) GPG key ID."""
return self._fingerprint()[-8:]
def hex_short_key_id(self):
"""Short (8 hexadecimal digits) GPG key ID."""
return hexlify(self.key_id()[-4:])
def close(self):
"""Close connection and turn off the screen of the device."""
self.client_wrapper.connection.clear_session()
self.client_wrapper.connection.close()
def export(self):
"""Export GPG public key, ready for "gpg2 --import"."""
pubkey_packet = packet(tag=6, blob=self._pubkey_data())
user_id_packet = packet(tag=13, blob=self.user_id)
user_id_to_hash = user_id_packet[:1] + prefix_len('>L', self.user_id)
data_to_sign = self._pubkey_data_to_hash() + user_id_to_hash
log.info('signing public key "%s"', self.user_id)
hashed_subpackets = [
subpacket_time(self.created), # signature creaion time
subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
subpacket_byte(0x15, 8), # preferred hash (SHA256)
subpacket_byte(0x16, 0), # preferred compression (none)
subpacket_byte(0x17, 0x80)] # key server prefs (no-modify)
signature = self._make_signature(visual=self.hex_short_key_id(),
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets)
sign_packet = packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def sign(self, msg, sign_time=None):
"""Sign GPG message at specified time."""
if sign_time is None:
sign_time = int(time.time())
log.info('signing %d byte message at %s',
len(msg), time_format(sign_time))
hashed_subpackets = [subpacket_time(sign_time)]
blob = self._make_signature(
visual=self.hex_short_key_id(),
data_to_sign=msg, hashed_subpackets=hashed_subpackets)
return packet(tag=2, blob=blob)
def _make_signature(self, visual, data_to_sign,
hashed_subpackets, sig_type=0):
curve_info = SUPPORTED_CURVES[self.curve_name]
header = struct.pack('>BBBB',
4, # version
sig_type, # rfc4880 (section-5.2.1)
curve_info['algo_id'],
8) # hash_alg (SHA256)
hashed = subpackets(*hashed_subpackets)
unhashed = subpackets(
subpacket(16, self.key_id()) # issuer key id
)
tail = b'\x04\xff' + struct.pack('>L', len(header) + len(hashed))
data_to_hash = data_to_sign + header + hashed + tail
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual=visual,
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
sig = mpi(util.bytes2num(sig[:32])) + mpi(util.bytes2num(sig[32:]))
return (header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check
sig) # actual ECDSA signature
def _split_lines(body, size):
lines = []
for i in range(0, len(body), size):
lines.append(body[i:i+size] + '\n')
return ''.join(lines)
def armor(blob, type_str):
"""See https://tools.ietf.org/html/rfc4880#section-6 for details."""
head = '-----BEGIN PGP {}-----\nVersion: GnuPG v2\n\n'.format(type_str)
body = base64.b64encode(blob)
checksum = base64.b64encode(util.crc24(blob))
tail = '-----END PGP {}-----\n'.format(type_str)
return head + _split_lines(body, 64) + '=' + checksum + '\n' + tail
def load_from_gpg(user_id):
"""Load existing GPG public key for `user_id` from local keyring."""
pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id])
if pubkey_bytes:
return decode.load_public_key(io.BytesIO(pubkey_bytes))
else:
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
def main():
"""Main function."""
p = argparse.ArgumentParser()
@ -288,23 +24,23 @@ def main():
format='%(asctime)s %(levelname)-10s %(message)s')
user_id = args.user_id.encode('ascii')
if not args.filename:
s = Signer(user_id=user_id, created=args.time,
curve_name=args.ecdsa_curve)
s = encode.Signer(user_id=user_id, created=args.time,
curve_name=args.ecdsa_curve)
pubkey = s.export()
ext = '.pub'
if args.armor:
pubkey = armor(pubkey, 'PUBLIC KEY BLOCK')
pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK')
ext = '.asc'
filename = s.hex_short_key_id() + ext
open(filename, 'wb').write(pubkey)
log.info('import to local keyring using "gpg2 --import %s"', filename)
else:
pubkey = load_from_gpg(user_id)
s = Signer.from_public_key(pubkey=pubkey, user_id=user_id)
pubkey = decode.load_from_gpg(user_id)
s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id)
data = open(args.filename, 'rb').read()
sig, ext = s.sign(data), '.sig'
if args.armor:
sig = armor(sig, 'SIGNATURE')
sig = encode.armor(sig, 'SIGNATURE')
ext = '.asc'
filename = args.filename + ext
open(filename, 'wb').write(sig)

@ -1,6 +1,9 @@
"""Various I/O and serialization utilities."""
import binascii
import contextlib
import io
import struct
import time
def send(conn, data):
@ -93,3 +96,84 @@ def crc24(blob):
crc_bytes = struct.pack('>L', crc)
assert crc_bytes[0] == b'\x00'
return crc_bytes[1:]
def bit(value, i):
"""Extract the i-th bit out of value."""
return 1 if value & (1 << i) else 0
def low_bits(value, n):
"""Extract the lowest n bits out of value."""
return value & ((1 << n) - 1)
def split_bits(value, *bits):
"""
Split integer value into list of ints, according to `bits` list.
For example, split_bits(0x1234, 4, 8, 4) == [0x1, 0x23, 0x4]
"""
result = []
for b in reversed(bits):
mask = (1 << b) - 1
result.append(value & mask)
value = value >> b
assert value == 0
return reversed(result)
def readfmt(stream, fmt):
"""Read and unpack an object from stream, using a struct format string."""
size = struct.calcsize(fmt)
blob = stream.read(size)
return struct.unpack(fmt, blob)
def prefix_len(fmt, blob):
"""Prefix `blob` with its size, serialized using `fmt` format."""
return struct.pack(fmt, len(blob)) + blob
def time_format(t):
"""Utility for consistent time formatting."""
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
def hexlify(blob):
"""Utility for consistent hexadecimal formatting."""
return binascii.hexlify(blob).decode('ascii').upper()
class Reader(object):
"""Read basic type objects out of given stream."""
def __init__(self, stream):
"""Create a non-capturing reader."""
self.s = stream
self._captured = None
def readfmt(self, fmt):
"""Read a specified object, using a struct format string."""
size = struct.calcsize(fmt)
blob = self.read(size)
obj, = struct.unpack(fmt, blob)
return obj
def read(self, size=None):
"""Read `size` bytes from stream."""
blob = self.s.read(size)
if size is not None and len(blob) < size:
raise EOFError
if self._captured:
self._captured.write(blob)
return blob
@contextlib.contextmanager
def capture(self, stream):
"""Capture all data read during this context."""
self._captured = stream
try:
yield
finally:
self._captured = None

Loading…
Cancel
Save