HBASE-23293 [REPLICATION] make ship edits timeout configurable (#882)
Signed-off-by: Guangxu Cheng <gxcheng@apache.org>
This commit is contained in:
parent
70bbc38aae
commit
46c090ce45
|
@ -977,6 +977,10 @@ public final class HConstants {
|
|||
|
||||
public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
|
||||
|
||||
/** Configuration key for ReplicationSource shipeEdits timeout */
|
||||
public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT =
|
||||
"replication.source.shipedits.timeout";
|
||||
public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000;
|
||||
|
||||
/**
|
||||
* Directory where the source cluster file system client configuration are placed which is used by
|
||||
|
|
|
@ -171,4 +171,18 @@ public final class ReplicationUtils {
|
|||
return tableCFs != null && tableCFs.containsKey(tableName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the adaptive timeout value when performing a retry
|
||||
*/
|
||||
public static int getAdaptiveTimeout(final int initialValue, final int retries) {
|
||||
int ntries = retries;
|
||||
if (ntries >= HConstants.RETRY_BACKOFF.length) {
|
||||
ntries = HConstants.RETRY_BACKOFF.length - 1;
|
||||
}
|
||||
if (ntries < 0) {
|
||||
ntries = 0;
|
||||
}
|
||||
return initialValue * HConstants.RETRY_BACKOFF[ntries];
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,11 +57,12 @@ public class ReplicationProtbufUtil {
|
|||
*/
|
||||
public static void replicateWALEntry(final AdminService.BlockingInterface admin,
|
||||
final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
|
||||
Path sourceHFileArchiveDir) throws IOException {
|
||||
Path sourceHFileArchiveDir, int timeout) throws IOException {
|
||||
Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
|
||||
buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
|
||||
sourceHFileArchiveDir);
|
||||
HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
|
||||
controller.setCallTimeout(timeout);
|
||||
try {
|
||||
admin.replicateWALEntry(controller, p.getFirst());
|
||||
} catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
|
||||
|
|
|
@ -148,6 +148,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
|
|||
List<Entry> entries;
|
||||
int size;
|
||||
String walGroupId;
|
||||
int timeout;
|
||||
@InterfaceAudience.Private
|
||||
public ReplicateContext() {
|
||||
}
|
||||
|
@ -173,6 +174,12 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
|
|||
public String getWalGroupId(){
|
||||
return walGroupId;
|
||||
}
|
||||
public void setTimeout(int timeout) {
|
||||
this.timeout = timeout;
|
||||
}
|
||||
public int getTimeout() {
|
||||
return this.timeout;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -309,7 +309,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
replicateContext.getSize());
|
||||
}
|
||||
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
|
||||
pool.submit(createReplicator(entries, i));
|
||||
pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
|
||||
futures++;
|
||||
}
|
||||
}
|
||||
|
@ -467,7 +467,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
|
||||
protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
|
||||
throws IOException {
|
||||
SinkPeer sinkPeer = null;
|
||||
try {
|
||||
int entriesHashCode = System.identityHashCode(entries);
|
||||
|
@ -480,7 +481,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
BlockingInterface rrs = sinkPeer.getRegionServer();
|
||||
try {
|
||||
ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
|
||||
replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
|
||||
}
|
||||
|
@ -500,14 +501,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
return batchIndex;
|
||||
}
|
||||
|
||||
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
|
||||
private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
|
||||
throws IOException {
|
||||
int batchSize = 0, index = 0;
|
||||
List<Entry> batch = new ArrayList<>();
|
||||
for (Entry entry : entries) {
|
||||
int entrySize = getEstimatedEntrySize(entry);
|
||||
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
|
||||
replicateEntries(batch, index++);
|
||||
replicateEntries(batch, index++, timeout);
|
||||
batch.clear();
|
||||
batchSize = 0;
|
||||
}
|
||||
|
@ -515,15 +516,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
|
|||
batchSize += entrySize;
|
||||
}
|
||||
if (batchSize > 0) {
|
||||
replicateEntries(batch, index);
|
||||
replicateEntries(batch, index, timeout);
|
||||
}
|
||||
return batchIndex;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
|
||||
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
|
||||
: () -> replicateEntries(entries, batchIndex);
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
|
||||
return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
|
||||
: () -> replicateEntries(entries, batchIndex, timeout);
|
||||
}
|
||||
|
||||
private String logPeerId(){
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication.regionserver;
|
||||
|
||||
import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.PriorityBlockingQueue;
|
||||
|
@ -24,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
@ -71,6 +73,7 @@ public class ReplicationSourceShipper extends Thread {
|
|||
protected final int maxRetriesMultiplier;
|
||||
private final int DEFAULT_TIMEOUT = 20000;
|
||||
private final int getEntriesTimeout;
|
||||
private final int shipEditsTimeout;
|
||||
|
||||
public ReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
|
||||
|
@ -84,6 +87,8 @@ public class ReplicationSourceShipper extends Thread {
|
|||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.getEntriesTimeout =
|
||||
this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
|
||||
this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
|
||||
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -176,6 +181,7 @@ public class ReplicationSourceShipper extends Thread {
|
|||
new ReplicationEndpoint.ReplicateContext();
|
||||
replicateContext.setEntries(entries).setSize(currentSize);
|
||||
replicateContext.setWalGroupId(walGroupId);
|
||||
replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
|
||||
|
||||
long startTimeNs = System.nanoTime();
|
||||
// send the edits to the endpoint. Will block until the edits are shipped and acknowledged
|
||||
|
|
|
@ -483,7 +483,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
||||
// Fail only once, we don't want to slow down the test.
|
||||
if (failedOnce) {
|
||||
return () -> ordinal;
|
||||
|
|
|
@ -229,9 +229,9 @@ public class TestReplicator extends TestReplicationBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
||||
return () -> {
|
||||
int batchIndex = replicateEntries(entries, ordinal);
|
||||
int batchIndex = replicateEntries(entries, ordinal, timeout);
|
||||
entriesCount += entries.size();
|
||||
int count = batchCount.incrementAndGet();
|
||||
LOG.info(
|
||||
|
@ -246,10 +246,10 @@ public class TestReplicator extends TestReplicationBase {
|
|||
private final AtomicBoolean failNext = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
||||
return () -> {
|
||||
if (failNext.compareAndSet(false, true)) {
|
||||
int batchIndex = replicateEntries(entries, ordinal);
|
||||
int batchIndex = replicateEntries(entries, ordinal, timeout);
|
||||
entriesCount += entries.size();
|
||||
int count = batchCount.incrementAndGet();
|
||||
LOG.info(
|
||||
|
|
|
@ -167,7 +167,7 @@ public class TestSerialReplicationEndpoint {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
|
||||
protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
|
||||
return () -> {
|
||||
entryQueue.addAll(entries);
|
||||
return ordinal;
|
||||
|
|
Loading…
Reference in New Issue