From c74bf8b6e2bbcde6baa7f6f0166236ebb7ef3503 Mon Sep 17 00:00:00 2001 From: Pankaj Date: Thu, 23 Jun 2022 22:22:57 +0530 Subject: [PATCH] HBASE-27105 HBaseInterClusterReplicationEndpoint should honor replication adaptive timeout (#4569) Signed-off-by: Wellington Chevreuil --- .../regionserver/HBaseInterClusterReplicationEndpoint.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index a899d37c594..9cd5392cbfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ClusterConnection; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; @@ -504,6 +506,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi public boolean replicate(ReplicateContext replicateContext) { CompletionService pool = new ExecutorCompletionService<>(this.exec); int sleepMultiplier = 1; + int initialTimeout = replicateContext.getTimeout(); if (!peersSelected && this.isRunning()) { connectToPeers(); @@ -574,6 +577,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) { LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe); replicationSinkMgr.chooseSinks(); + } else if (ioe instanceof CallTimeoutException) { + replicateContext + .setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier)); } else { LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe); }