diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index d14792e78bb..f07c614dd20 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1742,11 +1742,8 @@ possible configurations would overwhelm and obscure the important.
false
Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
- If this is enabled, a replication peer named "region_replica_replication" will be created
- which will tail the logs and replicate the mutations to region replicas for tables that
- have region replication > 1. If this is enabled once, disabling this replication also
- requires disabling the replication peer using shell or Admin java class.
- Replication to secondary region replicas works over standard inter-cluster replication.
+ We have a separated implementation for replicating the WAL without using the general
+ inter-cluster replication framework, so now we will not add any replication peers.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
index 6c83f154871..4d169742d3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java
@@ -70,12 +70,25 @@ public class RegionReplicationSink {
public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
- public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
+ public static final long RPC_TIMEOUT_MS_DEFAULT = 1000;
public static final String OPERATION_TIMEOUT_MS =
"hbase.region.read-replica.sink.operation.timeout.ms";
- public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
+ public static final long OPERATION_TIMEOUT_MS_DEFAULT = 5000;
+
+ // the two options below are for replicating meta edits, as usually a meta edit will trigger a
+ // refreshStoreFiles call at remote side so it will likely to spend more time. And also a meta
+ // edit is more important for fixing inconsistent state so it worth to wait for more time.
+ public static final String META_EDIT_RPC_TIMEOUT_MS =
+ "hbase.region.read-replica.sink.meta-edit.rpc.timeout.ms";
+
+ public static final long META_EDIT_RPC_TIMEOUT_MS_DEFAULT = 15000;
+
+ public static final String META_EDIT_OPERATION_TIMEOUT_MS =
+ "hbase.region.read-replica.sink.meta-edit.operation.timeout.ms";
+
+ public static final long META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT = 60000;
public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
@@ -147,6 +160,10 @@ public class RegionReplicationSink {
private final long operationTimeoutNs;
+ private final long metaEditRpcTimeoutNs;
+
+ private final long metaEditOperationTimeoutNs;
+
private final long batchSizeCapacity;
private final long batchCountCapacity;
@@ -178,6 +195,10 @@ public class RegionReplicationSink {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+ this.metaEditRpcTimeoutNs = TimeUnit.MILLISECONDS
+ .toNanos(conf.getLong(META_EDIT_RPC_TIMEOUT_MS, META_EDIT_RPC_TIMEOUT_MS_DEFAULT));
+ this.metaEditOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
+ conf.getLong(META_EDIT_OPERATION_TIMEOUT_MS, META_EDIT_OPERATION_TIMEOUT_MS_DEFAULT));
this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
this.failedReplicas = new IntHashSet(regionReplication - 1);
@@ -200,16 +221,16 @@ public class RegionReplicationSink {
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence"
- + " id of sunk entris is {}, which is greater than the last flush SN {},"
- + " we will stop replicating for a while and trigger a flush",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is greater than the last flush SN {}," +
+ " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
- "Failed to replicate to secondary replica {} for {}, since the max sequence"
- + " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
- + " we will not stop replicating",
+ "Failed to replicate to secondary replica {} for {}, since the max sequence" +
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
+ " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
@@ -235,6 +256,7 @@ public class RegionReplicationSink {
private void send() {
List toSend = new ArrayList<>();
long totalSize = 0L;
+ boolean hasMetaEdit = false;
for (SinkEntry entry;;) {
entry = entries.poll();
if (entry == null) {
@@ -242,6 +264,7 @@ public class RegionReplicationSink {
}
toSend.add(entry);
totalSize += entry.size;
+ hasMetaEdit |= entry.edit.isMetaEdit();
if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
break;
}
@@ -250,6 +273,15 @@ public class RegionReplicationSink {
if (toSendReplicaCount <= 0) {
return;
}
+ long rpcTimeoutNsToUse;
+ long operationTimeoutNsToUse;
+ if (hasMetaEdit) {
+ rpcTimeoutNsToUse = rpcTimeoutNs;
+ operationTimeoutNsToUse = operationTimeoutNs;
+ } else {
+ rpcTimeoutNsToUse = metaEditRpcTimeoutNs;
+ operationTimeoutNsToUse = metaEditOperationTimeoutNs;
+ }
sending = true;
List walEntries =
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
@@ -263,7 +295,8 @@ public class RegionReplicationSink {
replica2Error.put(replicaId, error);
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
FutureUtils.addListener(
- conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> {
+ conn.replicate(replica, walEntries, retries, rpcTimeoutNsToUse, operationTimeoutNsToUse),
+ (r, e) -> {
error.setValue(e);
if (remaining.decrementAndGet() == 0) {
onComplete(toSend, replica2Error);
@@ -346,8 +379,8 @@ public class RegionReplicationSink {
long clearedSize = clearAllEntries();
if (LOG.isDebugEnabled()) {
LOG.debug(
- "Got a flush all request with sequence id {}, clear {} pending"
- + " entries with size {}, clear failed replicas {}",
+ "Got a flush all request with sequence id {}, clear {} pending" +
+ " entries with size {}, clear failed replicas {}",
flushSequenceNumber, clearedCount,
StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
failedReplicas);