HBASE-26456 Limit the size for one replicating (#3873)
Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
parent
529c36e7ad
commit
70259a2a4c
|
@ -77,6 +77,14 @@ public class RegionReplicationSink {
|
|||
|
||||
public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
|
||||
|
||||
public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity";
|
||||
|
||||
public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024;
|
||||
|
||||
public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity";
|
||||
|
||||
public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100;
|
||||
|
||||
private static final class SinkEntry {
|
||||
|
||||
final WALKeyImpl key;
|
||||
|
@ -139,6 +147,10 @@ public class RegionReplicationSink {
|
|||
|
||||
private final long operationTimeoutNs;
|
||||
|
||||
private final long batchSizeCapacity;
|
||||
|
||||
private final long batchCountCapacity;
|
||||
|
||||
private volatile long pendingSize;
|
||||
|
||||
private long lastFlushedSequenceId;
|
||||
|
@ -166,6 +178,8 @@ public class RegionReplicationSink {
|
|||
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
|
||||
this.operationTimeoutNs = TimeUnit.MILLISECONDS
|
||||
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
|
||||
this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT);
|
||||
this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT);
|
||||
this.failedReplicas = new IntHashSet(regionReplication - 1);
|
||||
}
|
||||
|
||||
|
@ -186,16 +200,16 @@ public class RegionReplicationSink {
|
|||
if (error != null) {
|
||||
if (maxSequenceId > lastFlushedSequenceId) {
|
||||
LOG.warn(
|
||||
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
|
||||
" id of sunk entris is {}, which is greater than the last flush SN {}," +
|
||||
" we will stop replicating for a while and trigger a flush",
|
||||
"Failed to replicate to secondary replica {} for {}, since the max sequence"
|
||||
+ " id of sunk entris is {}, which is greater than the last flush SN {},"
|
||||
+ " we will stop replicating for a while and trigger a flush",
|
||||
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
|
||||
failed.add(replicaId);
|
||||
} else {
|
||||
LOG.warn(
|
||||
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
|
||||
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
|
||||
" we will not stop replicating",
|
||||
"Failed to replicate to secondary replica {} for {}, since the max sequence"
|
||||
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
|
||||
+ " we will not stop replicating",
|
||||
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
|
||||
}
|
||||
}
|
||||
|
@ -220,12 +234,17 @@ public class RegionReplicationSink {
|
|||
|
||||
private void send() {
|
||||
List<SinkEntry> toSend = new ArrayList<>();
|
||||
long totalSize = 0L;
|
||||
for (SinkEntry entry;;) {
|
||||
entry = entries.poll();
|
||||
if (entry == null) {
|
||||
break;
|
||||
}
|
||||
toSend.add(entry);
|
||||
totalSize += entry.size;
|
||||
if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
int toSendReplicaCount = regionReplication - 1 - failedReplicas.size();
|
||||
if (toSendReplicaCount <= 0) {
|
||||
|
@ -327,8 +346,8 @@ public class RegionReplicationSink {
|
|||
long clearedSize = clearAllEntries();
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"Got a flush all request with sequence id {}, clear {} pending" +
|
||||
" entries with size {}, clear failed replicas {}",
|
||||
"Got a flush all request with sequence id {}, clear {} pending"
|
||||
+ " entries with size {}, clear failed replicas {}",
|
||||
flushSequenceNumber, clearedCount,
|
||||
StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
|
||||
failedReplicas);
|
||||
|
|
|
@ -94,6 +94,8 @@ public class TestRegionReplicationSink {
|
|||
@Before
|
||||
public void setUp() {
|
||||
conf = HBaseConfiguration.create();
|
||||
conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5);
|
||||
conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024);
|
||||
td = TableDescriptorBuilder.newBuilder(name.getTableName())
|
||||
.setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build();
|
||||
primary = RegionInfoBuilder.newBuilder(name.getTableName()).build();
|
||||
|
@ -313,4 +315,89 @@ public class TestRegionReplicationSink {
|
|||
// should have send out all so no pending entries.
|
||||
assertEquals(0, sink.pendingSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSizeCapacity() {
|
||||
MutableInt next = new MutableInt(0);
|
||||
List<CompletableFuture<Void>> futures =
|
||||
Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
|
||||
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||
.then(i -> futures.get(next.getAndIncrement()));
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ServerCall<?> rpcCall = mock(ServerCall.class);
|
||||
WALKeyImpl key = mock(WALKeyImpl.class);
|
||||
when(key.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||
when(key.getSequenceId()).thenReturn(i + 1L);
|
||||
WALEdit edit = mock(WALEdit.class);
|
||||
when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024);
|
||||
when(manager.increase(anyLong())).thenReturn(true);
|
||||
sink.add(key, edit, rpcCall);
|
||||
}
|
||||
// the first entry will be send out immediately
|
||||
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the first send
|
||||
futures.get(0).complete(null);
|
||||
futures.get(1).complete(null);
|
||||
|
||||
// we should have another batch
|
||||
verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the second send
|
||||
futures.get(2).complete(null);
|
||||
futures.get(3).complete(null);
|
||||
|
||||
// the size of the second entry is greater than 1024 * 1024, so we will have another batch
|
||||
verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the third send
|
||||
futures.get(4).complete(null);
|
||||
futures.get(5).complete(null);
|
||||
|
||||
// should have send out all so no pending entries.
|
||||
assertEquals(0, sink.pendingSize());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCountCapacity() {
|
||||
MutableInt next = new MutableInt(0);
|
||||
List<CompletableFuture<Void>> futures =
|
||||
Stream.generate(() -> new CompletableFuture<Void>()).limit(6).collect(Collectors.toList());
|
||||
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||
.then(i -> futures.get(next.getAndIncrement()));
|
||||
for (int i = 0; i < 7; i++) {
|
||||
ServerCall<?> rpcCall = mock(ServerCall.class);
|
||||
WALKeyImpl key = mock(WALKeyImpl.class);
|
||||
when(key.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||
when(key.getSequenceId()).thenReturn(i + 1L);
|
||||
WALEdit edit = mock(WALEdit.class);
|
||||
when(edit.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||
when(manager.increase(anyLong())).thenReturn(true);
|
||||
sink.add(key, edit, rpcCall);
|
||||
}
|
||||
// the first entry will be send out immediately
|
||||
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the first send
|
||||
futures.get(0).complete(null);
|
||||
futures.get(1).complete(null);
|
||||
|
||||
// we should have another batch
|
||||
verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the second send
|
||||
futures.get(2).complete(null);
|
||||
futures.get(3).complete(null);
|
||||
|
||||
// because of the count limit is 5, the above send can not send all the edits, so we will do
|
||||
// another send
|
||||
verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||
|
||||
// complete the third send
|
||||
futures.get(4).complete(null);
|
||||
futures.get(5).complete(null);
|
||||
|
||||
// should have send out all so no pending entries.
|
||||
assertEquals(0, sink.pendingSize());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue