|
|
|
@ -218,9 +218,10 @@ def parse_packets(stream):
|
|
|
|
|
|
|
|
|
|
See https://tools.ietf.org/html/rfc4880#section-4.2 for details.
|
|
|
|
|
"""
|
|
|
|
|
reader = util.Reader(stream)
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
value = stream.readfmt('B')
|
|
|
|
|
value = reader.readfmt('B')
|
|
|
|
|
except EOFError:
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
@ -232,20 +233,20 @@ def parse_packets(stream):
|
|
|
|
|
length_type = util.low_bits(tag, 2)
|
|
|
|
|
tag = tag >> 2
|
|
|
|
|
fmt = {0: '>B', 1: '>H', 2: '>L'}[length_type]
|
|
|
|
|
packet_size = stream.readfmt(fmt)
|
|
|
|
|
packet_size = reader.readfmt(fmt)
|
|
|
|
|
else:
|
|
|
|
|
first = stream.readfmt('B')
|
|
|
|
|
first = reader.readfmt('B')
|
|
|
|
|
if first < 192:
|
|
|
|
|
packet_size = first
|
|
|
|
|
elif first < 224:
|
|
|
|
|
packet_size = ((first - 192) << 8) + stream.readfmt('B') + 192
|
|
|
|
|
packet_size = ((first - 192) << 8) + reader.readfmt('B') + 192
|
|
|
|
|
elif first == 255:
|
|
|
|
|
packet_size = stream.readfmt('>L')
|
|
|
|
|
packet_size = reader.readfmt('>L')
|
|
|
|
|
else:
|
|
|
|
|
log.error('Partial Body Lengths unsupported')
|
|
|
|
|
|
|
|
|
|
log.debug('packet length: %d', packet_size)
|
|
|
|
|
packet_data = stream.read(packet_size)
|
|
|
|
|
packet_data = reader.read(packet_size)
|
|
|
|
|
packet_type = PACKET_TYPES.get(tag)
|
|
|
|
|
|
|
|
|
|
if packet_type is not None:
|
|
|
|
@ -281,7 +282,7 @@ HASH_ALGORITHMS = {
|
|
|
|
|
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
|
|
|
|
|
"""Parse and validate GPG public key from an input stream."""
|
|
|
|
|
stream = io.BytesIO(pubkey_bytes)
|
|
|
|
|
packets = list(parse_packets(util.Reader(stream)))
|
|
|
|
|
packets = list(parse_packets(stream))
|
|
|
|
|
pubkey, userid, signature = packets[:3]
|
|
|
|
|
packets = packets[3:]
|
|
|
|
|
|
|
|
|
@ -318,7 +319,7 @@ def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
|
|
|
|
|
|
|
|
|
|
def load_signature(stream, original_data):
|
|
|
|
|
"""Load signature from stream, and compute GPG digest for verification."""
|
|
|
|
|
signature, = list(parse_packets(util.Reader(stream)))
|
|
|
|
|
signature, = list(parse_packets((stream)))
|
|
|
|
|
hash_alg = HASH_ALGORITHMS[signature['hash_alg']]
|
|
|
|
|
digest = digest_packets([{'_to_hash': original_data}, signature],
|
|
|
|
|
hasher=hashlib.new(hash_alg))
|
|
|
|
|