HBASE-19001 Remove the hooks in RegionObserver which are designed to construct a StoreScanner which is marked as IA.Private

This commit is contained in:
zhangduo 2017-10-17 21:27:05 +08:00
parent 5368fd5bf0
commit e804dd0b60
18 changed files with 652 additions and 706 deletions

View File

@ -123,27 +123,6 @@ public interface RegionObserver {
*/
default void postLogReplay(ObserverContext<RegionCoprocessorEnvironment> c) {}
/**
* Called before a memstore is flushed to disk and prior to creating the scanner to read from
* the memstore. To override or modify how a memstore is flushed,
* implementing classes can return a new scanner to provide the KeyValues to be
* stored into the new {@code StoreFile} or null to perform the default processing.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param scanners the scanners for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPoint the readpoint to create scanner
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
*/
default InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s, long readPoint)
throws IOException {
return s;
}
/**
* Called before the memstore is flushed to disk.
* @param c the environment provided by the region server
@ -235,33 +214,6 @@ public interface RegionObserver {
return scanner;
}
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile} and prior to creating the scanner used to read the input files. To override
* or modify the compaction process, implementing classes can return a new scanner to provide the
* KeyValues to be stored into the new {@code StoreFile} or null to perform the default
* processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list of store file scanners to be read from
* @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
* files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param tracker used to track the life cycle of a compaction
* @param request the requested compaction
* @param readPoint the readpoint to create scanner
* @return the scanner to use during compaction. {@code null} if the default implementation is to
* be used.
*/
default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
return s;
}
/**
* Called after compaction has completed and the new store file has been moved in to place.
* @param c the environment provided by the region server
@ -802,35 +754,6 @@ public interface RegionObserver {
return s;
}
/**
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners
* created for flushes or compactions, resp.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* <p>
* Note: Do not retain references to any Cells returned by scanner, beyond the life of this
* invocation. If need a Cell reference for later use, copy the cell and use that.
* @param c the environment provided by the region server
* @param store the store being scanned
* @param scan the Scan specification
* @param targetCols columns to be used in the scanner
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @param readPt the read point
* @return a KeyValueScanner instance to use or {@code null} to use the default implementation
*/
default KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt)
throws IOException {
return s;
}
/**
* Called after the client opens a new scanner.
* <p>

View File

@ -145,8 +145,7 @@ public class HMobStore extends HStore {
*/
@Override
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt, KeyValueScanner scanner) throws IOException {
if (scanner == null) {
long readPt) throws IOException {
if (MobUtils.isRefOnlyScan(scan)) {
Filter refOnlyFilter = new MobReferenceOnlyFilter();
Filter filter = scan.getFilter();
@ -156,10 +155,9 @@ public class HMobStore extends HStore {
scan.setFilter(refOnlyFilter);
}
}
scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan,
targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
}
return scanner;
return scan.isReversed()
? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
: new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
}
/**

View File

@ -1446,7 +1446,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return maxFlushedSeqId;
}
@Override
/**
* @return readpoint considering given IsolationLevel. Pass {@code null} for default
*/
public long getReadPoint(IsolationLevel isolationLevel) {
if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) {
// This scan can read even uncommitted transactions

View File

@ -1920,26 +1920,18 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
final NavigableSet<byte []> targetCols, long readPt) throws IOException {
lock.readLock().lock();
try {
KeyValueScanner scanner = null;
if (this.getCoprocessorHost() != null) {
scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt);
}
scanner = createScanner(scan, targetCols, readPt, scanner);
return scanner;
return createScanner(scan, targetCols, readPt);
} finally {
lock.readLock().unlock();
}
}
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt, KeyValueScanner scanner) throws IOException {
if (scanner == null) {
scanner = scan.isReversed() ? new ReversedStoreScanner(this,
long readPt) throws IOException {
return scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
}
return scanner;
}
/**
* Recreates the scanners on the current list of active store file scanners

View File

@ -131,9 +131,6 @@ public interface Region extends ConfigurationObserver {
*/
public Map<byte[], Long> getMaxStoreSeqId();
/** @return readpoint considering given IsolationLevel; pass null for default*/
long getReadPoint(IsolationLevel isolationLevel);
/**
* @return The earliest time a store in the region was flushed. All
* other stores in the region would have been flushed either at, or

View File

@ -19,18 +19,18 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Matcher;
import com.google.protobuf.Message;
import com.google.protobuf.Service;
import org.apache.commons.collections4.map.AbstractReferenceMap;
import org.apache.commons.collections4.map.ReferenceMap;
import org.apache.commons.logging.Log;
@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@ -77,15 +76,15 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CoprocessorClassLoader;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
* Implements the coprocessor environment and runtime support for coprocessors
@ -541,27 +540,6 @@ public class RegionCoprocessorHost
}
}
/**
* See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
* InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)}
*/
public InternalScanner preCompactScannerOpen(final HStore store,
final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs,
final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user,
final long readPoint)
throws IOException {
return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, user) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preCompactScannerOpen(this, store, scanners, scanType,
earliestPutTs, getResult(), tracker, request, readPoint);
}
});
}
/**
* Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently
* available candidates.
@ -673,21 +651,6 @@ public class RegionCoprocessorHost
});
}
/**
* See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
*/
public InternalScanner preFlushScannerOpen(final HStore store,
final List<KeyValueScanner> scanners, final long readPoint) throws IOException {
return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint);
}
});
}
/**
* Invoked after a memstore flush
* @throws IOException
@ -1158,21 +1121,6 @@ public class RegionCoprocessorHost
});
}
/**
* See
* {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)}
*/
public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan,
final NavigableSet<byte[]> targetCols, final long readPt) throws IOException {
return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, KeyValueScanner>(regionObserverGetter) {
@Override
public KeyValueScanner call(RegionObserver observer) throws IOException {
return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt);
}
});
}
/**
* @param scan the Scan specification
* @param s the scanner

View File

@ -34,7 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* reversed scanning.
*/
@InterfaceAudience.Private
class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
public class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we
@ -46,14 +46,14 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
* @param columns which columns we are scanning
* @throws IOException
*/
ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
public ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan,
NavigableSet<byte[]> columns, long readPt)
throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
/** Constructor for testing. */
ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
public ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
super(scan, scanInfo, columns, scanners);
}

View File

@ -79,15 +79,9 @@ abstract class StoreFlusher {
*/
protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint) throws IOException {
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners,
smallestReadPoint);
}
if (scanner == null) {
scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
InternalScanner scanner =
new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
if (store.getCoprocessorHost() != null) {
try {

View File

@ -86,8 +86,8 @@ public abstract class Compactor<T extends CellSink> {
protected static final String MINOR_COMPACTION_DROP_CACHE =
"hbase.regionserver.minorcompaction.pagecache.drop";
private boolean dropCacheMajor;
private boolean dropCacheMinor;
private final boolean dropCacheMajor;
private final boolean dropCacheMinor;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(Configuration conf, HStore store) {
@ -138,7 +138,7 @@ public abstract class Compactor<T extends CellSink> {
* @param allFiles Whether all files are included for compaction
* @return The result.
*/
protected FileDetails getFileDetails(
private FileDetails getFileDetails(
Collection<HStoreFile> filesToCompact, boolean allFiles) throws IOException {
FileDetails fd = new FileDetails();
long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() -
@ -217,13 +217,13 @@ public abstract class Compactor<T extends CellSink> {
* @param filesToCompact Files.
* @return Scanners.
*/
protected List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
private List<StoreFileScanner> createFileScanners(Collection<HStoreFile> filesToCompact,
long smallestReadPoint, boolean useDropBehind) throws IOException {
return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind,
smallestReadPoint);
}
protected long getSmallestReadPoint() {
private long getSmallestReadPoint() {
return store.getSmallestReadPoint();
}
@ -257,7 +257,7 @@ public abstract class Compactor<T extends CellSink> {
* @return Writer for a new StoreFile in the tmp dir.
* @throws IOException if creation failed
*/
protected StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind)
throws IOException {
// When all MVCC readpoints are 0, don't write them.
// See HBASE-8166, HBASE-12600, and HBASE-13389.
@ -267,7 +267,7 @@ public abstract class Compactor<T extends CellSink> {
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
}
protected List<Path> compact(final CompactionRequestImpl request,
protected final List<Path> compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
@ -291,12 +291,8 @@ public abstract class Compactor<T extends CellSink> {
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user,
smallestReadPoint);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
scanner = postCreateCoprocScanner(request, scanType,
scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<>();
@ -330,26 +326,6 @@ public abstract class Compactor<T extends CellSink> {
protected abstract void abortWriter(T writer) throws IOException;
/**
* Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param earliestPutTs Earliest put ts.
* @param scanners File scanners for compaction files.
* @param user the User
* @param readPoint the read point to help create scanner by Coprocessor if required.
* @return Scanner override by coprocessor; null if not overriding.
*/
protected InternalScanner preCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
long earliestPutTs, List<StoreFileScanner> scanners, User user, long readPoint)
throws IOException {
if (store.getCoprocessorHost() == null) {
return null;
}
return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType,
earliestPutTs, request.getTracker(), request, user, readPoint);
}
/**
* Calls coprocessor, if any, to create scanners - after normal scanner creation.
* @param request Compaction request.
@ -357,7 +333,7 @@ public abstract class Compactor<T extends CellSink> {
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return scanner;

View File

@ -24,12 +24,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
@ -46,15 +43,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheKey;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CachedBlock;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -67,10 +62,10 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables;
@Category({ LargeTests.class, ClientTests.class })
@SuppressWarnings("deprecation")
public class TestAvoidCellReferencesIntoShippedBlocks {
private static final Log LOG = LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class);
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static byte[][] ROWS = new byte[2][];
private static byte[] ROW = Bytes.toBytes("testRow");
@ -134,7 +129,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region =
(HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName);
HStore store = region.getStores().iterator().next();
@ -190,10 +185,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
// Load cache
Scan s = new Scan();
s.setMaxResultSize(1000);
ResultScanner scanner = table.getScanner(s);
int count = 0;
for (Result result : scanner) {
count++;
int count;
try (ResultScanner scanner = table.getScanner(s)) {
count = Iterables.size(scanner);
}
assertEquals("Count all the rows ", count, 6);
// all the cache is loaded
@ -203,10 +197,8 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
region.compact(true);
s = new Scan();
s.setMaxResultSize(1000);
scanner = table.getScanner(s);
count = 0;
for (Result result : scanner) {
count++;
try (ResultScanner scanner = table.getScanner(s)) {
count = Iterables.size(scanner);
}
assertEquals("Count all the rows ", count, 6);
} finally {
@ -224,10 +216,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
}
public void run() {
Scan s = new Scan();
s.setCaching(1);
s.setStartRow(ROW4);
s.setStopRow(ROW5);
Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1);
try {
while(!doScan.get()) {
try {
@ -246,9 +235,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
// evict what ever is available
cache.evictBlock(cacheKey);
}
ResultScanner scanner = table.getScanner(s);
for (Result res : scanner) {
try (ResultScanner scanner = table.getScanner(s)) {
while (scanner.next() != null) {
}
}
compactReadLatch.countDown();
} catch (IOException e) {
@ -264,34 +253,23 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException {
return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs);
}
private InternalScanner createCompactorScanner(HStore store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs)
throws IOException {
return new CompactorStoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
scanType, store.getSmallestReadPoint(), earliestPutTs);
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return new CompactorInternalScanner(scanner);
}
}
private static class CompactorStoreScanner extends StoreScanner {
private static final class CompactorInternalScanner extends DelegatingInternalScanner {
public CompactorStoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
super(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs);
public CompactorInternalScanner(InternalScanner scanner) {
super(scanner);
}
@Override
public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws IOException {
boolean next = super.next(outResult, scannerContext);
for (Cell cell : outResult) {
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean next = scanner.next(result, scannerContext);
for (Cell cell : result) {
if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) {
try {
// hold the compaction
@ -314,7 +292,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
try {
// get the block cache and region
RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName);
String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName();
String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName)
.getRegion(regionName);
HStore store = region.getStores().iterator().next();
@ -364,10 +342,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
// Load cache
Scan s = new Scan();
s.setMaxResultSize(1000);
ResultScanner scanner = table.getScanner(s);
int count = 0;
for (Result result : scanner) {
count++;
int count;
try (ResultScanner scanner = table.getScanner(s)) {
count = Iterables.size(scanner);
}
assertEquals("Count all the rows ", count, 6);
@ -375,11 +352,11 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
s = new Scan();
// Start a scan from row3
s.setCaching(1);
s.setStartRow(ROW1);
s.withStartRow(ROW1);
// set partial as true so that the scan can send partial columns also
s.setAllowPartialResults(true);
s.setMaxResultSize(1000);
scanner = table.getScanner(s);
try (ResultScanner scanner = table.getScanner(s)) {
Thread evictorThread = new Thread() {
@Override
public void run() {
@ -408,16 +385,13 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
Scan s1 = new Scan();
// This scan will start from ROW1 and it will populate the cache with a
// row that is lower than ROW3.
s1.setStartRow(ROW3);
s1.setStopRow(ROW5);
s1.withStartRow(ROW3);
s1.withStopRow(ROW5);
s1.setCaching(1);
ResultScanner scanner;
try {
scanner = table.getScanner(s1);
int count = 0;
for (Result result : scanner) {
count++;
}
int count = Iterables.size(scanner);
assertEquals("Count the rows", count, 2);
iterator = cache.iterator();
List<BlockCacheKey> newCacheList = new ArrayList<>();
@ -441,13 +415,14 @@ public class TestAvoidCellReferencesIntoShippedBlocks {
}
};
count = 0;
for (Result result : scanner) {
while (scanner.next() != null) {
count++;
if (count == 2) {
evictorThread.start();
latch.await();
}
}
}
assertEquals("Count should give all rows ", count, 10);
} finally {
table.close();

View File

@ -39,14 +39,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.ArrayUtils;
@ -57,7 +53,6 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -74,11 +69,6 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
@ -104,15 +94,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
@ -540,147 +525,6 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
/**
* This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
* fail with an IOException() on the first call.
*/
public static class ExceptionInReseekRegionObserver implements RegionCoprocessor, RegionObserver {
static AtomicLong reqCount = new AtomicLong(0);
static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE
static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once
static void reset() {
reqCount.set(0);
isDoNotRetry.set(false);
throwOnce.set(true);
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
class MyStoreScanner extends StoreScanner {
public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
@Override
protected List<KeyValueScanner> selectScannersFrom(HStore store,
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
reqCount.incrementAndGet();
if (!throwOnce.get()|| reqCount.get() == 1) {
if (isDoNotRetry.get()) {
throw new DoNotRetryIOException("Injected exception");
} else {
throw new IOException("Injected exception");
}
}
return super.reseek(key);
}
});
}
return newScanners;
}
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s,
final long readPt) throws IOException {
HStore hs = (HStore) store;
return new MyStoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt);
}
}
/**
* Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
* leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
* ClientScanner does not get an exception and also sees all the data.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testClientScannerIsResetWhenScanThrowsIOException()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY);
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
TEST_UTIL.getAdmin().createTable(htd);
ExceptionInReseekRegionObserver.reset();
ExceptionInReseekRegionObserver.throwOnce.set(true); // throw exceptions only once
try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
assertEquals(rowCount, actualRowCount);
}
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
}
/**
* Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
* is that the exception will bubble up to the client scanner instead of being retried.
*/
@Test (timeout = 180000)
public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY);
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
TEST_UTIL.getAdmin().createTable(htd);
ExceptionInReseekRegionObserver.reset();
ExceptionInReseekRegionObserver.isDoNotRetry.set(true);
try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
fail("Should have thrown an exception");
} catch (DoNotRetryIOException expected) {
// expected
}
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
}
/**
* Tests the case where a coprocessor throws a regular IOException in the scan. The expectation
* is that the we will keep on retrying, but fail after the retries are exhausted instead of
* retrying indefinitely.
*/
@Test (timeout = 180000)
public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
final TableName tableName = TableName.valueOf(name.getMethodName());
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY);
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
TEST_UTIL.getAdmin().createTable(htd);
ExceptionInReseekRegionObserver.reset();
ExceptionInReseekRegionObserver.throwOnce.set(false); // throw exceptions in every retry
try (Table t = TEST_UTIL.getConnection().getTable(tableName)) {
TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
fail("Should have thrown an exception");
} catch (DoNotRetryIOException expected) {
assertTrue(expected instanceof ScannerResetException);
// expected
}
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() >= 3);
}
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.

View File

@ -0,0 +1,238 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
@Category({ MediumTests.class, ClientTests.class })
public class TestFromClientSideScanExcpetion {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static int SLAVES = 3;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000);
conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class);
conf.setBoolean("hbase.client.log.scanner.activity", true);
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
private static AtomicBoolean ON = new AtomicBoolean(false);
private static AtomicLong REQ_COUNT = new AtomicLong(0);
private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw
// DNRIOE
private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once
private static void reset() {
ON.set(false);
REQ_COUNT.set(0);
IS_DO_NOT_RETRY.set(false);
THROW_ONCE.set(true);
}
private static void inject() {
ON.set(true);
}
public static final class MyHRegion extends HRegion {
@SuppressWarnings("deprecation")
public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam,
RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) {
super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices);
}
@Override
protected HStore instantiateHStore(ColumnFamilyDescriptor family) throws IOException {
return new MyHStore(this, family, conf);
}
}
public static final class MyHStore extends HStore {
public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam)
throws IOException {
super(region, family, confParam);
}
@Override
protected KeyValueScanner createScanner(Scan scan, NavigableSet<byte[]> targetCols, long readPt)
throws IOException {
return scan.isReversed()
? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
: new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
}
}
public static final class MyStoreScanner extends StoreScanner {
public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
@Override
protected List<KeyValueScanner> selectScannersFrom(HStore store,
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(store, allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
if (ON.get()) {
REQ_COUNT.incrementAndGet();
if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) {
if (IS_DO_NOT_RETRY.get()) {
throw new DoNotRetryIOException("Injected exception");
} else {
throw new IOException("Injected exception");
}
}
}
return super.reseek(key);
}
});
}
return newScanners;
}
}
/**
* Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving
* the server side RegionScanner to be in dirty state. The client has to ensure that the
* ClientScanner does not get an exception and also sees all the data.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testClientScannerIsResetWhenScanThrowsIOException()
throws IOException, InterruptedException {
reset();
THROW_ONCE.set(true); // throw exceptions only once
TableName tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
inject();
int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
assertEquals(rowCount, actualRowCount);
}
assertTrue(REQ_COUNT.get() > 0);
}
/**
* Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
* is that the exception will bubble up to the client scanner instead of being retried.
*/
@Test
public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
throws IOException, InterruptedException {
reset();
IS_DO_NOT_RETRY.set(true);
TableName tableName = TableName.valueOf(name.getMethodName());
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
inject();
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
fail("Should have thrown an exception");
} catch (DoNotRetryIOException expected) {
// expected
}
assertTrue(REQ_COUNT.get() > 0);
}
/**
* Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is
* that the we will keep on retrying, but fail after the retries are exhausted instead of retrying
* indefinitely.
*/
@Test
public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
TableName tableName = TableName.valueOf(name.getMethodName());
reset();
THROW_ONCE.set(false); // throw exceptions in every retry
try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) {
TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(tableName);
inject();
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
fail("Should have thrown an exception");
} catch (DoNotRetryIOException expected) {
assertThat(expected, instanceOf(ScannerResetException.class));
// expected
}
assertTrue(REQ_COUNT.get() >= 3);
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Test all client operations with a coprocessor that just implements the default flush/compact/scan
* policy.
*/
@Category({ MediumTests.class, ClientTests.class })
public class TestFromClientSideScanExcpetionWithCoprocessor
extends TestFromClientSideScanExcpetion {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName());
TestFromClientSideScanExcpetion.setUpBeforeClass();
}
}

View File

@ -82,11 +82,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPreClose = new AtomicInteger(0);
final AtomicInteger ctPostClose = new AtomicInteger(0);
final AtomicInteger ctPreFlush = new AtomicInteger(0);
final AtomicInteger ctPreFlushScannerOpen = new AtomicInteger(0);
final AtomicInteger ctPostFlush = new AtomicInteger(0);
final AtomicInteger ctPreCompactSelect = new AtomicInteger(0);
final AtomicInteger ctPostCompactSelect = new AtomicInteger(0);
final AtomicInteger ctPreCompactScanner = new AtomicInteger(0);
final AtomicInteger ctPreCompact = new AtomicInteger(0);
final AtomicInteger ctPostCompact = new AtomicInteger(0);
final AtomicInteger ctPreGet = new AtomicInteger(0);
@ -114,7 +112,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
final AtomicInteger ctPreScannerClose = new AtomicInteger(0);
final AtomicInteger ctPostScannerClose = new AtomicInteger(0);
final AtomicInteger ctPreScannerOpen = new AtomicInteger(0);
final AtomicInteger ctPreStoreScannerOpen = new AtomicInteger(0);
final AtomicInteger ctPostScannerOpen = new AtomicInteger(0);
final AtomicInteger ctPreBulkLoadHFile = new AtomicInteger(0);
final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0);
@ -180,14 +177,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return scanner;
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s, long readPoint)
throws IOException {
ctPreFlushScannerOpen.incrementAndGet();
return s;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) throws IOException {
@ -222,15 +211,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return scanner;
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
ctPreCompactScanner.incrementAndGet();
return s;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, CompactionLifeCycleTracker tracker,
@ -250,14 +230,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return null;
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt)
throws IOException {
ctPreStoreScannerOpen.incrementAndGet();
return s;
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)
@ -830,10 +802,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPreFlush.get();
}
public int getCtPreFlushScannerOpen() {
return ctPreFlushScannerOpen.get();
}
public int getCtPostFlush() {
return ctPostFlush.get();
}
@ -846,10 +814,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
return ctPostCompactSelect.get();
}
public int getCtPreCompactScanner() {
return ctPreCompactScanner.get();
}
public int getCtPreCompact() {
return ctPreCompact.get();
}

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@ -123,12 +124,15 @@ public class TestRegionObserverScannerOpenHook {
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt)
throws IOException {
scan.setFilter(new NoDataFilter());
HStore hs = (HStore) store;
return new StoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt);
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
c.bypass();
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
RegionScanner s) throws IOException {
return c.getEnvironment().getRegion().getScanner(scan.setFilter(new NoDataFilter()));
}
}
@ -152,10 +156,8 @@ public class TestRegionObserverScannerOpenHook {
}
@Override
public InternalScanner preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s, long readPoint)
throws IOException {
scanners.forEach(KeyValueScanner::close);
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
return NO_DATA;
}
}
@ -171,12 +173,9 @@ public class TestRegionObserverScannerOpenHook {
}
@Override
public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker,
CompactionRequest request, long readPoint)
throws IOException {
scanners.forEach(KeyValueScanner::close);
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return NO_DATA;
}
}

View File

@ -0,0 +1,45 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class DelegatingInternalScanner implements InternalScanner {
protected final InternalScanner scanner;
public DelegatingInternalScanner(InternalScanner scanner) {
this.scanner = scanner;
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return scanner.next(result, scannerContext);
}
@Override
public void close() throws IOException {
scanner.close();
}
}

View File

@ -19,12 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalInt;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@ -35,11 +31,10 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
/**
* RegionObserver that just reimplements the default behavior,
* in order to validate that all the necessary APIs for this are public
* This observer is also used in {@link TestFromClientSideWithCoprocessor} and
* {@link TestCompactionWithCoprocessor} to make sure that a wide range
* of functionality still behaves as expected.
* RegionObserver that just reimplements the default behavior, in order to validate that all the
* necessary APIs for this are public This observer is also used in
* {@link TestFromClientSideWithCoprocessor} and {@link TestCompactionWithCoprocessor} to make sure
* that a wide range of functionality still behaves as expected.
*/
public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver {
@ -48,49 +43,22 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
return Optional.of(this);
}
/**
* Reimplement the default behavior
*/
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<KeyValueScanner> scanners, InternalScanner s, long readPoint)
throws IOException {
HStore hs = (HStore) store;
ScanInfo oldSI = hs.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),
oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
/**
* Reimplement the default behavior
*/
@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store;
// this demonstrates how to override the scanners default behavior
ScanInfo oldSI = hs.getScanInfo();
ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(),
oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners, scanType,
store.getSmallestReadPoint(), earliestPutTs);
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
return new DelegatingInternalScanner(scanner);
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPoint)
throws IOException {
HStore hs = (HStore) store;
Region r = c.getEnvironment().getRegion();
return scan.isReversed()
? new ReversedStoreScanner(hs, hs.getScanInfo(), scan, targetCols,
r.getReadPoint(scan.getIsolationLevel()))
: new StoreScanner(hs, hs.getScanInfo(), scan, targetCols,
r.getReadPoint(scan.getIsolationLevel()));
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return new DelegatingInternalScanner(scanner);
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
RegionScanner s) throws IOException {
return c.getEnvironment().getRegion().getScanner(scan);
}
}

View File

@ -17,31 +17,25 @@
*/
package org.apache.hadoop.hbase.util;
// this is deliberately not in the o.a.h.h.regionserver package
// in order to make sure all required classes/method are available
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@ -53,11 +47,12 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
@ -84,8 +79,7 @@ public class TestCoprocessorScanPolicy {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
ScanObserver.class.getName());
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName());
TEST_UTIL.startMiniCluster();
}
@ -106,49 +100,58 @@ public class TestCoprocessorScanPolicy {
@Test
public void testBaseCases() throws Exception {
TableName tableName =
TableName.valueOf("baseCases");
TableName tableName = TableName.valueOf("baseCases");
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.deleteTable(tableName);
}
Table t = TEST_UTIL.createTable(tableName, F, 1);
// set the version override to 2
Put p = new Put(R);
p.setAttribute("versions", new byte[]{});
p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
t.put(p);
Table t = TEST_UTIL.createTable(tableName, F, 10);
// insert 3 versions
long now = EnvironmentEdgeManager.currentTime();
// insert 2 versions
p = new Put(R);
Put p = new Put(R);
p.addColumn(F, Q, now, Q);
t.put(p);
p = new Put(R);
p.addColumn(F, Q, now + 1, Q);
t.put(p);
p = new Put(R);
p.addColumn(F, Q, now + 2, Q);
t.put(p);
Get g = new Get(R);
g.setMaxVersions(10);
g.readVersions(10);
Result r = t.get(g);
assertEquals(3, r.size());
TEST_UTIL.flush(tableName);
TEST_UTIL.compact(tableName, true);
// still visible after a flush/compaction
r = t.get(g);
assertEquals(3, r.size());
// set the version override to 2
p = new Put(R);
p.setAttribute("versions", new byte[] {});
p.addColumn(F, tableName.getName(), Bytes.toBytes(2));
t.put(p);
// only 2 versions now
r = t.get(g);
assertEquals(2, r.size());
TEST_UTIL.flush(tableName);
TEST_UTIL.compact(tableName, true);
// both version are still visible even after a flush/compaction
g = new Get(R);
g.setMaxVersions(10);
// still 2 versions after a flush/compaction
r = t.get(g);
assertEquals(2, r.size());
// insert a 3rd version
p = new Put(R);
p.addColumn(F, Q, now + 2, Q);
// insert a new version
p.addColumn(F, Q, now + 3, Q);
t.put(p);
g = new Get(R);
g.setMaxVersions(10);
// still 2 versions
r = t.get(g);
// still only two version visible
assertEquals(2, r.size());
t.close();
@ -156,41 +159,33 @@ public class TestCoprocessorScanPolicy {
@Test
public void testTTL() throws Exception {
TableName tableName =
TableName.valueOf("testTTL");
TableName tableName = TableName.valueOf("testTTL");
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
TEST_UTIL.deleteTable(tableName);
}
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(F)
.setMaxVersions(10)
.setTimeToLive(1);
desc.addFamily(hcd);
TEST_UTIL.getAdmin().createTable(desc);
Table t = TEST_UTIL.getConnection().getTable(tableName);
Table t = TEST_UTIL.createTable(tableName, F, 10);
long now = EnvironmentEdgeManager.currentTime();
ManualEnvironmentEdge me = new ManualEnvironmentEdge();
me.setValue(now);
EnvironmentEdgeManagerTestHelper.injectEdge(me);
// 2s in the past
long ts = now - 2000;
// Set the TTL override to 3s
Put p = new Put(R);
p.setAttribute("ttl", new byte[]{});
p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
t.put(p);
p = new Put(R);
Put p = new Put(R);
p.addColumn(F, Q, ts, Q);
t.put(p);
p = new Put(R);
p.addColumn(F, Q, ts + 1, Q);
t.put(p);
// these two should be expired but for the override
// (their ts was 2s in the past)
// Set the TTL override to 3s
p = new Put(R);
p.setAttribute("ttl", new byte[] {});
p.addColumn(F, tableName.getName(), Bytes.toBytes(3000L));
t.put(p);
// these two should still be there
Get g = new Get(R);
g.setMaxVersions(10);
g.readAllVersions();
Result r = t.get(g);
// still there?
assertEquals(2, r.size());
@ -199,7 +194,7 @@ public class TestCoprocessorScanPolicy {
TEST_UTIL.compact(tableName, true);
g = new Get(R);
g.setMaxVersions(10);
g.readAllVersions();
r = t.get(g);
// still there?
assertEquals(2, r.size());
@ -208,7 +203,7 @@ public class TestCoprocessorScanPolicy {
me.setValue(now + 2000);
// now verify that data eventually does expire
g = new Get(R);
g.setMaxVersions(10);
g.readAllVersions();
r = t.get(g);
// should be gone now
assertEquals(0, r.size());
@ -217,8 +212,8 @@ public class TestCoprocessorScanPolicy {
}
public static class ScanObserver implements RegionCoprocessor, RegionObserver {
private Map<TableName, Long> ttls = new HashMap<>();
private Map<TableName, Integer> versions = new HashMap<>();
private final ConcurrentMap<TableName, Long> ttls = new ConcurrentHashMap<>();
private final ConcurrentMap<TableName, Integer> versions = new ConcurrentHashMap<>();
@Override
public Optional<RegionObserver> getRegionObserver() {
@ -231,85 +226,130 @@ public class TestCoprocessorScanPolicy {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
if (put.getAttribute("ttl") != null) {
Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
ttls.put(TableName.valueOf(
Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())),
Bytes.toLong(CellUtil.cloneValue(kv)));
Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
ttls.put(
TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength())),
Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
c.bypass();
} else if (put.getAttribute("versions") != null) {
Cell cell = put.getFamilyCellMap().values().iterator().next().get(0);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
versions.put(TableName.valueOf(
Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength())),
Bytes.toInt(CellUtil.cloneValue(kv)));
Cell cell = put.getFamilyCellMap().values().stream().findFirst().get().get(0);
versions.put(
TableName.valueOf(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength())),
Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
c.bypass();
}
}
@Override
public InternalScanner preFlushScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<KeyValueScanner> scanners, InternalScanner s, long readPoint) throws IOException {
HStore hs = (HStore) store;
Long newTtl = ttls.get(store.getTableName());
if (newTtl != null) {
System.out.println("PreFlush:" + newTtl);
}
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = hs.getScanInfo();
ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(), family.isNewVersionBehavior());
return new StoreScanner(hs, scanInfo,
newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()),
scanners, ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
private InternalScanner wrap(Store store, InternalScanner scanner) {
Long ttl = this.ttls.get(store.getTableName());
Integer version = this.versions.get(store.getTableName());
return new DelegatingInternalScanner(scanner) {
private byte[] row;
private byte[] qualifier;
private int count;
private Predicate<Cell> checkTtl(long now, long ttl) {
return c -> now - c.getTimestamp() > ttl;
}
@Override
public InternalScanner preCompactScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store,
List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s,CompactionLifeCycleTracker tracker, CompactionRequest request,
long readPoint) throws IOException {
HStore hs = (HStore) store;
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = hs.getScanInfo();
ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(),
family.isNewVersionBehavior());
return new StoreScanner(hs, scanInfo,
newVersions == null ? OptionalInt.empty() : OptionalInt.of(newVersions.intValue()),
scanners, scanType, store.getSmallestReadPoint(), earliestPutTs);
}
@Override
public KeyValueScanner preStoreScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
final NavigableSet<byte[]> targetCols, KeyValueScanner s, long readPt) throws IOException {
TableName tn = store.getTableName();
if (!tn.isSystemTable()) {
HStore hs = (HStore) store;
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = hs.getScanInfo();
ColumnFamilyDescriptor family = store.getColumnFamilyDescriptor();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator(),
family.isNewVersionBehavior());
return new StoreScanner(hs, scanInfo, scan, targetCols, readPt);
private Predicate<Cell> checkVersion(Cell firstCell, int version) {
if (version == 0) {
return c -> true;
} else {
return s;
}
if (row == null || !CellUtil.matchingRow(firstCell, row)) {
row = CellUtil.cloneRow(firstCell);
// reset qualifier as there is a row change
qualifier = null;
}
return c -> {
if (qualifier != null && CellUtil.matchingQualifier(c, qualifier)) {
if (count >= version) {
return true;
}
count++;
return false;
} else { // qualifier switch
qualifier = CellUtil.cloneQualifier(c);
count = 1;
return false;
}
};
}
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean moreRows = scanner.next(result, scannerContext);
if (result.isEmpty()) {
return moreRows;
}
long now = EnvironmentEdgeManager.currentTime();
Predicate<Cell> predicate = null;
if (ttl != null) {
predicate = checkTtl(now, ttl);
}
if (version != null) {
Predicate<Cell> vp = checkVersion(result.get(0), version);
if (predicate != null) {
predicate = predicate.and(vp);
} else {
predicate = vp;
}
}
if (predicate != null) {
result.removeIf(predicate);
}
return moreRows;
}
};
}
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
return wrap(store, scanner);
}
@Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return wrap(store, scanner);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
TableName tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName();
Long ttl = this.ttls.get(tableName);
if (ttl != null) {
get.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, get.getTimeRange().getMax());
}
Integer version = this.versions.get(tableName);
if (version != null) {
get.readVersions(version);
}
}
@Override
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan,
RegionScanner s) throws IOException {
Region region = c.getEnvironment().getRegion();
TableName tableName = region.getTableDescriptor().getTableName();
Long ttl = this.ttls.get(tableName);
if (ttl != null) {
scan.setTimeRange(EnvironmentEdgeManager.currentTime() - ttl, scan.getTimeRange().getMax());
}
Integer version = this.versions.get(tableName);
if (version != null) {
scan.readVersions(version);
}
return region.getScanner(scan);
}
}
}