本文整理汇总了Python中raiden.utils.privatekey_to_address函数的典型用法代码示例。如果您正苦于以下问题:Python privatekey_to_address函数的具体用法?Python privatekey_to_address怎么用?Python privatekey_to_address使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了privatekey_to_address函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Python代码示例。
示例1: test_channelmanager
def test_channelmanager(private_keys, settle_timeout, tester_channelmanager):
pkey0 = private_keys[0]
address0 = privatekey_to_address(private_keys[0])
address1 = privatekey_to_address(private_keys[1])
address2 = privatekey_to_address(private_keys[2])
total_pairs = 0
for addr in [address1, address2]:
channel_address_hex = tester_channelmanager.newChannel(
addr,
settle_timeout,
sender=pkey0,
)
assert tester_channelmanager.getChannelWith(addr, sender=pkey0) == channel_address_hex
total_pairs += 2
participant_pairs = tester_channelmanager.getChannelsParticipants(sender=pkey0)
assert len(participant_pairs) == total_pairs
# pylint: disable=no-member
addr0_channels = tester_channelmanager.nettingContractsByAddress(address0, sender=pkey0)
addr1_channels = tester_channelmanager.nettingContractsByAddress(address1, sender=pkey0)
nonexisting_address = sha3(b'this_does_not_exist')[:20]
nonaddr_channels = tester_channelmanager.nettingContractsByAddress(
nonexisting_address,
sender=pkey0,
)
assert len(addr0_channels) == 2
assert len(addr1_channels) == 1
assert not nonaddr_channels
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:33,代码来源:test_channel_manager.py
示例2: test_new_channel_state
def test_new_channel_state(private_keys, tester_chain, tester_channelmanager):
""" Tests the state of a newly created netting channel. """
pkey0, pkey1 = private_keys
events = list()
settle_timeout = 10
channel = new_nettingcontract(
pkey0,
pkey1,
tester_chain,
events.append,
tester_channelmanager,
settle_timeout,
)
# pylint: disable=no-member
assert channel.settleTimeout(sender=pkey0) == settle_timeout
assert channel.tokenAddress(sender=pkey0) == tester_channelmanager.tokenAddress(sender=pkey0)
assert channel.opened(sender=pkey0) == tester_chain.block.number - 1
assert channel.closed(sender=pkey0) == 0
address_and_balances = channel.addressAndBalance(sender=pkey0)
address0 = privatekey_to_address(pkey0)
address1 = privatekey_to_address(pkey1)
assert address_and_balances[0] == address_encoder(address0)
assert address_and_balances[1] == 0
assert address_and_balances[2] == address_encoder(address1)
assert address_and_balances[3] == 0
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:29,代码来源:test_channel_manager.py
示例3: test_channelnew_event
def test_channelnew_event(
settle_timeout,
tester_channelmanager,
private_keys,
tester_events):
""" When a new channel is created the channel new event must be emitted. """
pkey0 = private_keys[0]
address0 = privatekey_to_address(pkey0)
address1 = privatekey_to_address(private_keys[1])
# pylint: disable=no-member
netting_channel_address1_hex = tester_channelmanager.newChannel(
address1,
settle_timeout,
sender=pkey0,
)
last_event = event_decoder(tester_events[-1], tester_channelmanager.translator)
assert last_event == {
'_event_type': b'ChannelNew',
'netting_channel': netting_channel_address1_hex,
'participant1': address_encoder(address0),
'participant2': address_encoder(address1),
'settle_timeout': settle_timeout,
}
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:26,代码来源:test_channel_manager.py
示例4: geth_wait_and_check
def geth_wait_and_check(privatekeys, rpc_ports):
""" Wait until the geth cluster is ready. """
address = address_encoder(privatekey_to_address(privatekeys[0]))
jsonrpc_running = False
tries = 5
rpc_port = rpc_ports[0]
jsonrpc_client = JSONRPCClient(host="0.0.0.0", port=rpc_port, privkey=privatekeys[0], print_communication=False)
while not jsonrpc_running and tries > 0:
try:
jsonrpc_client.call("eth_getBalance", address, "latest")
jsonrpc_running = True
except ConnectionError:
gevent.sleep(0.5)
tries -= 1
if jsonrpc_running is False:
raise ValueError("geth didnt start the jsonrpc interface")
for key in set(privatekeys):
address = address_encoder(privatekey_to_address(key))
jsonrpc_client = JSONRPCClient(host="0.0.0.0", port=rpc_port, privkey=key, print_communication=False)
tries = 10
balance = "0x0"
while balance == "0x0" and tries > 0:
balance = jsonrpc_client.call("eth_getBalance", address, "latest")
gevent.sleep(1)
tries -= 1
if balance == "0x0":
raise ValueError("account is with a balance of 0")
开发者ID:raiden-network,项目名称:raiden,代码行数:32,代码来源:blockchain.py
示例5: test_two_direct_transfers
def test_two_direct_transfers(
settle_timeout,
deposit,
tester_chain,
tester_channels,
tester_token):
""" The value of both transfers must be account for. """
pkey0, pkey1, nettingchannel, channel0, channel1 = tester_channels[0]
address0 = privatekey_to_address(pkey0)
address1 = privatekey_to_address(pkey1)
initial0 = tester_token.balanceOf(address0, sender=pkey0)
initial1 = tester_token.balanceOf(address1, sender=pkey0)
first_amount0 = 90
block_number = tester_chain.block.number
make_direct_transfer_from_channel(
block_number,
channel0,
channel1,
first_amount0,
pkey0,
)
second_amount0 = 90
block_number = tester_chain.block.number
second_direct0 = make_direct_transfer_from_channel(
block_number,
channel0,
channel1,
second_amount0,
pkey0,
)
nettingchannel.close(sender=pkey0)
second_direct0_hash = sha3(second_direct0.packed().data[:-65])
nettingchannel.updateTransfer(
second_direct0.nonce,
second_direct0.transferred_amount,
second_direct0.locksroot,
second_direct0_hash,
second_direct0.signature,
sender=pkey1,
)
tester_chain.mine(number_of_blocks=settle_timeout + 1)
nettingchannel.settle(sender=pkey0)
balance0 = initial0 + deposit - first_amount0 - second_amount0
balance1 = initial1 + deposit + first_amount0 + second_amount0
assert tester_token.balanceOf(address0, sender=pkey0) == balance0
assert tester_token.balanceOf(address1, sender=pkey0) == balance1
assert tester_token.balanceOf(nettingchannel.address, sender=pkey0) == 0
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:57,代码来源:test_settle.py
示例6: test_settle_with_locked_mediated_transfer_for_counterparty
def test_settle_with_locked_mediated_transfer_for_counterparty(
deposit,
settle_timeout,
reveal_timeout,
tester_chain,
tester_channels,
tester_token):
""" Test settle with a locked mediated transfer for the counter party. """
pkey0, pkey1, nettingchannel, channel0, channel1 = tester_channels[0]
address0 = privatekey_to_address(pkey0)
address1 = privatekey_to_address(pkey1)
initial0 = tester_token.balanceOf(address0, sender=pkey0)
initial1 = tester_token.balanceOf(address1, sender=pkey0)
transferred_amount0 = 30
increase_transferred_amount(channel0, channel1, transferred_amount0)
expiration0 = tester_chain.block.number + reveal_timeout + 5
new_block = Block(tester_chain.block.number)
channel0.state_transition(new_block)
channel1.state_transition(new_block)
lock0 = Lock(amount=29, expiration=expiration0, hashlock=sha3(b'lock1'))
mediated0 = make_mediated_transfer(
channel0,
channel1,
address0,
address1,
lock0,
pkey0,
tester_chain.block.number,
)
nettingchannel.close(sender=pkey0)
mediated0_hash = sha3(mediated0.packed().data[:-65])
nettingchannel.updateTransfer(
mediated0.nonce,
mediated0.transferred_amount,
mediated0.locksroot,
mediated0_hash,
mediated0.signature,
sender=pkey1,
)
tester_chain.mine(number_of_blocks=settle_timeout + 1)
nettingchannel.settle(sender=pkey1)
# the balances only change by transferred_amount because the lock was /not/ unlocked
balance0 = initial0 + deposit - transferred_amount0
balance1 = initial1 + transferred_amount0
assert tester_token.balanceOf(nettingchannel.address, sender=pkey0) == 0
assert tester_token.balanceOf(address0, sender=pkey0) == balance0
assert tester_token.balanceOf(address1, sender=pkey0) == balance1
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:57,代码来源:test_settle.py
示例7: __init__
def __init__(self, private_key, tester_chain):
self.tester_chain = tester_chain
self.address = privatekey_to_address(private_key)
self.private_key = private_key
self.node_address = privatekey_to_address(private_key)
self.address_to_token = dict()
self.address_to_discovery = dict()
self.address_to_nettingchannel = dict()
self.address_to_registry = dict()
self.client = ClientMock()
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:12,代码来源:tester_client.py
示例8: test_reopen_channel
def test_reopen_channel(
private_keys,
settle_timeout,
tester_channelmanager,
tester_chain):
""" A new channel can be opened after the old one is settled. When this
happens the channel manager must update its internal data structures to
point to the new channel address.
"""
log = list()
pkey = private_keys[0]
channels = list()
old_channel_addresses = list()
for partner_pkey in private_keys[1:]:
nettingcontract = new_nettingcontract(
pkey, partner_pkey, tester_chain, log.append, tester_channelmanager, settle_timeout,
)
channels.append(nettingcontract)
address = privatekey_to_address(partner_pkey)
channel_address = tester_channelmanager.getChannelWith(
address,
sender=pkey,
)
old_channel_addresses.append(channel_address)
for nettingchannel in channels:
netting_channel_settled(
tester_chain,
nettingchannel,
pkey,
settle_timeout,
)
channels = list()
for partner_pkey in private_keys[1:]:
nettingcontract = new_nettingcontract(
pkey, partner_pkey, tester_chain, log.append, tester_channelmanager, settle_timeout,
)
channels.append(nettingcontract)
# there must be a single entry for each participant
for partner_pkey in private_keys[1:]:
address = privatekey_to_address(partner_pkey)
channel_address = tester_channelmanager.getChannelWith(
address,
sender=pkey,
)
assert channel_address
assert channel_address not in old_channel_addresses
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:52,代码来源:test_channel_manager.py
示例9: test_deposit
def test_deposit(private_keys, tester_channelmanager, tester_chain, tester_token):
""" A call to deposit must increase the available token amount in the
netting channel.
"""
pkey0 = private_keys[0]
pkey1 = private_keys[1]
address0 = address_encoder(privatekey_to_address(pkey0))
address1 = address_encoder(privatekey_to_address(pkey1))
settle_timeout = 10
events = list()
# not using the tester_nettingcontracts fixture because it has a set balance
channel = new_nettingcontract(
pkey0,
pkey1,
tester_chain,
events.append,
tester_channelmanager,
settle_timeout,
)
deposit = 100
# cannot deposit without approving
assert channel.deposit(deposit, sender=pkey0) is False
assert tester_token.approve(channel.address, deposit, sender=pkey0) is True
# cannot deposit negative values
with pytest.raises(abi.ValueOutOfBounds):
channel.deposit(-1, sender=pkey0)
zero_state = (address0, 0, address1, 0)
assert tuple(channel.addressAndBalance(sender=pkey0)) == zero_state
assert channel.deposit(deposit, sender=pkey0) is True
deposit_state = (address0, deposit, address1, 0)
assert tuple(channel.addressAndBalance(sender=pkey0)) == deposit_state
assert tester_token.balanceOf(channel.address, sender=pkey0) == deposit
# cannot over deposit (the allowance is depleted)
assert channel.deposit(deposit, sender=pkey0) is False
assert tester_token.approve(channel.address, deposit, sender=pkey0) is True
assert channel.deposit(deposit, sender=pkey0) is True
second_deposit_state = (address0, deposit * 2, address1, 0)
assert tuple(channel.addressAndBalance(sender=pkey0)) == second_deposit_state
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:50,代码来源:test_deposit.py
示例10: test_for_issue_892
def test_for_issue_892(
private_keys,
settle_timeout,
tester_channelmanager,
tester_chain,
tester_events):
"""
This is a regression test for issue #892 (https://github.com/raiden-network/raiden/issues/892)
where the `getChannelsParticipants()` call was returning an empty list if one channel from
the channel manager has been settled
"""
pairs = itertools.combinations(private_keys, 2)
participant_pairs = []
first_pair = True
for pkey0, pkey1 in pairs:
address0 = privatekey_to_address(pkey0)
address1 = privatekey_to_address(pkey1)
channel_address_hex = tester_channelmanager.newChannel(
address1,
settle_timeout,
sender=pkey0,
)
tester_chain.mine()
assert tester_channelmanager.getChannelWith(address1, sender=pkey0) == channel_address_hex
assert tester_channelmanager.getChannelWith(address0, sender=pkey1) == channel_address_hex
if first_pair:
first_pair = False
nettingchannel = create_nettingchannel_proxy(
tester_chain,
channel_address_hex,
tester_events.append,
)
nettingchannel.close(sender=pkey0)
tester_chain.mine(number_of_blocks=settle_timeout + 2)
nettingchannel.settle(sender=pkey1)
else:
# this is brittle, relying on an implicit ordering of addresses
participant_pairs.extend((
address_encoder(address0),
address_encoder(address1),
))
assert participant_pairs == tester_channelmanager.getChannelsParticipants(sender=pkey0)
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:49,代码来源:test_channel_manager.py
示例11: test_withdraw_twice
def test_withdraw_twice(reveal_timeout, tester_channels, tester_chain):
""" A lock can be withdrawn only once, the second try must fail. """
pkey0, pkey1, nettingchannel, channel0, channel1 = tester_channels[0]
lock_expiration = tester_chain.block.number + reveal_timeout + 5
secret = b'secretsecretsecretsecretsecretse'
new_block = Block(tester_chain.block.number)
channel0.state_transition(new_block)
channel1.state_transition(new_block)
lock = Lock(17, lock_expiration, sha3(secret))
mediated0 = make_mediated_transfer(
channel1,
channel0,
privatekey_to_address(pkey1),
privatekey_to_address(pkey0),
lock,
pkey1,
tester_chain.block.number,
secret,
)
mediated0_hash = sha3(mediated0.packed().data[:-65])
nettingchannel.close(
mediated0.nonce,
mediated0.transferred_amount,
mediated0.locksroot,
mediated0_hash,
mediated0.signature,
sender=pkey0,
)
unlock_proofs = list(channel0.partner_state.get_known_unlocks())
proof = unlock_proofs[0]
nettingchannel.withdraw(
proof.lock_encoded,
b''.join(proof.merkle_proof),
proof.secret,
sender=pkey0,
)
with pytest.raises(TransactionFailed):
nettingchannel.withdraw(
proof.lock_encoded,
b''.join(proof.merkle_proof),
proof.secret,
sender=pkey0,
)
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:49,代码来源:test_withdraw.py
示例12: token_addresses
def token_addresses(
token_amount,
number_of_tokens,
private_keys,
deploy_service,
token_network_registry_address,
register_tokens,
contract_manager,
) -> typing.List[typing.Address]:
""" Fixture that yields `number_of_tokens` ERC20 token addresses, where the
`token_amount` (per token) is distributed among the addresses behind `deploy_client` and
potentially pre-registered with the raiden Registry.
The following arguments can control the behavior:
Args:
token_amount (int): the overall number of units minted per token
number_of_tokens (int): the number of token instances
register_tokens (bool): controls if tokens will be registered with raiden Registry
"""
participants = [privatekey_to_address(key) for key in private_keys]
token_addresses = deploy_tokens_and_fund_accounts(
token_amount=token_amount,
number_of_tokens=number_of_tokens,
deploy_service=deploy_service,
participants=participants,
contract_manager=contract_manager,
)
if register_tokens:
for token in token_addresses:
deploy_service.token_network_registry(token_network_registry_address).add_token(token)
return token_addresses
开发者ID:hackaugusto,项目名称:raiden,代码行数:34,代码来源:smartcontracts.py
示例13: test_close_wrong_channel
def test_close_wrong_channel(tester_channels):
""" Close must not accept a transfer aimed at a different channel. """
pkey0, pkey1, nettingchannel, channel0, _ = tester_channels[0]
opened_block = nettingchannel.opened(sender=pkey0)
wrong_address = make_address()
# make a transfer where the recipient is totally wrong
transfer_wrong_channel = DirectTransfer(
identifier=1,
nonce=1 + (opened_block * (2 ** 32)),
token=channel0.token_address,
channel=wrong_address,
transferred_amount=10,
recipient=channel0.our_address,
locksroot=EMPTY_MERKLE_ROOT,
)
transfer_wrong_channel.sign(PrivateKey(pkey1), privatekey_to_address(pkey1))
transfer_wrong_channel_hash = sha3(transfer_wrong_channel.packed().data[:-65])
with pytest.raises(TransactionFailed):
nettingchannel.close(
transfer_wrong_channel.nonce,
transfer_wrong_channel.transferred_amount,
transfer_wrong_channel.locksroot,
transfer_wrong_channel_hash,
transfer_wrong_channel.signature,
sender=pkey0,
)
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:29,代码来源:test_close.py
示例14: __init__
def __init__(
self,
jsonrpc_client,
secret_registry_address,
):
if not is_binary_address(secret_registry_address):
raise InvalidAddress('Expected binary address format for secret registry')
check_address_has_code(jsonrpc_client, secret_registry_address, CONTRACT_SECRET_REGISTRY)
proxy = jsonrpc_client.new_contract_proxy(
CONTRACT_MANAGER.get_contract_abi(CONTRACT_SECRET_REGISTRY),
to_normalized_address(secret_registry_address),
)
if not compare_versions(
proxy.contract.functions.contract_version().call(),
EXPECTED_CONTRACTS_VERSION,
):
raise ContractVersionMismatch('Incompatible ABI for SecretRegistry')
self.address = secret_registry_address
self.proxy = proxy
self.client = jsonrpc_client
self.node_address = privatekey_to_address(self.client.privkey)
self.open_secret_transactions = dict()
开发者ID:AlphaX-IBS,项目名称:raiden,代码行数:26,代码来源:secret_registry.py
示例15: test_update_must_fail_with_a_channel_address
def test_update_must_fail_with_a_channel_address(tester_channels, private_keys):
""" updateTransfer must not accept a transfer signed with the wrong channel address. """
pkey0, pkey1, nettingchannel, channel0, channel1 = tester_channels[0]
opened_block = nettingchannel.opened(sender=pkey0)
wrong_channel = make_address()
# make a transfer where pkey1 is the target
transfer_wrong_recipient = DirectTransfer(
identifier=1,
nonce=1 + (opened_block * (2 ** 32)),
token=channel0.token_address,
channel=wrong_channel,
transferred_amount=10,
recipient=channel1.our_address,
locksroot=EMPTY_MERKLE_ROOT,
)
our_address = privatekey_to_address(pkey0)
our_sign_key = PrivateKey(pkey0)
transfer_wrong_recipient.sign(our_sign_key, our_address)
nettingchannel.close(sender=pkey0)
transfer_wrong_recipient_hash = sha3(transfer_wrong_recipient.packed().data[:-65])
with pytest.raises(TransactionFailed):
nettingchannel.updateTransfer(
transfer_wrong_recipient.nonce,
transfer_wrong_recipient.transferred_amount,
transfer_wrong_recipient.locksroot,
transfer_wrong_recipient_hash,
transfer_wrong_recipient.signature,
sender=pkey1,
)
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:34,代码来源:test_updatetransfer.py
示例16: test_endpointregistry
def test_endpointregistry(private_keys, blockchain_services, contract_manager):
chain = blockchain_services.blockchain_services[0]
my_address = privatekey_to_address(private_keys[0])
endpointregistry_address = deploy_contract_web3(
contract_name=CONTRACT_ENDPOINT_REGISTRY,
deploy_client=chain.client,
contract_manager=contract_manager,
)
discovery_proxy = chain.discovery(endpointregistry_address)
contract_discovery = ContractDiscovery(my_address, discovery_proxy)
unregistered_address = make_address()
# get should raise for unregistered addresses
with pytest.raises(UnknownAddress):
contract_discovery.get(my_address)
with pytest.raises(UnknownAddress):
contract_discovery.get(unregistered_address)
contract_discovery.register(my_address, '127.0.0.1', 44444)
assert contract_discovery.get(my_address) == ('127.0.0.1', 44444)
contract_discovery.register(my_address, '127.0.0.1', 88888)
assert contract_discovery.get(my_address) == ('127.0.0.1', 88888)
with pytest.raises(UnknownAddress):
contract_discovery.get(unregistered_address)
开发者ID:hackaugusto,项目名称:raiden,代码行数:30,代码来源:test_endpointregistry.py
示例17: detail
def detail(self):
""" FIXME: 'our_address' is only needed for the pure python mock implementation """
self._check_exists()
data = self.proxy.addressAndBalance()
settle_timeout = self.proxy.settleTimeout()
our_address = privatekey_to_address(self.private_key)
if address_decoder(data[0]) == our_address:
return {
'our_address': address_decoder(data[0]),
'our_balance': data[1],
'partner_address': address_decoder(data[2]),
'partner_balance': data[3],
'settle_timeout': settle_timeout,
}
if address_decoder(data[2]) == our_address:
return {
'our_address': address_decoder(data[2]),
'our_balance': data[3],
'partner_address': address_decoder(data[0]),
'partner_balance': data[1],
'settle_timeout': settle_timeout,
}
raise ValueError('We [{}] are not a participant of the given channel ({}, {})'.format(
pex(our_address),
data[0],
data[2],
))
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:31,代码来源:tester_client.py
示例18: new_nettingcontract
def new_nettingcontract(
our_key,
partner_key,
tester_chain,
log_listener,
channelmanager,
settle_timeout):
invalid_timeout = (
settle_timeout < NETTINGCHANNEL_SETTLE_TIMEOUT_MIN or
settle_timeout > NETTINGCHANNEL_SETTLE_TIMEOUT_MAX
)
if invalid_timeout:
raise ValueError('settle_timeout must be in range [{}, {}]'.format(
NETTINGCHANNEL_SETTLE_TIMEOUT_MIN, NETTINGCHANNEL_SETTLE_TIMEOUT_MAX
))
netting_channel_address0_hex = channelmanager.newChannel(
privatekey_to_address(partner_key),
settle_timeout,
sender=our_key,
)
tester_chain.mine(number_of_blocks=1)
nettingchannel = create_nettingchannel_proxy(
tester_chain,
netting_channel_address0_hex,
log_listener,
)
return nettingchannel
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:31,代码来源:tester.py
示例19: geth_wait_and_check
def geth_wait_and_check(deploy_client, privatekeys, random_marker):
""" Wait until the geth cluster is ready. """
jsonrpc_running = False
tries = 5
while not jsonrpc_running and tries > 0:
try:
block = deploy_client.call('eth_getBlockByNumber', '0x0', True)
except ConnectionError:
gevent.sleep(0.5)
tries -= 1
else:
jsonrpc_running = True
running_marker = block['extraData'][2:len(random_marker) + 2]
if running_marker != random_marker:
raise RuntimeError(
'the test marker does not match, maybe two tests are running in '
'parallel with the same port?'
)
if jsonrpc_running is False:
raise ValueError('geth didnt start the jsonrpc interface')
for key in sorted(set(privatekeys)):
address = address_encoder(privatekey_to_address(key))
tries = 10
balance = '0x0'
while balance == '0x0' and tries > 0:
balance = deploy_client.call('eth_getBalance', address, 'latest')
gevent.sleep(1)
tries -= 1
if balance == '0x0':
raise ValueError('account is with a balance of 0')
开发者ID:destenson,项目名称:raiden-network--raiden,代码行数:35,代码来源:blockchain.py
示例20: __init__
def __init__(
self,
jsonrpc_client,
manager_address,
):
if not is_binary_address(manager_address):
raise InvalidAddress('Expected binary address format for token nework')
check_address_has_code(jsonrpc_client, manager_address, CONTRACT_TOKEN_NETWORK)
proxy = jsonrpc_client.new_contract_proxy(
CONTRACT_MANAGER.get_contract_abi(CONTRACT_TOKEN_NETWORK),
to_normalized_address(manager_address),
)
is_good_version = compare_versions(
proxy.contract.functions.contract_version().call(),
EXPECTED_CONTRACTS_VERSION,
)
if not is_good_version:
raise ContractVersionMismatch('Incompatible ABI for TokenNetwork')
self.address = manager_address
self.proxy = proxy
self.client = jsonrpc_client
self.node_address = privatekey_to_address(self.client.privkey)
self.open_channel_transactions = dict()
# Forbids concurrent operations on the same channel
self.channel_operations_lock = defaultdict(RLock)
# Serializes concurent deposits on this token network. This must be an
# exclusive lock, since we need to coordinate the approve and
# setTotalDeposit calls.
self.deposit_lock = Semaphore()
开发者ID:AlphaX-IBS,项目名称:raiden,代码行数:35,代码来源:token_network.py
注:本文中的raiden.utils.privatekey_to_address函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论