HBASE-27105 HBaseInterClusterReplicationEndpoint should honor replication adaptive timeout (#4569)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
b570d2e81d
commit
c74bf8b6e2
|
@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.ipc.CallTimeoutException;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
|
||||
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
||||
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||
|
@ -504,6 +506,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
public boolean replicate(ReplicateContext replicateContext) {
|
||||
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
|
||||
int sleepMultiplier = 1;
|
||||
int initialTimeout = replicateContext.getTimeout();
|
||||
|
||||
if (!peersSelected && this.isRunning()) {
|
||||
connectToPeers();
|
||||
|
@ -574,6 +577,9 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
} else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
|
||||
LOG.warn("{} Peer is unavailable, rechecking all sinks: ", logPeerId(), ioe);
|
||||
replicationSinkMgr.chooseSinks();
|
||||
} else if (ioe instanceof CallTimeoutException) {
|
||||
replicateContext
|
||||
.setTimeout(ReplicationUtils.getAdaptiveTimeout(initialTimeout, sleepMultiplier));
|
||||
} else {
|
||||
LOG.warn("{} Can't replicate because of a local or network error: ", logPeerId(), ioe);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue