diff --git a/libi2pd/Transports.cpp b/libi2pd/Transports.cpp index faba89b9..de72d01f 100644 --- a/libi2pd/Transports.cpp +++ b/libi2pd/Transports.cpp @@ -457,6 +457,7 @@ namespace transport return; } if(RoutesRestricted() && !IsRestrictedPeer(ident)) return; + std::shared_ptr peer; auto it = m_Peers.find (ident); if (it == m_Peers.end ()) { @@ -470,10 +471,12 @@ namespace transport if (r && (r->IsUnreachable () || !r->IsReachableFrom (i2p::context.GetRouterInfo ()))) return; // router found but non-reachable { auto ts = i2p::util::GetSecondsSinceEpoch (); + peer = std::make_shared(r, ts); std::unique_lock l(m_PeersMutex); - it = m_Peers.insert (std::pair(ident, {r, ts})).first; + peer = m_Peers.emplace (ident, peer).first->second; } - connected = ConnectToPeer (ident, it->second); + if (peer) + connected = ConnectToPeer (ident, peer); } catch (std::exception& ex) { @@ -481,11 +484,15 @@ namespace transport } if (!connected) return; } - if (it->second.IsConnected ()) - it->second.sessions.front ()->SendI2NPMessages (msgs); + else + peer = it->second; + + if (!peer) return; + if (peer->IsConnected ()) + peer->sessions.front ()->SendI2NPMessages (msgs); else { - auto sz = it->second.delayedMessages.size (); + auto sz = peer->delayedMessages.size (); if (sz < MAX_NUM_DELAYED_MESSAGES) { if (sz < CHECK_PROFILE_NUM_DELAYED_MESSAGES && sz + msgs.size () >= CHECK_PROFILE_NUM_DELAYED_MESSAGES) @@ -494,7 +501,7 @@ namespace transport { LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); std::unique_lock l(m_PeersMutex); - m_Peers.erase (it); + m_Peers.erase (ident); return; } } @@ -502,30 +509,30 @@ namespace transport if (sz > MAX_NUM_DELAYED_MESSAGES/2 && it1->onDrop) it1->Drop (); // drop earlier because we can handle it else - it->second.delayedMessages.push_back (it1); + peer->delayedMessages.push_back (it1); } else { LogPrint (eLogWarning, "Transports: Delayed messages queue size to ", ident.ToBase64 (), " exceeds ", MAX_NUM_DELAYED_MESSAGES); std::unique_lock l(m_PeersMutex); - m_Peers.erase (it); + m_Peers.erase (ident); } } } - bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer) + bool Transports::ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer) { - if (!peer.router) // reconnect - peer.SetRouter (netdb.FindRouter (ident)); // try to get new one from netdb - if (peer.router) // we have RI already + if (!peer->router) // reconnect + peer->SetRouter (netdb.FindRouter (ident)); // try to get new one from netdb + if (peer->router) // we have RI already { - if (peer.priority.empty ()) + if (peer->priority.empty ()) SetPriority (peer); - while (peer.numAttempts < (int)peer.priority.size ()) + while (peer->numAttempts < (int)peer->priority.size ()) { - auto tr = peer.priority[peer.numAttempts]; - peer.numAttempts++; + auto tr = peer->priority[peer->numAttempts]; + peer->numAttempts++; switch (tr) { case i2p::data::RouterInfo::eNTCP2V4: @@ -533,12 +540,12 @@ namespace transport { if (!m_NTCP2Server) continue; std::shared_ptr address = (tr == i2p::data::RouterInfo::eNTCP2V6) ? - peer.router->GetPublishedNTCP2V6Address () : peer.router->GetPublishedNTCP2V4Address (); + peer->router->GetPublishedNTCP2V6Address () : peer->router->GetPublishedNTCP2V4Address (); if (address && IsInReservedRange(address->host)) address = nullptr; if (address) { - auto s = std::make_shared (*m_NTCP2Server, peer.router, address); + auto s = std::make_shared (*m_NTCP2Server, peer->router, address); if( m_NTCP2Server->UsingProxy()) m_NTCP2Server->ConnectWithProxy(s); else @@ -552,12 +559,12 @@ namespace transport { if (!m_SSU2Server) continue; std::shared_ptr address = (tr == i2p::data::RouterInfo::eSSU2V6) ? - peer.router->GetSSU2V6Address () : peer.router->GetSSU2V4Address (); + peer->router->GetSSU2V6Address () : peer->router->GetSSU2V4Address (); if (address && IsInReservedRange(address->host)) address = nullptr; if (address && address->IsReachableSSU ()) { - if (m_SSU2Server->CreateSession (peer.router, address)) + if (m_SSU2Server->CreateSession (peer->router, address)) return true; } break; @@ -565,10 +572,10 @@ namespace transport case i2p::data::RouterInfo::eNTCP2V6Mesh: { if (!m_NTCP2Server) continue; - auto address = peer.router->GetYggdrasilAddress (); + auto address = peer->router->GetYggdrasilAddress (); if (address) { - auto s = std::make_shared (*m_NTCP2Server, peer.router, address); + auto s = std::make_shared (*m_NTCP2Server, peer->router, address); m_NTCP2Server->Connect (s); return true; } @@ -580,9 +587,9 @@ namespace transport } LogPrint (eLogInfo, "Transports: No compatible addresses available"); - if (peer.router->IsReachableFrom (i2p::context.GetRouterInfo ())) + if (peer->router->IsReachableFrom (i2p::context.GetRouterInfo ())) i2p::data::netdb.SetUnreachable (ident, true); // we are here because all connection attempts failed but router claimed them - peer.Done (); + peer->Done (); std::unique_lock l(m_PeersMutex); m_Peers.erase (ident); return false; @@ -590,7 +597,7 @@ namespace transport else if (i2p::data::IsRouterBanned (ident)) { LogPrint (eLogWarning, "Transports: Router ", ident.ToBase64 (), " is banned. Peer dropped"); - peer.Done (); + peer->Done (); std::unique_lock l(m_PeersMutex); m_Peers.erase (ident); return false; @@ -604,7 +611,7 @@ namespace transport return true; } - void Transports::SetPriority (Peer& peer) const + void Transports::SetPriority (std::shared_ptr peer) const { static const std::vector ntcp2Priority = @@ -623,13 +630,13 @@ namespace transport i2p::data::RouterInfo::eNTCP2V4, i2p::data::RouterInfo::eNTCP2V6Mesh }; - if (!peer.router) return; + if (!peer || !peer->router) return; auto compatibleTransports = context.GetRouterInfo ().GetCompatibleTransports (false) & - peer.router->GetCompatibleTransports (true); - auto directTransports = compatibleTransports & peer.router->GetPublishedTransports (); - peer.numAttempts = 0; - peer.priority.clear (); - bool isReal = peer.router->GetProfile ()->IsReal (); + peer->router->GetCompatibleTransports (true); + auto directTransports = compatibleTransports & peer->router->GetPublishedTransports (); + peer->numAttempts = 0; + peer->priority.clear (); + bool isReal = peer->router->GetProfile ()->IsReal (); bool ssu2 = isReal ? (rand () & 1) : false; // try NTCP2 if router is not confirmed real const auto& priority = ssu2 ? ssu2Priority : ntcp2Priority; if (directTransports) @@ -643,7 +650,7 @@ namespace transport } for (auto transport: priority) if (transport & directTransports) - peer.priority.push_back (transport); + peer->priority.push_back (transport); compatibleTransports &= ~directTransports; } if (compatibleTransports) @@ -651,7 +658,7 @@ namespace transport // then remaining for (auto transport: priority) if (transport & compatibleTransports) - peer.priority.push_back (transport); + peer->priority.push_back (transport); } } @@ -668,8 +675,8 @@ namespace transport if (r) { LogPrint (eLogDebug, "Transports: RouterInfo for ", ident.ToBase64 (), " found, trying to connect"); - it->second.SetRouter (r); - if (!it->second.IsConnected ()) + it->second->SetRouter (r); + if (!it->second->IsConnected ()) ConnectToPeer (ident, it->second); } else @@ -796,31 +803,32 @@ namespace transport auto it = m_Peers.find (ident); if (it != m_Peers.end ()) { - if (it->second.numAttempts > 1) + auto peer = it->second; + if (peer->numAttempts > 1) { // exclude failed transports i2p::data::RouterInfo::CompatibleTransports transports = 0; - int numExcluded = it->second.numAttempts - 1; - if (numExcluded > (int)it->second.priority.size ()) numExcluded = it->second.priority.size (); + int numExcluded = peer->numAttempts - 1; + if (numExcluded > (int)peer->priority.size ()) numExcluded = peer->priority.size (); for (int i = 0; i < numExcluded; i++) - transports |= it->second.priority[i]; + transports |= peer->priority[i]; i2p::data::netdb.ExcludeReachableTransports (ident, transports); } - if (it->second.router && it->second.numAttempts) + if (peer->router && peer->numAttempts) { - auto transport = it->second.priority[it->second.numAttempts-1]; + auto transport = peer->priority[peer->numAttempts-1]; if (transport == i2p::data::RouterInfo::eNTCP2V4 || transport == i2p::data::RouterInfo::eNTCP2V6 || transport == i2p::data::RouterInfo::eNTCP2V6Mesh) - it->second.router->GetProfile ()->Connected (); // outgoing NTCP2 connection if always real + peer->router->GetProfile ()->Connected (); // outgoing NTCP2 connection if always real i2p::data::netdb.SetUnreachable (ident, false); // clear unreachable } - it->second.numAttempts = 0; - it->second.router = nullptr; // we don't need RouterInfo after successive connect + peer->numAttempts = 0; + peer->router = nullptr; // we don't need RouterInfo after successive connect bool sendDatabaseStore = true; - if (it->second.delayedMessages.size () > 0) + if (it->second->delayedMessages.size () > 0) { // check if first message is our DatabaseStore (publishing) - auto firstMsg = it->second.delayedMessages[0]; + auto firstMsg = peer->delayedMessages[0]; if (firstMsg && firstMsg->GetTypeID () == eI2NPDatabaseStore && i2p::data::IdentHash(firstMsg->GetPayload () + DATABASE_STORE_KEY_OFFSET) == i2p::context.GetIdentHash ()) sendDatabaseStore = false; // we have it in the list already @@ -829,9 +837,9 @@ namespace transport session->SendLocalRouterInfo (); else session->SetTerminationTimeout (10); // most likely it's publishing, no follow-up messages expected, set timeout to 10 seconds - it->second.sessions.push_back (session); - session->SendI2NPMessages (it->second.delayedMessages); - it->second.delayedMessages.clear (); + peer->sessions.push_back (session); + session->SendI2NPMessages (peer->delayedMessages); + peer->delayedMessages.clear (); } else // incoming connection or peer test { @@ -846,10 +854,11 @@ namespace transport auto r = i2p::data::netdb.FindRouter (ident); // router should be in netdb after SessionConfirmed if (r) r->GetProfile ()->Connected (); auto ts = i2p::util::GetSecondsSinceEpoch (); + auto peer = std::make_shared(r, ts); + peer->sessions.push_back (session); + peer->router = nullptr; std::unique_lock l(m_PeersMutex); - auto it = m_Peers.insert (std::make_pair (ident, Peer{ r, ts })).first; - it->second.sessions.push_back (session); - it->second.router = nullptr; + m_Peers.emplace (ident, peer); } }); } @@ -864,15 +873,16 @@ namespace transport auto it = m_Peers.find (ident); if (it != m_Peers.end ()) { - bool wasConnected = it->second.IsConnected (); - it->second.sessions.remove (session); - if (!it->second.IsConnected ()) + auto peer = it->second; + bool wasConnected = peer->IsConnected (); + peer->sessions.remove (session); + if (!peer->IsConnected ()) { - if (it->second.delayedMessages.size () > 0) + if (peer->delayedMessages.size () > 0) { if (wasConnected) // we had an active session before - it->second.numAttempts = 0; // start over - ConnectToPeer (ident, it->second); + peer->numAttempts = 0; // start over + ConnectToPeer (ident, peer); } else { @@ -898,12 +908,12 @@ namespace transport auto ts = i2p::util::GetSecondsSinceEpoch (); for (auto it = m_Peers.begin (); it != m_Peers.end (); ) { - it->second.sessions.remove_if ( + it->second->sessions.remove_if ( [](std::shared_ptr session)->bool { return !session || !session->IsEstablished (); }); - if (!it->second.IsConnected () && ts > it->second.creationTime + SESSION_CREATION_TIMEOUT) + if (!it->second->IsConnected () && ts > it->second->creationTime + SESSION_CREATION_TIMEOUT) { LogPrint (eLogWarning, "Transports: Session to peer ", it->first.ToBase64 (), " has not been created in ", SESSION_CREATION_TIMEOUT, " seconds"); /* if (!it->second.router) @@ -917,12 +927,12 @@ namespace transport } else { - if (ts > it->second.nextRouterInfoUpdateTime) + if (ts > it->second->nextRouterInfoUpdateTime) { - auto session = it->second.sessions.front (); + auto session = it->second->sessions.front (); if (session) session->SendLocalRouterInfo (true); - it->second.nextRouterInfoUpdateTime = ts + PEER_ROUTER_INFO_UPDATE_INTERVAL + + it->second->nextRouterInfoUpdateTime = ts + PEER_ROUTER_INFO_UPDATE_INTERVAL + rand () % PEER_ROUTER_INFO_UPDATE_INTERVAL_VARIANCE; } ++it; @@ -1042,13 +1052,13 @@ namespace transport std::shared_ptr Transports::GetRandomPeer (bool isHighBandwidth) const { return GetRandomPeer ( - [isHighBandwidth](const Peer& peer)->bool + [isHighBandwidth](std::shared_ptr peer)->bool { // connected, not overloaded and not slow - return !peer.router && peer.IsConnected () && peer.isReachable && - peer.sessions.front ()->GetSendQueueSize () <= PEER_ROUTER_INFO_OVERLOAD_QUEUE_SIZE && - !peer.sessions.front ()->IsSlow () && !peer.sessions.front ()->IsBandwidthExceeded (peer.isHighBandwidth) && - (!isHighBandwidth || peer.isHighBandwidth); + return !peer->router && peer->IsConnected () && peer->isReachable && + peer->sessions.front ()->GetSendQueueSize () <= PEER_ROUTER_INFO_OVERLOAD_QUEUE_SIZE && + !peer->sessions.front ()->IsSlow () && !peer->sessions.front ()->IsBandwidthExceeded (peer->isHighBandwidth) && + (!isHighBandwidth || peer->isHighBandwidth); }); } diff --git a/libi2pd/Transports.h b/libi2pd/Transports.h index 92d75efd..1164855d 100644 --- a/libi2pd/Transports.h +++ b/libi2pd/Transports.h @@ -191,8 +191,8 @@ namespace transport void RequestComplete (std::shared_ptr r, const i2p::data::IdentHash& ident); void HandleRequestComplete (std::shared_ptr r, i2p::data::IdentHash ident); void PostMessages (i2p::data::IdentHash ident, std::vector > msgs); - bool ConnectToPeer (const i2p::data::IdentHash& ident, Peer& peer); - void SetPriority (Peer& peer) const; + bool ConnectToPeer (const i2p::data::IdentHash& ident, std::shared_ptr peer); + void SetPriority (std::shared_ptr peer) const; void HandlePeerCleanupTimer (const boost::system::error_code& ecode); void HandlePeerTestTimer (const boost::system::error_code& ecode); void HandleUpdateBandwidthTimer (const boost::system::error_code& ecode); @@ -215,7 +215,7 @@ namespace transport SSU2Server * m_SSU2Server; NTCP2Server * m_NTCP2Server; mutable std::mutex m_PeersMutex; - std::unordered_map m_Peers; + std::unordered_map > m_Peers; X25519KeysPairSupplier m_X25519KeysPairSupplier;