HBASE-26457 Should not always clear all the failed replicas when getting a flush all request (#3850)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-11-17 23:20:22 +08:00
parent a8d46db416
commit 67306e74b1
2 changed files with 164 additions and 46 deletions

View File

@ -22,15 +22,16 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.agrona.collections.IntHashSet;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -123,8 +124,11 @@ public class RegionReplicationSink {
private final AsyncClusterConnection conn;
// used to track the replicas which we failed to replicate edits to them
// will be cleared after we get a flush edit.
private final IntHashSet failedReplicas = new IntHashSet();
// the key is the replica id, the value is the sequence id of the last failed edit
// when we get a flush all request, we will try to remove a replica from this map, the key point
// here is the flush sequence number must be greater than the failed sequence id, otherwise we
// should not remove the replica from this map
private final Map<Integer, Long> failedReplicas = new HashMap<>();
private final Queue<SinkEntry> entries = new ArrayDeque<>();
@ -180,16 +184,16 @@ public class RegionReplicationSink {
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is greater than the last flush SN {}," +
" we will stop replicating for a while and trigger a flush",
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is greater than the last flush SN {},"
+ " we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
" we will not stop replicating",
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+ " we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
@ -197,7 +201,9 @@ public class RegionReplicationSink {
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
failedReplicas.addAll(failed);
for (Integer replicaId : failed) {
failedReplicas.put(replicaId, maxSequenceId);
}
flushRequester.requestFlush(maxSequenceId);
}
sending = false;
@ -231,7 +237,7 @@ public class RegionReplicationSink {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
if (failedReplicas.contains(replicaId)) {
if (failedReplicas.containsKey(replicaId)) {
continue;
}
MutableObject<Throwable> error = new MutableObject<>();
@ -247,7 +253,7 @@ public class RegionReplicationSink {
}
}
private boolean flushAllStores(FlushDescriptor flushDesc) {
private boolean isFlushAllStores(FlushDescriptor flushDesc) {
Set<byte[]> storesFlushed =
flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
.collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@ -257,6 +263,24 @@ public class RegionReplicationSink {
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}
private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
return Optional.empty();
}
FlushDescriptor flushDesc;
try {
flushDesc = WALEdit.getFlushDescriptor(metaCell);
} catch (IOException e) {
LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
return Optional.empty();
}
if (flushDesc != null && isFlushAllStores(flushDesc)) {
return Optional.of(flushDesc);
} else {
return Optional.empty();
}
}
private void clearAllEntries() {
long toClearSize = 0;
for (SinkEntry entry : entries) {
@ -268,6 +292,20 @@ public class RegionReplicationSink {
manager.decrease(toClearSize);
}
private void clearFailedReplica(long flushSequenceNumber) {
for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
.hasNext();) {
Map.Entry<Integer, Long> entry = iter.next();
if (entry.getValue().longValue() < flushSequenceNumber) {
LOG.debug(
"Got a flush all request with sequence id {}, clear failed replica {}" +
" with last failed sequence id {}",
flushSequenceNumber, entry.getKey(), entry.getValue());
iter.remove();
}
}
}
/**
* Add this edit to replication queue.
* <p/>
@ -287,41 +325,34 @@ public class RegionReplicationSink {
// check whether we flushed all stores, which means we could drop all the previous edits,
// and also, recover from the previous failure of some replicas
for (Cell metaCell : edit.getCells()) {
if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
FlushDescriptor flushDesc;
try {
flushDesc = WALEdit.getFlushDescriptor(metaCell);
} catch (IOException e) {
LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
continue;
}
if (flushDesc != null && flushAllStores(flushDesc)) {
long flushedSequenceId = flushDesc.getFlushSequenceNumber();
int toClearCount = 0;
long toClearSize = 0;
for (;;) {
SinkEntry e = entries.peek();
if (e == null) {
break;
}
if (e.key.getSequenceId() < flushedSequenceId) {
entries.poll();
toClearCount++;
toClearSize += e.size;
} else {
break;
}
getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
int toClearCount = 0;
long toClearSize = 0;
for (;;) {
SinkEntry e = entries.peek();
if (e == null) {
break;
}
if (e.key.getSequenceId() < flushSequenceNumber) {
entries.poll();
toClearCount++;
toClearSize += e.size;
} else {
break;
}
lastFlushedSequenceId = flushedSequenceId;
failedReplicas.clear();
LOG.debug(
"Got a flush all request with sequence id {}, clear failed replicas {}" +
" and {} pending entries with size {}",
flushedSequenceId, failedReplicas, toClearCount,
StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
flushRequester.recordFlush(flushedSequenceId);
}
}
lastFlushedSequenceId = flushSequenceNumber;
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got a flush all request with sequence id {}, clear {} pending"
+ " entries with size {}",
flushSequenceNumber, toClearCount,
StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
}
clearFailedReplica(flushSequenceNumber);
flushRequester.recordFlush(flushSequenceNumber);
});
}
}
if (failedReplicas.size() == regionReplication - 1) {
@ -340,7 +371,7 @@ public class RegionReplicationSink {
// failed
clearAllEntries();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
failedReplicas.add(replicaId);
failedReplicas.put(replicaId, entry.key.getSequenceId());
}
flushRequester.requestFlush(entry.key.getSequenceId());
}

View File

@ -313,4 +313,91 @@ public class TestRegionReplicationSink {
// should have send out all so no pending entries.
assertEquals(0, sink.pendingSize());
}
@Test
public void testNotClearFailedReplica() {
// simulate this scenario:
// 1. prepare flush
// 2. add one edit broken
// 3. commit flush with flush sequence number less than the previous edit(this is the normal
// case)
// we should not clear the failed replica as we do not flush the broken edit out with this
// flush, we need an extra flush to flush it out
MutableInt next = new MutableInt(0);
List<CompletableFuture<Void>> futures =
Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
.then(i -> futures.get(next.getAndIncrement()));
when(manager.increase(anyLong())).thenReturn(true);
ServerCall<?> rpcCall1 = mock(ServerCall.class);
WALKeyImpl key1 = mock(WALKeyImpl.class);
when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
when(key1.getSequenceId()).thenReturn(1L);
Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
.collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key1, edit1, rpcCall1);
futures.get(0).complete(null);
futures.get(1).complete(null);
ServerCall<?> rpcCall2 = mock(ServerCall.class);
WALKeyImpl key2 = mock(WALKeyImpl.class);
when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
when(key2.getSequenceId()).thenReturn(2L);
WALEdit edit2 = mock(WALEdit.class);
when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
sink.add(key2, edit2, rpcCall2);
// fail the call to replica 1
futures.get(2).completeExceptionally(new IOException("inject error"));
futures.get(3).complete(null);
ServerCall<?> rpcCall3 = mock(ServerCall.class);
WALKeyImpl key3 = mock(WALKeyImpl.class);
when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
when(key3.getSequenceId()).thenReturn(3L);
FlushDescriptor fd3 =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
sink.add(key3, edit3, rpcCall3);
// we should only call replicate once for edit3, since replica 1 is marked as failed, and the
// flush request can not clean the failed replica since the flush sequence number is not greater
// than sequence id of the last failed edit
verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
futures.get(4).complete(null);
ServerCall<?> rpcCall4 = mock(ServerCall.class);
WALKeyImpl key4 = mock(WALKeyImpl.class);
when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
when(key4.getSequenceId()).thenReturn(4L);
WALEdit edit4 = mock(WALEdit.class);
when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
sink.add(key4, edit4, rpcCall4);
// still, only send to replica 2
verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
futures.get(5).complete(null);
ServerCall<?> rpcCall5 = mock(ServerCall.class);
WALKeyImpl key5 = mock(WALKeyImpl.class);
when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
when(key5.getSequenceId()).thenReturn(3L);
FlushDescriptor fd5 =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
sink.add(key5, edit5, rpcCall5);
futures.get(6).complete(null);
futures.get(7).complete(null);
// should have cleared the failed replica because the flush sequence number is greater than than
// the sequence id of the last failed edit
verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
}
}