HBASE-26993 Make the new framework for region replication could work for SKIP_WAL (#4392)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
chenglei 2022-06-03 16:39:17 +08:00 committed by GitHub
parent c0e8243c8b
commit d57159f31c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 502 additions and 60 deletions

View File

@ -2891,7 +2891,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* {@link FlushDescriptor} and attach the {@link RegionReplicationSink#add} to the
* flushOpSeqIdMVCCEntry,see HBASE-26960 for more details.
*/
this.attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
this.attachRegionReplicationToFlushOpSeqIdMVCCEntry(flushOpSeqIdMVCCEntry, desc, sink);
return false;
}
@ -2912,7 +2912,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Create {@link WALEdit} for {@link FlushDescriptor} and attach {@link RegionReplicationSink#add}
* to the flushOpSeqIdMVCCEntry.
*/
private void attachReplicateRegionReplicaToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
private void attachRegionReplicationToFlushOpSeqIdMVCCEntry(WriteEntry flushOpSeqIdMVCCEntry,
FlushDescriptor desc, RegionReplicationSink sink) {
assert !flushOpSeqIdMVCCEntry.getCompletionAction().isPresent();
WALEdit flushMarkerWALEdit = WALEdit.createFlushWALEdit(getRegionInfo(), desc);
@ -3372,8 +3372,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* Write mini-batch operations to MemStore
*/
public abstract WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
throws IOException;
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
long now) throws IOException;
protected void writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
@ -3592,6 +3592,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
}
protected WALEdit createWALEdit(final MiniBatchOperationInProgress<Mutation> miniBatchOp) {
return new WALEdit(miniBatchOp.getCellCount(), isInReplay());
}
/**
* Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
* present, they are merged to result WALEdit.
@ -3609,6 +3613,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we use durability of the original mutation for the mutation passed by CP.
if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
region.recordMutationWithoutWal(m.getFamilyCellMap());
/**
* Here is for HBASE-26993,in order to make the new framework for region replication
* could work for SKIP_WAL, we save the {@link Mutation} which
* {@link Mutation#getDurability} is {@link Durability#SKIP_WAL} in miniBatchOp.
*/
cacheSkipWALMutationForRegionReplication(miniBatchOp, walEdits, familyCellMaps[index]);
return true;
}
@ -3622,27 +3632,38 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|| curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup
|| curWALEditForNonce.getFirst().getNonce() != nonce
) {
curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
curWALEditForNonce =
new Pair<>(new NonceKey(nonceGroup, nonce), createWALEdit(miniBatchOp));
walEdits.add(curWALEditForNonce);
}
WALEdit walEdit = curWALEditForNonce.getSecond();
// Add WAL edits from CPs.
WALEdit fromCP = walEditsFromCoprocessors[index];
if (fromCP != null) {
for (Cell cell : fromCP.getCells()) {
walEdit.add(cell);
}
}
walEdit.add(familyCellMaps[index]);
List<Cell> cellsFromCP = fromCP == null ? Collections.emptyList() : fromCP.getCells();
addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMaps[index]);
return true;
}
});
return walEdits;
}
protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
doAddCellsToWALEdit(walEdit, cellsFromCP, familyCellMap);
}
protected static void doAddCellsToWALEdit(WALEdit walEdit, List<Cell> cellsFromCP,
Map<byte[], List<Cell>> familyCellMap) {
walEdit.add(cellsFromCP);
walEdit.add(familyCellMap);
}
protected abstract void cacheSkipWALMutationForRegionReplication(
final MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> walEdits, Map<byte[], List<Cell>> familyCellMap);
/**
* This method completes mini-batch operations by calling postBatchMutate() CP hook (if
* required) and completing mvcc.
@ -3717,6 +3738,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private long nonceGroup;
private long nonce;
protected boolean canProceed;
private boolean regionReplicateEnable;
public MutationBatchOperation(final HRegion region, Mutation[] operations, boolean atomic,
long nonceGroup, long nonce) {
@ -3724,6 +3746,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
this.atomic = atomic;
this.nonceGroup = nonceGroup;
this.nonce = nonce;
this.regionReplicateEnable = region.regionReplicationSink.isPresent();
}
@Override
@ -4140,17 +4163,115 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return walEdits;
}
/**
* Here is for HBASE-26993,in order to make the new framework for region replication could work
* for SKIP_WAL, we save the {@link Mutation} which {@link Mutation#getDurability} is
* {@link Durability#SKIP_WAL} in miniBatchOp.
*/
@Override
protected void cacheSkipWALMutationForRegionReplication(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits, Map<byte[], List<Cell>> familyCellMap) {
if (!this.regionReplicateEnable) {
return;
}
WALEdit walEditForReplicateIfExistsSkipWAL =
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
/**
* When there is a SKIP_WAL {@link Mutation},we create a new {@link WALEdit} for replicating
* to region replica,first we fill the existing {@link WALEdit} to it and then add the
* {@link Mutation} which is SKIP_WAL to it.
*/
if (walEditForReplicateIfExistsSkipWAL == null) {
walEditForReplicateIfExistsSkipWAL =
this.createWALEditForReplicateSkipWAL(miniBatchOp, nonceKeyAndWALEdits);
miniBatchOp.setWalEditForReplicateIfExistsSkipWAL(walEditForReplicateIfExistsSkipWAL);
}
walEditForReplicateIfExistsSkipWAL.add(familyCellMap);
}
private WALEdit createWALEditForReplicateSkipWAL(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
List<Pair<NonceKey, WALEdit>> nonceKeyAndWALEdits) {
if (nonceKeyAndWALEdits.isEmpty()) {
return this.createWALEdit(miniBatchOp);
}
// for MutationBatchOperation, more than one nonce is not allowed
assert nonceKeyAndWALEdits.size() == 1;
WALEdit currentWALEdit = nonceKeyAndWALEdits.get(0).getSecond();
return new WALEdit(currentWALEdit);
}
@Override
protected void addNonSkipWALMutationsToWALEdit(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WALEdit walEdit,
List<Cell> cellsFromCP, Map<byte[], List<Cell>> familyCellMap) {
super.addNonSkipWALMutationsToWALEdit(miniBatchOp, walEdit, cellsFromCP, familyCellMap);
WALEdit walEditForReplicateIfExistsSkipWAL =
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
if (walEditForReplicateIfExistsSkipWAL == null) {
return;
}
/**
* When walEditForReplicateIfExistsSkipWAL is not null,it means there exists SKIP_WAL
* {@link Mutation} and we create a new {@link WALEdit} in
* {@link MutationBatchOperation#cacheSkipWALMutationForReplicateRegionReplica} for
* replicating to region replica, so here we also add non SKIP_WAL{@link Mutation}s to
* walEditForReplicateIfExistsSkipWAL.
*/
doAddCellsToWALEdit(walEditForReplicateIfExistsSkipWAL, cellsFromCP, familyCellMap);
}
@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
throws IOException {
final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry,
long now) throws IOException {
boolean newWriteEntry = false;
if (writeEntry == null) {
writeEntry = region.mvcc.begin();
newWriteEntry = true;
}
super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
if (newWriteEntry) {
/**
* Here is for HBASE-26993 case 2,all {@link Mutation}s are {@link Durability#SKIP_WAL}. In
* order to make the new framework for region replication could work for SKIP_WAL,because
* there is no {@link RegionReplicationSink#add} attached in {@link HRegion#doWALAppend},so
* here we get {@link WALEdit} from
* {@link MiniBatchOperationInProgress#getWalEditForReplicateIfExistsSkipWAL} and attach
* {@link RegionReplicationSink#add} to the new mvcc writeEntry.
*/
attachRegionReplicationToMVCCEntry(miniBatchOp, writeEntry, now);
}
return writeEntry;
}
private WALKeyImpl createWALKey(long now) {
// for MutationBatchOperation,isReplay is false.
return this.region.createWALKeyForWALAppend(false, this, now, this.nonceGroup, this.nonce);
}
/**
* Create {@link WALKeyImpl} and get {@link WALEdit} from miniBatchOp and attach
* {@link RegionReplicationSink#add} to the mvccWriteEntry.
*/
private void attachRegionReplicationToMVCCEntry(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, WriteEntry mvccWriteEntry, long now)
throws IOException {
if (!this.regionReplicateEnable) {
return;
}
assert !mvccWriteEntry.getCompletionAction().isPresent();
final WALKeyImpl walKey = this.createWALKey(now);
walKey.setWriteEntry(mvccWriteEntry);
region.doAttachReplicateRegionReplicaAction(walKey,
miniBatchOp.getWalEditForReplicateIfExistsSkipWAL(), mvccWriteEntry);
}
@Override
public void completeMiniBatchOperations(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
@ -4466,8 +4587,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public WriteEntry writeMiniBatchOperationsToMemStore(
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
throws IOException {
final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry,
long now) throws IOException {
super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
return writeEntry;
}
@ -4479,6 +4600,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
super.completeMiniBatchOperations(miniBatchOp, writeEntry);
region.mvcc.advanceTo(getOrigLogSeqNum());
}
@Override
protected void cacheSkipWALMutationForRegionReplication(
MiniBatchOperationInProgress<Mutation> miniBatchOp, List<Pair<NonceKey, WALEdit>> walEdits,
Map<byte[], List<Cell>> familyCellMap) {
// There is no action to do if current region is secondary replica
}
}
public OperationStatus[] batchMutate(Mutation[] mutations, boolean atomic, long nonceGroup,
@ -4647,8 +4776,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
if (walEdit != null && !walEdit.isEmpty()) {
writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
writeEntry = doWALAppend(walEdit, batchOp, miniBatchOp, now, nonceKey);
}
// Complete mvcc for all but last writeEntry (for replay case)
@ -4660,7 +4788,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 5. Write back to memStore
// NOTE: writeEntry can be null here
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry, now);
// STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
// complete mvcc for last writeEntry
@ -7903,42 +8031,46 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}, () -> createRegionSpan("Region.increment"));
}
private WALKeyImpl createWALKeyForWALAppend(boolean isReplay, BatchOperation<?> batchOp, long now,
long nonceGroup, long nonce) {
WALKeyImpl walKey = isReplay
? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
batchOp.getClusterIds(), nonceGroup, nonce, mvcc)
: new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now,
batchOp.getClusterIds(), nonceGroup, nonce, mvcc, this.getReplicationScope());
if (isReplay) {
walKey.setOrigLogSeqNum(batchOp.getOrigLogSeqNum());
}
return walKey;
}
/**
* @return writeEntry associated with this append
*/
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
private WriteEntry doWALAppend(WALEdit walEdit, BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, long now, NonceKey nonceKey)
throws IOException {
Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), "WALEdit is null or empty!");
Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != SequenceId.NO_SEQUENCE_ID,
Preconditions.checkArgument(
!walEdit.isReplay() || batchOp.getOrigLogSeqNum() != SequenceId.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
// Using default cluster id, as this can only happen in the originating cluster.
// A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
// here instead of WALKeyImpl directly to support legacy coprocessors.
WALKeyImpl walKey = walEdit.isReplay()
? new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc)
: new WALKeyImpl(this.getRegionInfo().getEncodedNameAsBytes(),
this.htableDescriptor.getTableName(), SequenceId.NO_SEQUENCE_ID, now, clusterIds,
nonceGroup, nonce, mvcc, this.getReplicationScope());
if (walEdit.isReplay()) {
walKey.setOrigLogSeqNum(origLogSeqNum);
}
WALKeyImpl walKey = createWALKeyForWALAppend(walEdit.isReplay(), batchOp, now,
nonceKey.getNonceGroup(), nonceKey.getNonce());
// don't call the coproc hook for writes to the WAL caused by
// system lifecycle events like flushes or compactions
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
WriteEntry writeEntry = walKey.getWriteEntry();
regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
sink.add(walKey, walEdit, rpcCall);
}));
this.attachRegionReplicationInWALAppend(batchOp, miniBatchOp, walKey, walEdit, writeEntry);
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
sync(txid, batchOp.durability);
}
return writeEntry;
} catch (IOException ioe) {
@ -7947,7 +8079,51 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
throw ioe;
}
}
/**
* Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
* replica.
*/
private void attachRegionReplicationInWALAppend(BatchOperation<?> batchOp,
MiniBatchOperationInProgress<Mutation> miniBatchOp, WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
if (!regionReplicationSink.isPresent()) {
return;
}
/**
* If {@link HRegion#regionReplicationSink} is present,only {@link MutationBatchOperation} is
* used and {@link NonceKey} is all the same for {@link Mutation}s in
* {@link MutationBatchOperation},so for HBASE-26993 case 1,if
* {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is not null and we could
* enter {@link HRegion#doWALAppend},that means partial {@link Mutation}s are
* {@link Durability#SKIP_WAL}, we use
* {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} to replicate to region
* replica,but if {@link MiniBatchOperationInProgress#getWalEditForReplicateSkipWAL} is
* null,that means there is no {@link Mutation} is {@link Durability#SKIP_WAL},so we just use
* walEdit to replicate.
*/
assert batchOp instanceof MutationBatchOperation;
WALEdit walEditToUse = miniBatchOp.getWalEditForReplicateIfExistsSkipWAL();
if (walEditToUse == null) {
walEditToUse = walEdit;
}
doAttachReplicateRegionReplicaAction(walKey, walEditToUse, writeEntry);
}
/**
* Attach {@link RegionReplicationSink#add} to the mvcc writeEntry for replicating to region
* replica.
*/
private void doAttachReplicateRegionReplicaAction(WALKeyImpl walKey, WALEdit walEdit,
WriteEntry writeEntry) throws IOException {
if (walEdit == null || walEdit.isEmpty()) {
return;
}
final ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
sink.add(walKey, walEdit, rpcCall);
}));
}
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);

View File

@ -48,6 +48,13 @@ public class MiniBatchOperationInProgress<T> {
private int numOfDeletes = 0;
private int numOfIncrements = 0;
private int numOfAppends = 0;
/**
* Here is for HBASE-26993,saving the all the {@link Mutation}s if there is
* {@link Durability#SKIP_WAL} in {@link HRegion.BatchOperation#buildWALEdits} for
* {@link HRegion#doMiniBatchMutate} to also replicate {@link Mutation} which is
* {@link Durability#SKIP_WAL} to region replica.
*/
private WALEdit walEditForReplicateIfExistsSkipWAL = null;
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
@ -182,4 +189,12 @@ public class MiniBatchOperationInProgress<T> {
public void incrementNumOfAppends() {
this.numOfAppends += 1;
}
public WALEdit getWalEditForReplicateIfExistsSkipWAL() {
return walEditForReplicateIfExistsSkipWAL;
}
public void setWalEditForReplicateIfExistsSkipWAL(WALEdit walEditForReplicateSkipWAL) {
this.walEditForReplicateIfExistsSkipWAL = walEditForReplicateSkipWAL;
}
}

View File

@ -177,6 +177,19 @@ public class WALEdit implements HeapSize {
cells = new ArrayList<>(cellCount);
}
/**
* Create a new WALEdit from a existing {@link WALEdit}.
*/
public WALEdit(WALEdit walEdit) {
this.replay = walEdit.replay;
cells = new ArrayList<>(walEdit.cells);
if (walEdit.families != null) {
this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
this.families.addAll(walEdit.families);
}
}
private Set<byte[]> getOrCreateFamilies() {
if (this.families == null) {
this.families = new TreeSet<>(Bytes.BYTES_COMPARATOR);
@ -237,6 +250,17 @@ public class WALEdit implements HeapSize {
return add(cell, CellUtil.cloneFamily(cell));
}
@InterfaceAudience.Private
public WALEdit add(List<Cell> cells) {
if (cells == null || cells.isEmpty()) {
return this;
}
for (Cell cell : cells) {
add(cell);
}
return this;
}
public boolean isEmpty() {
return cells.isEmpty();
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@ -37,6 +39,7 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -69,7 +72,8 @@ public class TestRegionReplicaReplicationError {
public static final class ErrorReplayRSRpcServices extends RSRpcServices {
private final AtomicInteger count = new AtomicInteger(0);
private final ConcurrentHashMap<HRegion, AtomicInteger> regionToCounter =
new ConcurrentHashMap<HRegion, AtomicInteger>();
public ErrorReplayRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
@ -89,8 +93,12 @@ public class TestRegionReplicaReplicationError {
} catch (NotServingRegionException e) {
throw new ServiceException(e);
}
AtomicInteger counter =
ConcurrentMapUtils.computeIfAbsent(regionToCounter, region, () -> new AtomicInteger(0));
// fail the first several request
if (region.getRegionInfo().getReplicaId() == 1 && count.addAndGet(entries.size()) < 100) {
if (region.getRegionInfo().getReplicaId() == 1 && counter.addAndGet(entries.size()) < 100) {
throw new ServiceException("Inject error!");
}
return super.replicateToReplica(controller, request);
@ -112,7 +120,7 @@ public class TestRegionReplicaReplicationError {
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
private static TableName TN = TableName.valueOf("test");
private static String TN = "test";
private static byte[] CF = Bytes.toBytes("cf");
@ -124,9 +132,6 @@ public class TestRegionReplicaReplicationError {
true);
HTU.startMiniCluster(
StartTestingClusterOption.builder().rsClass(RSForTest.class).numRegionServers(3).build());
TableDescriptor td = TableDescriptorBuilder.newBuilder(TN).setRegionReplication(3)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build();
HTU.getAdmin().createTable(td);
}
@AfterClass
@ -145,8 +150,26 @@ public class TestRegionReplicaReplicationError {
}
@Test
public void test() throws IOException {
try (Table table = HTU.getConnection().getTable(TN)) {
public void testDefaultDurability() throws IOException {
doTest(false);
}
@Test
public void testSkipWAL() throws IOException {
doTest(true);
}
private void doTest(boolean skipWAL) throws IOException {
TableName tableName = TableName.valueOf(TN + (skipWAL ? "_skipWAL" : ""));
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
.setRegionReplication(3).setColumnFamily(ColumnFamilyDescriptorBuilder.of(CF));
if (skipWAL) {
builder.setDurability(Durability.SKIP_WAL);
}
TableDescriptor td = builder.build();
HTU.getAdmin().createTable(td);
try (Table table = HTU.getConnection().getTable(tableName)) {
for (int i = 0; i < 500; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.regionreplication;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, LargeTests.class })
public class TestRegionReplicationForSkipWAL {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicationForSkipWAL.class);
private static final byte[] FAM1 = Bytes.toBytes("family_test1");
private static final byte[] QUAL1 = Bytes.toBytes("qualifier_test1");
private static final byte[] FAM2 = Bytes.toBytes("family_test2");
private static final byte[] QUAL2 = Bytes.toBytes("qualifier_test2");
private static final byte[] FAM3 = Bytes.toBytes("family_test3");
private static final byte[] QUAL3 = Bytes.toBytes("qualifier_test3");
private static final byte[] FAM4 = Bytes.toBytes("family_test4");
private static final byte[] QUAL4 = Bytes.toBytes("qualifier_test4");
private static final byte[] FAM5 = Bytes.toBytes("family_test5");
private static final byte[] QUAL5 = Bytes.toBytes("qualifier_test5");
private static final byte[] FAM6 = Bytes.toBytes("family_test6");
private static final byte[] QUAL6 = Bytes.toBytes("qualifier_test6");
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
private static final int NB_SERVERS = 2;
private static final String strTableName = "TestRegionReplicationForSkipWAL";
@BeforeClass
public static void setUp() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(RegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
HTU.startMiniCluster(StartTestingClusterOption.builder().numRegionServers(NB_SERVERS).build());
}
@AfterClass
public static void tearDown() throws Exception {
HTU.shutdownMiniCluster();
}
/**
* This test is for HBASE-26933,make the new region replication framework introduced by
* HBASE-26233 work for table which DURABILITY is Durability.SKIP_WAL.
*/
@Test
public void testReplicateToReplicaWhenSkipWAL() throws Exception {
final HRegion[] skipWALRegions = this.createTable(true);
byte[] rowKey1 = Bytes.toBytes(1);
byte[] value1 = Bytes.toBytes(2);
byte[] rowKey2 = Bytes.toBytes(2);
byte[] value2 = Bytes.toBytes(4);
// Test the table is skipWAL
skipWALRegions[0].batchMutate(new Mutation[] { new Put(rowKey1).addColumn(FAM1, QUAL1, value1),
new Put(rowKey2).addColumn(FAM2, QUAL2, value2) });
try (Table skipWALTable = HTU.getConnection().getTable(getTableName(true))) {
HTU.waitFor(30000, () -> checkReplica(skipWALTable, FAM1, QUAL1, rowKey1, value1)
&& checkReplica(skipWALTable, FAM2, QUAL2, rowKey2, value2));
}
byte[] rowKey3 = Bytes.toBytes(3);
byte[] value3 = Bytes.toBytes(6);
byte[] rowKey4 = Bytes.toBytes(4);
byte[] value4 = Bytes.toBytes(8);
byte[] rowKey5 = Bytes.toBytes(5);
byte[] value5 = Bytes.toBytes(10);
byte[] rowKey6 = Bytes.toBytes(6);
byte[] value6 = Bytes.toBytes(12);
// Test the table is normal,but the Put is skipWAL
final HRegion[] normalRegions = this.createTable(false);
normalRegions[0].batchMutate(new Mutation[] { new Put(rowKey3).addColumn(FAM3, QUAL3, value3),
new Put(rowKey4).addColumn(FAM4, QUAL4, value4).setDurability(Durability.SKIP_WAL),
new Put(rowKey5).addColumn(FAM5, QUAL5, value5).setDurability(Durability.SKIP_WAL),
new Put(rowKey6).addColumn(FAM6, QUAL6, value6) });
try (Table normalTable = HTU.getConnection().getTable(getTableName(false))) {
HTU.waitFor(30000,
() -> checkReplica(normalTable, FAM3, QUAL3, rowKey3, value3)
&& checkReplica(normalTable, FAM4, QUAL4, rowKey4, value4)
&& checkReplica(normalTable, FAM5, QUAL5, rowKey5, value5)
&& checkReplica(normalTable, FAM6, QUAL6, rowKey6, value6));
}
}
private static boolean checkReplica(Table table, byte[] fam, byte[] qual, byte[] rowKey,
byte[] expectValue) throws IOException {
Get get = new Get(rowKey).setConsistency(Consistency.TIMELINE).setReplicaId(1);
Result result = table.get(get);
byte[] value = result.getValue(fam, qual);
return value != null && value.length > 0 && Arrays.equals(expectValue, value);
}
private TableName getTableName(boolean skipWAL) {
return TableName.valueOf(strTableName + (skipWAL ? "_skipWAL" : ""));
}
private HRegion[] createTable(boolean skipWAL) throws Exception {
TableName tableName = getTableName(skipWAL);
TableDescriptorBuilder builder =
TableDescriptorBuilder.newBuilder(tableName).setRegionReplication(NB_SERVERS)
.setColumnFamilies(Arrays.asList(ColumnFamilyDescriptorBuilder.of(FAM1),
ColumnFamilyDescriptorBuilder.of(FAM2), ColumnFamilyDescriptorBuilder.of(FAM3),
ColumnFamilyDescriptorBuilder.of(FAM4), ColumnFamilyDescriptorBuilder.of(FAM5),
ColumnFamilyDescriptorBuilder.of(FAM6)));
if (skipWAL) {
builder.setDurability(Durability.SKIP_WAL);
}
TableDescriptor tableDescriptor = builder.build();
HTU.getAdmin().createTable(tableDescriptor);
final HRegion[] regions = new HRegion[NB_SERVERS];
for (int i = 0; i < NB_SERVERS; i++) {
HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
List<HRegion> onlineRegions = rs.getRegions(tableName);
for (HRegion region : onlineRegions) {
int replicaId = region.getRegionInfo().getReplicaId();
assertTrue(regions[replicaId] == null);
regions[region.getRegionInfo().getReplicaId()] = region;
}
}
for (Region region : regions) {
assertNotNull(region);
}
return regions;
}
}

View File

@ -31,8 +31,10 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
@ -96,16 +98,22 @@ public class TestRegionReplicaReplication {
HTU.shutdownMiniCluster();
}
private void testRegionReplicaReplication(int regionReplication) throws Exception {
private void testRegionReplicaReplication(int regionReplication, boolean skipWAL)
throws Exception {
// test region replica replication. Create a table with single region, write some data
// ensure that data is replicated to the secondary region
TableName tableName =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_" + regionReplication);
TableDescriptor htd = HTU
.createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
.setRegionReplication(regionReplication).build();
TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ regionReplication + (skipWAL ? "_skipWAL" : ""));
TableDescriptorBuilder builder =
HTU
.createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
.setRegionReplication(regionReplication);
if (skipWAL) {
builder.setDurability(Durability.SKIP_WAL);
}
TableDescriptor htd = builder.build();
createOrEnableTableWithRetries(htd, true);
TableName tableNameNoReplicas =
TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
@ -171,17 +179,20 @@ public class TestRegionReplicaReplication {
@Test
public void testRegionReplicaReplicationWith2Replicas() throws Exception {
testRegionReplicaReplication(2);
testRegionReplicaReplication(2, false);
testRegionReplicaReplication(2, true);
}
@Test
public void testRegionReplicaReplicationWith3Replicas() throws Exception {
testRegionReplicaReplication(3);
testRegionReplicaReplication(3, false);
testRegionReplicaReplication(3, true);
}
@Test
public void testRegionReplicaReplicationWith10Replicas() throws Exception {
testRegionReplicaReplication(10);
testRegionReplicaReplication(10, false);
testRegionReplicaReplication(10, true);
}
@Test