HBASE-19033 Allow CP users to change versions and TTL before opening StoreScanner

This commit is contained in:
zhangduo 2017-10-29 21:18:55 +08:00
parent 0b7d8ffc21
commit e0a530e714
25 changed files with 828 additions and 186 deletions

View File

@ -0,0 +1,252 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.TreeMap;
import java.util.stream.IntStream;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.shaded.com.google.common.math.IntMath;
* An example for implementing a counter that reads is much less than writes, i.e, write heavy.
* <p>
* We will convert increment to put, and do aggregating when get. And of course the return value of
* increment is useless then.
* <p>
* Notice that this is only an example so we do not handle most corner cases, for example, you must
* provide a qualifier when doing a get.
public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
private InternalScanner wrap(byte[] family, InternalScanner scanner) {
return new InternalScanner() {
private List<Cell> srcResult = new ArrayList<>();
private byte[] row;
private byte[] qualifier;
private long timestamp;
private long sum;
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean moreRows = scanner.next(srcResult, scannerContext);
if (srcResult.isEmpty()) {
if (!moreRows && row != null) {
result.add(createCell(row, family, qualifier, timestamp, sum));
return moreRows;
Cell firstCell = srcResult.get(0);
// Check if there is a row change first. All the cells will come from the same row so just
// check the first one once is enough.
if (row == null) {
row = CellUtil.cloneRow(firstCell);
qualifier = CellUtil.cloneQualifier(firstCell);
} else if (!CellUtil.matchingRows(firstCell, row)) {
result.add(createCell(row, family, qualifier, timestamp, sum));
row = CellUtil.cloneRow(firstCell);
qualifier = CellUtil.cloneQualifier(firstCell);
sum = 0;
srcResult.forEach(c -> {
if (CellUtil.matchingQualifier(c, qualifier)) {
sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
} else {
result.add(createCell(row, family, qualifier, timestamp, sum));
qualifier = CellUtil.cloneQualifier(c);
sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
timestamp = c.getTimestamp();
if (!moreRows) {
result.add(createCell(row, family, qualifier, timestamp, sum));
return moreRows;
public void close() throws IOException {
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
throws IOException {
Scan scan =
new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
new TreeMap<>(Bytes.BYTES_COMPARATOR);
get.getFamilyMap().forEach((cf, cqs) -> {
NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
sums.put(cf, ss);
cqs.forEach(cq -> {
ss.put(cq, new MutableLong(0));
scan.addColumn(cf, cq);
List<Cell> cells = new ArrayList<>();
try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
boolean moreRows;
do {
moreRows = scanner.next(cells);
for (Cell cell : cells) {
byte[] family = CellUtil.cloneFamily(cell);
byte[] qualifier = CellUtil.cloneQualifier(cell);
long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
} while (moreRows);
sums.forEach((cf, m) -> m.forEach((cq, s) -> result
.add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
private final int mask;
private final MutableLong[] lastTimestamps;
int stripes =
1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
lastTimestamps =
IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
mask = stripes - 1;
// We need make sure the different put uses different timestamp otherwise we may lost some
// increments. This is a known issue for HBase.
private long getUniqueTimestamp(byte[] row) {
int slot = Bytes.hashCode(row) & mask;
MutableLong lastTimestamp = lastTimestamps[slot];
long now = System.currentTimeMillis();
synchronized (lastTimestamp) {
long pt = lastTimestamp.longValue() >> 10;
if (now > pt) {
lastTimestamp.setValue(now << 10);
} else {
return lastTimestamp.longValue();
public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
throws IOException {
byte[] row = increment.getRow();
Put put = new Put(row);
long ts = getUniqueTimestamp(row);
for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
for (Cell cell : entry.getValue()) {
.setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
.setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
.setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
return Result.EMPTY_RESULT;
public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
ScanOptions options) throws IOException {

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
@ -28,20 +27,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryForever;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* This is an example showing how a RegionObserver could configured via ZooKeeper in order to
@ -170,33 +168,24 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
return OptionalLong.of(Bytes.toLong(bytes));
private InternalScanner wrap(InternalScanner scanner) {
OptionalLong optExpireBefore = getExpireBefore();
if (!optExpireBefore.isPresent()) {
return scanner;
private void resetTTL(ScanOptions options) {
OptionalLong expireBefore = getExpireBefore();
if (!expireBefore.isPresent()) {
long expireBefore = optExpireBefore.getAsLong();
return new DelegatingInternalScanner(scanner) {
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean moreRows = scanner.next(result, scannerContext);
result.removeIf(c -> c.getTimestamp() < expireBefore);
return moreRows;
options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(scanner);
public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
return wrap(scanner);

View File

@ -0,0 +1,153 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestWriteHeavyIncrementObserver {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("TestCP");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] CQ1 = Bytes.toBytes("cq1");
private static byte[] CQ2 = Bytes.toBytes("cq2");
private static Table TABLE;
private static long UPPER = 1000;
private static int THREADS = 10;
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
TABLE = UTIL.getConnection().getTable(NAME);
public static void tearDown() throws Exception {
if (TABLE != null) {
private static void increment() throws IOException {
for (long i = 1; i <= UPPER; i++) {
TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
} catch (InterruptedException e) {
private void assertSum() throws IOException {
Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
public void test() throws Exception {
Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
try {
} catch (IOException e) {
throw new UncheckedIOException(e);
}, "increment-" + i)).toArray(Thread[]::new);
for (Thread thread : threads) {
for (Thread thread : threads) {
// we do not hack scan operation so using scan we could get the original values added into the
// table.
try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
.withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
Result r = scanner.next();
assertTrue(r.rawCells().length > 2);
HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
Waiter.waitFor(UTIL.getConfiguration(), 30000, new Waiter.ExplainingPredicate<Exception>() {
public boolean evaluate() throws Exception {
return store.getStorefilesCount() == 1;
public String explainFailure() throws Exception {
return "Major compaction hangs, there are still " + store.getStorefilesCount() +
" store files";
// Should only have two cells after flush and major compaction
try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
.withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
Result r = scanner.next();
assertEquals(2, r.rawCells().length);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@ -129,10 +130,20 @@ public interface RegionObserver {
default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {}
* Called before we open store scanner for flush. You can use the {@code options} to change max
* versions and TTL for the scanner being opened.
* @param c the environment provided by the region server
* @param store the store where flush is being requested
* @param options used to change max versions and TTL for the scanner being opened
default void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {}
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param store the store where flush is being requested
* @param scanner the scanner over existing data used in the store file
* @param tracker tracker used to track the life cycle of a flush
* @return the scanner to use during compaction. Should not be {@code null}
@ -188,6 +199,20 @@ public interface RegionObserver {
List<? extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {}
* Called before we open store scanner for compaction. You can use the {@code options} to change max
* versions and TTL for the scanner being opened.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanType type of Scan
* @param options used to change max versions and TTL for the scanner being opened
* @param tracker tracker used to track the life cycle of a compaction
* @param request the requested compaction
default void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {}
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile}.
@ -857,6 +882,27 @@ public interface RegionObserver {
default void postScannerClose(ObserverContext<RegionCoprocessorEnvironment> ctx,
InternalScanner s) throws IOException {}
* Called before a store opens a new scanner.
* <p>
* This hook is called when a "user" scanner is opened. Use {@code preFlushScannerOpen} and
* {@code preCompactScannerOpen} to inject flush/compaction.
* <p>
* Notice that, this method is used to change the inherent max versions and TTL for a Store. For
* example, you can change the max versions option for a {@link Scan} object to 10 in
* {@code preScannerOpen}, but if the max versions config on the Store is 1, then you still can
* only read 1 version. You need also to inject here to change the max versions to 10 if you want
* to get more versions.
* @param ctx the environment provided by the region server
* @param store the store which we want to get scanner from
* @param options used to change max versions and TTL for the scanner being opened
* @see #preFlushScannerOpen(ObserverContext, Store, ScanOptions, FlushLifeCycleTracker)
* @see #preCompactScannerOpen(ObserverContext, Store, ScanType, ScanOptions,
* CompactionLifeCycleTracker, CompactionRequest)
default void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
ScanOptions options) throws IOException {}
* Called before replaying WALs for this region.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no

View File

@ -22,7 +22,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
@ -72,10 +72,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
public InternalScanner createScanner(List<StoreFileScanner> scanners,
public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
smallestReadPoint, fd.earliestPutTs);
return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,

View File

@ -0,0 +1,69 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.regionserver;
import org.apache.yetus.audience.InterfaceAudience;
* Helper class for CP hooks to change max versions and TTL.
public class CustomizedScanInfoBuilder implements ScanOptions {
private final ScanInfo scanInfo;
private Integer maxVersions;
private Long ttl;
public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
this.scanInfo = scanInfo;
public int getMaxVersions() {
return maxVersions != null ? maxVersions.intValue() : scanInfo.getMaxVersions();
public void setMaxVersions(int maxVersions) {
this.maxVersions = maxVersions;
public long getTTL() {
return ttl != null ? ttl.longValue() : scanInfo.getTtl();
public void setTTL(long ttl) {
this.ttl = ttl;
public ScanInfo build() {
if (maxVersions == null && ttl == null) {
return scanInfo;
return scanInfo.customize(getMaxVersions(), getTTL());
public String toString() {
return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";

View File

@ -144,8 +144,8 @@ public class HMobStore extends HStore {
* the mob files should be performed after the seek in HBase is done.
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt) throws IOException {
protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> targetCols, long readPt) throws IOException {
if (MobUtils.isRefOnlyScan(scan)) {
Filter refOnlyFilter = new MobReferenceOnlyFilter();
Filter filter = scan.getFilter();
@ -155,9 +155,8 @@ public class HMobStore extends HStore {
return scan.isReversed()
? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
: new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt)
: new MobStoreScanner(this, scanInfo, scan, targetCols, readPt);

View File

@ -7857,6 +7857,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
/** @param coprocessorHost the new coprocessor host */
public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;

View File

@ -1909,7 +1909,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.forceMajor = true;
// File administration
@ -1922,21 +1921,27 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @return a scanner over the current key values
* @throws IOException on failure
public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols, long readPt) throws IOException {
public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
throws IOException {
try {
return createScanner(scan, targetCols, readPt);
ScanInfo scanInfo;
if (this.getCoprocessorHost() != null) {
scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
} else {
scanInfo = getScanInfo();
return createScanner(scan, scanInfo, targetCols, readPt);
} finally {
protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
long readPt) throws IOException {
return scan.isReversed() ? new ReversedStoreScanner(this,
getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
getScanInfo(), scan, targetCols, readPt);
// HMobStore will override this method to return its own implementation.
protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> targetCols, long readPt) throws IOException {
return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
: new StoreScanner(this, scanInfo, scan, targetCols, readPt);

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalInt;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@ -108,9 +107,11 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
throws IOException {
// Get all available versions
return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
// FIXME: This is the old comment 'Get all available versions'
// But actually if we really reset the ScanInfo to get all available versions then lots of UTs
// will fail
return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
/* Refill kev-value set (should be invoked only when KVS is empty)

View File

@ -1,5 +1,4 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -622,6 +621,21 @@ public class RegionCoprocessorHost
* Called prior to opening store scanner for compaction.
public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
public void call(RegionObserver observer) throws IOException {
observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
return builder.build();
* Called prior to rewriting the store files selected for compaction
* @param store the store being compacted
@ -665,6 +679,22 @@ public class RegionCoprocessorHost
* Invoked before create StoreScanner for flush.
* @throws IOException
public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
throws IOException {
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
public void call(RegionObserver observer) throws IOException {
observer.preFlushScannerOpen(this, store, builder, tracker);
return builder.build();
* Invoked before a memstore flush
* @throws IOException
@ -1263,6 +1293,20 @@ public class RegionCoprocessorHost
* Called before open store scanner for user scan.
public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
public void call(RegionObserver observer) throws IOException {
observer.preStoreScannerOpen(this, store, builder);
return builder.build();
* @param info the RegionInfo for this region
* @param edits the file of recovered edits

View File

@ -49,7 +49,6 @@ public class ScanInfo {
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
private final long preadMaxBytes;
private final Configuration conf;
private final boolean newVersionBehavior;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@ -64,10 +63,18 @@ public class ScanInfo {
* major compaction.
* @param comparator The store's comparator
public ScanInfo(final Configuration conf, final ColumnFamilyDescriptor family, final long ttl,
final long timeToPurgeDeletes, final CellComparator comparator) {
public ScanInfo(Configuration conf, ColumnFamilyDescriptor family, long ttl,
long timeToPurgeDeletes, CellComparator comparator) {
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, family.isNewVersionBehavior());
family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator,
private static long getCellsPerTimeoutCheck(Configuration conf) {
long perHeartbeat = conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
return perHeartbeat > 0 ? perHeartbeat
@ -82,10 +89,20 @@ public class ScanInfo {
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator,
final boolean newVersionBehavior) {
public ScanInfo(Configuration conf, byte[] family, int minVersions, int maxVersions, long ttl,
KeepDeletedCells keepDeletedCells, long blockSize, long timeToPurgeDeletes,
CellComparator comparator, boolean newVersionBehavior) {
this(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes, comparator,
conf.getBoolean("hbase.storescanner.use.pread", false), getCellsPerTimeoutCheck(conf),
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false),
conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize), newVersionBehavior);
private ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
KeepDeletedCells keepDeletedCells, long timeToPurgeDeletes, CellComparator comparator,
long tableMaxRowSize, boolean usePread, long cellsPerTimeoutCheck,
boolean parallelSeekEnabled, long preadMaxBytes, boolean newVersionBehavior) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
@ -93,25 +110,14 @@ public class ScanInfo {
this.keepDeletedCells = keepDeletedCells;
this.timeToPurgeDeletes = timeToPurgeDeletes;
this.comparator = comparator;
this.tableMaxRowSize =
this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false);
long perHeartbeat =
this.cellsPerTimeoutCheck = perHeartbeat > 0?
this.parallelSeekEnabled =
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
this.conf = conf;
this.tableMaxRowSize = tableMaxRowSize;
this.usePread = usePread;
this.cellsPerTimeoutCheck = cellsPerTimeoutCheck;
this.parallelSeekEnabled = parallelSeekEnabled;
this.preadMaxBytes = preadMaxBytes;
this.newVersionBehavior = newVersionBehavior;
public Configuration getConfiguration() {
return this.conf;
long getTableMaxRowSize() {
return this.tableMaxRowSize;
@ -163,4 +169,12 @@ public class ScanInfo {
public boolean isNewVersionBehavior() {
return newVersionBehavior;
* Used for CP users for customizing max versions and ttl.
ScanInfo customize(int maxVersions, long ttl) {
return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, ttl, comparator,
ttl, usePread, maxVersions, parallelSeekEnabled, ttl, newVersionBehavior);

View File

@ -0,0 +1,62 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
* This class gives you the ability to change the max versions and TTL options before opening a
* scanner for a Store. And also gives you some information for the scan.
* <p>
* Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
* way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
* the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
* {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
* <p>
* For user scans, we also provide this class as a parameter in the {@code preStoreScannerOpen}
* method in {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}. You can use it to change
* the inherent properties for a Store. For example, even if you use {@code Scan.readAllVersions},
* you still can not read two versions if the max versions property of the Store is one. You need to
* set the max versions to a value greater than two in {@code preStoreScannerOpen}.
* @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preFlushScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
* Store, ScanOptions, FlushLifeCycleTracker)
* @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preCompactScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
* Store, ScanType, ScanOptions,
* org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker,
* org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest)
* @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preStoreScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
* Store, ScanOptions)
public interface ScanOptions {
int getMaxVersions();
void setMaxVersions(int maxVersions);
default void readAllVersions() {
long getTTL();
void setTTL(long ttl);

View File

@ -80,9 +80,14 @@ abstract class StoreFlusher {
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
InternalScanner scanner =
new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
ScanInfo scanInfo;
if (store.getCoprocessorHost() != null) {
scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
} else {
scanInfo = store.getScanInfo();
InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
assert scanner != null;
if (store.getCoprocessorHost() != null) {
try {

View File

@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@ -67,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
// In unit tests, the store could be null
protected final Optional<HStore> store;
protected final HStore store;
private ScanQueryMatcher matcher;
protected KeyValueHeap heap;
private boolean cacheBlocks;
@ -160,7 +159,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private boolean topChanged = false;
/** An internal constructor. */
private StoreScanner(Optional<HStore> store, Scan scan, ScanInfo scanInfo,
private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
this.readPt = readPt;
this.store = store;
@ -199,15 +198,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
this.store.ifPresent(s -> {
if (s.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
if (store != null && store.getStorefilesCount() > 1) {
RegionServerServices rsService = store.getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
@ -225,7 +222,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt,
this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
scan.getCacheBlocks(), ScanType.USER_SCAN);
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
@ -275,11 +272,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions
public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null,
public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
@ -292,21 +287,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
private StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
scanInfo, 0, store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED),
false, scanType);
private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
assert scanType != ScanType.USER_SCAN;
matcher =
CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
@ -333,7 +325,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// For mob compaction only as we do not have a Store instance when doing mob compaction.
public StoreScanner(ScanInfo scanInfo, ScanType scanType,
List<? extends KeyValueScanner> scanners) throws IOException {
this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
assert scanType != ScanType.USER_SCAN;
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
oldestUnexpiredTS, now, null, null, null);
@ -345,7 +337,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
List<? extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L,
this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L,
scan.getCacheBlocks(), ScanType.USER_SCAN);
this.matcher =
UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
@ -357,7 +349,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
List<? extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
: SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
@ -478,7 +470,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.closing = true;
// For mob compaction, we do not have a store.
this.store.ifPresent(s -> s.deleteChangedReaderObserver(this));
if (this.store != null) {
if (withDelayedScannersClose) {
@ -550,8 +544,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Only do a sanity-check if store and comparator are available.
CellComparator comparator =
store.map(s -> s.getComparator()).orElse(null);
CellComparator comparator = store != null ? store.getComparator() : null;
int count = 0;
long totalBytesRead = 0;
@ -864,8 +857,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @return if top of heap has changed (and KeyValueHeap has to try the next KV)
protected final boolean reopenAfterFlush() throws IOException {
// here we can make sure that we have a Store instance.
HStore store = this.store.get();
// here we can make sure that we have a Store instance so no null check on store.
Cell lastTop = heap.peek();
// When we have the scan object, should we not pass it to getScanners() to get a limited set of
// scanners? We did so in the constructor and we could have done it now by storing the scan
@ -992,9 +984,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List<KeyValueScanner> fileScanners = null;
List<KeyValueScanner> newCurrentScanners;
KeyValueHeap newHeap;
// We must have a store instance here
HStore store = this.store.get();
try {
// We must have a store instance here so no null check
// recreate the scanners on the current file scanners
fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -26,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,10 +41,12 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
@ -230,7 +233,7 @@ public abstract class Compactor<T extends CellSink> {
ScanType getScanType(CompactionRequestImpl request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
@ -238,14 +241,13 @@ public abstract class Compactor<T extends CellSink> {
public ScanType getScanType(CompactionRequestImpl request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
@ -266,6 +268,31 @@ public abstract class Compactor<T extends CellSink> {
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return store.getScanInfo();
return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
request, user);
* Calls coprocessor, if any, to create scanners - after normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return scanner;
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
request, user);
protected final List<Path> compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
ThroughputController throughputController, User user) throws IOException {
@ -290,8 +317,9 @@ public abstract class Compactor<T extends CellSink> {
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = postCreateCoprocScanner(request, scanType,
scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user);
ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
scanner = postCompactScannerOpen(request, scanType,
scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<>();
@ -325,22 +353,6 @@ public abstract class Compactor<T extends CellSink> {
protected abstract void abortWriter(T writer) throws IOException;
* Calls coprocessor, if any, to create scanners - after normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
InternalScanner scanner, User user) throws IOException {
if (store.getCoprocessorHost() == null) {
return scanner;
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
request, user);
* Performs the compaction.
* @param fd FileDetails of cell sink writer
@ -475,10 +487,10 @@ public abstract class Compactor<T extends CellSink> {
* @param earliestPutTs Earliest put across all files.
* @return A compaction scanner.
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
smallestReadPoint, earliestPutTs);
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
@ -490,10 +502,10 @@ public abstract class Compactor<T extends CellSink> {
* @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
* @return A compaction scanner.
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
dropDeletesFromRow, dropDeletesToRow);

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@ -66,13 +67,13 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
return (majorRangeFromRow == null)
? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
: StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
majorRangeFromRow, majorRangeToRow);
? StripeCompactor.this.createScanner(store, scanInfo, scanners, scanType,
smallestReadPoint, fd.earliestPutTs)
: StripeCompactor.this.createScanner(store, scanInfo, scanners, smallestReadPoint,
fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);

View File

@ -124,11 +124,10 @@ public class TestFromClientSideScanExcpetion {
protected KeyValueScanner createScanner(Scan scan, NavigableSet<byte[]> targetCols, long readPt)
throws IOException {
return scan.isReversed()
? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
: new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> targetCols, long readPt) throws IOException {
return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
: new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);

View File

@ -211,11 +211,9 @@ public class TestCompaction {
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
for (HStore store: this.r.stores.values()) {
for (HStore store : this.r.stores.values()) {
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
old.getComparator(), old.isNewVersionBehavior());
ScanInfo si = old.customize(old.getMaxVersions(), ttl);

View File

@ -158,10 +158,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
public void testCompactionEmptyHFile() throws IOException {
// Set TTL
ScanInfo oldScanInfo = store.getScanInfo();
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator(), oldScanInfo.isNewVersionBehavior());
ScanInfo newScanInfo = oldScanInfo.customize(oldScanInfo.getMaxVersions(), 600);
// Do not compact empty store file
List<HStoreFile> candidates = sfCreate(0);

View File

@ -403,6 +403,8 @@ public class TestHRegion {
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),

View File

@ -292,9 +292,7 @@ public class TestMajorCompaction {
final int ttl = 1000;
for (HStore store : r.getStores()) {
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
old.getComparator(), old.isNewVersionBehavior());
ScanInfo si = old.customize(old.getMaxVersions(), ttl);

View File

@ -110,15 +110,16 @@ public class TestDateTieredCompactor {
return new DateTieredCompactor(conf, store) {
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
return scanner;

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
@ -787,15 +788,16 @@ public class TestStripeCompactionPolicy {
final Scanner scanner = new Scanner();
return new StripeCompactor(conf, store) {
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
return scanner;

View File

@ -207,15 +207,16 @@ public class TestStripeCompactor {
return new StripeCompactor(conf, store) {
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
long earliestPutTs) throws IOException {
return scanner;