diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
new file mode 100644
index 00000000000..e9b590deda0
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.math.IntMath;
+
+/**
+ * An example for implementing a counter that reads is much less than writes, i.e, write heavy.
+ *
+ * We will convert increment to put, and do aggregating when get. And of course the return value of
+ * increment is useless then.
+ *
+ * Notice that this is only an example so we do not handle most corner cases, for example, you must
+ * provide a qualifier when doing a get.
+ */
+public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
+
+ @Override
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void preFlushScannerOpen(ObserverContext c, Store store,
+ ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
+ options.readAllVersions();
+ }
+
+ private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
+ return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
+ .setType(CellBuilder.DataType.Put).setFamily(family).setQualifier(qualifier)
+ .setTimestamp(ts).setValue(Bytes.toBytes(value)).build();
+ }
+
+ private InternalScanner wrap(byte[] family, InternalScanner scanner) {
+ return new InternalScanner() {
+
+ private List srcResult = new ArrayList<>();
+
+ private byte[] row;
+
+ private byte[] qualifier;
+
+ private long timestamp;
+
+ private long sum;
+
+ @Override
+ public boolean next(List result, ScannerContext scannerContext) throws IOException {
+ boolean moreRows = scanner.next(srcResult, scannerContext);
+ if (srcResult.isEmpty()) {
+ if (!moreRows && row != null) {
+ result.add(createCell(row, family, qualifier, timestamp, sum));
+ }
+ return moreRows;
+ }
+ Cell firstCell = srcResult.get(0);
+ // Check if there is a row change first. All the cells will come from the same row so just
+ // check the first one once is enough.
+ if (row == null) {
+ row = CellUtil.cloneRow(firstCell);
+ qualifier = CellUtil.cloneQualifier(firstCell);
+ } else if (!CellUtil.matchingRows(firstCell, row)) {
+ result.add(createCell(row, family, qualifier, timestamp, sum));
+ row = CellUtil.cloneRow(firstCell);
+ qualifier = CellUtil.cloneQualifier(firstCell);
+ sum = 0;
+ }
+ srcResult.forEach(c -> {
+ if (CellUtil.matchingQualifier(c, qualifier)) {
+ sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
+ } else {
+ result.add(createCell(row, family, qualifier, timestamp, sum));
+ qualifier = CellUtil.cloneQualifier(c);
+ sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
+ }
+ timestamp = c.getTimestamp();
+ });
+ if (!moreRows) {
+ result.add(createCell(row, family, qualifier, timestamp, sum));
+ }
+ srcResult.clear();
+ return moreRows;
+ }
+
+ @Override
+ public void close() throws IOException {
+ scanner.close();
+ }
+ };
+ }
+
+ @Override
+ public InternalScanner preFlush(ObserverContext c, Store store,
+ InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
+ return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+ }
+
+ @Override
+ public void preCompactScannerOpen(ObserverContext c, Store store,
+ ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ options.readAllVersions();
+ }
+
+ @Override
+ public InternalScanner preCompact(ObserverContext c, Store store,
+ InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+ }
+
+ @Override
+ public void preGetOp(ObserverContext c, Get get, List result)
+ throws IOException {
+ Scan scan =
+ new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
+ NavigableMap> sums =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ get.getFamilyMap().forEach((cf, cqs) -> {
+ NavigableMap ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ sums.put(cf, ss);
+ cqs.forEach(cq -> {
+ ss.put(cq, new MutableLong(0));
+ scan.addColumn(cf, cq);
+ });
+ });
+ List cells = new ArrayList<>();
+ try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
+ boolean moreRows;
+ do {
+ moreRows = scanner.next(cells);
+ for (Cell cell : cells) {
+ byte[] family = CellUtil.cloneFamily(cell);
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
+ sums.get(family).get(qualifier).add(value);
+ }
+ cells.clear();
+ } while (moreRows);
+ }
+ sums.forEach((cf, m) -> m.forEach((cq, s) -> result
+ .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
+ c.bypass();
+ }
+
+ private final int mask;
+ private final MutableLong[] lastTimestamps;
+ {
+ int stripes =
+ 1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
+ lastTimestamps =
+ IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
+ mask = stripes - 1;
+ }
+
+ // We need make sure the different put uses different timestamp otherwise we may lost some
+ // increments. This is a known issue for HBase.
+ private long getUniqueTimestamp(byte[] row) {
+ int slot = Bytes.hashCode(row) & mask;
+ MutableLong lastTimestamp = lastTimestamps[slot];
+ long now = System.currentTimeMillis();
+ synchronized (lastTimestamp) {
+ long pt = lastTimestamp.longValue() >> 10;
+ if (now > pt) {
+ lastTimestamp.setValue(now << 10);
+ } else {
+ lastTimestamp.increment();
+ }
+ return lastTimestamp.longValue();
+ }
+ }
+
+ @Override
+ public Result preIncrement(ObserverContext c, Increment increment)
+ throws IOException {
+ byte[] row = increment.getRow();
+ Put put = new Put(row);
+ long ts = getUniqueTimestamp(row);
+ for (Map.Entry> entry : increment.getFamilyCellMap().entrySet()) {
+ for (Cell cell : entry.getValue()) {
+ put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
+ .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
+ .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength())
+ .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
+ .setType(CellBuilder.DataType.Put).setTimestamp(ts).build());
+ }
+ }
+ c.getEnvironment().getRegion().put(put);
+ c.bypass();
+ return Result.EMPTY_RESULT;
+ }
+
+ @Override
+ public void preStoreScannerOpen(ObserverContext ctx, Store store,
+ ScanOptions options) throws IOException {
+ options.readAllVersions();
+ }
+}
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index d6d66bbd5a0..449726f74e4 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.IOException;
-import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
@@ -28,20 +27,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.retry.RetryForever;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* This is an example showing how a RegionObserver could configured via ZooKeeper in order to
@@ -170,33 +168,24 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
return OptionalLong.of(Bytes.toLong(bytes));
}
- private InternalScanner wrap(InternalScanner scanner) {
- OptionalLong optExpireBefore = getExpireBefore();
- if (!optExpireBefore.isPresent()) {
- return scanner;
+ private void resetTTL(ScanOptions options) {
+ OptionalLong expireBefore = getExpireBefore();
+ if (!expireBefore.isPresent()) {
+ return;
}
- long expireBefore = optExpireBefore.getAsLong();
- return new DelegatingInternalScanner(scanner) {
-
- @Override
- public boolean next(List result, ScannerContext scannerContext) throws IOException {
- boolean moreRows = scanner.next(result, scannerContext);
- result.removeIf(c -> c.getTimestamp() < expireBefore);
- return moreRows;
- }
- };
+ options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
}
@Override
- public InternalScanner preFlush(ObserverContext c, Store store,
- InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
- return wrap(scanner);
+ public void preFlushScannerOpen(ObserverContext c, Store store,
+ ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
+ resetTTL(options);
}
@Override
- public InternalScanner preCompact(ObserverContext c, Store store,
- InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+ public void preCompactScannerOpen(ObserverContext c, Store store,
+ ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
CompactionRequest request) throws IOException {
- return wrap(scanner);
+ resetTTL(options);
}
}
\ No newline at end of file
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
new file mode 100644
index 00000000000..1881c8552f5
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestWriteHeavyIncrementObserver {
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName NAME = TableName.valueOf("TestCP");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[] ROW = Bytes.toBytes("row");
+
+ private static byte[] CQ1 = Bytes.toBytes("cq1");
+
+ private static byte[] CQ2 = Bytes.toBytes("cq2");
+
+ private static Table TABLE;
+
+ private static long UPPER = 1000;
+
+ private static int THREADS = 10;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
+ UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
+ UTIL.startMiniCluster(3);
+ UTIL.getAdmin()
+ .createTable(TableDescriptorBuilder.newBuilder(NAME)
+ .addCoprocessor(WriteHeavyIncrementObserver.class.getName())
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+ TABLE = UTIL.getConnection().getTable(NAME);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (TABLE != null) {
+ TABLE.close();
+ }
+ UTIL.shutdownMiniCluster();
+ }
+
+ private static void increment() throws IOException {
+ for (long i = 1; i <= UPPER; i++) {
+ TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
+ try {
+ Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ private void assertSum() throws IOException {
+ Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
+ assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
+ assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
+ }
+
+ @Test
+ public void test() throws Exception {
+ Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
+ try {
+ increment();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }, "increment-" + i)).toArray(Thread[]::new);
+ for (Thread thread : threads) {
+ thread.start();
+ }
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ assertSum();
+ // we do not hack scan operation so using scan we could get the original values added into the
+ // table.
+ try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
+ .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
+ Result r = scanner.next();
+ assertTrue(r.rawCells().length > 2);
+ }
+ UTIL.getAdmin().flush(NAME);
+ UTIL.getAdmin().majorCompact(NAME);
+ HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
+ Waiter.waitFor(UTIL.getConfiguration(), 30000, new Waiter.ExplainingPredicate() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return store.getStorefilesCount() == 1;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "Major compaction hangs, there are still " + store.getStorefilesCount() +
+ " store files";
+ }
+ });
+ assertSum();
+ // Should only have two cells after flush and major compaction
+ try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
+ .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
+ Result r = scanner.next();
+ assertEquals(2, r.rawCells().length);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index cfc8e927c4f..1fdd2f31db5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Region.Operation;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -129,10 +130,20 @@ public interface RegionObserver {
default void preFlush(final ObserverContext c,
FlushLifeCycleTracker tracker) throws IOException {}
+ /**
+ * Called before we open store scanner for flush. You can use the {@code options} to change max
+ * versions and TTL for the scanner being opened.
+ * @param c the environment provided by the region server
+ * @param store the store where flush is being requested
+ * @param options used to change max versions and TTL for the scanner being opened
+ */
+ default void preFlushScannerOpen(ObserverContext c, Store store,
+ ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {}
+
/**
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
- * @param store the store where compaction is being requested
+ * @param store the store where flush is being requested
* @param scanner the scanner over existing data used in the store file
* @param tracker tracker used to track the life cycle of a flush
* @return the scanner to use during compaction. Should not be {@code null}
@@ -188,6 +199,20 @@ public interface RegionObserver {
List extends StoreFile> selected, CompactionLifeCycleTracker tracker,
CompactionRequest request) {}
+ /**
+ * Called before we open store scanner for compaction. You can use the {@code options} to change max
+ * versions and TTL for the scanner being opened.
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param scanType type of Scan
+ * @param options used to change max versions and TTL for the scanner being opened
+ * @param tracker tracker used to track the life cycle of a compaction
+ * @param request the requested compaction
+ */
+ default void preCompactScannerOpen(ObserverContext c, Store store,
+ ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {}
+
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into a new
* {@code StoreFile}.
@@ -857,6 +882,27 @@ public interface RegionObserver {
default void postScannerClose(ObserverContext ctx,
InternalScanner s) throws IOException {}
+ /**
+ * Called before a store opens a new scanner.
+ *
+ * This hook is called when a "user" scanner is opened. Use {@code preFlushScannerOpen} and
+ * {@code preCompactScannerOpen} to inject flush/compaction.
+ *
+ * Notice that, this method is used to change the inherent max versions and TTL for a Store. For
+ * example, you can change the max versions option for a {@link Scan} object to 10 in
+ * {@code preScannerOpen}, but if the max versions config on the Store is 1, then you still can
+ * only read 1 version. You need also to inject here to change the max versions to 10 if you want
+ * to get more versions.
+ * @param ctx the environment provided by the region server
+ * @param store the store which we want to get scanner from
+ * @param options used to change max versions and TTL for the scanner being opened
+ * @see #preFlushScannerOpen(ObserverContext, Store, ScanOptions, FlushLifeCycleTracker)
+ * @see #preCompactScannerOpen(ObserverContext, Store, ScanType, ScanOptions,
+ * CompactionLifeCycleTracker, CompactionRequest)
+ */
+ default void preStoreScannerOpen(ObserverContext ctx, Store store,
+ ScanOptions options) throws IOException {}
+
/**
* Called before replaying WALs for this region.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 32552da8502..447629b5508 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,7 +22,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
-import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
@@ -72,10 +72,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
}
@Override
- public InternalScanner createScanner(List scanners,
+ public InternalScanner createScanner(ScanInfo scanInfo, List scanners,
ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
- return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
- smallestReadPoint, fd.earliestPutTs);
+ return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
+ fd.earliestPutTs);
}
};
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
new file mode 100644
index 00000000000..c3d5e57cbfb
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class for CP hooks to change max versions and TTL.
+ */
+@InterfaceAudience.Private
+public class CustomizedScanInfoBuilder implements ScanOptions {
+
+ private final ScanInfo scanInfo;
+
+ private Integer maxVersions;
+
+ private Long ttl;
+
+ public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
+ this.scanInfo = scanInfo;
+ }
+
+ @Override
+ public int getMaxVersions() {
+ return maxVersions != null ? maxVersions.intValue() : scanInfo.getMaxVersions();
+ }
+
+ @Override
+ public void setMaxVersions(int maxVersions) {
+ this.maxVersions = maxVersions;
+ }
+
+ @Override
+ public long getTTL() {
+ return ttl != null ? ttl.longValue() : scanInfo.getTtl();
+ }
+
+ @Override
+ public void setTTL(long ttl) {
+ this.ttl = ttl;
+ }
+
+ public ScanInfo build() {
+ if (maxVersions == null && ttl == null) {
+ return scanInfo;
+ }
+ return scanInfo.customize(getMaxVersions(), getTTL());
+ }
+
+ @Override
+ public String toString() {
+ return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 206c3cdc1d4..5cb1e455a52 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -144,8 +144,8 @@ public class HMobStore extends HStore {
* the mob files should be performed after the seek in HBase is done.
*/
@Override
- protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols,
- long readPt) throws IOException {
+ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+ NavigableSet targetCols, long readPt) throws IOException {
if (MobUtils.isRefOnlyScan(scan)) {
Filter refOnlyFilter = new MobReferenceOnlyFilter();
Filter filter = scan.getFilter();
@@ -155,9 +155,8 @@ public class HMobStore extends HStore {
scan.setFilter(refOnlyFilter);
}
}
- return scan.isReversed()
- ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
- : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+ return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt)
+ : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6f71dc95cf3..648a415d441 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7853,6 +7853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/** @param coprocessorHost the new coprocessor host */
+ @VisibleForTesting
public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
this.coprocessorHost = coprocessorHost;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c0cea4e1526..7b8ca79ab5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1909,7 +1909,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
this.forceMajor = true;
}
-
//////////////////////////////////////////////////////////////////////////////
// File administration
//////////////////////////////////////////////////////////////////////////////
@@ -1922,21 +1921,27 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @return a scanner over the current key values
* @throws IOException on failure
*/
- public KeyValueScanner getScanner(Scan scan,
- final NavigableSet targetCols, long readPt) throws IOException {
+ public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt)
+ throws IOException {
lock.readLock().lock();
try {
- return createScanner(scan, targetCols, readPt);
+ ScanInfo scanInfo;
+ if (this.getCoprocessorHost() != null) {
+ scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
+ } else {
+ scanInfo = getScanInfo();
+ }
+ return createScanner(scan, scanInfo, targetCols, readPt);
} finally {
lock.readLock().unlock();
}
}
- protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols,
- long readPt) throws IOException {
- return scan.isReversed() ? new ReversedStoreScanner(this,
- getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
- getScanInfo(), scan, targetCols, readPt);
+ // HMobStore will override this method to return its own implementation.
+ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+ NavigableSet targetCols, long readPt) throws IOException {
+ return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
+ : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index b3ba998d116..7ab2fe325e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -23,7 +23,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-import java.util.OptionalInt;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
@@ -108,9 +107,11 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
*/
private StoreScanner createScanner(HStore store, List scanners)
throws IOException {
- // Get all available versions
- return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners,
- ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+ // FIXME: This is the old comment 'Get all available versions'
+ // But actually if we really reset the ScanInfo to get all available versions then lots of UTs
+ // will fail
+ return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
+ store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
}
/* Refill kev-value set (should be invoked only when KVS is empty)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index c242fd18503..c5a3de37e5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -622,6 +621,21 @@ public class RegionCoprocessorHost
});
}
+ /**
+ * Called prior to opening store scanner for compaction.
+ */
+ public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
+ CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
+ CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
+ }
+ });
+ return builder.build();
+ }
+
/**
* Called prior to rewriting the store files selected for compaction
* @param store the store being compacted
@@ -665,6 +679,22 @@ public class RegionCoprocessorHost
});
}
+ /**
+ * Invoked before create StoreScanner for flush.
+ * @throws IOException
+ */
+ public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
+ throws IOException {
+ CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preFlushScannerOpen(this, store, builder, tracker);
+ }
+ });
+ return builder.build();
+ }
+
/**
* Invoked before a memstore flush
* @throws IOException
@@ -1263,6 +1293,20 @@ public class RegionCoprocessorHost
});
}
+ /**
+ * Called before open store scanner for user scan.
+ */
+ public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
+ CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+ execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
+ @Override
+ public void call(RegionObserver observer) throws IOException {
+ observer.preStoreScannerOpen(this, store, builder);
+ }
+ });
+ return builder.build();
+ }
+
/**
* @param info the RegionInfo for this region
* @param edits the file of recovered edits
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 8e48c696a1d..4e5cb70cc57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -49,7 +49,6 @@ public class ScanInfo {
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
private final long preadMaxBytes;
- private final Configuration conf;
private final boolean newVersionBehavior;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@@ -64,10 +63,18 @@ public class ScanInfo {
* major compaction.
* @param comparator The store's comparator
*/
- public ScanInfo(final Configuration conf, final ColumnFamilyDescriptor family, final long ttl,
- final long timeToPurgeDeletes, final CellComparator comparator) {
+ public ScanInfo(Configuration conf, ColumnFamilyDescriptor family, long ttl,
+ long timeToPurgeDeletes, CellComparator comparator) {
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
- family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, family.isNewVersionBehavior());
+ family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator,
+ family.isNewVersionBehavior());
+ }
+
+ private static long getCellsPerTimeoutCheck(Configuration conf) {
+ long perHeartbeat = conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
+ StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
+ return perHeartbeat > 0 ? perHeartbeat
+ : StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
}
/**
@@ -82,10 +89,20 @@ public class ScanInfo {
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
*/
- public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
- final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
- final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator,
- final boolean newVersionBehavior) {
+ public ScanInfo(Configuration conf, byte[] family, int minVersions, int maxVersions, long ttl,
+ KeepDeletedCells keepDeletedCells, long blockSize, long timeToPurgeDeletes,
+ CellComparator comparator, boolean newVersionBehavior) {
+ this(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes, comparator,
+ conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT),
+ conf.getBoolean("hbase.storescanner.use.pread", false), getCellsPerTimeoutCheck(conf),
+ conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false),
+ conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize), newVersionBehavior);
+ }
+
+ private ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
+ KeepDeletedCells keepDeletedCells, long timeToPurgeDeletes, CellComparator comparator,
+ long tableMaxRowSize, boolean usePread, long cellsPerTimeoutCheck,
+ boolean parallelSeekEnabled, long preadMaxBytes, boolean newVersionBehavior) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
@@ -93,25 +110,14 @@ public class ScanInfo {
this.keepDeletedCells = keepDeletedCells;
this.timeToPurgeDeletes = timeToPurgeDeletes;
this.comparator = comparator;
- this.tableMaxRowSize =
- conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
- this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false);
- long perHeartbeat =
- conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
- StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
- this.cellsPerTimeoutCheck = perHeartbeat > 0?
- perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
- this.parallelSeekEnabled =
- conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
- this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
- this.conf = conf;
+ this.tableMaxRowSize = tableMaxRowSize;
+ this.usePread = usePread;
+ this.cellsPerTimeoutCheck = cellsPerTimeoutCheck;
+ this.parallelSeekEnabled = parallelSeekEnabled;
+ this.preadMaxBytes = preadMaxBytes;
this.newVersionBehavior = newVersionBehavior;
}
- public Configuration getConfiguration() {
- return this.conf;
- }
-
long getTableMaxRowSize() {
return this.tableMaxRowSize;
}
@@ -163,4 +169,12 @@ public class ScanInfo {
public boolean isNewVersionBehavior() {
return newVersionBehavior;
}
+
+ /**
+ * Used for CP users for customizing max versions and ttl.
+ */
+ ScanInfo customize(int maxVersions, long ttl) {
+ return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, ttl, comparator,
+ ttl, usePread, maxVersions, parallelSeekEnabled, ttl, newVersionBehavior);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
new file mode 100644
index 00000000000..5a35d51a27f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * This class gives you the ability to change the max versions and TTL options before opening a
+ * scanner for a Store. And also gives you some information for the scan.
+ *
+ * Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
+ * way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
+ * the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
+ * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
+ *
+ * For user scans, we also provide this class as a parameter in the {@code preStoreScannerOpen}
+ * method in {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}. You can use it to change
+ * the inherent properties for a Store. For example, even if you use {@code Scan.readAllVersions},
+ * you still can not read two versions if the max versions property of the Store is one. You need to
+ * set the max versions to a value greater than two in {@code preStoreScannerOpen}.
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preFlushScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ * Store, ScanOptions, FlushLifeCycleTracker)
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preCompactScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ * Store, ScanType, ScanOptions,
+ * org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker,
+ * org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest)
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preStoreScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ * Store, ScanOptions)
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface ScanOptions {
+
+ int getMaxVersions();
+
+ void setMaxVersions(int maxVersions);
+
+ default void readAllVersions() {
+ setMaxVersions(Integer.MAX_VALUE);
+ }
+
+ long getTTL();
+
+ void setTTL(long ttl);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index b0bff106785..442d47d3a13 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -80,9 +80,14 @@ abstract class StoreFlusher {
*/
protected final InternalScanner createScanner(List snapshotScanners,
long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
- InternalScanner scanner =
- new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
- ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+ ScanInfo scanInfo;
+ if (store.getCoprocessorHost() != null) {
+ scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
+ } else {
+ scanInfo = store.getScanInfo();
+ }
+ InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
+ ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
assert scanner != null;
if (store.getCoprocessorHost() != null) {
try {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 43079a6e778..b2389eb5a61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableSet;
-import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@@ -67,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
// In unit tests, the store could be null
- protected final Optional store;
+ protected final HStore store;
private ScanQueryMatcher matcher;
protected KeyValueHeap heap;
private boolean cacheBlocks;
@@ -160,7 +159,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private boolean topChanged = false;
/** An internal constructor. */
- private StoreScanner(Optional store, Scan scan, ScanInfo scanInfo,
+ private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
this.readPt = readPt;
this.store = store;
@@ -199,15 +198,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
- this.store.ifPresent(s -> {
- if (s.getStorefilesCount() > 1) {
- RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices();
- if (rsService != null && scanInfo.isParallelSeekEnabled()) {
- this.parallelSeekEnabled = true;
- this.executor = rsService.getExecutorService();
- }
+ if (store != null && store.getStorefilesCount() > 1) {
+ RegionServerServices rsService = store.getHRegion().getRegionServerServices();
+ if (rsService != null && scanInfo.isParallelSeekEnabled()) {
+ this.parallelSeekEnabled = true;
+ this.executor = rsService.getExecutorService();
}
- });
+ }
}
private void addCurrentScanners(List extends KeyValueScanner> scanners) {
@@ -225,7 +222,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns,
long readPt) throws IOException {
- this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt,
+ this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
scan.getCacheBlocks(), ScanType.USER_SCAN);
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
@@ -275,11 +272,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions
*/
- public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
- List extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
- long earliestPutTs) throws IOException {
- this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null,
- null);
+ public StoreScanner(HStore store, ScanInfo scanInfo, List extends KeyValueScanner> scanners,
+ ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+ this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
}
/**
@@ -292,21 +287,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
*/
- public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
- List extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
- byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
- this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
+ public StoreScanner(HStore store, ScanInfo scanInfo, List extends KeyValueScanner> scanners,
+ long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+ byte[] dropDeletesToRow) throws IOException {
+ this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
- private StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
- List extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
- long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
- this(Optional.of(store),
- maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
- : SCAN_FOR_COMPACTION,
- scanInfo, 0, store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED),
- false, scanType);
+ private StoreScanner(HStore store, ScanInfo scanInfo, List extends KeyValueScanner> scanners,
+ ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+ byte[] dropDeletesToRow) throws IOException {
+ this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
+ store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
assert scanType != ScanType.USER_SCAN;
matcher =
CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
@@ -333,7 +325,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// For mob compaction only as we do not have a Store instance when doing mob compaction.
public StoreScanner(ScanInfo scanInfo, ScanType scanType,
List extends KeyValueScanner> scanners) throws IOException {
- this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
+ this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
assert scanType != ScanType.USER_SCAN;
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
oldestUnexpiredTS, now, null, null, null);
@@ -345,7 +337,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns,
List extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
- this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L,
+ this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L,
scan.getCacheBlocks(), ScanType.USER_SCAN);
this.matcher =
UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
@@ -357,7 +349,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
List extends KeyValueScanner> scanners) throws IOException {
// 0 is passed as readpoint because the test bypasses Store
- this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
+ this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
: SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
@@ -478,7 +470,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.closing = true;
}
// For mob compaction, we do not have a store.
- this.store.ifPresent(s -> s.deleteChangedReaderObserver(this));
+ if (this.store != null) {
+ this.store.deleteChangedReaderObserver(this);
+ }
if (withDelayedScannersClose) {
clearAndClose(scannersForDelayedClose);
clearAndClose(memStoreScannersAfterFlush);
@@ -550,8 +544,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Only do a sanity-check if store and comparator are available.
- CellComparator comparator =
- store.map(s -> s.getComparator()).orElse(null);
+ CellComparator comparator = store != null ? store.getComparator() : null;
int count = 0;
long totalBytesRead = 0;
@@ -864,8 +857,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* @return if top of heap has changed (and KeyValueHeap has to try the next KV)
*/
protected final boolean reopenAfterFlush() throws IOException {
- // here we can make sure that we have a Store instance.
- HStore store = this.store.get();
+ // here we can make sure that we have a Store instance so no null check on store.
Cell lastTop = heap.peek();
// When we have the scan object, should we not pass it to getScanners() to get a limited set of
// scanners? We did so in the constructor and we could have done it now by storing the scan
@@ -992,9 +984,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
List fileScanners = null;
List newCurrentScanners;
KeyValueHeap newHeap;
- // We must have a store instance here
- HStore store = this.store.get();
try {
+ // We must have a store instance here so no null check
// recreate the scanners on the current file scanners
fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 6ed8fefadd2..817ddf87f79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
+import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
+import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -26,7 +28,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.OptionalInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,10 +41,12 @@ import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
@@ -230,7 +233,7 @@ public abstract class Compactor {
ScanType getScanType(CompactionRequestImpl request);
- InternalScanner createScanner(List scanners, ScanType scanType,
+ InternalScanner createScanner(ScanInfo scanInfo, List scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
}
@@ -238,14 +241,13 @@ public abstract class Compactor {
@Override
public ScanType getScanType(CompactionRequestImpl request) {
- return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
- : ScanType.COMPACT_RETAIN_DELETES;
+ return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
}
@Override
- public InternalScanner createScanner(List scanners, ScanType scanType,
- FileDetails fd, long smallestReadPoint) throws IOException {
- return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+ public InternalScanner createScanner(ScanInfo scanInfo, List scanners,
+ ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
+ return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
};
@@ -266,6 +268,31 @@ public abstract class Compactor {
/* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
}
+ private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
+ User user) throws IOException {
+ if (store.getCoprocessorHost() == null) {
+ return store.getScanInfo();
+ }
+ return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
+ request, user);
+ }
+
+ /**
+ * Calls coprocessor, if any, to create scanners - after normal scanner creation.
+ * @param request Compaction request.
+ * @param scanType Scan type.
+ * @param scanner The default scanner created for compaction.
+ * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
+ */
+ private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
+ InternalScanner scanner, User user) throws IOException {
+ if (store.getCoprocessorHost() == null) {
+ return scanner;
+ }
+ return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
+ request, user);
+ }
+
protected final List compact(final CompactionRequestImpl request,
InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory,
ThroughputController throughputController, User user) throws IOException {
@@ -290,8 +317,9 @@ public abstract class Compactor {
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
- scanner = postCreateCoprocScanner(request, scanType,
- scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user);
+ ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
+ scanner = postCompactScannerOpen(request, scanType,
+ scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<>();
@@ -325,22 +353,6 @@ public abstract class Compactor {
protected abstract void abortWriter(T writer) throws IOException;
- /**
- * Calls coprocessor, if any, to create scanners - after normal scanner creation.
- * @param request Compaction request.
- * @param scanType Scan type.
- * @param scanner The default scanner created for compaction.
- * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
- */
- private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
- InternalScanner scanner, User user) throws IOException {
- if (store.getCoprocessorHost() == null) {
- return scanner;
- }
- return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
- request, user);
- }
-
/**
* Performs the compaction.
* @param fd FileDetails of cell sink writer
@@ -475,10 +487,10 @@ public abstract class Compactor {
* @param earliestPutTs Earliest put across all files.
* @return A compaction scanner.
*/
- protected InternalScanner createScanner(HStore store, List scanners,
- ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
- return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
- smallestReadPoint, earliestPutTs);
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, ScanType scanType, long smallestReadPoint,
+ long earliestPutTs) throws IOException {
+ return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
}
/**
@@ -490,10 +502,10 @@ public abstract class Compactor {
* @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
* @return A compaction scanner.
*/
- protected InternalScanner createScanner(HStore store, List scanners,
- long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
- byte[] dropDeletesToRow) throws IOException {
- return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
- smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, long smallestReadPoint, long earliestPutTs,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
+ return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
+ dropDeletesFromRow, dropDeletesToRow);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index f4836a8d709..c9e591ea432 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@@ -66,13 +67,13 @@ public class StripeCompactor extends AbstractMultiOutputCompactor scanners, ScanType scanType,
- FileDetails fd, long smallestReadPoint) throws IOException {
+ public InternalScanner createScanner(ScanInfo scanInfo, List scanners,
+ ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
return (majorRangeFromRow == null)
- ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
- fd.earliestPutTs)
- : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
- majorRangeFromRow, majorRangeToRow);
+ ? StripeCompactor.this.createScanner(store, scanInfo, scanners, scanType,
+ smallestReadPoint, fd.earliestPutTs)
+ : StripeCompactor.this.createScanner(store, scanInfo, scanners, smallestReadPoint,
+ fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
index f18ccc0bec3..6b16b08d5a8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
@@ -124,11 +124,10 @@ public class TestFromClientSideScanExcpetion {
}
@Override
- protected KeyValueScanner createScanner(Scan scan, NavigableSet targetCols, long readPt)
- throws IOException {
- return scan.isReversed()
- ? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
- : new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+ protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+ NavigableSet targetCols, long readPt) throws IOException {
+ return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
+ : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 63168099256..7248f562cec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -211,11 +211,9 @@ public class TestCompaction {
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
- for (HStore store: this.r.stores.values()) {
+ for (HStore store : this.r.stores.values()) {
ScanInfo old = store.getScanInfo();
- ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
- old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
- old.getComparator(), old.isNewVersionBehavior());
+ ScanInfo si = old.customize(old.getMaxVersions(), ttl);
store.setScanInfo(si);
}
Thread.sleep(ttl);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index c6c0bdc440c..6038bb2686f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -158,10 +158,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
public void testCompactionEmptyHFile() throws IOException {
// Set TTL
ScanInfo oldScanInfo = store.getScanInfo();
- ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
- oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
- oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
- oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator(), oldScanInfo.isNewVersionBehavior());
+ ScanInfo newScanInfo = oldScanInfo.customize(oldScanInfo.getMaxVersions(), 600);
store.setScanInfo(newScanInfo);
// Do not compact empty store file
List candidates = sfCreate(0);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 4499cd569d8..268b352239a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -403,6 +403,8 @@ public class TestHRegion {
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
+ when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class),
+ Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo());
region.setCoprocessorHost(mockedCPHost);
region.put(put);
region.flush(true);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index f443705e1ec..2a556d7a529 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -292,9 +292,7 @@ public class TestMajorCompaction {
final int ttl = 1000;
for (HStore store : r.getStores()) {
ScanInfo old = store.getScanInfo();
- ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
- old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
- old.getComparator(), old.isNewVersionBehavior());
+ ScanInfo si = old.customize(old.getMaxVersions(), ttl);
store.setScanInfo(si);
}
Thread.sleep(1000);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index a5a0e7830b7..95c2c56dafb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -110,15 +110,16 @@ public class TestDateTieredCompactor {
return new DateTieredCompactor(conf, store) {
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
- byte[] dropDeletesToRow) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, long smallestReadPoint, long earliestPutTs,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, ScanType scanType, long smallestReadPoint,
+ long earliestPutTs) throws IOException {
return scanner;
}
};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index af30b7caf4c..48e560c748f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
@@ -787,15 +788,16 @@ public class TestStripeCompactionPolicy {
final Scanner scanner = new Scanner();
return new StripeCompactor(conf, store) {
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
- byte[] dropDeletesToRow) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, long smallestReadPoint, long earliestPutTs,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, ScanType scanType, long smallestReadPoint,
+ long earliestPutTs) throws IOException {
return scanner;
}
};
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index dbf95f3b089..772a674f34c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -207,15 +207,16 @@ public class TestStripeCompactor {
return new StripeCompactor(conf, store) {
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
- byte[] dropDeletesToRow) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, long smallestReadPoint, long earliestPutTs,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
- protected InternalScanner createScanner(HStore store, List scanners,
- ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+ protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+ List scanners, ScanType scanType, long smallestReadPoint,
+ long earliestPutTs) throws IOException {
return scanner;
}
};
| | | | |