HBASE-26960 Another case for unnecessary replication suspending in Re… (#4355)

This commit is contained in:
chenglei 2022-04-30 11:37:02 +08:00 committed by GitHub
parent b0c2832b6e
commit f5a566ea1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 357 additions and 9 deletions

View File

@ -2794,7 +2794,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
long flushOpSeqId = writeEntry.getWriteNumber(); long flushOpSeqId = writeEntry.getWriteNumber();
FlushResultImpl flushResult = FlushResultImpl flushResult =
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId,
"Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); "Nothing to flush",
writeCanNotFlushMarkerToWAL(writeEntry, wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
// Set to null so we don't complete it again down in finally block. // Set to null so we don't complete it again down in finally block.
writeEntry = null; writeEntry = null;
@ -2975,17 +2976,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
/** /**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various * This method is only used when we flush but the memstore is empty,if writeFlushWalMarker is
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded. * true,we write the {@link FlushAction#CANNOT_FLUSH} flush marker to WAL when the memstore is
* empty. Ignores exceptions from WAL. Returns whether the write succeeded.
* @return whether WAL write was successful * @return whether WAL write was successful
*/ */
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) { private boolean writeCanNotFlushMarkerToWAL(WriteEntry flushOpSeqIdMVCCEntry, WAL wal,
boolean writeFlushWalMarker) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH, getRegionInfo(),
-1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
RegionReplicationSink sink = regionReplicationSink.orElse(null);
if (sink != null && !writeFlushWalMarker) {
/**
* Here for replication to secondary region replica could use {@link FlushAction#CANNOT_FLUSH}
* to recover writeFlushWalMarker is false, we create {@link WALEdit} for
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry,
desc, sink);
return false;
}
if (writeFlushWalMarker && wal != null && !writestate.readOnly) { if (writeFlushWalMarker && wal != null && !writestate.readOnly) {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try { try {
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc, WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
regionReplicationSink.orElse(null)); sink);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : " + LOG.warn(getRegionInfo().getEncodedName() + " : " +
@ -2995,6 +3012,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return false; return false;
} }
/**
* Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
* to the flushOpSeqIdMVCCEntry.
*/
private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
FlushDescriptor desc, RegionReplicationSink sink) {
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
WALKeyImpl walKey =
WALUtil.createWALKey(getRegionInfo(), mvcc, this.getReplicationScope(), null);
walKey.setWriteEntry(flushOpSeqIdMVCCEntry);
/**
* Here the {@link ServerCall} is null for {@link RegionReplicationSink#add} because the
* flushMarkerWALEdit is created by ourselves, not from rpc.
*/
flushOpSeqIdMVCCEntry.attachCompletionAction(() -> sink.add(walKey, flushMarkerWALEdit, null));
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; notify is about completed flush") justification="Intentional; notify is about completed flush")
FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status,

View File

@ -303,6 +303,10 @@ public class MultiVersionConcurrencyControl {
completionAction.ifPresent(Runnable::run); completionAction.ifPresent(Runnable::run);
} }
public Optional<Runnable> getCompletionAction() {
return completionAction;
}
public long getWriteNumber() { public long getWriteNumber() {
return this.writeNumber; return this.writeNumber;
} }

View File

@ -158,8 +158,7 @@ public class WALUtil {
final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes, final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
final boolean sync, final RegionReplicationSink sink) throws IOException { final boolean sync, final RegionReplicationSink sink) throws IOException {
// TODO: Pass in current time to use? // TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(), WALKeyImpl walKey = createWALKey(hri, mvcc, replicationScope, extendedAttributes);
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE; long trx = MultiVersionConcurrencyControl.NONE;
try { try {
trx = wal.appendMarker(hri, walKey, edit); trx = wal.appendMarker(hri, walKey, edit);
@ -182,6 +181,13 @@ public class WALUtil {
return walKey; return walKey;
} }
public static WALKeyImpl createWALKey(final RegionInfo hri, MultiVersionConcurrencyControl mvcc,
final NavigableMap<byte[], Integer> replicationScope,
final Map<String, byte[]> extendedAttributes) {
return new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
}
/** /**
* Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in * Blocksize returned here is 2x the default HDFS blocksize unless explicitly set in
* Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in * Configuration. Works in tandem with hbase.regionserver.logroll.multiplier. See comment in

View File

@ -0,0 +1,303 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.regionreplication;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreFlusher;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
@Category({ RegionServerTests.class, LargeTests.class })
public class TestRegionReplicationForFlushMarker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicationForFlushMarker.class);
private static final byte[] FAMILY = Bytes.toBytes("family_test");
private static final byte[] QUAL = Bytes.toBytes("qualifier_test");
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
private static final int NB_SERVERS = 2;
private static TableName tableName = TableName.valueOf("TestRegionReplicationForFlushMarker");
private static volatile boolean startTest = false;
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
conf.setInt(RegionReplicationSink.RETRIES_NUMBER, 1);
conf.setLong(RegionReplicationSink.RPC_TIMEOUT_MS, 10 * 60 * 1000);
conf.setLong(RegionReplicationSink.OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
conf.setLong(RegionReplicationSink.META_EDIT_RPC_TIMEOUT_MS, 10 * 60 * 1000);
conf.setLong(RegionReplicationSink.META_EDIT_OPERATION_TIMEOUT_MS, 20 * 60 * 1000);
conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
conf.setInt(RegionReplicationFlushRequester.MIN_INTERVAL_SECS, 3);
HTU.startMiniCluster(StartTestingClusterOption.builder().rsClass(RSForTest.class)
.numRegionServers(NB_SERVERS).build());
}
@AfterClass
public static void tearDown() throws Exception {
HTU.shutdownMiniCluster();
}
/**
* This test is for HBASE-26960, before HBASE-26960, {@link MemStoreFlusher} does not write the
* {@link FlushAction#CANNOT_FLUSH} marker to the WAL when the memstore is empty,so if the
* {@link RegionReplicationSink} request a flush when the memstore is empty, it could not receive
* the {@link FlushAction#CANNOT_FLUSH} and the replication may be hanged. After HBASE-26768,when
* the {@link RegionReplicationSink} request a flush when the memstore is empty,even it does not
* writes the {@link FlushAction#CANNOT_FLUSH} marker to the WAL,we also replicate the
* {@link FlushAction#CANNOT_FLUSH} marker to the secondary region replica.
*/
@Test
public void testCannotFlushMarker() throws Exception {
final HRegionForTest[] regions = this.createTable();
RegionReplicationSink regionReplicationSink = regions[0].getRegionReplicationSink().get();
assertTrue(regionReplicationSink != null);
String oldThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(HRegionForTest.USER_THREAD_NAME);
try {
byte[] rowKey1 = Bytes.toBytes(1);
startTest = true;
/**
* Write First cell,replicating to secondary replica is error,and then
* {@link RegionReplicationSink} request flush,after {@link RegionReplicationSink} receiving
* the {@link FlushAction#START_FLUSH},the {@link RegionReplicationSink#failedReplicas} is
* cleared,but replicating {@link FlushAction#START_FLUSH} is failed again,so
* {@link RegionReplicationSink} request flush once more, but now memstore is empty,so the
* {@link MemStoreFlusher} just write a {@link FlushAction#CANNOT_FLUSH} marker to the WAL.
*/
regions[0].put(new Put(rowKey1).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
/**
* Wait for the {@link FlushAction#CANNOT_FLUSH} is written and initiating replication
*/
regions[0].cyclicBarrier.await();
assertTrue(regions[0].prepareFlushCounter.get() == 2);
/**
* The {@link RegionReplicationSink#failedReplicas} is cleared by the
* {@link FlushAction#CANNOT_FLUSH} marker.
*/
assertTrue(regionReplicationSink.getFailedReplicas().isEmpty());
} finally {
startTest = false;
Thread.currentThread().setName(oldThreadName);
}
}
private HRegionForTest[] createTable() throws Exception {
TableDescriptor tableDescriptor =
TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
.build();
HTU.getAdmin().createTable(tableDescriptor);
final HRegionForTest[] regions = new HRegionForTest[NB_SERVERS];
for (int i = 0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
for (HRegion region : onlineRegions) {
int replicaId = region.getRegionInfo().getReplicaId();
assertTrue(regions[replicaId] == null);
regions[region.getRegionInfo().getReplicaId()] = (HRegionForTest) region;
}
}
for (Region region : regions) {
assertNotNull(region);
}
return regions;
}
public static final class HRegionForTest extends HRegion {
static final String USER_THREAD_NAME = "TestRegionReplicationForFlushMarker";
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
final AtomicInteger prepareFlushCounter = new AtomicInteger(0);
public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
TableDescriptor htd, RegionServerServices rsServices) {
super(fs, wal, confParam, htd, rsServices);
}
@SuppressWarnings("deprecation")
public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
}
public void setRegionReplicationSink(RegionReplicationSink regionReplicationSink) {
this.regionReplicationSink = Optional.of(regionReplicationSink);
}
@Override
protected void writeRegionOpenMarker(WAL wal, long openSeqId) throws IOException {
// not write the region open marker to interrupt the test.
}
@Override
protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
if (!startTest) {
return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
writeFlushWalMarker, tracker);
}
if (this.getRegionInfo().getReplicaId() != 0) {
return super.internalPrepareFlushCache(wal, myseqid, storesToFlush, status,
writeFlushWalMarker, tracker);
}
try {
PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
status, writeFlushWalMarker, tracker);
this.prepareFlushCounter.incrementAndGet();
/**
* First flush is {@link FlushAction#START_FLUSH} marker and the second flush is
* {@link FlushAction#CANNOT_FLUSH} marker because the memstore is empty.
*/
if (this.prepareFlushCounter.get() == 2
&& result.getResult() != null
&& result.getResult().getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
cyclicBarrier.await();
}
return result;
} catch (BrokenBarrierException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static final class ErrorReplayRSRpcServices extends RSRpcServices {
private static final AtomicInteger callCounter = new AtomicInteger(0);
public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ReplicateWALEntryResponse replicateToReplica(RpcController rpcController,
ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
if (!startTest) {
return super.replicateToReplica(rpcController, replicateWALEntryRequest);
}
List<WALEntry> entries = replicateWALEntryRequest.getEntryList();
if (CollectionUtils.isEmpty(entries)) {
return ReplicateWALEntryResponse.getDefaultInstance();
}
ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
HRegion region;
try {
region = server.getRegionByEncodedName(regionName.toStringUtf8());
} catch (NotServingRegionException e) {
throw new ServiceException(e);
}
if (!region.getRegionInfo().getTable().equals(tableName)
|| region.getRegionInfo().getReplicaId() != 1) {
return super.replicateToReplica(rpcController, replicateWALEntryRequest);
}
/**
* Simulate the first cell write and {@link FlushAction#START_FLUSH} marker replicating error.
*/
int count = callCounter.incrementAndGet();
if (count > 2) {
return super.replicateToReplica(rpcController, replicateWALEntryRequest);
}
throw new ServiceException(new DoNotRetryIOException("Inject error!"));
}
}
public static final class RSForTest
extends SingleProcessHBaseCluster.MiniHBaseClusterRegionServer {
public RSForTest(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new ErrorReplayRSRpcServices(this);
}
}
}