From 7a666858019ed9af860763f615ce810e6b31a4d2 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Tue, 17 Oct 2017 21:27:05 +0800 Subject: [PATCH] HBASE-19001 Remove the hooks in RegionObserver which are designed to construct a StoreScanner which is marked as IA.Private --- .../hbase/coprocessor/RegionObserver.java | 77 ----- .../hadoop/hbase/regionserver/HMobStore.java | 24 +- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hadoop/hbase/regionserver/HStore.java | 18 +- .../hadoop/hbase/regionserver/Region.java | 3 - .../regionserver/RegionCoprocessorHost.java | 64 +--- .../regionserver/ReversedStoreScanner.java | 6 +- .../hbase/regionserver/StoreFlusher.java | 12 +- .../regionserver/compactions/Compactor.java | 44 +-- ...tAvoidCellReferencesIntoShippedBlocks.java | 199 ++++++------ .../hbase/client/TestFromClientSide.java | 156 ---------- .../TestFromClientSideScanExcpetion.java | 238 ++++++++++++++ ...lientSideScanExcpetionWithCoprocessor.java | 43 +++ .../coprocessor/SimpleRegionObserver.java | 36 --- .../TestRegionObserverScannerOpenHook.java | 31 +- .../DelegatingInternalScanner.java | 45 +++ .../regionserver/NoOpScanPolicyObserver.java | 66 +--- .../hbase/util/TestCoprocessorScanPolicy.java | 292 ++++++++++-------- 18 files changed, 652 insertions(+), 706 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index a1e4f0eb50e..d03a9beea5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -123,27 +123,6 @@ public interface RegionObserver { */ default void postLogReplay(ObserverContext c) {} - /** - * Called before a memstore is flushed to disk and prior to creating the scanner to read from - * the memstore. To override or modify how a memstore is flushed, - * implementing classes can return a new scanner to provide the KeyValues to be - * stored into the new {@code StoreFile} or null to perform the default processing. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - * @param c the environment provided by the region server - * @param store the store being flushed - * @param scanners the scanners for the memstore that is flushed - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param readPoint the readpoint to create scanner - * @return the scanner to use during the flush. {@code null} if the default implementation - * is to be used. - */ - default InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - return s; - } - /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server @@ -235,33 +214,6 @@ public interface RegionObserver { return scanner; } - /** - * Called prior to writing the {@link StoreFile}s selected for compaction into a new - * {@code StoreFile} and prior to creating the scanner used to read the input files. To override - * or modify the compaction process, implementing classes can return a new scanner to provide the - * KeyValues to be stored into the new {@code StoreFile} or null to perform the default - * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - * @param c the environment provided by the region server - * @param store the store being compacted - * @param scanners the list of store file scanners to be read from - * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction - * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store - * files - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param tracker used to track the life cycle of a compaction - * @param request the requested compaction - * @param readPoint the readpoint to create scanner - * @return the scanner to use during compaction. {@code null} if the default implementation is to - * be used. - */ - default InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - return s; - } - /** * Called after compaction has completed and the new store file has been moved in to place. * @param c the environment provided by the region server @@ -802,35 +754,6 @@ public interface RegionObserver { return s; } - /** - * Called before a store opens a new scanner. - * This hook is called when a "user" scanner is opened. - *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} - * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, - * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners - * created for flushes or compactions, resp. - *

- * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - *

- * Note: Do not retain references to any Cells returned by scanner, beyond the life of this - * invocation. If need a Cell reference for later use, copy the cell and use that. - * @param c the environment provided by the region server - * @param store the store being scanned - * @param scan the Scan specification - * @param targetCols columns to be used in the scanner - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param readPt the read point - * @return a KeyValueScanner instance to use or {@code null} to use the default implementation - */ - default KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - return s; - } - /** * Called after the client opens a new scanner. *

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 95bbf743a99..206c3cdc1d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -145,21 +145,19 @@ public class HMobStore extends HStore { */ @Override protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt, KeyValueScanner scanner) throws IOException { - if (scanner == null) { - if (MobUtils.isRefOnlyScan(scan)) { - Filter refOnlyFilter = new MobReferenceOnlyFilter(); - Filter filter = scan.getFilter(); - if (filter != null) { - scan.setFilter(new FilterList(filter, refOnlyFilter)); - } else { - scan.setFilter(refOnlyFilter); - } + long readPt) throws IOException { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); } - scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, - targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); } - return scanner; + return scan.isReversed() + ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt) + : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1cbb689231f..da3a1e9f194 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1446,7 +1446,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return maxFlushedSeqId; } - @Override + /** + * @return readpoint considering given IsolationLevel. Pass {@code null} for default + */ public long getReadPoint(IsolationLevel isolationLevel) { if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 186608f28d3..83b5561d57e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1920,25 +1920,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat final NavigableSet targetCols, long readPt) throws IOException { lock.readLock().lock(); try { - KeyValueScanner scanner = null; - if (this.getCoprocessorHost() != null) { - scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt); - } - scanner = createScanner(scan, targetCols, readPt, scanner); - return scanner; + return createScanner(scan, targetCols, readPt); } finally { lock.readLock().unlock(); } } protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt, KeyValueScanner scanner) throws IOException { - if (scanner == null) { - scanner = scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); - } - return scanner; + long readPt) throws IOException { + return scan.isReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, + getScanInfo(), scan, targetCols, readPt); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 630ae80cac0..79012ea92df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -131,9 +131,6 @@ public interface Region extends ConfigurationObserver { */ public Map getMaxStoreSeqId(); - /** @return readpoint considering given IsolationLevel; pass null for default*/ - long getReadPoint(IsolationLevel isolationLevel); - /** * @return The earliest time a store in the region was flushed. All * other stores in the region would have been flushed either at, or diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index b78d95b43b1..fbd93b8e02a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -19,18 +19,18 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.protobuf.Message; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; -import com.google.protobuf.Message; -import com.google.protobuf.Service; import org.apache.commons.collections4.map.AbstractReferenceMap; import org.apache.commons.collections4.map.ReferenceMap; import org.apache.commons.logging.Log; @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -77,15 +76,15 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CoprocessorClassLoader; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; /** * Implements the coprocessor environment and runtime support for coprocessors @@ -541,27 +540,6 @@ public class RegionCoprocessorHost } } - /** - * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, - * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} - */ - public InternalScanner preCompactScannerOpen(final HStore store, - final List scanners, final ScanType scanType, final long earliestPutTs, - final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user, - final long readPoint) - throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult( - regionObserverGetter, user) { - @Override - public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preCompactScannerOpen(this, store, scanners, scanType, - earliestPutTs, getResult(), tracker, request, readPoint); - } - }); - } - /** * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently * available candidates. @@ -673,21 +651,6 @@ public class RegionCoprocessorHost }); } - /** - * See - * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} - */ - public InternalScanner preFlushScannerOpen(final HStore store, - final List scanners, final long readPoint) throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { - @Override - public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint); - } - }); - } - /** * Invoked after a memstore flush * @throws IOException @@ -1158,21 +1121,6 @@ public class RegionCoprocessorHost }); } - /** - * See - * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)} - */ - public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan, - final NavigableSet targetCols, final long readPt) throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { - @Override - public KeyValueScanner call(RegionObserver observer) throws IOException { - return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt); - } - }); - } - /** * @param scan the Scan specification * @param s the scanner diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 0089d3f3a71..04e68657faa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -34,7 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience; * reversed scanning. */ @InterfaceAudience.Private -class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { +public class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we @@ -46,14 +46,14 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { * @param columns which columns we are scanning * @throws IOException */ - ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, + public ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, long readPt) throws IOException { super(store, scanInfo, scan, columns, readPt); } /** Constructor for testing. */ - ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, + public ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, List scanners) throws IOException { super(scan, scanInfo, columns, scanners); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 124b7b58a07..8fde7d5e047 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -79,15 +79,9 @@ abstract class StoreFlusher { */ protected InternalScanner createScanner(List snapshotScanners, long smallestReadPoint) throws IOException { - InternalScanner scanner = null; - if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners, - smallestReadPoint); - } - if (scanner == null) { - scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); - } + InternalScanner scanner = + new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); assert scanner != null; if (store.getCoprocessorHost() != null) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 7ca3ab4a2bf..f9efd98261a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -86,8 +86,8 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private boolean dropCacheMajor; - private boolean dropCacheMinor; + private final boolean dropCacheMajor; + private final boolean dropCacheMinor; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { @@ -138,7 +138,7 @@ public abstract class Compactor { * @param allFiles Whether all files are included for compaction * @return The result. */ - protected FileDetails getFileDetails( + private FileDetails getFileDetails( Collection filesToCompact, boolean allFiles) throws IOException { FileDetails fd = new FileDetails(); long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - @@ -217,13 +217,13 @@ public abstract class Compactor { * @param filesToCompact Files. * @return Scanners. */ - protected List createFileScanners(Collection filesToCompact, + private List createFileScanners(Collection filesToCompact, long smallestReadPoint, boolean useDropBehind) throws IOException { return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, smallestReadPoint); } - protected long getSmallestReadPoint() { + private long getSmallestReadPoint() { return store.getSmallestReadPoint(); } @@ -257,7 +257,7 @@ public abstract class Compactor { * @return Writer for a new StoreFile in the tmp dir. * @throws IOException if creation failed */ - protected StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) + protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. @@ -267,7 +267,7 @@ public abstract class Compactor { /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); } - protected List compact(final CompactionRequestImpl request, + protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); @@ -291,12 +291,8 @@ public abstract class Compactor { try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user, - smallestReadPoint); - if (scanner == null) { - scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); - } - scanner = postCreateCoprocScanner(request, scanType, scanner, user); + scanner = postCreateCoprocScanner(request, scanType, + scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList<>(); @@ -330,26 +326,6 @@ public abstract class Compactor { protected abstract void abortWriter(T writer) throws IOException; - /** - * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation. - * @param request Compaction request. - * @param scanType Scan type. - * @param earliestPutTs Earliest put ts. - * @param scanners File scanners for compaction files. - * @param user the User - * @param readPoint the read point to help create scanner by Coprocessor if required. - * @return Scanner override by coprocessor; null if not overriding. - */ - protected InternalScanner preCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, - long earliestPutTs, List scanners, User user, long readPoint) - throws IOException { - if (store.getCoprocessorHost() == null) { - return null; - } - return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, - earliestPutTs, request.getTracker(), request, user, readPoint); - } - /** * Calls coprocessor, if any, to create scanners - after normal scanner creation. * @param request Compaction request. @@ -357,7 +333,7 @@ public abstract class Compactor { * @param scanner The default scanner created for compaction. * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ - protected InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, + private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, InternalScanner scanner, User user) throws IOException { if (store.getCoprocessorHost() == null) { return scanner; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java index ac0a4e6f08e..baf014562ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java @@ -24,12 +24,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparatorImpl; @@ -46,15 +43,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -67,10 +62,10 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; + @Category({ LargeTests.class, ClientTests.class }) -@SuppressWarnings("deprecation") public class TestAvoidCellReferencesIntoShippedBlocks { - private static final Log LOG = LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); static byte[][] ROWS = new byte[2][]; private static byte[] ROW = Bytes.toBytes("testRow"); @@ -134,7 +129,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { try { // get the block cache and region RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); - String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); HStore store = region.getStores().iterator().next(); @@ -190,10 +185,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // Load cache Scan s = new Scan(); s.setMaxResultSize(1000); - ResultScanner scanner = table.getScanner(s); - int count = 0; - for (Result result : scanner) { - count++; + int count; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); // all the cache is loaded @@ -203,10 +197,8 @@ public class TestAvoidCellReferencesIntoShippedBlocks { region.compact(true); s = new Scan(); s.setMaxResultSize(1000); - scanner = table.getScanner(s); - count = 0; - for (Result result : scanner) { - count++; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); } finally { @@ -224,10 +216,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { } public void run() { - Scan s = new Scan(); - s.setCaching(1); - s.setStartRow(ROW4); - s.setStopRow(ROW5); + Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); try { while(!doScan.get()) { try { @@ -246,9 +235,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // evict what ever is available cache.evictBlock(cacheKey); } - ResultScanner scanner = table.getScanner(s); - for (Result res : scanner) { - + try (ResultScanner scanner = table.getScanner(s)) { + while (scanner.next() != null) { + } } compactReadLatch.countDown(); } catch (IOException e) { @@ -264,35 +253,24 @@ public class TestAvoidCellReferencesIntoShippedBlocks { } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, - long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, - CompactionRequest request, long readPoint) - throws IOException { - return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs); - } - - private InternalScanner createCompactorScanner(HStore store, - List scanners, ScanType scanType, long earliestPutTs) - throws IOException { - return new CompactorStoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, - scanType, store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new CompactorInternalScanner(scanner); } } - private static class CompactorStoreScanner extends StoreScanner { + private static final class CompactorInternalScanner extends DelegatingInternalScanner { - public CompactorStoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, - List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs) throws IOException { - super(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs); + public CompactorInternalScanner(InternalScanner scanner) { + super(scanner); } @Override - public boolean next(List outResult, ScannerContext scannerContext) throws IOException { - boolean next = super.next(outResult, scannerContext); - for (Cell cell : outResult) { - if(CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean next = scanner.next(result, scannerContext); + for (Cell cell : result) { + if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { try { // hold the compaction // set doscan to true @@ -314,7 +292,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { try { // get the block cache and region RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); - String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName) .getRegion(regionName); HStore store = region.getStores().iterator().next(); @@ -364,10 +342,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // Load cache Scan s = new Scan(); s.setMaxResultSize(1000); - ResultScanner scanner = table.getScanner(s); - int count = 0; - for (Result result : scanner) { - count++; + int count; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); @@ -375,77 +352,75 @@ public class TestAvoidCellReferencesIntoShippedBlocks { s = new Scan(); // Start a scan from row3 s.setCaching(1); - s.setStartRow(ROW1); + s.withStartRow(ROW1); // set partial as true so that the scan can send partial columns also s.setAllowPartialResults(true); s.setMaxResultSize(1000); - scanner = table.getScanner(s); - Thread evictorThread = new Thread() { - @Override - public void run() { - List cacheList = new ArrayList<>(); - Iterator iterator = cache.iterator(); - // evict all the blocks - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - cacheList.add(cacheKey); - cache.evictBlock(cacheKey); - } - try { - Thread.sleep(1); - } catch (InterruptedException e1) { - } - iterator = cache.iterator(); - int refBlockCount = 0; - while (iterator.hasNext()) { - iterator.next(); - refBlockCount++; - } - assertEquals("One block should be there ", refBlockCount, 1); - // Rescan to prepopulate the data - // cache this row. - Scan s1 = new Scan(); - // This scan will start from ROW1 and it will populate the cache with a - // row that is lower than ROW3. - s1.setStartRow(ROW3); - s1.setStopRow(ROW5); - s1.setCaching(1); - ResultScanner scanner; - try { - scanner = table.getScanner(s1); - int count = 0; - for (Result result : scanner) { - count++; - } - assertEquals("Count the rows", count, 2); - iterator = cache.iterator(); - List newCacheList = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(s)) { + Thread evictorThread = new Thread() { + @Override + public void run() { + List cacheList = new ArrayList<>(); + Iterator iterator = cache.iterator(); + // evict all the blocks while (iterator.hasNext()) { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - newCacheList.add(cacheKey); + cacheList.add(cacheKey); + cache.evictBlock(cacheKey); } - int newBlockRefCount = 0; - for (BlockCacheKey key : cacheList) { - if (newCacheList.contains(key)) { - newBlockRefCount++; + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + } + iterator = cache.iterator(); + int refBlockCount = 0; + while (iterator.hasNext()) { + iterator.next(); + refBlockCount++; + } + assertEquals("One block should be there ", refBlockCount, 1); + // Rescan to prepopulate the data + // cache this row. + Scan s1 = new Scan(); + // This scan will start from ROW1 and it will populate the cache with a + // row that is lower than ROW3. + s1.withStartRow(ROW3); + s1.withStopRow(ROW5); + s1.setCaching(1); + ResultScanner scanner; + try { + scanner = table.getScanner(s1); + int count = Iterables.size(scanner); + assertEquals("Count the rows", count, 2); + iterator = cache.iterator(); + List newCacheList = new ArrayList<>(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + newCacheList.add(cacheKey); } + int newBlockRefCount = 0; + for (BlockCacheKey key : cacheList) { + if (newCacheList.contains(key)) { + newBlockRefCount++; + } + } + + assertEquals("old blocks should still be found ", newBlockRefCount, 6); + latch.countDown(); + + } catch (IOException e) { } - - assertEquals("old blocks should still be found ", newBlockRefCount, 6); - latch.countDown(); - - } catch (IOException e) { } - } - }; - count = 0; - for (Result result : scanner) { - count++; - if (count == 2) { - evictorThread.start(); - latch.await(); + }; + count = 0; + while (scanner.next() != null) { + count++; + if (count == 2) { + evictorThread.start(); + latch.await(); + } } } assertEquals("Count should give all rows ", count, 10); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index d887e7bd56c..85d84de7347 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -39,14 +39,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.ArrayUtils; @@ -57,7 +53,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -74,11 +69,6 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -104,15 +94,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; -import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; -import org.apache.hadoop.hbase.regionserver.ScanInfo; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -540,147 +525,6 @@ public class TestFromClientSide { assertEquals(rowCount - endKeyCount, countGreater); } - /** - * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will - * fail with an IOException() on the first call. - */ - public static class ExceptionInReseekRegionObserver implements RegionCoprocessor, RegionObserver { - static AtomicLong reqCount = new AtomicLong(0); - static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE - static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once - - static void reset() { - reqCount.set(0); - isDoNotRetry.set(false); - throwOnce.set(true); - } - - @Override - public Optional getRegionObserver() { - return Optional.of(this); - } - - class MyStoreScanner extends StoreScanner { - public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, - long readPt) throws IOException { - super(store, scanInfo, scan, columns, readPt); - } - - @Override - protected List selectScannersFrom(HStore store, - List allScanners) { - List scanners = super.selectScannersFrom(store, allScanners); - List newScanners = new ArrayList<>(scanners.size()); - for (KeyValueScanner scanner : scanners) { - newScanners.add(new DelegatingKeyValueScanner(scanner) { - @Override - public boolean reseek(Cell key) throws IOException { - reqCount.incrementAndGet(); - if (!throwOnce.get()|| reqCount.get() == 1) { - if (isDoNotRetry.get()) { - throw new DoNotRetryIOException("Injected exception"); - } else { - throw new IOException("Injected exception"); - } - } - return super.reseek(key); - } - }); - } - return newScanners; - } - } - - @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, - final long readPt) throws IOException { - HStore hs = (HStore) store; - return new MyStoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt); - } - } - - /** - * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek - * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the - * ClientScanner does not get an exception and also sees all the data. - * @throws IOException - * @throws InterruptedException - */ - @Test - public void testClientScannerIsResetWhenScanThrowsIOException() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.throwOnce.set(true); // throw exceptions only once - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - assertEquals(rowCount, actualRowCount); - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); - } - - /** - * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation - * is that the exception will bubble up to the client scanner instead of being retried. - */ - @Test (timeout = 180000) - public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.isDoNotRetry.set(true); - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - fail("Should have thrown an exception"); - } catch (DoNotRetryIOException expected) { - // expected - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); - } - - /** - * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation - * is that the we will keep on retrying, but fail after the retries are exhausted instead of - * retrying indefinitely. - */ - @Test (timeout = 180000) - public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.throwOnce.set(false); // throw exceptions in every retry - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - fail("Should have thrown an exception"); - } catch (DoNotRetryIOException expected) { - assertTrue(expected instanceof ScannerResetException); - // expected - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() >= 3); - } - /* * @param key * @return Scan with RowFilter that does LESS than passed key. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java new file mode 100644 index 00000000000..f18ccc0bec3 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestFromClientSideScanExcpetion { + + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + + private static int SLAVES = 3; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); + conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); + conf.setBoolean("hbase.client.log.scanner.activity", true); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static AtomicBoolean ON = new AtomicBoolean(false); + private static AtomicLong REQ_COUNT = new AtomicLong(0); + private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw + // DNRIOE + private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once + + private static void reset() { + ON.set(false); + REQ_COUNT.set(0); + IS_DO_NOT_RETRY.set(false); + THROW_ONCE.set(true); + } + + private static void inject() { + ON.set(true); + } + + public static final class MyHRegion extends HRegion { + + @SuppressWarnings("deprecation") + public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + @Override + protected HStore instantiateHStore(ColumnFamilyDescriptor family) throws IOException { + return new MyHStore(this, family, conf); + } + } + + public static final class MyHStore extends HStore { + + public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam) + throws IOException { + super(region, family, confParam); + } + + @Override + protected KeyValueScanner createScanner(Scan scan, NavigableSet targetCols, long readPt) + throws IOException { + return scan.isReversed() + ? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt) + : new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + } + + public static final class MyStoreScanner extends StoreScanner { + public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + @Override + protected List selectScannersFrom(HStore store, + List allScanners) { + List scanners = super.selectScannersFrom(store, allScanners); + List newScanners = new ArrayList<>(scanners.size()); + for (KeyValueScanner scanner : scanners) { + newScanners.add(new DelegatingKeyValueScanner(scanner) { + @Override + public boolean reseek(Cell key) throws IOException { + if (ON.get()) { + REQ_COUNT.incrementAndGet(); + if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { + if (IS_DO_NOT_RETRY.get()) { + throw new DoNotRetryIOException("Injected exception"); + } else { + throw new IOException("Injected exception"); + } + } + } + return super.reseek(key); + } + }); + } + return newScanners; + } + } + + /** + * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving + * the server side RegionScanner to be in dirty state. The client has to ensure that the + * ClientScanner does not get an exception and also sees all the data. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testClientScannerIsResetWhenScanThrowsIOException() + throws IOException, InterruptedException { + reset(); + THROW_ONCE.set(true); // throw exceptions only once + TableName tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + assertEquals(rowCount, actualRowCount); + } + assertTrue(REQ_COUNT.get() > 0); + } + + /** + * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation + * is that the exception will bubble up to the client scanner instead of being retried. + */ + @Test + public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() + throws IOException, InterruptedException { + reset(); + IS_DO_NOT_RETRY.set(true); + TableName tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (DoNotRetryIOException expected) { + // expected + } + assertTrue(REQ_COUNT.get() > 0); + } + + /** + * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is + * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying + * indefinitely. + */ + @Test + public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + TableName tableName = TableName.valueOf(name.getMethodName()); + reset(); + THROW_ONCE.set(false); // throw exceptions in every retry + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (DoNotRetryIOException expected) { + assertThat(expected, instanceOf(ScannerResetException.class)); + // expected + } + assertTrue(REQ_COUNT.get() >= 3); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java new file mode 100644 index 00000000000..3d50ec7eccf --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Test all client operations with a coprocessor that just implements the default flush/compact/scan + * policy. + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestFromClientSideScanExcpetionWithCoprocessor + extends TestFromClientSideScanExcpetion { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); + TestFromClientSideScanExcpetion.setUpBeforeClass(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index ee946450453..91af2b74ca2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -82,11 +82,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPreClose = new AtomicInteger(0); final AtomicInteger ctPostClose = new AtomicInteger(0); final AtomicInteger ctPreFlush = new AtomicInteger(0); - final AtomicInteger ctPreFlushScannerOpen = new AtomicInteger(0); final AtomicInteger ctPostFlush = new AtomicInteger(0); final AtomicInteger ctPreCompactSelect = new AtomicInteger(0); final AtomicInteger ctPostCompactSelect = new AtomicInteger(0); - final AtomicInteger ctPreCompactScanner = new AtomicInteger(0); final AtomicInteger ctPreCompact = new AtomicInteger(0); final AtomicInteger ctPostCompact = new AtomicInteger(0); final AtomicInteger ctPreGet = new AtomicInteger(0); @@ -114,7 +112,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPreScannerClose = new AtomicInteger(0); final AtomicInteger ctPostScannerClose = new AtomicInteger(0); final AtomicInteger ctPreScannerOpen = new AtomicInteger(0); - final AtomicInteger ctPreStoreScannerOpen = new AtomicInteger(0); final AtomicInteger ctPostScannerOpen = new AtomicInteger(0); final AtomicInteger ctPreBulkLoadHFile = new AtomicInteger(0); final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0); @@ -180,14 +177,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return scanner; } - @Override - public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - ctPreFlushScannerOpen.incrementAndGet(); - return s; - } - @Override public void postFlush(ObserverContext c, Store store, StoreFile resultFile) throws IOException { @@ -222,15 +211,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return scanner; } - @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - ctPreCompactScanner.incrementAndGet(); - return s; - } - @Override public void postCompact(ObserverContext c, Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, @@ -250,14 +230,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return null; } - @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - ctPreStoreScannerOpen.incrementAndGet(); - return s; - } - @Override public RegionScanner postScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) @@ -830,10 +802,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPreFlush.get(); } - public int getCtPreFlushScannerOpen() { - return ctPreFlushScannerOpen.get(); - } - public int getCtPostFlush() { return ctPostFlush.get(); } @@ -846,10 +814,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPostCompactSelect.get(); } - public int getCtPreCompactScanner() { - return ctPreCompactScanner.get(); - } - public int getCtPreCompact() { return ctPreCompact.get(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 4448f9d7dad..4d6bfec1abc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -123,12 +124,15 @@ public class TestRegionObserverScannerOpenHook { } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - scan.setFilter(new NoDataFilter()); - HStore hs = (HStore) store; - return new StoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt); + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + c.bypass(); + } + + @Override + public RegionScanner preScannerOpen(ObserverContext c, Scan scan, + RegionScanner s) throws IOException { + return c.getEnvironment().getRegion().getScanner(scan.setFilter(new NoDataFilter())); } } @@ -152,10 +156,8 @@ public class TestRegionObserverScannerOpenHook { } @Override - public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - scanners.forEach(KeyValueScanner::close); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { return NO_DATA; } } @@ -171,12 +173,9 @@ public class TestRegionObserverScannerOpenHook { } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, - long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, - CompactionRequest request, long readPoint) - throws IOException { - scanners.forEach(KeyValueScanner::close); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { return NO_DATA; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java new file mode 100644 index 00000000000..ad733d11c79 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class DelegatingInternalScanner implements InternalScanner { + + protected final InternalScanner scanner; + + public DelegatingInternalScanner(InternalScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 2b98cf23630..cdad8501dec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -19,12 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; import java.util.Optional; -import java.util.OptionalInt; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -35,11 +31,10 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** - * RegionObserver that just reimplements the default behavior, - * in order to validate that all the necessary APIs for this are public - * This observer is also used in {@link TestFromClientSideWithCoprocessor} and - * {@link TestCompactionWithCoprocessor} to make sure that a wide range - * of functionality still behaves as expected. + * RegionObserver that just reimplements the default behavior, in order to validate that all the + * necessary APIs for this are public This observer is also used in + * {@link TestFromClientSideWithCoprocessor} and {@link TestCompactionWithCoprocessor} to make sure + * that a wide range of functionality still behaves as expected. */ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver { @@ -48,49 +43,22 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver return Optional.of(this); } - /** - * Reimplement the default behavior - */ @Override - public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - HStore hs = (HStore) store; - ScanInfo oldSI = hs.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), - oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); - } - - /** - * Reimplement the default behavior - */ - @Override - public InternalScanner preCompactScannerOpen( - final ObserverContext c, Store store, - List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - HStore hs = (HStore) store; - // this demonstrates how to override the scanners default behavior - ScanInfo oldSI = hs.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), - oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + return new DelegatingInternalScanner(scanner); } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPoint) - throws IOException { - HStore hs = (HStore) store; - Region r = c.getEnvironment().getRegion(); - return scan.isReversed() - ? new ReversedStoreScanner(hs, hs.getScanInfo(), scan, targetCols, - r.getReadPoint(scan.getIsolationLevel())) - : new StoreScanner(hs, hs.getScanInfo(), scan, targetCols, - r.getReadPoint(scan.getIsolationLevel())); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new DelegatingInternalScanner(scanner); + } + + @Override + public RegionScanner preScannerOpen(ObserverContext c, Scan scan, + RegionScanner s) throws IOException { + return c.getEnvironment().getRegion().getScanner(scan); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index ab9bfc59766..c67d7bf15c0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -17,31 +17,25 @@ */ package org.apache.hadoop.hbase.util; // this is deliberately not in the o.a.h.h.regionserver package + // in order to make sure all required classes/method are available import static org.junit.Assert.assertEquals; import java.io.IOException; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.NavigableSet; import java.util.Optional; -import java.util.OptionalInt; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; @@ -53,11 +47,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; @@ -73,7 +68,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -@Category({MiscTests.class, MediumTests.class}) +@Category({ MiscTests.class, MediumTests.class }) @RunWith(Parameterized.class) public class TestCoprocessorScanPolicy { protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @@ -84,8 +79,7 @@ public class TestCoprocessorScanPolicy { @BeforeClass public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - ScanObserver.class.getName()); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); TEST_UTIL.startMiniCluster(); } @@ -106,49 +100,58 @@ public class TestCoprocessorScanPolicy { @Test public void testBaseCases() throws Exception { - TableName tableName = - TableName.valueOf("baseCases"); + TableName tableName = TableName.valueOf("baseCases"); if (TEST_UTIL.getAdmin().tableExists(tableName)) { TEST_UTIL.deleteTable(tableName); } - Table t = TEST_UTIL.createTable(tableName, F, 1); - // set the version override to 2 - Put p = new Put(R); - p.setAttribute("versions", new byte[]{}); - p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); - t.put(p); - + Table t = TEST_UTIL.createTable(tableName, F, 10); + // insert 3 versions long now = EnvironmentEdgeManager.currentTime(); - - // insert 2 versions - p = new Put(R); + Put p = new Put(R); p.addColumn(F, Q, now, Q); t.put(p); p = new Put(R); p.addColumn(F, Q, now + 1, Q); t.put(p); + p = new Put(R); + p.addColumn(F, Q, now + 2, Q); + t.put(p); + Get g = new Get(R); - g.setMaxVersions(10); + g.readVersions(10); Result r = t.get(g); + assertEquals(3, r.size()); + + TEST_UTIL.flush(tableName); + TEST_UTIL.compact(tableName, true); + + // still visible after a flush/compaction + r = t.get(g); + assertEquals(3, r.size()); + + // set the version override to 2 + p = new Put(R); + p.setAttribute("versions", new byte[] {}); + p.addColumn(F, tableName.getName(), Bytes.toBytes(2)); + t.put(p); + + // only 2 versions now + r = t.get(g); assertEquals(2, r.size()); TEST_UTIL.flush(tableName); TEST_UTIL.compact(tableName, true); - // both version are still visible even after a flush/compaction - g = new Get(R); - g.setMaxVersions(10); + // still 2 versions after a flush/compaction r = t.get(g); assertEquals(2, r.size()); - // insert a 3rd version - p = new Put(R); - p.addColumn(F, Q, now + 2, Q); + // insert a new version + p.addColumn(F, Q, now + 3, Q); t.put(p); - g = new Get(R); - g.setMaxVersions(10); + + // still 2 versions r = t.get(g); - // still only two version visible assertEquals(2, r.size()); t.close(); @@ -156,41 +159,33 @@ public class TestCoprocessorScanPolicy { @Test public void testTTL() throws Exception { - TableName tableName = - TableName.valueOf("testTTL"); + TableName tableName = TableName.valueOf("testTTL"); if (TEST_UTIL.getAdmin().tableExists(tableName)) { TEST_UTIL.deleteTable(tableName); } - HTableDescriptor desc = new HTableDescriptor(tableName); - HColumnDescriptor hcd = new HColumnDescriptor(F) - .setMaxVersions(10) - .setTimeToLive(1); - desc.addFamily(hcd); - TEST_UTIL.getAdmin().createTable(desc); - Table t = TEST_UTIL.getConnection().getTable(tableName); + Table t = TEST_UTIL.createTable(tableName, F, 10); long now = EnvironmentEdgeManager.currentTime(); ManualEnvironmentEdge me = new ManualEnvironmentEdge(); me.setValue(now); EnvironmentEdgeManagerTestHelper.injectEdge(me); // 2s in the past long ts = now - 2000; - // Set the TTL override to 3s - Put p = new Put(R); - p.setAttribute("ttl", new byte[]{}); - p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); - t.put(p); - p = new Put(R); + Put p = new Put(R); p.addColumn(F, Q, ts, Q); t.put(p); p = new Put(R); p.addColumn(F, Q, ts + 1, Q); t.put(p); - // these two should be expired but for the override - // (their ts was 2s in the past) + // Set the TTL override to 3s + p = new Put(R); + p.setAttribute("ttl", new byte[] {}); + p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L)); + t.put(p); + // these two should still be there Get g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); Result r = t.get(g); // still there? assertEquals(2, r.size()); @@ -199,7 +194,7 @@ public class TestCoprocessorScanPolicy { TEST_UTIL.compact(tableName, true); g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); r = t.get(g); // still there? assertEquals(2, r.size()); @@ -208,7 +203,7 @@ public class TestCoprocessorScanPolicy { me.setValue(now + 2000); // now verify that data eventually does expire g = new Get(R); - g.setMaxVersions(10); + g.readAllVersions(); r = t.get(g); // should be gone now assertEquals(0, r.size()); @@ -217,8 +212,8 @@ public class TestCoprocessorScanPolicy { } public static class ScanObserver implements RegionCoprocessor, RegionObserver { - private Map ttls = new HashMap<>(); - private Map versions = new HashMap<>(); + private final ConcurrentMap ttls = new ConcurrentHashMap<>(); + private final ConcurrentMap versions = new ConcurrentHashMap<>(); @Override public Optional getRegionObserver() { @@ -231,85 +226,130 @@ public class TestCoprocessorScanPolicy { public void prePut(final ObserverContext c, final Put put, final WALEdit edit, final Durability durability) throws IOException { if (put.getAttribute("ttl") != null) { - Cell cell = put.getFamilyCellMap().values().iterator().next().get(0); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - ttls.put(TableName.valueOf( - Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())), - Bytes.toLong(CellUtil.cloneValue(kv))); + Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); + ttls.put( + TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())), + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); c.bypass(); } else if (put.getAttribute("versions") != null) { - Cell cell = put.getFamilyCellMap().values().iterator().next().get(0); - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - versions.put(TableName.valueOf( - Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())), - Bytes.toInt(CellUtil.cloneValue(kv))); + Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0); + versions.put( + TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength())), + Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); c.bypass(); } } - @Override - public InternalScanner preFlushScannerOpen( - final ObserverContext c, Store store, - List scanners, InternalScanner s, long readPoint) throws IOException { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - if (newTtl != null) { - System.out.println("PreFlush:" + newTtl); - } - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, - newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()), - scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), - HConstants.OLDEST_TIMESTAMP); + private InternalScanner wrap(Store store, InternalScanner scanner) { + Long ttl = this.ttls.get(store.getTableName()); + Integer version = this.versions.get(store.getTableName()); + return new DelegatingInternalScanner(scanner) { + + private byte[] row; + + private byte[] qualifier; + + private int count; + + private Predicate checkTtl(long now, long ttl) { + return c -> now - c.getTimestamp() > ttl; + } + + private Predicate checkVersion(Cell firstCell, int version) { + if (version == 0) { + return c -> true; + } else { + if (row == null || !CellUtil.matchingRow(firstCell, row)) { + row = CellUtil.cloneRow(firstCell); + // reset qualifier as there is a row change + qualifier = null; + } + return c -> { + if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) { + if (count >= version) { + return true; + } + count++; + return false; + } else { // qualifier switch + qualifier = CellUtil.cloneQualifier(c); + count = 1; + return false; + } + }; + } + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean moreRows = scanner.next(result, scannerContext); + if (result.isEmpty()) { + return moreRows; + } + long now = EnvironmentEdgeManager.currentTime(); + Predicate predicate = null; + if (ttl != null) { + predicate = checkTtl(now, ttl); + } + if (version != null) { + Predicate vp = checkVersion(result.get(0), version); + if (predicate != null) { + predicate = predicate.and(vp); + } else { + predicate = vp; + } + } + if (predicate != null) { + result.removeIf(predicate); + } + return moreRows; + } + }; } @Override - public InternalScanner preCompactScannerOpen( - final ObserverContext c, Store store, - List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s,CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), - family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, - newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()), - scanners, scanType, store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + return wrap(store, scanner); } @Override - public KeyValueScanner preStoreScannerOpen( - final ObserverContext c, Store store, final Scan scan, - final NavigableSet targetCols, KeyValueScanner s, long readPt) throws IOException { - TableName tn = store.getTableName(); - if (!tn.isSystemTable()) { - HStore hs = (HStore) store; - Long newTtl = ttls.get(store.getTableName()); - Integer newVersions = versions.get(store.getTableName()); - ScanInfo oldSI = hs.getScanInfo(); - ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), - family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, - newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), - family.isNewVersionBehavior()); - return new StoreScanner(hs, scanInfo, scan, targetCols, readPt); - } else { - return s; + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return wrap(store, scanner); + } + + @Override + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName(); + Long ttl = this.ttls.get(tableName); + if (ttl != null) { + get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax()); } + Integer version = this.versions.get(tableName); + if (version != null) { + get.readVersions(version); + } + } + + @Override + public RegionScanner preScannerOpen(ObserverContext c, Scan scan, + RegionScanner s) throws IOException { + Region region = c.getEnvironment().getRegion(); + TableName tableName = region.getTableDescriptor().getTableName(); + Long ttl = this.ttls.get(tableName); + if (ttl != null) { + scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax()); + } + Integer version = this.versions.get(tableName); + if (version != null) { + scan.readVersions(version); + } + return region.getScanner(scan); } } }