HBASE-26412 Handle sink failure in RegionReplicationSink (#3815)

Signed-off-by: GeorryHuang <huangzhuoyue@apache.org>
This commit is contained in:
Duo Zhang 2021-11-09 21:41:25 +08:00
parent 7286cc0035
commit 2ee18988e6
4 changed files with 275 additions and 41 deletions

View File

@ -1107,8 +1107,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return;
}
status.setStatus("Initializaing region replication sink");
regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo, td, () -> {
rss.getFlushRequester().requestFlush(this, new ArrayList<>(td.getColumnFamilyNames()),
FlushLifeCycleTracker.DUMMY);
}, rss.getAsyncClusterConnection()));
}
/**

View File

@ -17,18 +17,29 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
@ -39,6 +50,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
/**
* The class for replicating WAL edits to secondary replicas, one instance per region.
*/
@ -97,35 +110,39 @@ public class RegionReplicationSink {
private final RegionInfo primary;
private final int regionReplication;
private final TableDescriptor tableDesc;
private final boolean hasRegionMemStoreReplication;
private final Queue<SinkEntry> entries = new ArrayDeque<>();
private final Runnable flushRequester;
private final AsyncClusterConnection conn;
// used to track the replicas which we failed to replicate edits to them
// will be cleared after we get a flush edit.
private final Set<Integer> failedReplicas = new HashSet<>();
private final Queue<SinkEntry> entries = new ArrayDeque<>();
private final int retries;
private final long rpcTimeoutNs;
private final long operationTimeoutNs;
private CompletableFuture<Void> future;
private boolean sending;
private boolean stopping;
private boolean stopped;
RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
RegionReplicationSink(Configuration conf, RegionInfo primary, TableDescriptor td,
Runnable flushRequester, AsyncClusterConnection conn) {
Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
primary);
Preconditions.checkArgument(regionReplication > 1,
"region replication should be greater than 1 but got %s", regionReplication);
Preconditions.checkArgument(td.getRegionReplication() > 1,
"region replication should be greater than 1 but got %s", td.getRegionReplication());
this.primary = primary;
this.regionReplication = regionReplication;
this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
this.tableDesc = td;
this.flushRequester = flushRequester;
this.conn = conn;
this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
this.rpcTimeoutNs =
@ -134,6 +151,36 @@ public class RegionReplicationSink {
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
}
private void onComplete(List<SinkEntry> sent,
Map<Integer, MutableObject<Throwable>> replica2Error) {
sent.forEach(SinkEntry::replicated);
Set<Integer> failed = new HashSet<>();
for (Map.Entry<Integer, MutableObject<Throwable>> entry : replica2Error.entrySet()) {
Integer replicaId = entry.getKey();
Throwable error = entry.getValue().getValue();
if (error != null) {
LOG.warn("Failed to replicate to secondary replica {} for {}, stop replicating" +
" for a while and trigger a flush", replicaId, primary, error);
failed.add(replicaId);
}
}
synchronized (entries) {
if (!failed.isEmpty()) {
failedReplicas.addAll(failed);
flushRequester.run();
}
sending = false;
if (stopping) {
stopped = true;
entries.notifyAll();
return;
}
if (!entries.isEmpty()) {
send();
}
}
}
private void send() {
List<SinkEntry> toSend = new ArrayList<>();
for (SinkEntry entry;;) {
@ -143,32 +190,37 @@ public class RegionReplicationSink {
}
toSend.add(entry);
}
int toSendReplicaCount = tableDesc.getRegionReplication() - 1 - failedReplicas.size();
if (toSendReplicaCount <= 0) {
return;
}
sending = true;
List<WAL.Entry> walEntries =
toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < tableDesc.getRegionReplication(); replicaId++) {
MutableObject<Throwable> error = new MutableObject<>();
replica2Error.put(replicaId, error);
RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
FutureUtils.addListener(
conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs), (r, e) -> {
error.setValue(e);
if (remaining.decrementAndGet() == 0) {
onComplete(toSend, replica2Error);
}
});
}
future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
FutureUtils.addListener(future, (r, e) -> {
if (e != null) {
// TODO: drop pending edits and issue a flush
LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
}
toSend.forEach(SinkEntry::replicated);
synchronized (entries) {
future = null;
if (stopping) {
stopped = true;
entries.notifyAll();
return;
}
if (!entries.isEmpty()) {
send();
}
}
});
}
private boolean flushAllStores(FlushDescriptor flushDesc) {
Set<byte[]> storesFlushed =
flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
.collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
if (storesFlushed.size() != tableDesc.getColumnFamilyCount()) {
return false;
}
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}
/**
@ -178,7 +230,7 @@ public class RegionReplicationSink {
* rpc call has cell scanner, which is off heap.
*/
public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
if (!tableDesc.hasRegionMemStoreReplication() && !edit.isMetaEdit()) {
// only replicate meta edit if region memstore replication is not enabled
return;
}
@ -186,10 +238,31 @@ public class RegionReplicationSink {
if (stopping) {
return;
}
if (edit.isMetaEdit()) {
// check whether we flushed all stores, which means we could drop all the previous edits,
// and also, recover from the previous failure of some replicas
for (Cell metaCell : edit.getCells()) {
if (CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
FlushDescriptor flushDesc;
try {
flushDesc = WALEdit.getFlushDescriptor(metaCell);
} catch (IOException e) {
LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
continue;
}
if (flushDesc != null && flushAllStores(flushDesc)) {
LOG.debug("Got a flush all request, clear failed replicas {} and {} pending" +
" replication entries", failedReplicas, entries.size());
entries.clear();
failedReplicas.clear();
}
}
}
}
// TODO: limit the total cached entries here, and we should have a global limitation, not for
// only this region.
entries.add(new SinkEntry(key, edit, rpcCall));
if (future == null) {
if (!sending) {
send();
}
}
@ -203,7 +276,7 @@ public class RegionReplicationSink {
void stop() {
synchronized (entries) {
stopping = true;
if (future == null) {
if (!sending) {
stopped = true;
entries.notifyAll();
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
/**
* Test region replication when error occur.
* <p/>
* We can not simply move the secondary replicas as we will trigger a flush for the primary replica
* when secondary replica is online, which will always make the data of the two regions in sync. So
* here we need to simulate request errors.
*/
@Category({ FlakeyTests.class, LargeTests.class })
public class TestRegionReplicaReplicationError {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaReplicationError.class);
public static final class ErrorReplayRSRpcServices extends RSRpcServices {
private final AtomicInteger count = new AtomicInteger(0);
public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ReplicateWALEntryResponse replay(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
List<WALEntry> entries = request.getEntryList();
if (CollectionUtils.isEmpty(entries)) {
return ReplicateWALEntryResponse.getDefaultInstance();
}
ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
HRegion region;
try {
region = server.getRegionByEncodedName(regionName.toStringUtf8());
} catch (NotServingRegionException e) {
throw new ServiceException(e);
}
// fail the first several request
if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
throw new ServiceException("Inject error!");
}
return super.replay(controller, request);
}
}
public static final class RSForTest
extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
public RSForTest(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new ErrorReplayRSRpcServices(this);
}
}
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
private static TableName TN = TableName.valueOf("test");
private static byte[] CF = Bytes.toBytes("cf");
private static byte[] CQ = Bytes.toBytes("cq");
@BeforeClass
public static void setUp() throws Exception {
HTU.getConfiguration().setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY,
true);
HTU.startMiniCluster(
StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
HTU.getAdmin().createTable(td);
}
@AfterClass
public static void tearDown() throws Exception {
HTU.shutdownMiniCluster();
}
private boolean checkReplica(Table table, int replicaId) throws IOException {
boolean ret = true;
for (int i = 0; i < 500; i++) {
Result result = table.get(new Get(Bytes.toBytes(i)).setReplicaId(replicaId));
byte[] value = result.getValue(CF, CQ);
ret &= value != null && value.length > 0 && Bytes.toInt(value) == i;
}
return ret;
}
@Test
public void test() throws IOException {
try (Table table = HTU.getConnection().getTable(TN)) {
for (int i = 0; i < 500; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
HTU.waitFor(30000, () -> checkReplica(table, 2));
HTU.waitFor(30000, () -> checkReplica(table, 1));
}
}
}

View File

@ -53,8 +53,8 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
/**
* Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
* async wal replication replays the edits to the secondary region in various scenarios.
* Tests region replication by setting up region replicas and verifying async wal replication
* replays the edits to the secondary region in various scenarios.
*/
@Category({FlakeyTests.class, LargeTests.class})
public class TestRegionReplicaReplication {