diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index f0129b70584..6c83f154871 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -77,6 +77,14 @@ public class RegionReplicationSink { public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000; + public static final String BATCH_SIZE_CAPACITY = "hbase.region.read-replica.sink.size.capacity"; + + public static final long BATCH_SIZE_CAPACITY_DEFAULT = 1024L * 1024; + + public static final String BATCH_COUNT_CAPACITY = "hbase.region.read-replica.sink.nb.capacity"; + + public static final int BATCH_COUNT_CAPACITY_DEFAULT = 100; + private static final class SinkEntry { final WALKeyImpl key; @@ -139,6 +147,10 @@ public class RegionReplicationSink { private final long operationTimeoutNs; + private final long batchSizeCapacity; + + private final long batchCountCapacity; + private volatile long pendingSize; private long lastFlushedSequenceId; @@ -166,6 +178,8 @@ public class RegionReplicationSink { TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT)); this.operationTimeoutNs = TimeUnit.MILLISECONDS .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); + this.batchSizeCapacity = conf.getLong(BATCH_SIZE_CAPACITY, BATCH_SIZE_CAPACITY_DEFAULT); + this.batchCountCapacity = conf.getInt(BATCH_COUNT_CAPACITY, BATCH_COUNT_CAPACITY_DEFAULT); this.failedReplicas = new IntHashSet(regionReplication - 1); } @@ -186,16 +200,16 @@ public class RegionReplicationSink { if (error != null) { if (maxSequenceId > lastFlushedSequenceId) { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is greater than the last flush SN {}," + - " we will stop replicating for a while and trigger a flush", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is greater than the last flush SN {}," + + " we will stop replicating for a while and trigger a flush", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); failed.add(replicaId); } else { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" + - " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + - " we will not stop replicating", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + + " we will not stop replicating", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); } } @@ -220,12 +234,17 @@ public class RegionReplicationSink { private void send() { List toSend = new ArrayList<>(); + long totalSize = 0L; for (SinkEntry entry;;) { entry = entries.poll(); if (entry == null) { break; } toSend.add(entry); + totalSize += entry.size; + if (toSend.size() >= batchCountCapacity || totalSize >= batchSizeCapacity) { + break; + } } int toSendReplicaCount = regionReplication - 1 - failedReplicas.size(); if (toSendReplicaCount <= 0) { @@ -327,8 +346,8 @@ public class RegionReplicationSink { long clearedSize = clearAllEntries(); if (LOG.isDebugEnabled()) { LOG.debug( - "Got a flush all request with sequence id {}, clear {} pending" + - " entries with size {}, clear failed replicas {}", + "Got a flush all request with sequence id {}, clear {} pending" + + " entries with size {}, clear failed replicas {}", flushSequenceNumber, clearedCount, StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1), failedReplicas); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java index 918f6448dad..e065709c1b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java @@ -94,6 +94,8 @@ public class TestRegionReplicationSink { @Before public void setUp() { conf = HBaseConfiguration.create(); + conf.setLong(RegionReplicationSink.BATCH_COUNT_CAPACITY, 5); + conf.setLong(RegionReplicationSink.BATCH_SIZE_CAPACITY, 1024 * 1024); td = TableDescriptorBuilder.newBuilder(name.getTableName()) .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setRegionReplication(3).build(); primary = RegionInfoBuilder.newBuilder(name.getTableName()).build(); @@ -313,4 +315,89 @@ public class TestRegionReplicationSink { // should have send out all so no pending entries. assertEquals(0, sink.pendingSize()); } + + @Test + public void testSizeCapacity() { + MutableInt next = new MutableInt(0); + List> futures = + Stream.generate(() -> new CompletableFuture()).limit(6).collect(Collectors.toList()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + for (int i = 0; i < 3; i++) { + ServerCall rpcCall = mock(ServerCall.class); + WALKeyImpl key = mock(WALKeyImpl.class); + when(key.estimatedSerializedSizeOf()).thenReturn(100L); + when(key.getSequenceId()).thenReturn(i + 1L); + WALEdit edit = mock(WALEdit.class); + when(edit.estimatedSerializedSizeOf()).thenReturn((i + 1) * 600L * 1024); + when(manager.increase(anyLong())).thenReturn(true); + sink.add(key, edit, rpcCall); + } + // the first entry will be send out immediately + verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the first send + futures.get(0).complete(null); + futures.get(1).complete(null); + + // we should have another batch + verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the second send + futures.get(2).complete(null); + futures.get(3).complete(null); + + // the size of the second entry is greater than 1024 * 1024, so we will have another batch + verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the third send + futures.get(4).complete(null); + futures.get(5).complete(null); + + // should have send out all so no pending entries. + assertEquals(0, sink.pendingSize()); + } + + @Test + public void testCountCapacity() { + MutableInt next = new MutableInt(0); + List> futures = + Stream.generate(() -> new CompletableFuture()).limit(6).collect(Collectors.toList()); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) + .then(i -> futures.get(next.getAndIncrement())); + for (int i = 0; i < 7; i++) { + ServerCall rpcCall = mock(ServerCall.class); + WALKeyImpl key = mock(WALKeyImpl.class); + when(key.estimatedSerializedSizeOf()).thenReturn(100L); + when(key.getSequenceId()).thenReturn(i + 1L); + WALEdit edit = mock(WALEdit.class); + when(edit.estimatedSerializedSizeOf()).thenReturn(1000L); + when(manager.increase(anyLong())).thenReturn(true); + sink.add(key, edit, rpcCall); + } + // the first entry will be send out immediately + verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the first send + futures.get(0).complete(null); + futures.get(1).complete(null); + + // we should have another batch + verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the second send + futures.get(2).complete(null); + futures.get(3).complete(null); + + // because of the count limit is 5, the above send can not send all the edits, so we will do + // another send + verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); + + // complete the third send + futures.get(4).complete(null); + futures.get(5).complete(null); + + // should have send out all so no pending entries. + assertEquals(0, sink.pendingSize()); + } }