HBASE-26539 The default rpc timeout 200ms is too small for replicating meta edits (#3919)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-12-06 23:42:42 +08:00
parent 1fdd0a4cfd
commit 5ac76c1c00
2 changed files with 46 additions and 16 deletions

View File

@ -1742,11 +1742,8 @@ possible configurations would overwhelm and obscure the important.
<value>false</value> <value>false</value>
<description> <description>
Whether asynchronous WAL replication to the secondary region replicas is enabled or not. Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
If this is enabled, a replication peer named "region_replica_replication" will be created We have a separated implementation for replicating the WAL without using the general
which will tail the logs and replicate the mutations to region replicas for tables that inter-cluster replication framework, so now we will not add any replication peers.
have region replication > 1. If this is enabled once, disabling this replication also
requires disabling the replication peer using shell or Admin java class.
Replication to secondary region replicas works over standard inter-cluster replication.
</description> </description>
</property> </property>
<property> <property>

View File

@ -70,12 +70,25 @@ public class RegionReplicationSink {
public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms"; public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
public static final long RPC_TIMEOUT_MS_DEFAULT = 200; public static final long RPC_TIMEOUT_MS_DEFAULT = 1000;
public static final String OPERATION_TIMEOUT_MS = public static final String OPERATION_TIMEOUT_MS =
"hbase.region.read-replica.sink.operation.timeout.ms"; "hbase.region.read-replica.sink.operation.timeout.ms";
public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000; public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000;
// the two options below are for replicating meta edits, as usually a meta edit will trigger a
// refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta
// edit is more important for fixing inconsistent state so it worth to wait for more time.
public static final String META_EDIT_RPC_TIMEOUT_MS =
"hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000;
public static final String META_EDIT_OPERATION_TIMEOUT_MS =
"hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000;
public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity"; public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
@ -147,6 +160,10 @@ public class RegionReplicationSink {
private final long operationTimeoutNs; private final long operationTimeoutNs;
private final long metaEditRpcTimeoutNs;
private final long metaEditOperationTimeoutNs;
private final long batchSizeCapacity; private final long batchSizeCapacity;
private final long batchCountCapacity; private final long batchCountCapacity;
@ -178,6 +195,10 @@ public class RegionReplicationSink {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT)); TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS this.operationTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT));
this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT));
this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT); this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT); this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
this.failedReplicas = new IntHashSet(regionReplication - 1); this.failedReplicas = new IntHashSet(regionReplication - 1);
@ -200,16 +221,16 @@ public class RegionReplicationSink {
if (error != null) { if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) { if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn( LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is greater than the last flush SN {}," " id of sunk entris is {}, which is greater than the last flush SN {}," +
+ " we will stop replicating for a while and trigger a flush", " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId); failed.add(replicaId);
} else { } else {
LOG.warn( LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {}," " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+ " we will not stop replicating", " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
} }
} }
@ -235,6 +256,7 @@ public class RegionReplicationSink {
private void send() { private void send() {
List<SinkEntry> toSend = new ArrayList<>(); List<SinkEntry> toSend = new ArrayList<>();
long totalSize = 0L; long totalSize = 0L;
boolean hasMetaEdit = false;
for (SinkEntry entry;;) { for (SinkEntry entry;;) {
entry = entries.poll(); entry = entries.poll();
if (entry == null) { if (entry == null) {
@ -242,6 +264,7 @@ public class RegionReplicationSink {
} }
toSend.add(entry); toSend.add(entry);
totalSize += entry.size; totalSize += entry.size;
hasMetaEdit |= entry.edit.isMetaEdit();
if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) { if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
break; break;
} }
@ -250,6 +273,15 @@ public class RegionReplicationSink {
if (toSendReplicaCount <= 0) { if (toSendReplicaCount <= 0) {
return; return;
} }
long rpcTimeoutNsToUse;
long operationTimeoutNsToUse;
if (hasMetaEdit) {
rpcTimeoutNsToUse = rpcTimeoutNs;
operationTimeoutNsToUse = operationTimeoutNs;
} else {
rpcTimeoutNsToUse = metaEditRpcTimeoutNs;
operationTimeoutNsToUse = metaEditOperationTimeoutNs;
}
sending = true; sending = true;
List<WAL.Entry> walEntries = List<WAL.Entry> walEntries =
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList()); toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
@ -263,7 +295,8 @@ public class RegionReplicationSink {
replica2Error.put(replicaId, error); replica2Error.put(replicaId, error);
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId); RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
FutureUtils.addListener( FutureUtils.addListener(
conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> { conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse),
(r, e) -> {
error.setValue(e); error.setValue(e);
if (remaining.decrementAndGet() == 0) { if (remaining.decrementAndGet() == 0) {
onComplete(toSend, replica2Error); onComplete(toSend, replica2Error);
@ -346,8 +379,8 @@ public class RegionReplicationSink {
long clearedSize = clearAllEntries(); long clearedSize = clearAllEntries();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug( LOG.debug(
"Got a flush all request with sequence id {}, clear {} pending" "Got a flush all request with sequence id {}, clear {} pending" +
+ " entries with size {}, clear failed replicas {}", " entries with size {}, clear failed replicas {}",
flushSequenceNumber, clearedCount, flushSequenceNumber, clearedCount,
StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1), StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
failedReplicas); failedReplicas);