HBASE-26449 The way we add or clear failedReplicas may have race (#3846)
Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
parent
0b29a7934a
commit
1b0d9ceaba
|
@ -30,6 +30,7 @@ import java.util.TreeSet;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import org.agrona.collections.IntHashSet;
|
||||||
import org.apache.commons.lang3.mutable.MutableObject;
|
import org.apache.commons.lang3.mutable.MutableObject;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
@ -123,7 +124,7 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
// used to track the replicas which we failed to replicate edits to them
|
// used to track the replicas which we failed to replicate edits to them
|
||||||
// will be cleared after we get a flush edit.
|
// will be cleared after we get a flush edit.
|
||||||
private final Set<Integer> failedReplicas = new HashSet<>();
|
private final IntHashSet failedReplicas = new IntHashSet();
|
||||||
|
|
||||||
private final Queue<SinkEntry> entries = new ArrayDeque<>();
|
private final Queue<SinkEntry> entries = new ArrayDeque<>();
|
||||||
|
|
||||||
|
@ -135,6 +136,8 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private volatile long pendingSize;
|
private volatile long pendingSize;
|
||||||
|
|
||||||
|
private long lastFlushSequenceNumber;
|
||||||
|
|
||||||
private boolean sending;
|
private boolean sending;
|
||||||
|
|
||||||
private boolean stopping;
|
private boolean stopping;
|
||||||
|
@ -162,8 +165,10 @@ public class RegionReplicationSink {
|
||||||
|
|
||||||
private void onComplete(List<SinkEntry> sent,
|
private void onComplete(List<SinkEntry> sent,
|
||||||
Map<Integer, MutableObject<Throwable>> replica2Error) {
|
Map<Integer, MutableObject<Throwable>> replica2Error) {
|
||||||
|
long maxSequenceId = Long.MIN_VALUE;
|
||||||
long toReleaseSize = 0;
|
long toReleaseSize = 0;
|
||||||
for (SinkEntry entry : sent) {
|
for (SinkEntry entry : sent) {
|
||||||
|
maxSequenceId = Math.max(maxSequenceId, entry.key.getSequenceId());
|
||||||
entry.replicated();
|
entry.replicated();
|
||||||
toReleaseSize += entry.size;
|
toReleaseSize += entry.size;
|
||||||
}
|
}
|
||||||
|
@ -173,9 +178,20 @@ public class RegionReplicationSink {
|
||||||
Integer replicaId = entry.getKey();
|
Integer replicaId = entry.getKey();
|
||||||
Throwable error = entry.getValue().getValue();
|
Throwable error = entry.getValue().getValue();
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
|
if (maxSequenceId > lastFlushSequenceNumber) {
|
||||||
" for a while and trigger a flush", replicaId, primary, error);
|
LOG.warn(
|
||||||
|
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
|
||||||
|
" id of sunk entris is {}, which is greater than the last flush SN {}," +
|
||||||
|
" we will stop replicating for a while and trigger a flush",
|
||||||
|
replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
|
||||||
failed.add(replicaId);
|
failed.add(replicaId);
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
|
||||||
|
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
|
||||||
|
" we will not stop replicating",
|
||||||
|
replicaId, primary, maxSequenceId, lastFlushSequenceNumber, error);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
synchronized (entries) {
|
synchronized (entries) {
|
||||||
|
@ -215,6 +231,9 @@ public class RegionReplicationSink {
|
||||||
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
|
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
|
||||||
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
|
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
|
||||||
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
|
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
|
||||||
|
if (failedReplicas.contains(replicaId)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
MutableObject<Throwable> error = new MutableObject<>();
|
MutableObject<Throwable> error = new MutableObject<>();
|
||||||
replica2Error.put(replicaId, error);
|
replica2Error.put(replicaId, error);
|
||||||
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
|
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
|
||||||
|
@ -292,6 +311,7 @@ public class RegionReplicationSink {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
lastFlushSequenceNumber = flushDesc.getFlushSequenceNumber();
|
||||||
failedReplicas.clear();
|
failedReplicas.clear();
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Got a flush all request with sequence id {}, clear failed replicas {}" +
|
"Got a flush all request with sequence id {}, clear failed replicas {}" +
|
||||||
|
|
|
@ -28,11 +28,19 @@ import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.commons.lang3.mutable.MutableInt;
|
import org.apache.commons.lang3.mutable.MutableInt;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.TableNameTestRule;
|
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||||
|
@ -45,14 +53,20 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.ipc.ServerCall;
|
import org.apache.hadoop.hbase.ipc.ServerCall;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
|
||||||
|
|
||||||
@Category({ RegionServerTests.class, MediumTests.class })
|
@Category({ RegionServerTests.class, MediumTests.class })
|
||||||
public class TestRegionReplicationSink {
|
public class TestRegionReplicationSink {
|
||||||
|
|
||||||
|
@ -72,6 +86,8 @@ public class TestRegionReplicationSink {
|
||||||
|
|
||||||
private RegionReplicationBufferManager manager;
|
private RegionReplicationBufferManager manager;
|
||||||
|
|
||||||
|
private RegionReplicationSink sink;
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public final TableNameTestRule name = new TableNameTestRule();
|
public final TableNameTestRule name = new TableNameTestRule();
|
||||||
|
|
||||||
|
@ -84,15 +100,17 @@ public class TestRegionReplicationSink {
|
||||||
flushRequester = mock(Runnable.class);
|
flushRequester = mock(Runnable.class);
|
||||||
conn = mock(AsyncClusterConnection.class);
|
conn = mock(AsyncClusterConnection.class);
|
||||||
manager = mock(RegionReplicationBufferManager.class);
|
manager = mock(RegionReplicationBufferManager.class);
|
||||||
|
sink = new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegionReplicationSink create() {
|
@After
|
||||||
return new RegionReplicationSink(conf, primary, td, manager, flushRequester, conn);
|
public void tearDown() throws InterruptedException {
|
||||||
|
sink.stop();
|
||||||
|
sink.waitUntilStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNormal() {
|
public void testNormal() {
|
||||||
RegionReplicationSink sink = create();
|
|
||||||
MutableInt next = new MutableInt(0);
|
MutableInt next = new MutableInt(0);
|
||||||
List<CompletableFuture<Void>> futures =
|
List<CompletableFuture<Void>> futures =
|
||||||
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
||||||
|
@ -129,7 +147,6 @@ public class TestRegionReplicationSink {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDropEdits() {
|
public void testDropEdits() {
|
||||||
RegionReplicationSink sink = create();
|
|
||||||
MutableInt next = new MutableInt(0);
|
MutableInt next = new MutableInt(0);
|
||||||
List<CompletableFuture<Void>> futures =
|
List<CompletableFuture<Void>> futures =
|
||||||
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
Arrays.asList(new CompletableFuture<>(), new CompletableFuture<>());
|
||||||
|
@ -191,4 +208,109 @@ public class TestRegionReplicationSink {
|
||||||
// replicas
|
// replicas
|
||||||
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
verify(conn, times(2)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotAddToFailedReplicas() {
|
||||||
|
MutableInt next = new MutableInt(0);
|
||||||
|
List<CompletableFuture<Void>> futures =
|
||||||
|
Stream.generate(() -> new CompletableFuture<Void>()).limit(4).collect(Collectors.toList());
|
||||||
|
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||||
|
.then(i -> futures.get(next.getAndIncrement()));
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall1 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key1 = mock(WALKeyImpl.class);
|
||||||
|
when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||||
|
when(key1.getSequenceId()).thenReturn(1L);
|
||||||
|
WALEdit edit1 = mock(WALEdit.class);
|
||||||
|
when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(true);
|
||||||
|
sink.add(key1, edit1, rpcCall1);
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall2 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key2 = mock(WALKeyImpl.class);
|
||||||
|
when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
|
||||||
|
when(key2.getSequenceId()).thenReturn(3L);
|
||||||
|
|
||||||
|
Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
|
||||||
|
.collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
|
||||||
|
FlushDescriptor fd =
|
||||||
|
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
|
||||||
|
WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
|
||||||
|
sink.add(key2, edit2, rpcCall2);
|
||||||
|
|
||||||
|
// fail the call to replica 2
|
||||||
|
futures.get(0).complete(null);
|
||||||
|
futures.get(1).completeExceptionally(new IOException("inject error"));
|
||||||
|
|
||||||
|
// the failure should not cause replica 2 to be added to failedReplicas, as we have already
|
||||||
|
// trigger a flush after it.
|
||||||
|
verify(conn, times(4)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||||
|
|
||||||
|
futures.get(2).complete(null);
|
||||||
|
futures.get(3).complete(null);
|
||||||
|
|
||||||
|
// should have send out all so no pending entries.
|
||||||
|
assertEquals(0, sink.pendingSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddToFailedReplica() {
|
||||||
|
MutableInt next = new MutableInt(0);
|
||||||
|
List<CompletableFuture<Void>> futures =
|
||||||
|
Stream.generate(() -> new CompletableFuture<Void>()).limit(5).collect(Collectors.toList());
|
||||||
|
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
|
||||||
|
.then(i -> futures.get(next.getAndIncrement()));
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall1 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key1 = mock(WALKeyImpl.class);
|
||||||
|
when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
|
||||||
|
when(key1.getSequenceId()).thenReturn(1L);
|
||||||
|
WALEdit edit1 = mock(WALEdit.class);
|
||||||
|
when(edit1.estimatedSerializedSizeOf()).thenReturn(1000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(true);
|
||||||
|
sink.add(key1, edit1, rpcCall1);
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall2 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key2 = mock(WALKeyImpl.class);
|
||||||
|
when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
|
||||||
|
when(key2.getSequenceId()).thenReturn(1L);
|
||||||
|
WALEdit edit2 = mock(WALEdit.class);
|
||||||
|
when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
|
||||||
|
when(manager.increase(anyLong())).thenReturn(true);
|
||||||
|
sink.add(key2, edit2, rpcCall2);
|
||||||
|
|
||||||
|
// fail the call to replica 2
|
||||||
|
futures.get(0).complete(null);
|
||||||
|
futures.get(1).completeExceptionally(new IOException("inject error"));
|
||||||
|
|
||||||
|
// we should only call replicate once for edit2, since replica 2 is marked as failed
|
||||||
|
verify(conn, times(3)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||||
|
futures.get(2).complete(null);
|
||||||
|
// should have send out all so no pending entries.
|
||||||
|
assertEquals(0, sink.pendingSize());
|
||||||
|
|
||||||
|
ServerCall<?> rpcCall3 = mock(ServerCall.class);
|
||||||
|
WALKeyImpl key3 = mock(WALKeyImpl.class);
|
||||||
|
when(key3.estimatedSerializedSizeOf()).thenReturn(200L);
|
||||||
|
when(key3.getSequenceId()).thenReturn(3L);
|
||||||
|
Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
|
||||||
|
.collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
|
||||||
|
throw new IllegalStateException();
|
||||||
|
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
|
||||||
|
FlushDescriptor fd =
|
||||||
|
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
|
||||||
|
WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
|
||||||
|
sink.add(key3, edit3, rpcCall3);
|
||||||
|
|
||||||
|
// the flush marker should have cleared the failedReplicas, so we will send the edit to 2
|
||||||
|
// replicas again
|
||||||
|
verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
|
||||||
|
futures.get(3).complete(null);
|
||||||
|
futures.get(4).complete(null);
|
||||||
|
|
||||||
|
// should have send out all so no pending entries.
|
||||||
|
assertEquals(0, sink.pendingSize());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue