aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDennis Trautwein <[email protected]>2023-02-11 18:33:11 +0100
committerGitHub <[email protected]>2023-02-11 18:33:11 +0100
commit81e732545e2f8fb9ee337c88c1eb6e9c4d7bbcdb (patch)
treeb33ac070ebada642b62d18946b1bfbd333fd32e8
parent4364e75c813db5686271777589ae980505a7dcd1 (diff)
feat: send FIND_NODE request to peers on routing table refresh (#810)
We have seen in the past that there are peers in the IPFS DHT that let you connect to them but then refuse to speak any protocol. This was mainly due to resource manager killing the connection if limits were exceeded. We have seen that such peers are pushed to the edge of the DHT - meaning, they get already pruned from lower buckets. However, they won't get pruned from higher ones, because we only try to connect to them and not speak anything on that connection.
-rw-r--r--dht.go6
-rw-r--r--rtrefresh/rt_refresh_manager.go14
2 files changed, 19 insertions, 1 deletions
diff --git a/dht.go b/dht.go
index 64b5d49..689de62 100644
--- a/dht.go
+++ b/dht.go
@@ -363,10 +363,16 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}
+ pingFnc := func(ctx context.Context, p peer.ID) error {
+ _, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
+ return err
+ }
+
r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
+ pingFnc,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
diff --git a/rtrefresh/rt_refresh_manager.go b/rtrefresh/rt_refresh_manager.go
index cb0eefc..ff8f27e 100644
--- a/rtrefresh/rt_refresh_manager.go
+++ b/rtrefresh/rt_refresh_manager.go
@@ -41,6 +41,7 @@ type RtRefreshManager struct {
enableAutoRefresh bool // should run periodic refreshes ?
refreshKeyGenFnc func(cpl uint) (string, error) // generate the key for the query to refresh this cpl
refreshQueryFnc func(ctx context.Context, key string) error // query to run for a refresh.
+ refreshPingFnc func(ctx context.Context, p peer.ID) error // request to check liveness of remote peer
refreshQueryTimeout time.Duration // timeout for one refresh query
// interval between two periodic refreshes.
@@ -57,6 +58,7 @@ type RtRefreshManager struct {
func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool,
refreshKeyGenFnc func(cpl uint) (string, error),
refreshQueryFnc func(ctx context.Context, key string) error,
+ refreshPingFnc func(ctx context.Context, p peer.ID) error,
refreshQueryTimeout time.Duration,
refreshInterval time.Duration,
successfulOutboundQueryGracePeriod time.Duration,
@@ -73,6 +75,7 @@ func NewRtRefreshManager(h host.Host, rt *kbucket.RoutingTable, autoRefresh bool
enableAutoRefresh: autoRefresh,
refreshKeyGenFnc: refreshKeyGenFnc,
refreshQueryFnc: refreshQueryFnc,
+ refreshPingFnc: refreshPingFnc,
refreshQueryTimeout: refreshQueryTimeout,
refreshInterval: refreshInterval,
@@ -178,12 +181,21 @@ func (r *RtRefreshManager) loop() {
wg.Add(1)
go func(ps kbucket.PeerInfo) {
defer wg.Done()
+
livelinessCtx, cancel := context.WithTimeout(r.ctx, peerPingTimeout)
+ defer cancel()
+
if err := r.h.Connect(livelinessCtx, peer.AddrInfo{ID: ps.Id}); err != nil {
+ logger.Debugw("evicting peer after failed connection", "peer", ps.Id, "error", err)
+ r.rt.RemovePeer(ps.Id)
+ return
+ }
+
+ if err := r.refreshPingFnc(livelinessCtx, ps.Id); err != nil {
logger.Debugw("evicting peer after failed ping", "peer", ps.Id, "error", err)
r.rt.RemovePeer(ps.Id)
+ return
}
- cancel()
}(ps)
}
}