HBASE-23293 [REPLICATION] make ship edits timeout configurable (#825)

Signed-off-by: Guangxu Cheng <gxcheng@apache.org>
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
chenxu14 2019-11-26 19:05:53 +08:00 committed by Guangxu Cheng
parent ea6cea846a
commit 17468efb88
11 changed files with 59 additions and 23 deletions

View File

@ -955,6 +955,10 @@ public final class HConstants {
public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024; public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
/** Configuration key for ReplicationSource shipeEdits timeout */
public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT =
"replication.source.shipedits.timeout";
public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000;
/** /**
* Directory where the source cluster file system client configuration are placed which is used by * Directory where the source cluster file system client configuration are placed which is used by

View File

@ -236,4 +236,18 @@ public final class ReplicationUtils {
} }
return sleepMultiplier < maxRetriesMultiplier; return sleepMultiplier < maxRetriesMultiplier;
} }
/**
* Get the adaptive timeout value when performing a retry
*/
public static int getAdaptiveTimeout(final int initialValue, final int retries) {
int ntries = retries;
if (ntries >= HConstants.RETRY_BACKOFF.length) {
ntries = HConstants.RETRY_BACKOFF.length - 1;
}
if (ntries < 0) {
ntries = 0;
}
return initialValue * HConstants.RETRY_BACKOFF[ntries];
}
} }

View File

@ -159,9 +159,11 @@ public class AsyncRegionServerAdmin {
} }
public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry( public CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
ReplicateWALEntryRequest request, CellScanner cellScanner) { ReplicateWALEntryRequest request, CellScanner cellScanner, int timeout) {
return call((stub, controller, done) -> stub.replicateWALEntry(controller, request, done), return call((stub, controller, done) -> {
cellScanner); controller.setCallTimeout(timeout);
stub.replicateWALEntry(controller, request, done);
}, cellScanner);
} }
public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request, public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,

View File

@ -54,11 +54,11 @@ public class ReplicationProtbufUtil {
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/ */
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries, public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
throws IOException { int timeout) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null, Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir); replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond())); FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
} }
/** /**

View File

@ -161,6 +161,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
List<Entry> entries; List<Entry> entries;
int size; int size;
String walGroupId; String walGroupId;
int timeout;
@InterfaceAudience.Private @InterfaceAudience.Private
public ReplicateContext() { public ReplicateContext() {
} }
@ -186,6 +187,12 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
public String getWalGroupId(){ public String getWalGroupId(){
return walGroupId; return walGroupId;
} }
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getTimeout() {
return this.timeout;
}
} }
/** /**

View File

@ -309,7 +309,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
replicateContext.getSize()); replicateContext.getSize());
} }
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
pool.submit(createReplicator(entries, i)); pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
futures++; futures++;
} }
} }
@ -467,7 +467,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
} }
@VisibleForTesting @VisibleForTesting
protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException { protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
throws IOException {
SinkPeer sinkPeer = null; SinkPeer sinkPeer = null;
try { try {
int entriesHashCode = System.identityHashCode(entries); int entriesHashCode = System.identityHashCode(entries);
@ -481,7 +482,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
try { try {
ReplicationProtbufUtil.replicateWALEntry(rsAdmin, ReplicationProtbufUtil.replicateWALEntry(rsAdmin,
entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir, entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
hfileArchiveDir); hfileArchiveDir, timeout);
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode); LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
} }
@ -501,14 +502,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return batchIndex; return batchIndex;
} }
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex) private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
throws IOException { throws IOException {
int batchSize = 0, index = 0; int batchSize = 0, index = 0;
List<Entry> batch = new ArrayList<>(); List<Entry> batch = new ArrayList<>();
for (Entry entry : entries) { for (Entry entry : entries) {
int entrySize = getEstimatedEntrySize(entry); int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) { if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
replicateEntries(batch, index++); replicateEntries(batch, index++, timeout);
batch.clear(); batch.clear();
batchSize = 0; batchSize = 0;
} }
@ -516,15 +517,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
batchSize += entrySize; batchSize += entrySize;
} }
if (batchSize > 0) { if (batchSize > 0) {
replicateEntries(batch, index); replicateEntries(batch, index, timeout);
} }
return batchIndex; return batchIndex;
} }
@VisibleForTesting @VisibleForTesting
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) { protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex) return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
: () -> replicateEntries(entries, batchIndex); : () -> replicateEntries(entries, batchIndex, timeout);
} }
private String logPeerId(){ private String logPeerId(){

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hbase.replication.regionserver; package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries; import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
import java.io.IOException; import java.io.IOException;
@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint; import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
@ -73,6 +75,7 @@ public class ReplicationSourceShipper extends Thread {
protected final int maxRetriesMultiplier; protected final int maxRetriesMultiplier;
private final int DEFAULT_TIMEOUT = 20000; private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout; private final int getEntriesTimeout;
private final int shipEditsTimeout;
public ReplicationSourceShipper(Configuration conf, String walGroupId, public ReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, ReplicationSource source) { PriorityBlockingQueue<Path> queue, ReplicationSource source) {
@ -86,6 +89,8 @@ public class ReplicationSourceShipper extends Thread {
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.getEntriesTimeout = this.getEntriesTimeout =
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} }
@Override @Override
@ -186,6 +191,7 @@ public class ReplicationSourceShipper extends Thread {
new ReplicationEndpoint.ReplicateContext(); new ReplicationEndpoint.ReplicateContext();
replicateContext.setEntries(entries).setSize(currentSize); replicateContext.setEntries(entries).setSize(currentSize);
replicateContext.setWalGroupId(walGroupId); replicateContext.setWalGroupId(walGroupId);
replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
long startTimeNs = System.nanoTime(); long startTimeNs = System.nanoTime();
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged // send the edits to the endpoint. Will block until the edits are shipped and acknowledged

View File

@ -267,11 +267,13 @@ public class SyncReplicationTestBase {
} }
if (!expectedRejection) { if (!expectedRejection) {
ReplicationProtbufUtil.replicateWALEntry( ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else { } else {
try { try {
ReplicationProtbufUtil.replicateWALEntry( ReplicationProtbufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null); connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or DA"); fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) { } catch (RemoteException e) {
assertRejection(e.unwrapRemoteException()); assertRejection(e.unwrapRemoteException());

View File

@ -482,7 +482,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
// Fail only once, we don't want to slow down the test. // Fail only once, we don't want to slow down the test.
if (failedOnce) { if (failedOnce) {
return () -> ordinal; return () -> ordinal;

View File

@ -227,9 +227,9 @@ public class TestReplicator extends TestReplicationBase {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> { return () -> {
int batchIndex = replicateEntries(entries, ordinal); int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size(); entriesCount += entries.size();
int count = batchCount.incrementAndGet(); int count = batchCount.incrementAndGet();
LOG.info( LOG.info(
@ -244,10 +244,10 @@ public class TestReplicator extends TestReplicationBase {
private final AtomicBoolean failNext = new AtomicBoolean(false); private final AtomicBoolean failNext = new AtomicBoolean(false);
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> { return () -> {
if (failNext.compareAndSet(false, true)) { if (failNext.compareAndSet(false, true)) {
int batchIndex = replicateEntries(entries, ordinal); int batchIndex = replicateEntries(entries, ordinal, timeout);
entriesCount += entries.size(); entriesCount += entries.size();
int count = batchCount.incrementAndGet(); int count = batchCount.incrementAndGet();
LOG.info( LOG.info(

View File

@ -167,7 +167,7 @@ public class TestSerialReplicationEndpoint {
} }
@Override @Override
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) { protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
return () -> { return () -> {
entryQueue.addAll(entries); entryQueue.addAll(entries);
return ordinal; return ordinal;