@ -44,7 +44,9 @@ pub mod transport {
}
pub mod behaviour {
use super ::* ;
use libp2p ::swarm ::behaviour ::toggle ::Toggle ;
use super ::{ rendezvous ::RendezvousNode , * } ;
#[ allow(clippy::large_enum_variant) ]
#[ derive(Debug) ]
@ -108,7 +110,7 @@ pub mod behaviour {
where
LR : LatestRate + Send + ' static ,
{
pub rendezvous : libp2p::swarm ::behaviour ::toggle :: Toggle< rendez ous::Behaviour > ,
pub rendezvous : Toggle< rendez v ous::Behaviour > ,
pub quote : quote ::Behaviour ,
pub swap_setup : alice ::Behaviour < LR > ,
pub transfer_proof : transfer_proof ::Behaviour ,
@ -132,25 +134,22 @@ pub mod behaviour {
resume_only : bool ,
env_config : env ::Config ,
identify_params : ( identity ::Keypair , XmrBtcNamespace ) ,
rendezvous_ params: Option < ( identity ::Keypair , PeerId , Multiaddr , XmrBtcNamespace ) > ,
rendezvous_ nodes: Vec < RendezvousNode > ,
) -> Self {
let agentVersion = format! ( "asb/{} ({})" , env! ( "CARGO_PKG_VERSION" ) , identify_params . 1 ) ;
let protocolVersion = "/comit/xmr/btc/1.0.0" . to_string ( ) ;
let identifyConfig = IdentifyConfig ::new ( protocolVersion , identify_params . 0. public ( ) )
. with_agent_version ( agentVersion ) ;
let ( identity , namespace ) = identify_params ;
let agent_version = format! ( "asb/{} ({})" , env! ( "CARGO_PKG_VERSION" ) , namespace ) ;
let protocol_version = "/comit/xmr/btc/1.0.0" . to_string ( ) ;
let identifyConfig = IdentifyConfig ::new ( protocol_version , identity . public ( ) )
. with_agent_version ( agent_version ) ;
let behaviour = if rendezvous_nodes . is_empty ( ) {
None
} else {
Some ( rendezvous ::Behaviour ::new ( identity , rendezvous_nodes ) )
} ;
Self {
rendezvous : libp2p ::swarm ::behaviour ::toggle ::Toggle ::from ( rendezvous_params . map (
| ( identity , rendezvous_peer_id , rendezvous_address , namespace ) | {
rendezous ::Behaviour ::new (
identity ,
rendezvous_peer_id ,
rendezvous_address ,
namespace ,
None , // use default ttl on rendezvous point
)
} ,
) ) ,
rendezvous : Toggle ::from ( behaviour ) ,
quote : quote ::asb ( ) ,
swap_setup : alice ::Behaviour ::new (
min_buy ,
@ -186,13 +185,14 @@ pub mod behaviour {
}
}
pub mod rendez ous {
pub mod rendez v ous {
use super ::* ;
use libp2p ::swarm ::dial_opts ::DialOpts ;
use libp2p ::swarm ::DialError ;
use std ::collections ::VecDeque ;
use std ::pin ::Pin ;
#[ derive( PartialEq)]
#[ derive( Clone, PartialEq)]
enum ConnectionStatus {
Disconnected ,
Dialling ,
@ -209,39 +209,59 @@ pub mod rendezous {
pub struct Behaviour {
inner : libp2p ::rendezvous ::client ::Behaviour ,
rendezvous_point : Multiaddr ,
rendezvous_peer_id : PeerId ,
namespace : XmrBtcNamespace ,
registration_status : RegistrationStatus ,
rendezvous_nodes : Vec < RendezvousNode > ,
to_dial : VecDeque < PeerId > ,
}
pub struct RendezvousNode {
pub address : Multiaddr ,
connection_status : ConnectionStatus ,
registration_ttl : Option < u64 > ,
pub peer_id : PeerId ,
registration_status : RegistrationStatus ,
pub registration_ttl : Option < u64 > ,
pub namespace : XmrBtcNamespace ,
}
impl Behaviour {
impl RendezvousNode {
pub fn new (
identity : identity ::Keypair ,
rendezvous_peer_id : PeerId ,
rendezvous_address : Multiaddr ,
address : & Multiaddr ,
peer_id : PeerId ,
namespace : XmrBtcNamespace ,
registration_ttl : Option < u64 > ,
) -> Self {
Self {
inner : libp2p ::rendezvous ::client ::Behaviour ::new ( identity ) ,
rendezvous_point : rendezvous_address ,
rendezvous_peer_id ,
address : address . to_owned ( ) ,
connection_status : ConnectionStatus ::Disconnected ,
namespace ,
peer_id ,
registration_status : RegistrationStatus ::RegisterOnNextConnection ,
connection_status : ConnectionStatus ::Disconnected ,
registration_ttl ,
}
}
fn register ( & mut self ) {
self . inner . register (
self . namespace . into ( ) ,
self . rendezvous_peer_id ,
self . registration_ttl ,
) ;
fn set_connection ( & mut self , status : ConnectionStatus ) {
self . connection_status = status ;
}
fn set_registration ( & mut self , status : RegistrationStatus ) {
self . registration_status = status ;
}
}
impl Behaviour {
pub fn new ( identity : identity ::Keypair , rendezvous_nodes : Vec < RendezvousNode > ) -> Self {
Self {
inner : libp2p ::rendezvous ::client ::Behaviour ::new ( identity ) ,
rendezvous_nodes ,
to_dial : VecDeque ::new ( ) ,
}
}
/// Calls the rendezvous register method of the node at node_index in the Vec of rendezvous nodes
fn register ( & mut self , node_index : usize ) {
let node = & self . rendezvous_nodes [ node_index ] ;
self . inner
. register ( node . namespace . into ( ) , node . peer_id , node . registration_ttl ) ;
}
}
@ -255,31 +275,37 @@ pub mod rendezous {
}
fn addresses_of_peer ( & mut self , peer_id : & PeerId ) -> Vec < Multiaddr > {
if peer_id = = & self . rendezvous_peer_id {
return vec! [ self . rendezvous_point . clone ( ) ] ;
for node in self . rendezvous_nodes . iter ( ) {
if peer_id = = & node . peer_id {
return vec! [ node . address . clone ( ) ] ;
}
}
vec! [ ]
}
fn inject_connected ( & mut self , peer_id : & PeerId ) {
if peer_id = = & self . rendezvous_peer_id {
self . connection_status = ConnectionStatus ::Connected ;
match & self . registration_status {
RegistrationStatus ::RegisterOnNextConnection = > {
self . register ( ) ;
self . registration_status = RegistrationStatus ::Pending ;
for i in 0 .. self . rendezvous_nodes . len ( ) {
if peer_id = = & self . rendezvous_nodes [ i ] . peer_id {
self . rendezvous_nodes [ i ] . set_connection ( ConnectionStatus ::Connected ) ;
match & self . rendezvous_nodes [ i ] . registration_status {
RegistrationStatus ::RegisterOnNextConnection = > {
self . register ( i ) ;
self . rendezvous_nodes [ i ] . set_registration ( RegistrationStatus ::Pending ) ;
}
RegistrationStatus ::Registered { .. } = > { }
RegistrationStatus ::Pending = > { }
}
RegistrationStatus ::Registered { .. } = > { }
RegistrationStatus ::Pending = > { }
}
}
}
fn inject_disconnected ( & mut self , peer_id : & PeerId ) {
if peer_id = = & self . rendezvous_peer_id {
self . connection_status = ConnectionStatus ::Disconnected ;
for i in 0 .. self . rendezvous_nodes . len ( ) {
let mut node = & mut self . rendezvous_nodes [ i ] ;
if peer_id = = & node . peer_id {
node . connection_status = ConnectionStatus ::Disconnected ;
}
}
}
@ -298,9 +324,12 @@ pub mod rendezous {
_handler : Self ::ProtocolsHandler ,
_error : & DialError ,
) {
if let Some ( id ) = peer_id {
if id = = self . rendezvous_peer_id {
self . connection_status = ConnectionStatus ::Disconnected ;
for i in 0 .. self . rendezvous_nodes . len ( ) {
let mut node = & mut self . rendezvous_nodes [ i ] ;
if let Some ( id ) = peer_id {
if id = = node . peer_id {
node . connection_status = ConnectionStatus ::Disconnected ;
}
}
}
}
@ -311,62 +340,73 @@ pub mod rendezous {
cx : & mut std ::task ::Context < ' _ > ,
params : & mut impl PollParameters ,
) -> Poll < NetworkBehaviourAction < Self ::OutEvent , Self ::ProtocolsHandler > > {
match & mut self . registration_status {
RegistrationStatus ::RegisterOnNextConnection = > match self . connection_status {
ConnectionStatus ::Disconnected = > {
self . connection_status = ConnectionStatus ::Dialling ;
return Poll ::Ready ( NetworkBehaviourAction ::Dial {
opts : DialOpts ::peer_id ( self . rendezvous_peer_id )
. condition ( PeerCondition ::Disconnected )
. build ( ) ,
handler : Self ::ProtocolsHandler ::new ( Duration ::from_secs ( 30 ) ) ,
} ) ;
}
ConnectionStatus ::Dialling = > { }
ConnectionStatus ::Connected = > {
self . registration_status = RegistrationStatus ::Pending ;
self . register ( ) ;
}
} ,
RegistrationStatus ::Registered { re_register_in } = > {
if let Poll ::Ready ( ( ) ) = re_register_in . poll_unpin ( cx ) {
match self . connection_status {
ConnectionStatus ::Connected = > {
self . registration_status = RegistrationStatus ::Pending ;
self . register ( ) ;
}
ConnectionStatus ::Disconnected = > {
self . registration_status =
RegistrationStatus ::RegisterOnNextConnection ;
return Poll ::Ready ( NetworkBehaviourAction ::Dial {
opts : DialOpts ::peer_id ( self . rendezvous_peer_id )
. condition ( PeerCondition ::Disconnected )
. build ( ) ,
handler : Self ::ProtocolsHandler ::new ( Duration ::from_secs ( 30 ) ) ,
} ) ;
if let Some ( peer_id ) = self . to_dial . pop_front ( ) {
return Poll ::Ready ( NetworkBehaviourAction ::Dial {
opts : DialOpts ::peer_id ( peer_id )
. condition ( PeerCondition ::Disconnected )
. build ( ) ,
handler : Self ::ProtocolsHandler ::new ( Duration ::from_secs ( 30 ) ) ,
} ) ;
}
// check the status of each rendezvous node
for i in 0 .. self . rendezvous_nodes . len ( ) {
let connection_status = self . rendezvous_nodes [ i ] . connection_status . clone ( ) ;
match & mut self . rendezvous_nodes [ i ] . registration_status {
RegistrationStatus ::RegisterOnNextConnection = > match connection_status {
ConnectionStatus ::Disconnected = > {
self . rendezvous_nodes [ i ] . set_connection ( ConnectionStatus ::Dialling ) ;
self . to_dial . push_back ( self . rendezvous_nodes [ i ] . peer_id ) ;
}
ConnectionStatus ::Dialling = > { }
ConnectionStatus ::Connected = > {
self . rendezvous_nodes [ i ] . set_registration ( RegistrationStatus ::Pending ) ;
self . register ( i ) ;
}
} ,
RegistrationStatus ::Registered { re_register_in } = > {
if let Poll ::Ready ( ( ) ) = re_register_in . poll_unpin ( cx ) {
match connection_status {
ConnectionStatus ::Connected = > {
self . rendezvous_nodes [ i ]
. set_registration ( RegistrationStatus ::Pending ) ;
self . register ( i ) ;
}
ConnectionStatus ::Disconnected = > {
self . rendezvous_nodes [ i ] . set_registration (
RegistrationStatus ::RegisterOnNextConnection ,
) ;
self . to_dial . push_back ( self . rendezvous_nodes [ i ] . peer_id ) ;
}
ConnectionStatus ::Dialling = > { }
}
ConnectionStatus ::Dialling = > { }
}
}
RegistrationStatus ::Pending = > { }
}
RegistrationStatus ::Pending = > { }
}
let inner_poll = self . inner . poll ( cx , params ) ;
// reset the timer if we successfully registered
// reset the timer for the specific rendezvous node if we successfully registered
if let Poll ::Ready ( NetworkBehaviourAction ::GenerateEvent (
libp2p ::rendezvous ::client ::Event ::Registered { ttl , .. } ,
libp2p ::rendezvous ::client ::Event ::Registered {
ttl ,
rendezvous_node ,
..
} ,
) ) = & inner_poll
{
let half_of_ttl = Duration ::from_secs ( * ttl ) / 2 ;
self . registration_status = RegistrationStatus ::Registered {
re_register_in : Box ::pin ( tokio ::time ::sleep ( half_of_ttl ) ) ,
} ;
if let Some ( i ) = self
. rendezvous_nodes
. iter ( )
. position ( | n | & n . peer_id = = rendezvous_node )
{
let half_of_ttl = Duration ::from_secs ( * ttl ) / 2 ;
let re_register_in = Box ::pin ( tokio ::time ::sleep ( half_of_ttl ) ) ;
let status = RegistrationStatus ::Registered { re_register_in } ;
self . rendezvous_nodes [ i ] . set_registration ( status ) ;
}
}
inner_poll
@ -380,6 +420,7 @@ pub mod rendezous {
use futures ::StreamExt ;
use libp2p ::rendezvous ;
use libp2p ::swarm ::SwarmEvent ;
use std ::collections ::HashMap ;
#[ tokio::test ]
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node (
@ -387,16 +428,16 @@ pub mod rendezous {
let mut rendezvous_node = new_swarm ( | _ , _ | {
rendezvous ::server ::Behaviour ::new ( rendezvous ::server ::Config ::default ( ) )
} ) ;
let rendezvous_address = rendezvous_node . listen_on_random_memory_address ( ) . await ;
let address = rendezvous_node . listen_on_random_memory_address ( ) . await ;
let rendezvous_point = RendezvousNode ::new (
& address ,
rendezvous_node . local_peer_id ( ) . to_owned ( ) ,
XmrBtcNamespace ::Testnet ,
None ,
) ;
let mut asb = new_swarm ( | _ , identity | {
rendezous ::Behaviour ::new (
identity ,
* rendezvous_node . local_peer_id ( ) ,
rendezvous_address ,
XmrBtcNamespace ::Testnet ,
None ,
)
super ::rendezvous ::Behaviour ::new ( identity , vec! [ rendezvous_point ] )
} ) ;
asb . listen_on_random_memory_address ( ) . await ; // this adds an external address
@ -428,16 +469,16 @@ pub mod rendezous {
rendezvous ::server ::Config ::default ( ) . with_min_ttl ( 2 ) ,
)
} ) ;
let rendezvous_address = rendezvous_node . listen_on_random_memory_address ( ) . await ;
let address = rendezvous_node . listen_on_random_memory_address ( ) . await ;
let rendezvous_point = RendezvousNode ::new (
& address ,
rendezvous_node . local_peer_id ( ) . to_owned ( ) ,
XmrBtcNamespace ::Testnet ,
Some ( 5 ) ,
) ;
let mut asb = new_swarm ( | _ , identity | {
rendezous ::Behaviour ::new (
identity ,
* rendezvous_node . local_peer_id ( ) ,
rendezvous_address ,
XmrBtcNamespace ::Testnet ,
Some ( 5 ) ,
)
super ::rendezvous ::Behaviour ::new ( identity , vec! [ rendezvous_point ] )
} ) ;
asb . listen_on_random_memory_address ( ) . await ; // this adds an external address
@ -467,5 +508,62 @@ pub mod rendezous {
. unwrap ( )
. unwrap ( ) ;
}
#[ tokio::test ]
async fn asb_registers_multiple ( ) {
let registration_ttl = Some ( 10 ) ;
let mut rendezvous_nodes = Vec ::new ( ) ;
let mut registrations = HashMap ::new ( ) ;
// register with 5 rendezvous nodes
for _ in 0 .. 5 {
let mut rendezvous = new_swarm ( | _ , _ | {
rendezvous ::server ::Behaviour ::new (
rendezvous ::server ::Config ::default ( ) . with_min_ttl ( 2 ) ,
)
} ) ;
let address = rendezvous . listen_on_random_memory_address ( ) . await ;
let id = * rendezvous . local_peer_id ( ) ;
registrations . insert ( id , 0 ) ;
rendezvous_nodes . push ( RendezvousNode ::new (
& address ,
* rendezvous . local_peer_id ( ) ,
XmrBtcNamespace ::Testnet ,
registration_ttl ,
) ) ;
tokio ::spawn ( async move {
loop {
rendezvous . next ( ) . await ;
}
} ) ;
}
let mut asb = new_swarm ( | _ , identity | {
super ::rendezvous ::Behaviour ::new ( identity , rendezvous_nodes )
} ) ;
asb . listen_on_random_memory_address ( ) . await ; // this adds an external address
let handle = tokio ::spawn ( async move {
loop {
if let SwarmEvent ::Behaviour ( rendezvous ::client ::Event ::Registered {
rendezvous_node ,
..
} ) = asb . select_next_some ( ) . await
{
registrations
. entry ( rendezvous_node )
. and_modify ( | counter | * counter + = 1 ) ;
}
if registrations . iter ( ) . all ( | ( _ , & count ) | count > = 4 ) {
break ;
}
}
} ) ;
tokio ::time ::timeout ( Duration ::from_secs ( 30 ) , handle )
. await
. unwrap ( )
. unwrap ( ) ;
}
}
}