From 529c36e7ad46a16c4036565fd29f8a83b055045f Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 22 Nov 2021 21:50:02 +0800 Subject: [PATCH] HBASE-26416 Implement a new method for region replication instead of using replay (#3864) Signed-off-by: Xiaolin Ha --- .../org/apache/hadoop/hbase/HConstants.java | 6 + .../main/protobuf/server/region/Admin.proto | 5 + .../AsyncRegionReplicationRetryingCaller.java | 2 +- .../hbase/master/MasterRpcServices.java | 6 + .../hadoop/hbase/regionserver/HRegion.java | 328 +++++++++++++-- .../hbase/regionserver/RSRpcServices.java | 67 ++- .../RegionReplicationFlushRequester.java | 1 + .../RegionReplicationSink.java | 87 ++-- .../apache/hadoop/hbase/wal/WALSplitUtil.java | 2 + .../hadoop/hbase/master/MockRegionServer.java | 6 + .../regionserver/TestHRegionReplayEvents.java | 2 +- .../regionserver/TestReplicateToReplica.java | 388 ++++++++++++++++++ .../TestRegionReplicationSink.java | 91 +--- .../TestMetaRegionReplicaReplication.java | 1 - .../TestRegionReplicaReplication.java | 1 - 15 files changed, 795 insertions(+), 198 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 4aacff1aa18..d5deb092730 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1192,7 +1192,13 @@ public final class HConstants { public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int REPLICATION_QOS = 5; + /** + * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and + * region replication has its own 'replay' method. + */ + @Deprecated public static final int REPLAY_QOS = 6; + public static final int REGION_REPLICATION_QOS = REPLAY_QOS; public static final int QOS_THRESHOLD = 10; public static final int ADMIN_QOS = 100; public static final int HIGH_QOS = 200; diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto index 0667292917a..89b9985e496 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto @@ -360,6 +360,11 @@ service AdminService { returns(ReplicateWALEntryResponse); rpc Replay(ReplicateWALEntryRequest) + returns(ReplicateWALEntryResponse) { + option deprecated = true; + }; + + rpc ReplicateToReplica(ReplicateWALEntryRequest) returns(ReplicateWALEntryResponse); rpc RollWALWriter(RollWALWriterRequest) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java index a0ce4183a95..c854ba38338 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java @@ -67,7 +67,7 @@ public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null); resetCallTimeout(); controller.setCellScanner(pair.getSecond()); - stub.replay(controller, pair.getFirst(), r -> { + stub.replicateToReplica(controller, pair.getFirst(), r -> { if (controller.failed()) { onError(controller.getFailed(), () -> "Call to " + loc.getServerName() + " for " + replica + " failed", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 9be3685972f..78fb3909b9c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -3492,4 +3492,10 @@ public class MasterRpcServices extends HBaseRpcServicesBase .forEach(builder::addServer); return builder.build(); } + + @Override + public ReplicateWALEntryResponse replicateToReplica(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + throw new ServiceException(new DoNotRetryIOException("Unsupported method on master")); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 2b337ecc395..2d7f720eebc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -67,6 +67,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; @@ -200,6 +201,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad; @@ -358,7 +360,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private Path regionDir; private FileSystem walFS; - // set to true if the region is restored from snapshot + // set to true if the region is restored from snapshot for reading by ClientSideRegionScanner private boolean isRestoredRegion = false; public void setRestoredRegion(boolean restoredRegion) { @@ -414,9 +416,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // The following map is populated when opening the region Map maxSeqIdInStores = new TreeMap<>(Bytes.BYTES_COMPARATOR); + // lock used to protect the replay operation for secondary replicas, so the below two fields does + // not need to be volatile. + private Lock replayLock; + /** Saved state from replaying prepare flush cache */ private PrepareFlushResult prepareFlushResult = null; + private long lastReplayedSequenceId = HConstants.NO_SEQNUM; + private volatile ConfigurationManager configurationManager; // Used for testing. @@ -1075,7 +1083,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi LOG.debug("Failed to clean up wrong region WAL directory {}", wrongRegionWALDir); } } + } else { + lastReplayedSequenceId = nextSeqId - 1; + replayLock = new ReentrantLock(); } + initializeRegionReplicationSink(reporter, status); } LOG.info("Opened {}; next sequenceid={}; {}, {}", this.getRegionInfo().getShortNameToLog(), @@ -1090,7 +1102,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Running coprocessor post-open hooks"); coprocessorHost.postOpen(); } - initializeRegionReplicationSink(reporter, status); status.markComplete("Region opened successfully"); return nextSeqId; } @@ -1244,6 +1255,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionEventDescriptor regionEventDesc = ProtobufUtil.toRegionEventDescriptor( RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(), getRegionServerServices().getServerName(), storeFiles); + // we do not care region close event at secondary replica side so just pass a null + // RegionReplicationSink WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc, mvcc, null); @@ -1686,7 +1699,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage()); } } - + if (regionReplicationSink.isPresent()) { + // stop replicating to secondary replicas + // the open event marker can make secondary replicas refresh store files and catch up + // everything, so here we just give up replicating later edits, to speed up the reopen process + RegionReplicationSink sink = regionReplicationSink.get(); + sink.stop(); + try { + regionReplicationSink.get().waitUntilStopped(); + } catch (InterruptedException e) { + throw throwOnInterrupt(e); + } + } // Set the closing flag // From this point new arrivals at the region lock will get NSRE. @@ -1890,16 +1914,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { writeRegionCloseMarker(wal); } - if (regionReplicationSink.isPresent()) { - // stop replicating to secondary replicas - RegionReplicationSink sink = regionReplicationSink.get(); - sink.stop(); - try { - regionReplicationSink.get().waitUntilStopped(); - } catch (InterruptedException e) { - throw throwOnInterrupt(e); - } - } this.closed.set(true); if (!canFlush) { decrMemStoreSize(this.memStoreSizing.getMemStoreSize()); @@ -2860,7 +2874,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo(), flushOpSeqId, committedFiles); // No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, - mvcc, null); + mvcc, regionReplicationSink.orElse(null)); } // Prepare flush (take a snapshot) @@ -2958,7 +2972,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Writes a marker to WAL indicating a flush is requested but cannot be complete due to various * reasons. Ignores exceptions from WAL. Returns whether the write succeeded. - * @param wal * @return whether WAL write was successful */ private boolean writeFlushRequestMarkerToWAL(WAL wal, boolean writeFlushWalMarker) { @@ -2967,11 +2980,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR)); try { WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc, - null); + regionReplicationSink.orElse(null)); return true; } catch (IOException e) { - LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Received exception while trying to write the flush request to wal", e); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received exception while trying to write the flush request to wal", e); } } return false; @@ -4416,7 +4429,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most * of the logic is same. + * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we will not use this operation to apply + * edits at secondary replica side. */ + @Deprecated private static final class ReplayBatchOperation extends BatchOperation { private long origLogSeqNum = 0; @@ -4554,8 +4570,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi () -> createRegionSpan("Region.batchMutate")); } - public OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) - throws IOException { + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Now we use + * {@link #replayWALEntry(WALEntry, CellScanner)} for replaying edits at secondary + * replica side. + */ + @Deprecated + OperationStatus[] batchReplay(MutationReplay[] mutations, long replaySeqId) throws IOException { if (!RegionReplicaUtil.isDefaultReplica(getRegionInfo()) && replaySeqId < lastReplayedOpenRegionSeqId) { // if it is a secondary replica we should ignore these entries silently @@ -5711,9 +5732,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. + */ + @Deprecated void replayWALFlushMarker(FlushDescriptor flush, long replaySeqId) throws IOException { - checkTargetRegion(flush.getEncodedRegionName().toByteArray(), - "Flush marker from WAL ", flush); + checkTargetRegion(flush.getEncodedRegionName().toByteArray(), "Flush marker from WAL ", flush); if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return; // if primary nothing to do @@ -5753,25 +5778,34 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - /** Replay the flush marker from primary region by creating a corresponding snapshot of - * the store memstores, only if the memstores do not have a higher seqId from an earlier wal - * edit (because the events may be coming out of order). - */ - PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException { - long flushSeqId = flush.getFlushSequenceNumber(); - - HashSet storesToFlush = new HashSet<>(); - for (StoreFlushDescriptor storeFlush : flush.getStoreFlushesList()) { + private Collection getStoresToFlush(FlushDescriptor flushDesc) { + List storesToFlush = new ArrayList<>(); + for (StoreFlushDescriptor storeFlush : flushDesc.getStoreFlushesList()) { byte[] family = storeFlush.getFamilyName().toByteArray(); HStore store = getStore(family); if (store == null) { - LOG.warn(getRegionInfo().getEncodedName() + " : " - + "Received a flush start marker from primary, but the family is not found. Ignoring" - + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush)); + LOG.warn(getRegionInfo().getEncodedName() + " : " + + "Received a flush start marker from primary, but the family is not found. Ignoring" + + " StoreFlushDescriptor:" + TextFormat.shortDebugString(storeFlush)); continue; } storesToFlush.add(store); } + return storesToFlush; + } + + /** + * Replay the flush marker from primary region by creating a corresponding snapshot of the store + * memstores, only if the memstores do not have a higher seqId from an earlier wal edit (because + * the events may be coming out of order). + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. + */ + @Deprecated + PrepareFlushResult replayWALFlushStartMarker(FlushDescriptor flush) throws IOException { + long flushSeqId = flush.getFlushSequenceNumber(); + + Collection storesToFlush = getStoresToFlush(flush); MonitoredTask status = TaskMonitor.get().createStatus("Preparing flush " + this); @@ -5867,6 +5901,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return null; } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. + */ + @Deprecated @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", justification="Intentional; post memstore flush") void replayWALFlushCommitMarker(FlushDescriptor flush) throws IOException { @@ -5988,11 +6027,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Replays the given flush descriptor by opening the flush files in stores and dropping the * memstore snapshots if requested. - * @param flush - * @param prepareFlushResult - * @param dropMemstoreSnapshot - * @throws IOException + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. */ + @Deprecated private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepareFlushResult, boolean dropMemstoreSnapshot) throws IOException { @@ -6086,7 +6124,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi /** * Drops the memstore contents after replaying a flush descriptor or region open event replay * if the memstore edits have seqNums smaller than the given seq id - * @throws IOException */ private MemStoreSize dropMemStoreContentsForSeqId(long seqId, HStore store) throws IOException { MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); @@ -6158,8 +6195,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return prepareFlushResult; } - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Intentional; cleared the memstore") + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. + */ + @Deprecated + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Intentional; cleared the memstore") void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOException { checkTargetRegion(regionEvent.getEncodedRegionName().toByteArray(), "RegionEvent marker from WAL ", regionEvent); @@ -6276,6 +6318,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * @deprecated Since 3.0.0, will be removed in 4.0.0. Only for keep compatibility for old region + * replica implementation. + */ + @Deprecated void replayWALBulkLoadEventMarker(WALProtos.BulkLoadDescriptor bulkLoadEvent) throws IOException { checkTargetRegion(bulkLoadEvent.getEncodedRegionName().toByteArray(), "BulkLoad marker from WAL ", bulkLoadEvent); @@ -6356,6 +6403,205 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * Replay the batch mutate for secondary replica. + *

+ * We will directly apply the cells to the memstore. This is because: + *

    + *
  1. All the cells are gotten from {@link WALEdit}, so we only have {@link Put} and + * {@link Delete} here
  2. + *
  3. The replay is single threaded, we do not need to acquire row lock, as the region is read + * only so no one else can write it.
  4. + *
  5. We do not need to write WAL.
  6. + *
  7. We will advance MVCC in the caller directly.
  8. + *
+ */ + private void replayWALBatchMutate(Map> family2Cells) throws IOException { + startRegionOperation(Operation.REPLAY_BATCH_MUTATE); + try { + for (Map.Entry> entry : family2Cells.entrySet()) { + applyToMemStore(getStore(entry.getKey()), entry.getValue(), false, memStoreSizing); + } + } finally { + closeRegionOperation(Operation.REPLAY_BATCH_MUTATE); + } + } + + /** + * Replay the meta edits, i.e, flush marker, compaction marker, bulk load marker, region event + * marker, etc. + *

+ * For all events other than start flush, we will just call {@link #refreshStoreFiles()} as the + * logic is straight-forward and robust. For start flush, we need to snapshot the memstore, so + * later {@link #refreshStoreFiles()} call could drop the snapshot, otherwise we may run out of + * memory. + */ + private void replayWALMetaEdit(Cell cell) throws IOException { + startRegionOperation(Operation.REPLAY_EVENT); + try { + FlushDescriptor flushDesc = WALEdit.getFlushDescriptor(cell); + if (flushDesc != null) { + switch (flushDesc.getAction()) { + case START_FLUSH: + // for start flush, we need to take a snapshot of the current memstore + synchronized (writestate) { + if (!writestate.flushing) { + this.writestate.flushing = true; + } else { + // usually this should not happen but let's make the code more robust, it is not a + // big deal to just ignore it, the refreshStoreFiles call should have the ability to + // clean up the inconsistent state. + LOG.debug("NOT flushing {} as already flushing", getRegionInfo()); + break; + } + } + MonitoredTask status = + TaskMonitor.get().createStatus("Preparing flush " + getRegionInfo()); + Collection storesToFlush = getStoresToFlush(flushDesc); + try { + PrepareFlushResult prepareResult = + internalPrepareFlushCache(null, flushDesc.getFlushSequenceNumber(), storesToFlush, + status, false, FlushLifeCycleTracker.DUMMY); + if (prepareResult.result == null) { + // save the PrepareFlushResult so that we can use it later from commit flush + this.prepareFlushResult = prepareResult; + status.markComplete("Flush prepare successful"); + if (LOG.isDebugEnabled()) { + LOG.debug("{} prepared flush with seqId: {}", getRegionInfo(), + flushDesc.getFlushSequenceNumber()); + } + } else { + // special case empty memstore. We will still save the flush result in this case, + // since our memstore is empty, but the primary is still flushing + if (prepareResult.getResult() + .getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + this.prepareFlushResult = prepareResult; + if (LOG.isDebugEnabled()) { + LOG.debug("{} prepared empty flush with seqId: {}", getRegionInfo(), + flushDesc.getFlushSequenceNumber()); + } + } + status.abort("Flush prepare failed with " + prepareResult.result); + // nothing much to do. prepare flush failed because of some reason. + } + } finally { + status.cleanup(); + } + break; + case ABORT_FLUSH: + // do nothing, an abort flush means the source region server will crash itself, after + // the primary region online, it will send us an open region marker, then we can clean + // up the memstore. + synchronized (writestate) { + writestate.flushing = false; + } + break; + case COMMIT_FLUSH: + case CANNOT_FLUSH: + // just call refreshStoreFiles + refreshStoreFiles(); + logRegionFiles(); + synchronized (writestate) { + writestate.flushing = false; + } + break; + default: + LOG.warn("{} received a flush event with unknown action: {}", getRegionInfo(), + TextFormat.shortDebugString(flushDesc)); + } + } else { + // for all other region events, we will do a refreshStoreFiles + refreshStoreFiles(); + logRegionFiles(); + } + } finally { + closeRegionOperation(Operation.REPLAY_EVENT); + } + } + + /** + * Replay remote wal entry sent by primary replica. + *

+ * Should only call this method on secondary replicas. + */ + void replayWALEntry(WALEntry entry, CellScanner cells) throws IOException { + long timeout = -1L; + Optional call = RpcServer.getCurrentCall(); + if (call.isPresent()) { + long deadline = call.get().getDeadline(); + if (deadline < Long.MAX_VALUE) { + timeout = deadline - EnvironmentEdgeManager.currentTime(); + if (timeout <= 0) { + throw new TimeoutIOException("Timeout while replaying edits for " + getRegionInfo()); + } + } + } + if (timeout > 0) { + try { + if (!replayLock.tryLock(timeout, TimeUnit.MILLISECONDS)) { + throw new TimeoutIOException( + "Timeout while waiting for lock when replaying edits for " + getRegionInfo()); + } + } catch (InterruptedException e) { + throw throwOnInterrupt(e); + } + } else { + replayLock.lock(); + } + try { + int count = entry.getAssociatedCellCount(); + long sequenceId = entry.getKey().getLogSequenceNumber(); + if (lastReplayedSequenceId >= sequenceId) { + // we have already replayed this edit, skip + // remember to advance the CellScanner, as we may have multiple WALEntries, we may still + // need apply later WALEntries + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + } + return; + } + Map> family2Cells = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (WALEdit.isMetaEditFamily(cell)) { + // If there is meta edit, i.e, we have done flush/compaction/open, then we need to apply + // the previous cells first, and then replay the special meta edit. The meta edit is like + // a barrier, We need to keep the order. For example, the flush marker will contain a + // flush sequence number, which makes us possible to drop memstore content, but if we + // apply some edits which have greater sequence id first, then we can not drop the + // memstore content when replaying the flush marker, which is not good as we could run out + // of memory. + // And usually, a meta edit will have a special WALEntry for it, so this is just a safe + // guard logic to make sure we do not break things in the worst case. + if (!family2Cells.isEmpty()) { + replayWALBatchMutate(family2Cells); + family2Cells.clear(); + } + replayWALMetaEdit(cell); + } else { + family2Cells + .computeIfAbsent(CellUtil.cloneFamily(cell), k -> new ArrayList<>()) + .add(cell); + } + } + // do not forget to apply the remaining cells + if (!family2Cells.isEmpty()) { + replayWALBatchMutate(family2Cells); + } + mvcc.advanceTo(sequenceId); + lastReplayedSequenceId = sequenceId; + } finally { + replayLock.unlock(); + } + } + /** * If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 0fad24c738f..5b57ac18a7c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1089,15 +1089,15 @@ public class RSRpcServices extends HBaseRpcServicesBase /** * Execute a list of Put/Delete mutations. The function returns OperationStatus instead of * constructing MultiResponse to save a possible loop if caller doesn't need MultiResponse. - * @param region - * @param mutations - * @param replaySeqId * @return an array of OperationStatus which internally contains the OperationStatusCode and the * exceptionMessage if any - * @throws IOException + * @deprecated Since 3.0.0, will be removed in 4.0.0. We do not use this method for replaying + * edits for secondary replicas any more, see + * {@link #replicateToReplica(RpcController, ReplicateWALEntryRequest)}. */ - private OperationStatus [] doReplayBatchOp(final HRegion region, - final List mutations, long replaySeqId) throws IOException { + @Deprecated + private OperationStatus[] doReplayBatchOp(final HRegion region, + final List mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { @@ -2083,21 +2083,30 @@ public class RSRpcServices extends HBaseRpcServicesBase return response; } + private CellScanner getAndReset(RpcController controller) { + HBaseRpcController hrc = (HBaseRpcController) controller; + CellScanner cells = hrc.cellScanner(); + hrc.setCellScanner(null); + return cells; + } + /** * Replay the given changes when distributedLogReplay WAL edits from a failed RS. The guarantee is * that the given mutations will be durable on the receiving RS if this method returns without any * exception. * @param controller the RPC controller * @param request the request - * @throws ServiceException + * @deprecated Since 3.0.0, will be removed in 4.0.0. Not used any more, put here only for + * compatibility with old region replica implementation. Now we will use + * {@code replicateToReplica} method instead. */ + @Deprecated @Override @QosPriority(priority = HConstants.REPLAY_QOS) public ReplicateWALEntryResponse replay(final RpcController controller, - final ReplicateWALEntryRequest request) throws ServiceException { + final ReplicateWALEntryRequest request) throws ServiceException { long before = EnvironmentEdgeManager.currentTime(); - CellScanner cells = ((HBaseRpcController) controller).cellScanner(); - ((HBaseRpcController) controller).setCellScanner(null); + CellScanner cells = getAndReset(controller); try { checkOpen(); List entries = request.getEntryList(); @@ -2184,6 +2193,41 @@ public class RSRpcServices extends HBaseRpcServicesBase } } + /** + * Replay the given changes on a secondary replica + */ + @Override + public ReplicateWALEntryResponse replicateToReplica(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + CellScanner cells = getAndReset(controller); + try { + checkOpen(); + List entries = request.getEntryList(); + if (entries == null || entries.isEmpty()) { + // empty input + return ReplicateWALEntryResponse.newBuilder().build(); + } + ByteString regionName = entries.get(0).getKey().getEncodedRegionName(); + HRegion region = server.getRegionByEncodedName(regionName.toStringUtf8()); + if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { + throw new DoNotRetryIOException( + "Should not replicate to primary replica " + region.getRegionInfo() + ", CODE BUG?"); + } + for (WALEntry entry : entries) { + if (!regionName.equals(entry.getKey().getEncodedRegionName())) { + throw new NotServingRegionException( + "ReplicateToReplica request contains entries from multiple " + + "regions. First region:" + regionName.toStringUtf8() + " , other region:" + + entry.getKey().getEncodedRegionName()); + } + region.replayWALEntry(entry, cells); + } + return ReplicateWALEntryResponse.newBuilder().build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + private void checkShouldRejectReplicationRequest(List entries) throws IOException { ReplicationSourceService replicationSource = server.getReplicationSourceService(); if (replicationSource == null || entries.isEmpty()) { @@ -2217,8 +2261,7 @@ public class RSRpcServices extends HBaseRpcServicesBase requestCount.increment(); List entries = request.getEntryList(); checkShouldRejectReplicationRequest(entries); - CellScanner cellScanner = ((HBaseRpcController) controller).cellScanner(); - ((HBaseRpcController) controller).setCellScanner(null); + CellScanner cellScanner = getAndReset(controller); server.getRegionServerCoprocessorHost().preReplicateLogEntries(); server.getReplicationSinkService().replicateLogEntries(entries, cellScanner, request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java index 960f57eab41..34313241d1f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationFlushRequester.java @@ -120,6 +120,7 @@ class RegionReplicationFlushRequester { } // schedule a timer task HashedWheelTimer timer = getTimer(); + pendingFlushRequestSequenceId = sequenceId; pendingFlushRequest = timer.newTimeout(this::flush, minIntervalSecs - elapsedSecs, TimeUnit.SECONDS); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java index d5e2387a1ce..f0129b70584 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationSink.java @@ -22,7 +22,6 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -32,6 +31,7 @@ import java.util.TreeSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import org.agrona.collections.IntHashSet; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -54,6 +54,7 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor.FlushAction; /** * The class for replicating WAL edits to secondary replicas, one instance per region. @@ -128,7 +129,7 @@ public class RegionReplicationSink { // when we get a flush all request, we will try to remove a replica from this map, the key point // here is the flush sequence number must be greater than the failed sequence id, otherwise we // should not remove the replica from this map - private final Map failedReplicas = new HashMap<>(); + private final IntHashSet failedReplicas; private final Queue entries = new ArrayDeque<>(); @@ -165,6 +166,7 @@ public class RegionReplicationSink { TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT)); this.operationTimeoutNs = TimeUnit.MILLISECONDS .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT)); + this.failedReplicas = new IntHashSet(regionReplication - 1); } private void onComplete(List sent, @@ -184,16 +186,16 @@ public class RegionReplicationSink { if (error != null) { if (maxSequenceId > lastFlushedSequenceId) { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" - + " id of sunk entris is {}, which is greater than the last flush SN {}," - + " we will stop replicating for a while and trigger a flush", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is greater than the last flush SN {}," + + " we will stop replicating for a while and trigger a flush", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); failed.add(replicaId); } else { LOG.warn( - "Failed to replicate to secondary replica {} for {}, since the max sequence" - + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," - + " we will not stop replicating", + "Failed to replicate to secondary replica {} for {}, since the max sequence" + + " id of sunk entris is {}, which is less than or equal to the last flush SN {}," + + " we will not stop replicating", replicaId, primary, maxSequenceId, lastFlushedSequenceId, error); } } @@ -201,9 +203,7 @@ public class RegionReplicationSink { synchronized (entries) { pendingSize -= toReleaseSize; if (!failed.isEmpty()) { - for (Integer replicaId : failed) { - failedReplicas.put(replicaId, maxSequenceId); - } + failedReplicas.addAll(failed); flushRequester.requestFlush(maxSequenceId); } sending = false; @@ -237,7 +237,7 @@ public class RegionReplicationSink { AtomicInteger remaining = new AtomicInteger(toSendReplicaCount); Map> replica2Error = new HashMap<>(); for (int replicaId = 1; replicaId < regionReplication; replicaId++) { - if (failedReplicas.containsKey(replicaId)) { + if (failedReplicas.contains(replicaId)) { continue; } MutableObject error = new MutableObject<>(); @@ -253,7 +253,15 @@ public class RegionReplicationSink { } } - private boolean isFlushAllStores(FlushDescriptor flushDesc) { + private boolean isStartFlushAllStores(FlushDescriptor flushDesc) { + if (flushDesc.getAction() == FlushAction.CANNOT_FLUSH) { + // this means the memstore is empty, which means all data before this sequence id are flushed + // out, so it equals to a flush all, return true + return true; + } + if (flushDesc.getAction() != FlushAction.START_FLUSH) { + return false; + } Set storesFlushed = flushDesc.getStoreFlushesList().stream().map(sfd -> sfd.getFamilyName().toByteArray()) .collect(Collectors.toCollection(() -> new TreeSet<>(Bytes.BYTES_COMPARATOR))); @@ -263,7 +271,7 @@ public class RegionReplicationSink { return storesFlushed.containsAll(tableDesc.getColumnFamilyNames()); } - private Optional getFlushAllDescriptor(Cell metaCell) { + private Optional getStartFlushAllDescriptor(Cell metaCell) { if (!CellUtil.matchingFamily(metaCell, WALEdit.METAFAMILY)) { return Optional.empty(); } @@ -274,14 +282,14 @@ public class RegionReplicationSink { LOG.warn("Failed to parse FlushDescriptor from {}", metaCell); return Optional.empty(); } - if (flushDesc != null && isFlushAllStores(flushDesc)) { + if (flushDesc != null && isStartFlushAllStores(flushDesc)) { return Optional.of(flushDesc); } else { return Optional.empty(); } } - private void clearAllEntries() { + private long clearAllEntries() { long toClearSize = 0; for (SinkEntry entry : entries) { toClearSize += entry.size; @@ -290,20 +298,7 @@ public class RegionReplicationSink { entries.clear(); pendingSize -= toClearSize; manager.decrease(toClearSize); - } - - private void clearFailedReplica(long flushSequenceNumber) { - for (Iterator> iter = failedReplicas.entrySet().iterator(); iter - .hasNext();) { - Map.Entry entry = iter.next(); - if (entry.getValue().longValue() < flushSequenceNumber) { - LOG.debug( - "Got a flush all request with sequence id {}, clear failed replica {}" + - " with last failed sequence id {}", - flushSequenceNumber, entry.getKey(), entry.getValue()); - iter.remove(); - } - } + return toClearSize; } /** @@ -325,32 +320,20 @@ public class RegionReplicationSink { // check whether we flushed all stores, which means we could drop all the previous edits, // and also, recover from the previous failure of some replicas for (Cell metaCell : edit.getCells()) { - getFlushAllDescriptor(metaCell).ifPresent(flushDesc -> { + getStartFlushAllDescriptor(metaCell).ifPresent(flushDesc -> { long flushSequenceNumber = flushDesc.getFlushSequenceNumber(); - int toClearCount = 0; - long toClearSize = 0; - for (;;) { - SinkEntry e = entries.peek(); - if (e == null) { - break; - } - if (e.key.getSequenceId() < flushSequenceNumber) { - entries.poll(); - toClearCount++; - toClearSize += e.size; - } else { - break; - } - } lastFlushedSequenceId = flushSequenceNumber; + long clearedCount = entries.size(); + long clearedSize = clearAllEntries(); if (LOG.isDebugEnabled()) { LOG.debug( - "Got a flush all request with sequence id {}, clear {} pending" - + " entries with size {}", - flushSequenceNumber, toClearCount, - StringUtils.TraditionalBinaryPrefix.long2String(toClearSize, "", 1)); + "Got a flush all request with sequence id {}, clear {} pending" + + " entries with size {}, clear failed replicas {}", + flushSequenceNumber, clearedCount, + StringUtils.TraditionalBinaryPrefix.long2String(clearedSize, "", 1), + failedReplicas); } - clearFailedReplica(flushSequenceNumber); + failedReplicas.clear(); flushRequester.recordFlush(flushSequenceNumber); }); } @@ -371,7 +354,7 @@ public class RegionReplicationSink { // failed clearAllEntries(); for (int replicaId = 1; replicaId < regionReplication; replicaId++) { - failedReplicas.put(replicaId, entry.key.getSequenceId()); + failedReplicas.add(replicaId); } flushRequester.requestFlush(entry.key.getSequenceId()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index c88581db2dd..94747ae026d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -471,7 +471,9 @@ public final class WALSplitUtil { * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances * extracted from the passed in WALEntry. * @return list of Pair<MutationType, Mutation> to be replayed + * @deprecated Since 3.0.0, will be removed in 4.0.0. */ + @Deprecated public static List getMutationsFromWALEntry(AdminProtos.WALEntry entry, CellScanner cells, Pair logEntry, Durability durability) throws IOException { if (entry == null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 56813af6b60..4e5cd2840d8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -755,4 +755,10 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface, public RegionReplicationBufferManager getRegionReplicationBufferManager() { return null; } + + @Override + public ReplicateWALEntryResponse replicateToReplica(RpcController controller, + ReplicateWALEntryRequest request) throws ServiceException { + return null; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 1db4bb8109e..6214b312eb2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -62,7 +62,6 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorConfig; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; @@ -113,6 +112,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescript * Tests of HRegion methods for replaying flush, compaction, region open, etc events for secondary * region replicas */ +@SuppressWarnings("deprecation") @Category(LargeTests.class) public class TestHRegionReplayEvents { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java new file mode 100644 index 00000000000..d9f846d789a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReplicateToReplica.java @@ -0,0 +1,388 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNameTestRule; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.executor.ExecutorType; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil; +import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; +import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestReplicateToReplica { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicateToReplica.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private static byte[] FAMILY = Bytes.toBytes("family"); + + private static byte[] QUAL = Bytes.toBytes("qualifier"); + + private static ExecutorService EXEC; + + @Rule + public final TableNameTestRule name = new TableNameTestRule(); + + private TableName tableName; + + private Path testDir; + + private TableDescriptor td; + + private RegionServerServices rss; + + private AsyncClusterConnection conn; + + private RegionReplicationBufferManager manager; + + private FlushRequester flushRequester; + + private HRegion primary; + + private HRegion secondary; + + private WALFactory walFactory; + + private boolean queueReqAndResps; + + private Queue, CompletableFuture>> reqAndResps; + + private static List TO_ADD_AFTER_PREPARE_FLUSH; + + public static final class HRegionForTest extends HRegion { + + public HRegionForTest(HRegionFileSystem fs, WAL wal, Configuration confParam, + TableDescriptor htd, RegionServerServices rsServices) { + super(fs, wal, confParam, htd, rsServices); + } + + @SuppressWarnings("deprecation") + public HRegionForTest(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + @Override + protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + PrepareFlushResult result = super.internalPrepareFlushCache(wal, myseqid, storesToFlush, + status, writeFlushWalMarker, tracker); + for (Put put : TO_ADD_AFTER_PREPARE_FLUSH) { + put(put); + } + TO_ADD_AFTER_PREPARE_FLUSH.clear(); + return result; + } + + } + + @BeforeClass + public static void setUpBeforeClass() { + Configuration conf = UTIL.getConfiguration(); + conf.setInt("hbase.region.read-replica.sink.flush.min-interval.secs", 1); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); + conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true); + conf.setClass(HConstants.REGION_IMPL, HRegionForTest.class, HRegion.class); + EXEC = new ExecutorService("test"); + EXEC.startExecutorService(EXEC.new ExecutorConfig().setCorePoolSize(1) + .setExecutorType(ExecutorType.RS_COMPACTED_FILES_DISCHARGER)); + ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + } + + @AfterClass + public static void tearDownAfterClass() { + EXEC.shutdown(); + UTIL.cleanupTestDir(); + } + + @Before + public void setUp() throws IOException { + TO_ADD_AFTER_PREPARE_FLUSH = new ArrayList<>(); + tableName = name.getTableName(); + testDir = UTIL.getDataTestDir(tableName.getNameAsString()); + Configuration conf = UTIL.getConfiguration(); + conf.set(HConstants.HBASE_DIR, testDir.toString()); + + td = TableDescriptorBuilder.newBuilder(tableName) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(2) + .setRegionMemStoreReplication(true).build(); + + reqAndResps = new ArrayDeque<>(); + queueReqAndResps = true; + conn = mock(AsyncClusterConnection.class); + when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())).thenAnswer(i -> { + if (queueReqAndResps) { + @SuppressWarnings("unchecked") + List entries = i.getArgument(1, List.class); + CompletableFuture future = new CompletableFuture<>(); + reqAndResps.add(Pair.newPair(entries, future)); + return future; + } else { + return CompletableFuture.completedFuture(null); + } + }); + + flushRequester = mock(FlushRequester.class); + + rss = mock(RegionServerServices.class); + when(rss.getServerName()).thenReturn(ServerName.valueOf("foo", 1, 1)); + when(rss.getConfiguration()).thenReturn(conf); + when(rss.getRegionServerAccounting()).thenReturn(new RegionServerAccounting(conf)); + when(rss.getExecutorService()).thenReturn(EXEC); + when(rss.getAsyncClusterConnection()).thenReturn(conn); + when(rss.getFlushRequester()).thenReturn(flushRequester); + + manager = new RegionReplicationBufferManager(rss); + when(rss.getRegionReplicationBufferManager()).thenReturn(manager); + + RegionInfo primaryHri = RegionInfoBuilder.newBuilder(td.getTableName()).build(); + RegionInfo secondaryHri = RegionReplicaUtil.getRegionInfoForReplica(primaryHri, 1); + + walFactory = new WALFactory(conf, UUID.randomUUID().toString()); + WAL wal = walFactory.getWAL(primaryHri); + primary = HRegion.createHRegion(primaryHri, testDir, conf, td, wal); + primary.close(); + + primary = HRegion.openHRegion(testDir, primaryHri, td, wal, conf, rss, null); + secondary = HRegion.openHRegion(secondaryHri, td, null, conf, rss, null); + + when(rss.getRegions()).then(i -> { + return Arrays.asList(primary, secondary); + }); + + // process the open events + replicateAll(); + } + + @After + public void tearDown() throws IOException { + // close region will issue a flush, which will enqueue an edit into the replication sink so we + // need to complete it otherwise the test will hang. + queueReqAndResps = false; + failAll(); + HBaseTestingUtil.closeRegionAndWAL(primary); + HBaseTestingUtil.closeRegionAndWAL(secondary); + if (walFactory != null) { + walFactory.close(); + } + } + + private FlushResult flushPrimary() throws IOException { + return primary.flushcache(true, true, FlushLifeCycleTracker.DUMMY); + } + + private void replicate(Pair, CompletableFuture> pair) throws IOException { + Pair params = ReplicationProtobufUtil + .buildReplicateWALEntryRequest(pair.getFirst().toArray(new WAL.Entry[0]), + secondary.getRegionInfo().getEncodedNameAsBytes(), null, null, null); + for (WALEntry entry : params.getFirst().getEntryList()) { + secondary.replayWALEntry(entry, params.getSecond()); + } + pair.getSecond().complete(null); + } + + private void replicateOne() throws IOException { + replicate(reqAndResps.remove()); + } + + private void replicateAll() throws IOException { + for (Pair, CompletableFuture> pair;;) { + pair = reqAndResps.poll(); + if (pair == null) { + break; + } + replicate(pair); + } + } + + private void failOne() { + reqAndResps.remove().getSecond().completeExceptionally(new IOException("Inject error")); + } + + private void failAll() { + for (Pair, CompletableFuture> pair;;) { + pair = reqAndResps.poll(); + if (pair == null) { + break; + } + pair.getSecond().completeExceptionally(new IOException("Inject error")); + } + } + + @Test + public void testNormalReplicate() throws IOException { + byte[] row = Bytes.toBytes(0); + primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + replicateOne(); + assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); + } + + @Test + public void testNormalFlush() throws IOException { + byte[] row = Bytes.toBytes(0); + primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + flushPrimary(); + replicateAll(); + assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); + + // we should have the same memstore size, i.e, the secondary should have also dropped the + // snapshot + assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); + } + + @Test + public void testErrorBeforeFlushStart() throws IOException { + byte[] row = Bytes.toBytes(0); + primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + failOne(); + verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); + TO_ADD_AFTER_PREPARE_FLUSH.add(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + flushPrimary(); + // this also tests start flush with empty memstore at secondary replica side + replicateAll(); + assertEquals(2, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); + assertEquals(primary.getMemStoreDataSize(), secondary.getMemStoreDataSize()); + } + + @Test + public void testErrorAfterFlushStartBeforeFlushCommit() throws IOException { + primary.put(new Put(Bytes.toBytes(0)).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + replicateAll(); + TO_ADD_AFTER_PREPARE_FLUSH + .add(new Put(Bytes.toBytes(1)).addColumn(FAMILY, QUAL, Bytes.toBytes(2))); + flushPrimary(); + // replicate the start flush edit + replicateOne(); + // fail the remaining edits, the put and the commit flush edit + failOne(); + verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); + primary.put(new Put(Bytes.toBytes(2)).addColumn(FAMILY, QUAL, Bytes.toBytes(3))); + flushPrimary(); + replicateAll(); + for (int i = 0; i < 3; i++) { + assertEquals(i + 1, + Bytes.toInt(secondary.get(new Get(Bytes.toBytes(i))).getValue(FAMILY, QUAL))); + } + // should have nothing in memstore + assertEquals(0, secondary.getMemStoreDataSize()); + } + + @Test + public void testCatchUpWithCannotFlush() throws IOException, InterruptedException { + byte[] row = Bytes.toBytes(0); + primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + failOne(); + verify(flushRequester, times(1)).requestFlush(any(), anyList(), any()); + flushPrimary(); + failAll(); + Thread.sleep(2000); + // we will request flush the second time + verify(flushRequester, times(2)).requestFlush(any(), anyList(), any()); + // we can not flush because no content in memstore + FlushResult result = flushPrimary(); + assertEquals(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, result.getResult()); + // the secondary replica does not have this row yet + assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); + // replicate the can not flush edit + replicateOne(); + // we should have the row now + assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); + } + + @Test + public void testCatchUpWithReopen() throws IOException { + byte[] row = Bytes.toBytes(0); + primary.put(new Put(row).addColumn(FAMILY, QUAL, Bytes.toBytes(1))); + failOne(); + primary.close(); + // the secondary replica does not have this row yet, although the above close has flushed the + // data out + assertFalse(secondary.get(new Get(row).setCheckExistenceOnly(true)).getExists().booleanValue()); + + // reopen + primary = HRegion.openHRegion(testDir, primary.getRegionInfo(), td, primary.getWAL(), + UTIL.getConfiguration(), rss, null); + replicateAll(); + // we should have the row now + assertEquals(1, Bytes.toInt(secondary.get(new Get(row)).getValue(FAMILY, QUAL))); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java index 76a224b8b25..918f6448dad 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/regionreplication/TestRegionReplicationSink.java @@ -236,7 +236,7 @@ public class TestRegionReplicationSink { throw new IllegalStateException(); }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); FlushDescriptor fd = - ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles); + ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); WALEdit edit2 = WALEdit.createFlushWALEdit(primary, fd); sink.add(key2, edit2, rpcCall2); @@ -300,7 +300,7 @@ public class TestRegionReplicationSink { throw new IllegalStateException(); }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); FlushDescriptor fd = - ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 2L, committedFiles); + ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 2L, committedFiles); WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd); sink.add(key3, edit3, rpcCall3); @@ -313,91 +313,4 @@ public class TestRegionReplicationSink { // should have send out all so no pending entries. assertEquals(0, sink.pendingSize()); } - - @Test - public void testNotClearFailedReplica() { - // simulate this scenario: - // 1. prepare flush - // 2. add one edit broken - // 3. commit flush with flush sequence number less than the previous edit(this is the normal - // case) - // we should not clear the failed replica as we do not flush the broken edit out with this - // flush, we need an extra flush to flush it out - MutableInt next = new MutableInt(0); - List> futures = - Stream.generate(() -> new CompletableFuture()).limit(8).collect(Collectors.toList()); - when(conn.replicate(any(), anyList(), anyInt(), anyLong(), anyLong())) - .then(i -> futures.get(next.getAndIncrement())); - when(manager.increase(anyLong())).thenReturn(true); - - ServerCall rpcCall1 = mock(ServerCall.class); - WALKeyImpl key1 = mock(WALKeyImpl.class); - when(key1.estimatedSerializedSizeOf()).thenReturn(100L); - when(key1.getSequenceId()).thenReturn(1L); - Map> committedFiles = td.getColumnFamilyNames().stream() - .collect(Collectors.toMap(Function.identity(), k -> Collections.emptyList(), (u, v) -> { - throw new IllegalStateException(); - }, () -> new TreeMap<>(Bytes.BYTES_COMPARATOR))); - FlushDescriptor fd = - ProtobufUtil.toFlushDescriptor(FlushAction.START_FLUSH, primary, 1L, committedFiles); - WALEdit edit1 = WALEdit.createFlushWALEdit(primary, fd); - sink.add(key1, edit1, rpcCall1); - - futures.get(0).complete(null); - futures.get(1).complete(null); - - ServerCall rpcCall2 = mock(ServerCall.class); - WALKeyImpl key2 = mock(WALKeyImpl.class); - when(key2.estimatedSerializedSizeOf()).thenReturn(200L); - when(key2.getSequenceId()).thenReturn(2L); - WALEdit edit2 = mock(WALEdit.class); - when(edit2.estimatedSerializedSizeOf()).thenReturn(2000L); - sink.add(key2, edit2, rpcCall2); - - // fail the call to replica 1 - futures.get(2).completeExceptionally(new IOException("inject error")); - futures.get(3).complete(null); - - ServerCall rpcCall3 = mock(ServerCall.class); - WALKeyImpl key3 = mock(WALKeyImpl.class); - when(key3.estimatedSerializedSizeOf()).thenReturn(300L); - when(key3.getSequenceId()).thenReturn(3L); - FlushDescriptor fd3 = - ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 1L, committedFiles); - WALEdit edit3 = WALEdit.createFlushWALEdit(primary, fd3); - sink.add(key3, edit3, rpcCall3); - - // we should only call replicate once for edit3, since replica 1 is marked as failed, and the - // flush request can not clean the failed replica since the flush sequence number is not greater - // than sequence id of the last failed edit - verify(conn, times(5)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); - futures.get(4).complete(null); - - ServerCall rpcCall4 = mock(ServerCall.class); - WALKeyImpl key4 = mock(WALKeyImpl.class); - when(key4.estimatedSerializedSizeOf()).thenReturn(400L); - when(key4.getSequenceId()).thenReturn(4L); - WALEdit edit4 = mock(WALEdit.class); - when(edit4.estimatedSerializedSizeOf()).thenReturn(4000L); - sink.add(key4, edit4, rpcCall4); - - // still, only send to replica 2 - verify(conn, times(6)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); - futures.get(5).complete(null); - - ServerCall rpcCall5 = mock(ServerCall.class); - WALKeyImpl key5 = mock(WALKeyImpl.class); - when(key5.estimatedSerializedSizeOf()).thenReturn(300L); - when(key5.getSequenceId()).thenReturn(3L); - FlushDescriptor fd5 = - ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH, primary, 4L, committedFiles); - WALEdit edit5 = WALEdit.createFlushWALEdit(primary, fd5); - sink.add(key5, edit5, rpcCall5); - - futures.get(6).complete(null); - futures.get(7).complete(null); - // should have cleared the failed replica because the flush sequence number is greater than than - // the sequence id of the last failed edit - verify(conn, times(8)).replicate(any(), anyList(), anyInt(), anyLong(), anyLong()); - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java index a4da6407b51..2fcfc29a163 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java @@ -89,7 +89,6 @@ public class TestMetaRegionReplicaReplication { conf.setInt("zookeeper.recovery.retry", 1); conf.setInt("zookeeper.recovery.retry.intervalmill", 10); conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); // Enable hbase:meta replication. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java index 231c9e154f3..ac279ed6e5e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java @@ -85,7 +85,6 @@ public class TestRegionReplicaReplication { conf.setInt("zookeeper.recovery.retry.intervalmill", 10); conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true); conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); - conf.setInt("replication.stats.thread.period.seconds", 5); conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);