You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
python-trezor/trezorlib/client.py

420 lines
14 KiB
Python

import os
import time
import binascii
11 years ago
import hashlib
import ckd_public
import tools
import messages_pb2 as proto
import types_pb2 as types
from api_blockchain import BlockchainApi
# === start monkeypatching: text formatting of protobuf messages
import google.protobuf.text_format
import google.protobuf.descriptor
_oldPrintFieldValue = google.protobuf.text_format.PrintFieldValue
def _customPrintFieldValue(field, value, out, indent=0, as_utf8=False, as_one_line=False):
if field.cpp_type == google.protobuf.descriptor.FieldDescriptor.CPPTYPE_STRING:
if str(field.GetOptions()).strip() == '[binary]:': # binary option set
_oldPrintFieldValue(field, 'hex(%s)' % binascii.hexlify(value), out, indent, as_utf8, as_one_line)
else:
_oldPrintFieldValue(field, value, out, indent, as_utf8, as_one_line)
google.protobuf.text_format.PrintFieldValue = _customPrintFieldValue
# === end of monkeypatching
def show_message(message):
print "MESSAGE FROM DEVICE:", message
def show_input(input_text, message=None):
if message:
print "QUESTION FROM DEVICE:", message
return raw_input(input_text)
def pin_func(input_text, message=None):
return show_input(input_text, message)
def passphrase_func(input_text):
return show_input(input_text)
class CallException(Exception):
pass
class PinException(CallException):
pass
PRIME_DERIVATION_FLAG = 0x80000000
class TrezorClient(object):
def __init__(self, transport, debuglink=None,
message_func=show_message, input_func=show_input,
pin_func=pin_func, passphrase_func=passphrase_func,
blockchain_api=None, debug=False):
self.transport = transport
self.debuglink = debuglink
self.message_func = message_func
self.input_func = input_func
self.pin_func = pin_func
self.passphrase_func = passphrase_func
self.debug = debug
if blockchain_api:
self.blockchain = blockchain_api
else:
self.blockchain = BlockchainApi()
self.setup_debuglink()
self.init_device()
def _get_local_entropy(self):
return os.urandom(32)
def _convert_prime(self, n):
# Convert minus signs to uint32 with flag
return [ int(abs(x) | PRIME_DERIVATION_FLAG) if x < 0 else x for x in n ]
def expand_path(self, n):
# Convert string of bip32 path to list of uint32 integers with prime flags
# 0/-1/1' -> [0, 0x80000001, 0x80000001]
if not n:
return []
n = n.split('/')
path = []
for x in n:
prime = False
if x.endswith("'"):
x = x.replace('\'', '')
prime = True
if x.startswith('-'):
prime = True
x = abs(int(x))
if prime:
x |= PRIME_DERIVATION_FLAG
path.append(x)
return path
def init_device(self):
self.features = self.call(proto.Initialize(), proto.Features)
def close(self):
self.transport.close()
if self.debuglink:
self.debuglink.transport.close()
def get_public_node(self, n):
return self.call(proto.GetPublicKey(address_n=n), proto.PublicKey).node
def get_address(self, coin_name, n):
n = self._convert_prime(n)
return self.call(proto.GetAddress(address_n=n, coin_name=coin_name), proto.Address).address
def get_entropy(self, size):
return self.call(proto.GetEntropy(size=size), proto.Entropy).entropy
def ping(self, msg):
return self.call(proto.Ping(message=msg), proto.Success).message
def get_device_id(self):
return self.features.device_id
def apply_settings(self, label=None, language=None):
settings = proto.ApplySettings()
if label != None:
settings.label = label
if language:
settings.language = language
out = self.call(settings, proto.Success).message
self.init_device() # Reload Features
return out
def change_pin(self, remove=False):
ret = self.call(proto.ChangePin(remove=remove))
self.init_device() # Re-read features
return ret
def _pprint(self, msg):
return "<%s>:\n%s" % (msg.__class__.__name__, msg)
def setup_debuglink(self, button=None, pin_correct=False):
self.debug_button = button
self.debug_pin = pin_correct
def call(self, msg, expected = None):
if self.debug:
print '----------------------'
print "Sending", self._pprint(msg)
try:
self.transport.session_begin()
self.transport.write(msg)
resp = self.transport.read_blocking()
if isinstance(resp, proto.ButtonRequest):
if self.debuglink and self.debug_button:
print "Pressing button", self.debug_button
self.debuglink.press_button(self.debug_button)
return self.call(proto.ButtonAck())
if isinstance(resp, proto.PinMatrixRequest):
if self.debuglink:
if self.debug_pin == 1:
pin = self.debuglink.read_pin_encoded()
msg2 = proto.PinMatrixAck(pin=pin)
elif self.debug_pin == -1:
msg2 = proto.Cancel()
else:
msg2 = proto.PinMatrixAck(pin='444444222222')
else:
pin = self.pin_func("PIN required: ", resp.message)
msg2 = proto.PinMatrixAck(pin=pin)
return self.call(msg2)
if isinstance(resp, proto.PassphraseRequest):
passphrase = self.passphrase_func("Passphrase required: ")
msg2 = proto.PassphraseAck(passphrase=passphrase)
return self.call(msg2)
finally:
self.transport.session_end()
if isinstance(resp, proto.Failure):
self.message_func(resp.message)
if resp.code == types.Failure_ActionCancelled:
raise CallException("Action cancelled by user")
elif resp.code in (types.Failure_PinInvalid,
types.Failure_PinCancelled, types.Failure_PinExpected):
raise PinException("PIN is invalid")
raise CallException(resp.code, resp.message)
if self.debug:
print "Received", self._pprint(resp)
if expected and not isinstance(resp, expected):
raise CallException("Expected %s message, got %s message" % (expected.DESCRIPTOR.name, resp.DESCRIPTOR.name))
return resp
def sign_message(self, n, message):
n = self._convert_prime(n)
return self.call(proto.SignMessage(address_n=n, message=message))
def verify_message(self, address, signature, message):
try:
resp = self.call(proto.VerifyMessage(address=address, signature=signature, message=message))
if isinstance(resp, proto.Success):
return True
except CallException:
pass
return False
11 years ago
def estimate_tx_size(self, coin_name, inputs, outputs):
msg = proto.EstimateTxSize()
msg.coin_name = coin_name
msg.inputs_count = len(inputs)
msg.outputs_count = len(outputs)
res = self.call(msg)
return res.tx_size
def simple_sign_tx(self, coin_name, inputs, outputs):
msg = proto.SimpleSignTx()
msg.coin_name = coin_name
msg.inputs.extend(inputs)
msg.outputs.extend(outputs)
known_hashes = []
for inp in inputs:
if inp.prev_hash in known_hashes:
continue
tx = msg.transactions.add()
tx.CopyFrom(self.blockchain.get_tx(binascii.hexlify(inp.prev_hash)))
known_hashes.append(inp.prev_hash)
return self.call(msg)
11 years ago
def sign_tx(self, coin_name, inputs, outputs):
10 years ago
# Temporary solution, until streaming is implemented in the firmware
return self.simple_sign_tx(coin_name, inputs, outputs)
def _sign_tx(self, coin_name, inputs, outputs):
'''
inputs: list of TxInput
outputs: list of TxOutput
proto.TxInput(index=0,
address_n=0,
amount=0,
prev_hash='',
prev_index=0,
#script_sig=
)
proto.TxOutput(index=0,
address='1Bitkey',
#address_n=[],
amount=100000000,
script_type=proto.PAYTOADDRESS,
#script_args=
)
'''
start = time.time()
try:
self.transport.session_begin()
# Prepare and send initial message
tx = proto.SignTx()
tx.inputs_count = len(inputs)
11 years ago
tx.outputs_count = len(outputs)
res = self.call(tx)
# Prepare structure for signatures
signatures = [None]*len(inputs)
serialized_tx = ''
counter = 0
while True:
counter += 1
if isinstance(res, proto.Failure):
raise CallException("Signing failed")
if not isinstance(res, proto.TxRequest):
raise CallException("Unexpected message")
# If there's some part of signed transaction, let's add it
if res.serialized_tx:
print "!!! RECEIVED PART OF SERIALIED TX (%d BYTES)" % len(res.serialized_tx)
serialized_tx += res.serialized_tx
if res.signed_index >= 0 and res.signature:
print "!!! SIGNED INPUT", res.signed_index
signatures[res.signed_index] = res.signature
if res.request_index < 0:
# Device didn't ask for more information, finish workflow
break
# Device asked for one more information, let's process it.
if res.request_type == types.TXOUTPUT:
res = self.call(outputs[res.request_index])
continue
elif res.request_type == types.TXINPUT:
print "REQUESTING", res.request_index
res = self.call(inputs[res.request_index])
continue
finally:
self.transport.session_end()
print "SIGNED IN %.03f SECONDS, CALLED %d MESSAGES, %d BYTES" % \
(time.time() - start, counter, len(serialized_tx))
return (signatures, serialized_tx)
def reset_device(self, display_random, strength, passphrase_protection, pin_protection, label, language):
# Begin with device reset workflow
msg = proto.ResetDevice(display_random=display_random,
strength=strength,
language=language,
passphrase_protection=bool(passphrase_protection),
pin_protection=bool(pin_protection),
label=label
)
resp = self.call(msg)
if not isinstance(resp, proto.EntropyRequest):
raise Exception("Invalid response, expected EntropyRequest")
external_entropy = self._get_local_entropy()
print "Computer generated entropy:", binascii.hexlify(external_entropy)
resp = self.call(proto.EntropyAck(entropy=external_entropy))
return isinstance(resp, proto.Success)
def load_device_by_mnemonic(self, mnemonic, pin, passphrase_protection, label, language):
resp = self.call(proto.LoadDevice(mnemonic=mnemonic, pin=pin,
passphrase_protection=passphrase_protection,
language=language,
label=label))
self.init_device()
return isinstance(resp, proto.Success)
def load_device_by_xprv(self, xprv, pin, passphrase_protection, label):
if xprv[0:4] not in ('xprv', 'tprv'):
raise Exception("Unknown type of xprv")
if len(xprv) < 100 and len(xprv) > 112:
raise Exception("Invalid length of xprv")
node = types.HDNodeType()
data = tools.b58decode(xprv, None).encode('hex')
if data[90:92] != '00':
raise Exception("Contain invalid private key")
11 years ago
checksum = hashlib.sha256(hashlib.sha256(binascii.unhexlify(data[:156])).digest()).hexdigest()[:8]
if checksum != data[156:]:
raise Exception("Checksum doesn't match")
# version 0488ade4
# depth 00
# fingerprint 00000000
# child_num 00000000
# chaincode 873dff81c02f525623fd1fe5167eac3a55a049de3d314bb42ee227ffed37d508
# privkey 00e8f32e723decf4051aefac8e2c93c9c5b214313817cdb01a1494b917c8436b35
11 years ago
# checksum e77e9d71
node.version = int(data[0:8], 16)
node.depth = int(data[8:10], 16)
node.fingerprint = int(data[10:18], 16)
node.child_num = int(data[18:26], 16)
node.chain_code = data[26:90].decode('hex')
node.private_key = data[92:156].decode('hex') # skip 0x00 indicating privkey
11 years ago
resp = self.call(proto.LoadDevice(node=node,
pin=pin,
passphrase_protection=passphrase_protection,
language='english',
label=label))
self.init_device()
return isinstance(resp, proto.Success)
def firmware_update(self, fp):
if self.features.bootloader_mode == False:
raise Exception("Device must be in bootloader mode")
resp = self.call(proto.FirmwareErase())
if isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError:
return False
resp = self.call(proto.FirmwareUpload(payload=fp.read()))
if isinstance(resp, proto.Success):
return True
elif isinstance(resp, proto.Failure) and resp.code == types.Failure_FirmwareError:
return False
raise Exception("Unexpected result " % resp)