diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index af4796057ce..a1fc898643f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -955,6 +955,10 @@ public final class HConstants { public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; + /** Configuration key for ReplicationSource shipeEdits timeout */ + public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT = + "replication.source.shipedits.timeout"; + public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000; /** * Directory where the source cluster file system client configuration are placed which is used by diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index dc4217c3d34..0d40e99edb1 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -236,4 +236,18 @@ public final class ReplicationUtils { } return sleepMultiplier < maxRetriesMultiplier; } + + /** + * Get the adaptive timeout value when performing a retry + */ + public static int getAdaptiveTimeout(final int initialValue, final int retries) { + int ntries = retries; + if (ntries >= HConstants.RETRY_BACKOFF.length) { + ntries = HConstants.RETRY_BACKOFF.length - 1; + } + if (ntries < 0) { + ntries = 0; + } + return initialValue * HConstants.RETRY_BACKOFF[ntries]; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java index d491890b942..cb061374638 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java @@ -159,9 +159,11 @@ public class AsyncRegionServerAdmin { } public CompletableFuture replicateWALEntry( - ReplicateWALEntryRequest request, CellScanner cellScanner) { - return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done), - cellScanner); + ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) { + return call((stub, controller, done) -> { + controller.setCallTimeout(timeout); + stub.replicateWALEntry(controller, request, done); + }, cellScanner); } public CompletableFuture replay(ReplicateWALEntryRequest request, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java index c39c86cfedd..4e2e5779303 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java @@ -54,11 +54,11 @@ public class ReplicationProtbufUtil { * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory */ public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, - String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) - throws IOException { + String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, + int timeout) throws IOException { Pair p = buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); - FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond())); + FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout)); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java index ca736636e1d..3fec8131d09 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java @@ -161,6 +161,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { List entries; int size; String walGroupId; + int timeout; @InterfaceAudience.Private public ReplicateContext() { } @@ -186,6 +187,12 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener { public String getWalGroupId(){ return walGroupId; } + public void setTimeout(int timeout) { + this.timeout = timeout; + } + public int getTimeout() { + return this.timeout; + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java index 58f47639c3f..ccdcee13f3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java @@ -309,7 +309,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi replicateContext.getSize()); } // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource - pool.submit(createReplicator(entries, i)); + pool.submit(createReplicator(entries, i, replicateContext.getTimeout())); futures++; } } @@ -467,7 +467,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi } @VisibleForTesting - protected int replicateEntries(List entries, int batchIndex) throws IOException { + protected int replicateEntries(List entries, int batchIndex, int timeout) + throws IOException { SinkPeer sinkPeer = null; try { int entriesHashCode = System.identityHashCode(entries); @@ -481,7 +482,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi try { ReplicationProtbufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, - hfileArchiveDir); + hfileArchiveDir, timeout); if (LOG.isTraceEnabled()) { LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); } @@ -501,14 +502,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi return batchIndex; } - private int serialReplicateRegionEntries(List entries, int batchIndex) + private int serialReplicateRegionEntries(List entries, int batchIndex, int timeout) throws IOException { int batchSize = 0, index = 0; List batch = new ArrayList<>(); for (Entry entry : entries) { int entrySize = getEstimatedEntrySize(entry); if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { - replicateEntries(batch, index++); + replicateEntries(batch, index++, timeout); batch.clear(); batchSize = 0; } @@ -516,15 +517,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi batchSize += entrySize; } if (batchSize > 0) { - replicateEntries(batch, index); + replicateEntries(batch, index, timeout); } return batchIndex; } @VisibleForTesting - protected Callable createReplicator(List entries, int batchIndex) { - return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex) - : () -> replicateEntries(entries, batchIndex); + protected Callable createReplicator(List entries, int batchIndex, int timeout) { + return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout) + : () -> replicateEntries(entries, batchIndex, timeout); } private String logPeerId(){ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java index 17e1167af13..cf42d4ed026 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout; import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import java.io.IOException; @@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.util.Threads; @@ -73,6 +75,7 @@ public class ReplicationSourceShipper extends Thread { protected final int maxRetriesMultiplier; private final int DEFAULT_TIMEOUT = 20000; private final int getEntriesTimeout; + private final int shipEditsTimeout; public ReplicationSourceShipper(Configuration conf, String walGroupId, PriorityBlockingQueue queue, ReplicationSource source) { @@ -86,6 +89,8 @@ public class ReplicationSourceShipper extends Thread { this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.getEntriesTimeout = this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds + this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); } @Override @@ -186,6 +191,7 @@ public class ReplicationSourceShipper extends Thread { new ReplicationEndpoint.ReplicateContext(); replicateContext.setEntries(entries).setSize(currentSize); replicateContext.setWalGroupId(walGroupId); + replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier)); long startTimeNs = System.nanoTime(); // send the edits to the endpoint. Will block until the edits are shipped and acknowledged diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java index fd8df32fd0b..f11bd498bb1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java @@ -267,11 +267,13 @@ public class SyncReplicationTestBase { } if (!expectedRejection) { ReplicationProtbufUtil.replicateWALEntry( - connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); } else { try { ReplicationProtbufUtil.replicateWALEntry( - connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); + connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null, + HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT); fail("Should throw IOException when sync-replication state is in A or DA"); } catch (RemoteException e) { assertRejection(e.unwrapRemoteException()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java index 8039db3f1d0..4588ace5900 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java @@ -482,7 +482,7 @@ public class TestReplicationEndpoint extends TestReplicationBase { } @Override - protected Callable createReplicator(List entries, int ordinal) { + protected Callable createReplicator(List entries, int ordinal, int timeout) { // Fail only once, we don't want to slow down the test. if (failedOnce) { return () -> ordinal; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java index a8cbab60548..bfdbb886472 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java @@ -227,9 +227,9 @@ public class TestReplicator extends TestReplicationBase { } @Override - protected Callable createReplicator(List entries, int ordinal) { + protected Callable createReplicator(List entries, int ordinal, int timeout) { return () -> { - int batchIndex = replicateEntries(entries, ordinal); + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( @@ -244,10 +244,10 @@ public class TestReplicator extends TestReplicationBase { private final AtomicBoolean failNext = new AtomicBoolean(false); @Override - protected Callable createReplicator(List entries, int ordinal) { + protected Callable createReplicator(List entries, int ordinal, int timeout) { return () -> { if (failNext.compareAndSet(false, true)) { - int batchIndex = replicateEntries(entries, ordinal); + int batchIndex = replicateEntries(entries, ordinal, timeout); entriesCount += entries.size(); int count = batchCount.incrementAndGet(); LOG.info( diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java index 7d59d383548..3c88ab31591 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java @@ -167,7 +167,7 @@ public class TestSerialReplicationEndpoint { } @Override - protected Callable createReplicator(List entries, int ordinal) { + protected Callable createReplicator(List entries, int ordinal, int timeout) { return () -> { entryQueue.addAll(entries); return ordinal;