HBASE-26416 Implement a new method for region replication instead of using replay (#3864)

Signed-off-by: Xiaolin Ha <haxiaolin@apache.org>
This commit is contained in:
Duo Zhang 2021-11-22 21:50:02 +08:00
parent 67306e74b1
commit 529c36e7ad
15 changed files with 795 additions and 198 deletions

View File

@ -1192,7 +1192,13 @@ public final class HConstants {
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
/**
* @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
* region replication has its own 'replay' method.
*/
@Deprecated
public static final int REPLAY_QOS = 6;
public static final int REGION_REPLICATION_QOS = REPLAY_QOS;
public static final int QOS_THRESHOLD = 10;
public static final int ADMIN_QOS = 100;
public static final int HIGH_QOS = 200;

View File

@ -360,6 +360,11 @@ service AdminService {
returns(ReplicateWALEntryResponse);
rpc Replay(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse) {
option deprecated = true;
};
rpc ReplicateToReplica(ReplicateWALEntryRequest)
returns(ReplicateWALEntryResponse);
rpc RollWALWriter(RollWALWriterRequest)

View File

@ -67,7 +67,7 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller
.buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
resetCallTimeout();
controller.setCellScanner(pair.getSecond());
stub.replay(controller, pair.getFirst(), r -> {
stub.replicateToReplica(controller, pair.getFirst(), r -> {
if (controller.failed()) {
onError(controller.getFailed(),
() -> "Call to " + loc.getServerName() + " for " + replica + " failed",

View File

@ -3492,4 +3492,10 @@ public class MasterRpcServices extends HBaseRpcServicesBase<HMaster>
.forEach(builder::addServer);
return builder.build();
}
@Override
public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}
}

View File

@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -200,6 +201,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@ -358,7 +360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private Path regionDir;
private FileSystem walFS;
// set to true if the region is restored from snapshot
// set to true if the region is restored from snapshot for reading by ClientSideRegionScanner
private boolean isRestoredRegion = false;
public void setRestoredRegion(boolean restoredRegion) {
@ -414,9 +416,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// The following map is populated when opening the region
Map<byte[], Long> maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR);
// lock used to protect the replay operation for secondary replicas, so the below two fields does
// not need to be volatile.
private Lock replayLock;
/** Saved state from replaying prepare flush cache */
private PrepareFlushResult prepareFlushResult = null;
private long lastReplayedSequenceId = HConstants.NO_SEQNUM;
private volatile ConfigurationManager configurationManager;
// Used for testing.
@ -1075,7 +1083,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
LOG.debug("Failed to clean up wrong region WAL directory {}", wrongRegionWALDir);
}
}
} else {
lastReplayedSequenceId = nextSeqId - 1;
replayLock = new ReentrantLock();
}
initializeRegionReplicationSink(reporter, status);
}
LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(),
@ -1090,7 +1102,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
initializeRegionReplicationSink(reporter, status);
status.markComplete("Region opened successfully");
return nextSeqId;
}
@ -1244,6 +1255,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor(
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
// we do not care region close event at secondary replica side so just pass a null
// RegionReplicationSink
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
mvcc, null);
@ -1686,7 +1699,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
}
}
if (regionReplicationSink.isPresent()) {
// stop replicating to secondary replicas
// the open event marker can make secondary replicas refresh store files and catch up
// everything, so here we just give up replicating later edits, to speed up the reopen process
RegionReplicationSink sink = regionReplicationSink.get();
sink.stop();
try {
regionReplicationSink.get().waitUntilStopped();
} catch (InterruptedException e) {
throw throwOnInterrupt(e);
}
}
// Set the closing flag
// From this point new arrivals at the region lock will get NSRE.
@ -1890,16 +1914,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
}
if (regionReplicationSink.isPresent()) {
// stop replicating to secondary replicas
RegionReplicationSink sink = regionReplicationSink.get();
sink.stop();
try {
regionReplicationSink.get().waitUntilStopped();
} catch (InterruptedException e) {
throw throwOnInterrupt(e);
}
}
this.closed.set(true);
if (!canFlush) {
decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@ -2860,7 +2874,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
mvcc, null);
mvcc, regionReplicationSink.orElse(null));
}
// Prepare flush (take a snapshot)
@ -2958,7 +2972,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Writes a marker to WAL indicating a flush is requested but cannot be complete due to various
* reasons. Ignores exceptions from WAL. Returns whether the write succeeded.
* @param wal
* @return whether WAL write was successful
*/
private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) {
@ -2967,11 +2980,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try {
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
null);
regionReplicationSink.orElse(null));
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received exception while trying to write the flush request to wal", e);
LOG.warn(getRegionInfo().getEncodedName() + " : " +
"Received exception while trying to write the flush request to wal", e);
}
}
return false;
@ -4416,7 +4429,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
* of the logic is same.
* @deprecated Since 3.0.0, will be removed in 4.0.0. Now we will not use this operation to apply
* edits at secondary replica side.
*/
@Deprecated
private static final class ReplayBatchOperation extends BatchOperation<MutationReplay> {
private long origLogSeqNum = 0;
@ -4554,8 +4570,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
() -> createRegionSpan("Region.batchMutate"));
}
public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId)
throws IOException {
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Now we use
* {@link #replayWALEntry(WALEntry, CellScanner)} for replaying edits at secondary
* replica side.
*/
@Deprecated
OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException {
if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo())
&& replaySeqId < lastReplayedOpenRegionSeqId) {
// if it is a secondary replica we should ignore these entries silently
@ -5711,9 +5732,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException {
checkTargetRegion(flush.getEncodedRegionName().toByteArray(),
"Flush marker from WAL ", flush);
checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush);
if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
return; // if primary nothing to do
@ -5753,25 +5778,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/** Replay the flush marker from primary region by creating a corresponding snapshot of
* the store memstores, only if the memstores do not have a higher seqId from an earlier wal
* edit (because the events may be coming out of order).
*/
PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
long flushSeqId = flush.getFlushSequenceNumber();
HashSet<HStore> storesToFlush = new HashSet<>();
for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) {
private Collection<HStore> getStoresToFlush(FlushDescriptor flushDesc) {
List<HStore> storesToFlush = new ArrayList<>();
for (StoreFlushDescriptor storeFlush : flushDesc.getStoreFlushesList()) {
byte[] family = storeFlush.getFamilyName().toByteArray();
HStore store = getStore(family);
if (store == null) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "Received a flush start marker from primary, but the family is not found. Ignoring"
+ " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
LOG.warn(getRegionInfo().getEncodedName() + " : " +
"Received a flush start marker from primary, but the family is not found. Ignoring" +
" StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush));
continue;
}
storesToFlush.add(store);
}
return storesToFlush;
}
/**
* Replay the flush marker from primary region by creating a corresponding snapshot of the store
* memstores, only if the memstores do not have a higher seqId from an earlier wal edit (because
* the events may be coming out of order).
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException {
long flushSeqId = flush.getFlushSequenceNumber();
Collection<HStore> storesToFlush = getStoresToFlush(flush);
MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this);
@ -5867,6 +5901,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return null;
}
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; post memstore flush")
void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException {
@ -5988,11 +6027,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Replays the given flush descriptor by opening the flush files in stores and dropping the
* memstore snapshots if requested.
* @param flush
* @param prepareFlushResult
* @param dropMemstoreSnapshot
* @throws IOException
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult,
boolean dropMemstoreSnapshot)
throws IOException {
@ -6086,7 +6124,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/**
* Drops the memstore contents after replaying a flush descriptor or region open event replay
* if the memstore edits have seqNums smaller than the given seq id
* @throws IOException
*/
private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException {
MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
@ -6158,8 +6195,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return prepareFlushResult;
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY",
justification="Intentional; cleared the memstore")
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY",
justification = "Intentional; cleared the memstore")
void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException {
checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(),
"RegionEvent marker from WAL ", regionEvent);
@ -6276,6 +6318,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/**
* @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region
* replica implementation.
*/
@Deprecated
void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException {
checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(),
"BulkLoad marker from WAL ", bulkLoadEvent);
@ -6356,6 +6403,205 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
/**
* Replay the batch mutate for secondary replica.
* <p/>
* We will directly apply the cells to the memstore. This is because:
* <ol>
* <li>All the cells are gotten from {@link WALEdit}, so we only have {@link Put} and
* {@link Delete} here</li>
* <li>The replay is single threaded, we do not need to acquire row lock, as the region is read
* only so no one else can write it.</li>
* <li>We do not need to write WAL.</li>
* <li>We will advance MVCC in the caller directly.</li>
* </ol>
*/
private void replayWALBatchMutate(Map<byte[], List<Cell>> family2Cells) throws IOException {
startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
try {
for (Map.Entry<byte[], List<Cell>> entry : family2Cells.entrySet()) {
applyToMemStore(getStore(entry.getKey()), entry.getValue(), false, memStoreSizing);
}
} finally {
closeRegionOperation(Operation.REPLAY_BATCH_MUTATE);
}
}
/**
* Replay the meta edits, i.e, flush marker, compaction marker, bulk load marker, region event
* marker, etc.
* <p/>
* For all events other than start flush, we will just call {@link #refreshStoreFiles()} as the
* logic is straight-forward and robust. For start flush, we need to snapshot the memstore, so
* later {@link #refreshStoreFiles()} call could drop the snapshot, otherwise we may run out of
* memory.
*/
private void replayWALMetaEdit(Cell cell) throws IOException {
startRegionOperation(Operation.REPLAY_EVENT);
try {
FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell);
if (flushDesc != null) {
switch (flushDesc.getAction()) {
case START_FLUSH:
// for start flush, we need to take a snapshot of the current memstore
synchronized (writestate) {
if (!writestate.flushing) {
this.writestate.flushing = true;
} else {
// usually this should not happen but let's make the code more robust, it is not a
// big deal to just ignore it, the refreshStoreFiles call should have the ability to
// clean up the inconsistent state.
LOG.debug("NOT flushing {} as already flushing", getRegionInfo());
break;
}
}
MonitoredTask status =
TaskMonitor.get().createStatus("Preparing flush " + getRegionInfo());
Collection<HStore> storesToFlush = getStoresToFlush(flushDesc);
try {
PrepareFlushResult prepareResult =
internalPrepareFlushCache(null, flushDesc.getFlushSequenceNumber(), storesToFlush,
status, false, FlushLifeCycleTracker.DUMMY);
if (prepareResult.result == null) {
// save the PrepareFlushResult so that we can use it later from commit flush
this.prepareFlushResult = prepareResult;
status.markComplete("Flush prepare successful");
if (LOG.isDebugEnabled()) {
LOG.debug("{} prepared flush with seqId: {}", getRegionInfo(),
flushDesc.getFlushSequenceNumber());
}
} else {
// special case empty memstore. We will still save the flush result in this case,
// since our memstore is empty, but the primary is still flushing
if (prepareResult.getResult()
.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
this.prepareFlushResult = prepareResult;
if (LOG.isDebugEnabled()) {
LOG.debug("{} prepared empty flush with seqId: {}", getRegionInfo(),
flushDesc.getFlushSequenceNumber());
}
}
status.abort("Flush prepare failed with " + prepareResult.result);
// nothing much to do. prepare flush failed because of some reason.
}
} finally {
status.cleanup();
}
break;
case ABORT_FLUSH:
// do nothing, an abort flush means the source region server will crash itself, after
// the primary region online, it will send us an open region marker, then we can clean
// up the memstore.
synchronized (writestate) {
writestate.flushing = false;
}
break;
case COMMIT_FLUSH:
case CANNOT_FLUSH:
// just call refreshStoreFiles
refreshStoreFiles();
logRegionFiles();
synchronized (writestate) {
writestate.flushing = false;
}
break;
default:
LOG.warn("{} received a flush event with unknown action: {}", getRegionInfo(),
TextFormat.shortDebugString(flushDesc));
}
} else {
// for all other region events, we will do a refreshStoreFiles
refreshStoreFiles();
logRegionFiles();
}
} finally {
closeRegionOperation(Operation.REPLAY_EVENT);
}
}
/**
* Replay remote wal entry sent by primary replica.
* <p/>
* Should only call this method on secondary replicas.
*/
void replayWALEntry(WALEntry entry, CellScanner cells) throws IOException {
long timeout = -1L;
Optional<RpcCall> call = RpcServer.getCurrentCall();
if (call.isPresent()) {
long deadline = call.get().getDeadline();
if (deadline < Long.MAX_VALUE) {
timeout = deadline - EnvironmentEdgeManager.currentTime();
if (timeout <= 0) {
throw new TimeoutIOException("Timeout while replaying edits for " + getRegionInfo());
}
}
}
if (timeout > 0) {
try {
if (!replayLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
throw new TimeoutIOException(
"Timeout while waiting for lock when replaying edits for " + getRegionInfo());
}
} catch (InterruptedException e) {
throw throwOnInterrupt(e);
}
} else {
replayLock.lock();
}
try {
int count = entry.getAssociatedCellCount();
long sequenceId = entry.getKey().getLogSequenceNumber();
if (lastReplayedSequenceId >= sequenceId) {
// we have already replayed this edit, skip
// remember to advance the CellScanner, as we may have multiple WALEntries, we may still
// need apply later WALEntries
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
return;
}
Map<byte[], List<Cell>> family2Cells = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
if (WALEdit.isMetaEditFamily(cell)) {
// If there is meta edit, i.e, we have done flush/compaction/open, then we need to apply
// the previous cells first, and then replay the special meta edit. The meta edit is like
// a barrier, We need to keep the order. For example, the flush marker will contain a
// flush sequence number, which makes us possible to drop memstore content, but if we
// apply some edits which have greater sequence id first, then we can not drop the
// memstore content when replaying the flush marker, which is not good as we could run out
// of memory.
// And usually, a meta edit will have a special WALEntry for it, so this is just a safe
// guard logic to make sure we do not break things in the worst case.
if (!family2Cells.isEmpty()) {
replayWALBatchMutate(family2Cells);
family2Cells.clear();
}
replayWALMetaEdit(cell);
} else {
family2Cells
.computeIfAbsent(CellUtil.cloneFamily(cell), k -> new ArrayList<>())
.add(cell);
}
}
// do not forget to apply the remaining cells
if (!family2Cells.isEmpty()) {
replayWALBatchMutate(family2Cells);
}
mvcc.advanceTo(sequenceId);
lastReplayedSequenceId = sequenceId;
} finally {
replayLock.unlock();
}
}
/**
* If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult
*/

View File

@ -1089,15 +1089,15 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
/**
* Execute a list of Put/Delete mutations. The function returns OperationStatus instead of
* constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse.
* @param region
* @param mutations
* @param replaySeqId
* @return an array of OperationStatus which internally contains the OperationStatusCode and the
* exceptionMessage if any
* @throws IOException
* @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying
* edits for secondary replicas any more, see
* {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}.
*/
private OperationStatus [] doReplayBatchOp(final HRegion region,
final List<MutationReplay> mutations, long replaySeqId) throws IOException {
@Deprecated
private OperationStatus[] doReplayBatchOp(final HRegion region,
final List<MutationReplay> mutations, long replaySeqId) throws IOException {
long before = EnvironmentEdgeManager.currentTime();
boolean batchContainsPuts = false, batchContainsDelete = false;
try {
@ -2083,21 +2083,30 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
return response;
}
private CellScanner getAndReset(RpcController controller) {
HBaseRpcController hrc = (HBaseRpcController) controller;
CellScanner cells = hrc.cellScanner();
hrc.setCellScanner(null);
return cells;
}
/**
* Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is
* that the given mutations will be durable on the receiving RS if this method returns without any
* exception.
* @param controller the RPC controller
* @param request the request
* @throws ServiceException
* @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for
* compatibility with old region replica implementation. Now we will use
* {@code replicateToReplica} method instead.
*/
@Deprecated
@Override
@QosPriority(priority = HConstants.REPLAY_QOS)
public ReplicateWALEntryResponse replay(final RpcController controller,
final ReplicateWALEntryRequest request) throws ServiceException {
final ReplicateWALEntryRequest request) throws ServiceException {
long before = EnvironmentEdgeManager.currentTime();
CellScanner cells = ((HBaseRpcController) controller).cellScanner();
((HBaseRpcController) controller).setCellScanner(null);
CellScanner cells = getAndReset(controller);
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
@ -2184,6 +2193,41 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
}
}
/**
* Replay the given changes on a secondary replica
*/
@Override
public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
CellScanner cells = getAndReset(controller);
try {
checkOpen();
List<WALEntry> entries = request.getEntryList();
if (entries == null || entries.isEmpty()) {
// empty input
return ReplicateWALEntryResponse.newBuilder().build();
}
ByteString regionName = entries.get(0).getKey().getEncodedRegionName();
HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8());
if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) {
throw new DoNotRetryIOException(
"Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?");
}
for (WALEntry entry : entries) {
if (!regionName.equals(entry.getKey().getEncodedRegionName())) {
throw new NotServingRegionException(
"ReplicateToReplica request contains entries from multiple " +
"regions. First region:" + regionName.toStringUtf8() + " , other region:" +
entry.getKey().getEncodedRegionName());
}
region.replayWALEntry(entry, cells);
}
return ReplicateWALEntryResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
private void checkShouldRejectReplicationRequest(List<WALEntry> entries) throws IOException {
ReplicationSourceService replicationSource = server.getReplicationSourceService();
if (replicationSource == null || entries.isEmpty()) {
@ -2217,8 +2261,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
requestCount.increment();
List<WALEntry> entries = request.getEntryList();
checkShouldRejectReplicationRequest(entries);
CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner();
((HBaseRpcController) controller).setCellScanner(null);
CellScanner cellScanner = getAndReset(controller);
server.getRegionServerCoprocessorHost().preReplicateLogEntries();
server.getReplicationSinkService().replicateLogEntries(entries, cellScanner,
request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),

View File

@ -120,6 +120,7 @@ class RegionReplicationFlushRequester {
}
// schedule a timer task
HashedWheelTimer timer = getTimer();
pendingFlushRequestSequenceId = sequenceId;
pendingFlushRequest =
timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS);
}

View File

@ -22,7 +22,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -32,6 +31,7 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.agrona.collections.IntHashSet;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction;
/**
* The class for replicating WAL edits to secondary replicas, one instance per region.
@ -128,7 +129,7 @@ public class RegionReplicationSink {
// when we get a flush all request, we will try to remove a replica from this map, the key point
// here is the flush sequence number must be greater than the failed sequence id, otherwise we
// should not remove the replica from this map
private final Map<Integer, Long> failedReplicas = new HashMap<>();
private final IntHashSet failedReplicas;
private final Queue<SinkEntry> entries = new ArrayDeque<>();
@ -165,6 +166,7 @@ public class RegionReplicationSink {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS
.toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
this.failedReplicas = new IntHashSet(regionReplication - 1);
}
private void onComplete(List<SinkEntry> sent,
@ -184,16 +186,16 @@ public class RegionReplicationSink {
if (error != null) {
if (maxSequenceId > lastFlushedSequenceId) {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is greater than the last flush SN {},"
+ " we will stop replicating for a while and trigger a flush",
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is greater than the last flush SN {}," +
" we will stop replicating for a while and trigger a flush",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
failed.add(replicaId);
} else {
LOG.warn(
"Failed to replicate to secondary replica {} for {}, since the max sequence"
+ " id of sunk entris is {}, which is less than or equal to the last flush SN {},"
+ " we will not stop replicating",
"Failed to replicate to secondary replica {} for {}, since the max sequence" +
" id of sunk entris is {}, which is less than or equal to the last flush SN {}," +
" we will not stop replicating",
replicaId, primary, maxSequenceId, lastFlushedSequenceId, error);
}
}
@ -201,9 +203,7 @@ public class RegionReplicationSink {
synchronized (entries) {
pendingSize -= toReleaseSize;
if (!failed.isEmpty()) {
for (Integer replicaId : failed) {
failedReplicas.put(replicaId, maxSequenceId);
}
failedReplicas.addAll(failed);
flushRequester.requestFlush(maxSequenceId);
}
sending = false;
@ -237,7 +237,7 @@ public class RegionReplicationSink {
AtomicInteger remaining = new AtomicInteger(toSendReplicaCount);
Map<Integer, MutableObject<Throwable>> replica2Error = new HashMap<>();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
if (failedReplicas.containsKey(replicaId)) {
if (failedReplicas.contains(replicaId)) {
continue;
}
MutableObject<Throwable> error = new MutableObject<>();
@ -253,7 +253,15 @@ public class RegionReplicationSink {
}
}
private boolean isFlushAllStores(FlushDescriptor flushDesc) {
private boolean isStartFlushAllStores(FlushDescriptor flushDesc) {
if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) {
// this means the memstore is empty, which means all data before this sequence id are flushed
// out, so it equals to a flush all, return true
return true;
}
if (flushDesc.getAction() != FlushAction.START_FLUSH) {
return false;
}
Set<byte[]> storesFlushed =
flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray())
.collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR)));
@ -263,7 +271,7 @@ public class RegionReplicationSink {
return storesFlushed.containsAll(tableDesc.getColumnFamilyNames());
}
private Optional<FlushDescriptor> getFlushAllDescriptor(Cell metaCell) {
private Optional<FlushDescriptor> getStartFlushAllDescriptor(Cell metaCell) {
if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) {
return Optional.empty();
}
@ -274,14 +282,14 @@ public class RegionReplicationSink {
LOG.warn("Failed to parse FlushDescriptor from {}", metaCell);
return Optional.empty();
}
if (flushDesc != null && isFlushAllStores(flushDesc)) {
if (flushDesc != null && isStartFlushAllStores(flushDesc)) {
return Optional.of(flushDesc);
} else {
return Optional.empty();
}
}
private void clearAllEntries() {
private long clearAllEntries() {
long toClearSize = 0;
for (SinkEntry entry : entries) {
toClearSize += entry.size;
@ -290,20 +298,7 @@ public class RegionReplicationSink {
entries.clear();
pendingSize -= toClearSize;
manager.decrease(toClearSize);
}
private void clearFailedReplica(long flushSequenceNumber) {
for (Iterator<Map.Entry<Integer, Long>> iter = failedReplicas.entrySet().iterator(); iter
.hasNext();) {
Map.Entry<Integer, Long> entry = iter.next();
if (entry.getValue().longValue() < flushSequenceNumber) {
LOG.debug(
"Got a flush all request with sequence id {}, clear failed replica {}" +
" with last failed sequence id {}",
flushSequenceNumber, entry.getKey(), entry.getValue());
iter.remove();
}
}
return toClearSize;
}
/**
@ -325,32 +320,20 @@ public class RegionReplicationSink {
// check whether we flushed all stores, which means we could drop all the previous edits,
// and also, recover from the previous failure of some replicas
for (Cell metaCell : edit.getCells()) {
getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> {
long flushSequenceNumber = flushDesc.getFlushSequenceNumber();
int toClearCount = 0;
long toClearSize = 0;
for (;;) {
SinkEntry e = entries.peek();
if (e == null) {
break;
}
if (e.key.getSequenceId() < flushSequenceNumber) {
entries.poll();
toClearCount++;
toClearSize += e.size;
} else {
break;
}
}
lastFlushedSequenceId = flushSequenceNumber;
long clearedCount = entries.size();
long clearedSize = clearAllEntries();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Got a flush all request with sequence id {}, clear {} pending"
+ " entries with size {}",
flushSequenceNumber, toClearCount,
StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1));
"Got a flush all request with sequence id {}, clear {} pending" +
" entries with size {}, clear failed replicas {}",
flushSequenceNumber, clearedCount,
StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1),
failedReplicas);
}
clearFailedReplica(flushSequenceNumber);
failedReplicas.clear();
flushRequester.recordFlush(flushSequenceNumber);
});
}
@ -371,7 +354,7 @@ public class RegionReplicationSink {
// failed
clearAllEntries();
for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
failedReplicas.put(replicaId, entry.key.getSequenceId());
failedReplicas.add(replicaId);
}
flushRequester.requestFlush(entry.key.getSequenceId());
}

View File

@ -471,7 +471,9 @@ public final class WALSplitUtil {
* @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances
* extracted from the passed in WALEntry.
* @return list of Pair&lt;MutationType, Mutation&gt; to be replayed
* @deprecated Since 3.0.0, will be removed in 4.0.0.
*/
@Deprecated
public static List<MutationReplay> getMutationsFromWALEntry(AdminProtos.WALEntry entry,
CellScanner cells, Pair<WALKey, WALEdit> logEntry, Durability durability) throws IOException {
if (entry == null) {

View File

@ -755,4 +755,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
public RegionReplicationBufferManager getRegionReplicationBufferManager() {
return null;
}
@Override
public ReplicateWALEntryResponse replicateToReplica(RpcController controller,
ReplicateWALEntryRequest request) throws ServiceException {
return null;
}
}

View File

@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript
* Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary
* region replicas
*/
@SuppressWarnings("deprecation")
@Category(LargeTests.class)
public class TestHRegionReplayEvents {

View File

@ -0,0 +1,388 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNameTestRule;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestReplicateToReplica {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicateToReplica.class);
private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
private static byte[] FAMILY = Bytes.toBytes("family");
private static byte[] QUAL = Bytes.toBytes("qualifier");
private static ExecutorService EXEC;
@Rule
public final TableNameTestRule name = new TableNameTestRule();
private TableName tableName;
private Path testDir;
private TableDescriptor td;
private RegionServerServices rss;
private AsyncClusterConnection conn;
private RegionReplicationBufferManager manager;
private FlushRequester flushRequester;
private HRegion primary;
private HRegion secondary;
private WALFactory walFactory;
private boolean queueReqAndResps;
private Queue<Pair<List<WAL.Entry>, CompletableFuture<Void>>> reqAndResps;
private static List<Put> TO_ADD_AFTER_PREPARE_FLUSH;
public static final class HRegionForTest extends HRegion {
public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam,
TableDescriptor htd, RegionServerServices rsServices) {
super(fs, wal, confParam, htd, rsServices);
}
@SuppressWarnings("deprecation")
public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
}
@Override
protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush,
status, writeFlushWalMarker, tracker);
for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) {
put(put);
}
TO_ADD_AFTER_PREPARE_FLUSH.clear();
return result;
}
}
@BeforeClass
public static void setUpBeforeClass() {
Configuration conf = UTIL.getConfiguration();
conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class);
EXEC = new ExecutorService("test");
EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1)
.setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER));
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
}
@AfterClass
public static void tearDownAfterClass() {
EXEC.shutdown();
UTIL.cleanupTestDir();
}
@Before
public void setUp() throws IOException {
TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>();
tableName = name.getTableName();
testDir = UTIL.getDataTestDir(tableName.getNameAsString());
Configuration conf = UTIL.getConfiguration();
conf.set(HConstants.HBASE_DIR, testDir.toString());
td = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2)
.setRegionMemStoreReplication(true).build();
reqAndResps = new ArrayDeque<>();
queueReqAndResps = true;
conn = mock(AsyncClusterConnection.class);
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> {
if (queueReqAndResps) {
@SuppressWarnings("unchecked")
List<WAL.Entry> entries = i.getArgument(1, List.class);
CompletableFuture<Void> future = new CompletableFuture<>();
reqAndResps.add(Pair.newPair(entries, future));
return future;
} else {
return CompletableFuture.completedFuture(null);
}
});
flushRequester = mock(FlushRequester.class);
rss = mock(RegionServerServices.class);
when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1));
when(rss.getConfiguration()).thenReturn(conf);
when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf));
when(rss.getExecutorService()).thenReturn(EXEC);
when(rss.getAsyncClusterConnection()).thenReturn(conn);
when(rss.getFlushRequester()).thenReturn(flushRequester);
manager = new RegionReplicationBufferManager(rss);
when(rss.getRegionReplicationBufferManager()).thenReturn(manager);
RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build();
RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1);
walFactory = new WALFactory(conf, UUID.randomUUID().toString());
WAL wal = walFactory.getWAL(primaryHri);
primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal);
primary.close();
primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null);
secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null);
when(rss.getRegions()).then(i -> {
return Arrays.asList(primary, secondary);
});
// process the open events
replicateAll();
}
@After
public void tearDown() throws IOException {
// close region will issue a flush, which will enqueue an edit into the replication sink so we
// need to complete it otherwise the test will hang.
queueReqAndResps = false;
failAll();
HBaseTestingUtil.closeRegionAndWAL(primary);
HBaseTestingUtil.closeRegionAndWAL(secondary);
if (walFactory != null) {
walFactory.close();
}
}
private FlushResult flushPrimary() throws IOException {
return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
}
private void replicate(Pair<List<WAL.Entry>, CompletableFuture<Void>> pair) throws IOException {
Pair<ReplicateWALEntryRequest, CellScanner> params = ReplicationProtobufUtil
.buildReplicateWALEntryRequest(pair.getFirst().toArray(new WAL.Entry[0]),
secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null);
for (WALEntry entry : params.getFirst().getEntryList()) {
secondary.replayWALEntry(entry, params.getSecond());
}
pair.getSecond().complete(null);
}
private void replicateOne() throws IOException {
replicate(reqAndResps.remove());
}
private void replicateAll() throws IOException {
for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
pair = reqAndResps.poll();
if (pair == null) {
break;
}
replicate(pair);
}
}
private void failOne() {
reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error"));
}
private void failAll() {
for (Pair<List<WAL.Entry>, CompletableFuture<Void>> pair;;) {
pair = reqAndResps.poll();
if (pair == null) {
break;
}
pair.getSecond().completeExceptionally(new IOException("Inject error"));
}
}
@Test
public void testNormalReplicate() throws IOException {
byte[] row = Bytes.toBytes(0);
primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
replicateOne();
assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
}
@Test
public void testNormalFlush() throws IOException {
byte[] row = Bytes.toBytes(0);
primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
flushPrimary();
replicateAll();
assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
// we should have the same memstore size, i.e, the secondary should have also dropped the
// snapshot
assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
}
@Test
public void testErrorBeforeFlushStart() throws IOException {
byte[] row = Bytes.toBytes(0);
primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
failOne();
verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
flushPrimary();
// this also tests start flush with empty memstore at secondary replica side
replicateAll();
assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize());
}
@Test
public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException {
primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
replicateAll();
TO_ADD_AFTER_PREPARE_FLUSH
.add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2)));
flushPrimary();
// replicate the start flush edit
replicateOne();
// fail the remaining edits, the put and the commit flush edit
failOne();
verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3)));
flushPrimary();
replicateAll();
for (int i = 0; i < 3; i++) {
assertEquals(i + 1,
Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL)));
}
// should have nothing in memstore
assertEquals(0, secondary.getMemStoreDataSize());
}
@Test
public void testCatchUpWithCannotFlush() throws IOException, InterruptedException {
byte[] row = Bytes.toBytes(0);
primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
failOne();
verify(flushRequester, times(1)).requestFlush(any(), anyList(), any());
flushPrimary();
failAll();
Thread.sleep(2000);
// we will request flush the second time
verify(flushRequester, times(2)).requestFlush(any(), anyList(), any());
// we can not flush because no content in memstore
FlushResult result = flushPrimary();
assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult());
// the secondary replica does not have this row yet
assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
// replicate the can not flush edit
replicateOne();
// we should have the row now
assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
}
@Test
public void testCatchUpWithReopen() throws IOException {
byte[] row = Bytes.toBytes(0);
primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1)));
failOne();
primary.close();
// the secondary replica does not have this row yet, although the above close has flushed the
// data out
assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue());
// reopen
primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(),
UTIL.getConfiguration(), rss, null);
replicateAll();
// we should have the row now
assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL)));
}
}

View File

@ -236,7 +236,7 @@ public class TestRegionReplicationSink {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key2, edit2, rpcCall2);
@ -300,7 +300,7 @@ public class TestRegionReplicationSink {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles);
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles);
WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key3, edit3, rpcCall3);
@ -313,91 +313,4 @@ public class TestRegionReplicationSink {
// should have send out all so no pending entries.
assertEquals(0, sink.pendingSize());
}
@Test
public void testNotClearFailedReplica() {
// simulate this scenario:
// 1. prepare flush
// 2. add one edit broken
// 3. commit flush with flush sequence number less than the previous edit(this is the normal
// case)
// we should not clear the failed replica as we do not flush the broken edit out with this
// flush, we need an extra flush to flush it out
MutableInt next = new MutableInt(0);
List<CompletableFuture<Void>> futures =
Stream.generate(() -> new CompletableFuture<Void>()).limit(8).collect(Collectors.toList());
when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong()))
.then(i -> futures.get(next.getAndIncrement()));
when(manager.increase(anyLong())).thenReturn(true);
ServerCall<?> rpcCall1 = mock(ServerCall.class);
WALKeyImpl key1 = mock(WALKeyImpl.class);
when(key1.estimatedSerializedSizeOf()).thenReturn(100L);
when(key1.getSequenceId()).thenReturn(1L);
Map<byte[], List<Path>> committedFiles = td.getColumnFamilyNames().stream()
.collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> {
throw new IllegalStateException();
}, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR)));
FlushDescriptor fd =
ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles);
WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd);
sink.add(key1, edit1, rpcCall1);
futures.get(0).complete(null);
futures.get(1).complete(null);
ServerCall<?> rpcCall2 = mock(ServerCall.class);
WALKeyImpl key2 = mock(WALKeyImpl.class);
when(key2.estimatedSerializedSizeOf()).thenReturn(200L);
when(key2.getSequenceId()).thenReturn(2L);
WALEdit edit2 = mock(WALEdit.class);
when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L);
sink.add(key2, edit2, rpcCall2);
// fail the call to replica 1
futures.get(2).completeExceptionally(new IOException("inject error"));
futures.get(3).complete(null);
ServerCall<?> rpcCall3 = mock(ServerCall.class);
WALKeyImpl key3 = mock(WALKeyImpl.class);
when(key3.estimatedSerializedSizeOf()).thenReturn(300L);
when(key3.getSequenceId()).thenReturn(3L);
FlushDescriptor fd3 =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles);
WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3);
sink.add(key3, edit3, rpcCall3);
// we should only call replicate once for edit3, since replica 1 is marked as failed, and the
// flush request can not clean the failed replica since the flush sequence number is not greater
// than sequence id of the last failed edit
verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
futures.get(4).complete(null);
ServerCall<?> rpcCall4 = mock(ServerCall.class);
WALKeyImpl key4 = mock(WALKeyImpl.class);
when(key4.estimatedSerializedSizeOf()).thenReturn(400L);
when(key4.getSequenceId()).thenReturn(4L);
WALEdit edit4 = mock(WALEdit.class);
when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L);
sink.add(key4, edit4, rpcCall4);
// still, only send to replica 2
verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
futures.get(5).complete(null);
ServerCall<?> rpcCall5 = mock(ServerCall.class);
WALKeyImpl key5 = mock(WALKeyImpl.class);
when(key5.estimatedSerializedSizeOf()).thenReturn(300L);
when(key5.getSequenceId()).thenReturn(3L);
FlushDescriptor fd5 =
ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles);
WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5);
sink.add(key5, edit5, rpcCall5);
futures.get(6).complete(null);
futures.get(7).complete(null);
// should have cleared the failed replica because the flush sequence number is greater than than
// the sequence id of the last failed edit
verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong());
}
}

View File

@ -89,7 +89,6 @@ public class TestMetaRegionReplicaReplication {
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
// Enable hbase:meta replication.

View File

@ -85,7 +85,6 @@ public class TestRegionReplicaReplication {
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);