diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java new file mode 100644 index 00000000000..e9b590deda0 --- /dev/null +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import java.io.IOException; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.TreeMap; +import java.util.stream.IntStream; + +import org.apache.commons.lang3.mutable.MutableLong; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellBuilder; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanOptions; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.util.Bytes; + +import org.apache.hadoop.hbase.shaded.com.google.common.math.IntMath; + +/** + * An example for implementing a counter that reads is much less than writes, i.e, write heavy. + *

+ * We will convert increment to put, and do aggregating when get. And of course the return value of + * increment is useless then. + *

+ * Notice that this is only an example so we do not handle most corner cases, for example, you must + * provide a qualifier when doing a get. + */ +public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { + options.readAllVersions(); + } + + private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) { + return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) + .setType(CellBuilder.DataType.Put).setFamily(family).setQualifier(qualifier) + .setTimestamp(ts).setValue(Bytes.toBytes(value)).build(); + } + + private InternalScanner wrap(byte[] family, InternalScanner scanner) { + return new InternalScanner() { + + private List srcResult = new ArrayList<>(); + + private byte[] row; + + private byte[] qualifier; + + private long timestamp; + + private long sum; + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean moreRows = scanner.next(srcResult, scannerContext); + if (srcResult.isEmpty()) { + if (!moreRows && row != null) { + result.add(createCell(row, family, qualifier, timestamp, sum)); + } + return moreRows; + } + Cell firstCell = srcResult.get(0); + // Check if there is a row change first. All the cells will come from the same row so just + // check the first one once is enough. + if (row == null) { + row = CellUtil.cloneRow(firstCell); + qualifier = CellUtil.cloneQualifier(firstCell); + } else if (!CellUtil.matchingRows(firstCell, row)) { + result.add(createCell(row, family, qualifier, timestamp, sum)); + row = CellUtil.cloneRow(firstCell); + qualifier = CellUtil.cloneQualifier(firstCell); + sum = 0; + } + srcResult.forEach(c -> { + if (CellUtil.matchingQualifier(c, qualifier)) { + sum += Bytes.toLong(c.getValueArray(), c.getValueOffset()); + } else { + result.add(createCell(row, family, qualifier, timestamp, sum)); + qualifier = CellUtil.cloneQualifier(c); + sum = Bytes.toLong(c.getValueArray(), c.getValueOffset()); + } + timestamp = c.getTimestamp(); + }); + if (!moreRows) { + result.add(createCell(row, family, qualifier, timestamp, sum)); + } + srcResult.clear(); + return moreRows; + } + + @Override + public void close() throws IOException { + scanner.close(); + } + }; + } + + @Override + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + return wrap(store.getColumnFamilyDescriptor().getName(), scanner); + } + + @Override + public void preCompactScannerOpen(ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + options.readAllVersions(); + } + + @Override + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return wrap(store.getColumnFamilyDescriptor().getName(), scanner); + } + + @Override + public void preGetOp(ObserverContext c, Get get, List result) + throws IOException { + Scan scan = + new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions(); + NavigableMap> sums = + new TreeMap<>(Bytes.BYTES_COMPARATOR); + get.getFamilyMap().forEach((cf, cqs) -> { + NavigableMap ss = new TreeMap<>(Bytes.BYTES_COMPARATOR); + sums.put(cf, ss); + cqs.forEach(cq -> { + ss.put(cq, new MutableLong(0)); + scan.addColumn(cf, cq); + }); + }); + List cells = new ArrayList<>(); + try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) { + boolean moreRows; + do { + moreRows = scanner.next(cells); + for (Cell cell : cells) { + byte[] family = CellUtil.cloneFamily(cell); + byte[] qualifier = CellUtil.cloneQualifier(cell); + long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset()); + sums.get(family).get(qualifier).add(value); + } + cells.clear(); + } while (moreRows); + } + sums.forEach((cf, m) -> m.forEach((cq, s) -> result + .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue())))); + c.bypass(); + } + + private final int mask; + private final MutableLong[] lastTimestamps; + { + int stripes = + 1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING); + lastTimestamps = + IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new); + mask = stripes - 1; + } + + // We need make sure the different put uses different timestamp otherwise we may lost some + // increments. This is a known issue for HBase. + private long getUniqueTimestamp(byte[] row) { + int slot = Bytes.hashCode(row) & mask; + MutableLong lastTimestamp = lastTimestamps[slot]; + long now = System.currentTimeMillis(); + synchronized (lastTimestamp) { + long pt = lastTimestamp.longValue() >> 10; + if (now > pt) { + lastTimestamp.setValue(now << 10); + } else { + lastTimestamp.increment(); + } + return lastTimestamp.longValue(); + } + } + + @Override + public Result preIncrement(ObserverContext c, Increment increment) + throws IOException { + byte[] row = increment.getRow(); + Put put = new Put(row); + long ts = getUniqueTimestamp(row); + for (Map.Entry> entry : increment.getFamilyCellMap().entrySet()) { + for (Cell cell : entry.getValue()) { + put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row) + .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(), + cell.getQualifierLength()) + .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()) + .setType(CellBuilder.DataType.Put).setTimestamp(ts).build()); + } + } + c.getEnvironment().getRegion().put(put); + c.bypass(); + return Result.EMPTY_RESULT; + } + + @Override + public void preStoreScannerOpen(ObserverContext ctx, Store store, + ScanOptions options) throws IOException { + options.readAllVersions(); + } +} diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index d6d66bbd5a0..449726f74e4 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.coprocessor.example; import java.io.IOException; -import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -28,20 +27,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.retry.RetryForever; -import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; -import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * This is an example showing how a RegionObserver could configured via ZooKeeper in order to @@ -170,33 +168,24 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs return OptionalLong.of(Bytes.toLong(bytes)); } - private InternalScanner wrap(InternalScanner scanner) { - OptionalLong optExpireBefore = getExpireBefore(); - if (!optExpireBefore.isPresent()) { - return scanner; + private void resetTTL(ScanOptions options) { + OptionalLong expireBefore = getExpireBefore(); + if (!expireBefore.isPresent()) { + return; } - long expireBefore = optExpireBefore.getAsLong(); - return new DelegatingInternalScanner(scanner) { - - @Override - public boolean next(List result, ScannerContext scannerContext) throws IOException { - boolean moreRows = scanner.next(result, scannerContext); - result.removeIf(c -> c.getTimestamp() < expireBefore); - return moreRows; - } - }; + options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong()); } @Override - public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { - return wrap(scanner); + public void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options, FlushLifeCycleTracker tracker) throws IOException { + resetTTL(options); } @Override - public InternalScanner preCompact(ObserverContext c, Store store, - InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + public void preCompactScannerOpen(ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException { - return wrap(scanner); + resetTTL(options); } } \ No newline at end of file diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java new file mode 100644 index 00000000000..1881c8552f5 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestWriteHeavyIncrementObserver { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName NAME = TableName.valueOf("TestCP"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] ROW = Bytes.toBytes("row"); + + private static byte[] CQ1 = Bytes.toBytes("cq1"); + + private static byte[] CQ2 = Bytes.toBytes("cq2"); + + private static Table TABLE; + + private static long UPPER = 1000; + + private static int THREADS = 10; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L); + UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L); + UTIL.startMiniCluster(3); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addCoprocessor(WriteHeavyIncrementObserver.class.getName()) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); + TABLE = UTIL.getConnection().getTable(NAME); + } + + @AfterClass + public static void tearDown() throws Exception { + if (TABLE != null) { + TABLE.close(); + } + UTIL.shutdownMiniCluster(); + } + + private static void increment() throws IOException { + for (long i = 1; i <= UPPER; i++) { + TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i)); + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10)); + } catch (InterruptedException e) { + } + } + } + + private void assertSum() throws IOException { + Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2)); + assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1))); + assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2))); + } + + @Test + public void test() throws Exception { + Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> { + try { + increment(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, "increment-" + i)).toArray(Thread[]::new); + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + assertSum(); + // we do not hack scan operation so using scan we could get the original values added into the + // table. + try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW) + .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) { + Result r = scanner.next(); + assertTrue(r.rawCells().length > 2); + } + UTIL.getAdmin().flush(NAME); + UTIL.getAdmin().majorCompact(NAME); + HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY); + Waiter.waitFor(UTIL.getConfiguration(), 30000, new Waiter.ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + return store.getStorefilesCount() == 1; + } + + @Override + public String explainFailure() throws Exception { + return "Major compaction hangs, there are still " + store.getStorefilesCount() + + " store files"; + } + }); + assertSum(); + // Should only have two cells after flush and major compaction + try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW) + .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) { + Result r = scanner.next(); + assertEquals(2, r.rawCells().length); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index cfc8e927c4f..1fdd2f31db5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; @@ -129,10 +130,20 @@ public interface RegionObserver { default void preFlush(final ObserverContext c, FlushLifeCycleTracker tracker) throws IOException {} + /** + * Called before we open store scanner for flush. You can use the {@code options} to change max + * versions and TTL for the scanner being opened. + * @param c the environment provided by the region server + * @param store the store where flush is being requested + * @param options used to change max versions and TTL for the scanner being opened + */ + default void preFlushScannerOpen(ObserverContext c, Store store, + ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {} + /** * Called before a Store's memstore is flushed to disk. * @param c the environment provided by the region server - * @param store the store where compaction is being requested + * @param store the store where flush is being requested * @param scanner the scanner over existing data used in the store file * @param tracker tracker used to track the life cycle of a flush * @return the scanner to use during compaction. Should not be {@code null} @@ -188,6 +199,20 @@ public interface RegionObserver { List selected, CompactionLifeCycleTracker tracker, CompactionRequest request) {} + /** + * Called before we open store scanner for compaction. You can use the {@code options} to change max + * versions and TTL for the scanner being opened. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanType type of Scan + * @param options used to change max versions and TTL for the scanner being opened + * @param tracker tracker used to track the life cycle of a compaction + * @param request the requested compaction + */ + default void preCompactScannerOpen(ObserverContext c, Store store, + ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException {} + /** * Called prior to writing the {@link StoreFile}s selected for compaction into a new * {@code StoreFile}. @@ -857,6 +882,27 @@ public interface RegionObserver { default void postScannerClose(ObserverContext ctx, InternalScanner s) throws IOException {} + /** + * Called before a store opens a new scanner. + *

+ * This hook is called when a "user" scanner is opened. Use {@code preFlushScannerOpen} and + * {@code preCompactScannerOpen} to inject flush/compaction. + *

+ * Notice that, this method is used to change the inherent max versions and TTL for a Store. For + * example, you can change the max versions option for a {@link Scan} object to 10 in + * {@code preScannerOpen}, but if the max versions config on the Store is 1, then you still can + * only read 1 version. You need also to inject here to change the max versions to 10 if you want + * to get more versions. + * @param ctx the environment provided by the region server + * @param store the store which we want to get scanner from + * @param options used to change max versions and TTL for the scanner being opened + * @see #preFlushScannerOpen(ObserverContext, Store, ScanOptions, FlushLifeCycleTracker) + * @see #preCompactScannerOpen(ObserverContext, Store, ScanType, ScanOptions, + * CompactionLifeCycleTracker, CompactionRequest) + */ + default void preStoreScannerOpen(ObserverContext ctx, Store store, + ScanOptions options) throws IOException {} + /** * Called before replaying WALs for this region. * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java index 32552da8502..447629b5508 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java @@ -22,7 +22,6 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.OptionalInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ShipperListener; @@ -72,10 +72,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor { } @Override - public InternalScanner createScanner(List scanners, + public InternalScanner createScanner(ScanInfo scanInfo, List scanners, ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { - return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType, - smallestReadPoint, fd.earliestPutTs); + return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, + fd.earliestPutTs); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java new file mode 100644 index 00000000000..c3d5e57cbfb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Helper class for CP hooks to change max versions and TTL. + */ +@InterfaceAudience.Private +public class CustomizedScanInfoBuilder implements ScanOptions { + + private final ScanInfo scanInfo; + + private Integer maxVersions; + + private Long ttl; + + public CustomizedScanInfoBuilder(ScanInfo scanInfo) { + this.scanInfo = scanInfo; + } + + @Override + public int getMaxVersions() { + return maxVersions != null ? maxVersions.intValue() : scanInfo.getMaxVersions(); + } + + @Override + public void setMaxVersions(int maxVersions) { + this.maxVersions = maxVersions; + } + + @Override + public long getTTL() { + return ttl != null ? ttl.longValue() : scanInfo.getTtl(); + } + + @Override + public void setTTL(long ttl) { + this.ttl = ttl; + } + + public ScanInfo build() { + if (maxVersions == null && ttl == null) { + return scanInfo; + } + return scanInfo.customize(getMaxVersions(), getTTL()); + } + + @Override + public String toString() { + return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]"; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 206c3cdc1d4..5cb1e455a52 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -144,8 +144,8 @@ public class HMobStore extends HStore { * the mob files should be performed after the seek in HBase is done. */ @Override - protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt) throws IOException { + protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, + NavigableSet targetCols, long readPt) throws IOException { if (MobUtils.isRefOnlyScan(scan)) { Filter refOnlyFilter = new MobReferenceOnlyFilter(); Filter filter = scan.getFilter(); @@ -155,9 +155,8 @@ public class HMobStore extends HStore { scan.setFilter(refOnlyFilter); } } - return scan.isReversed() - ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt) - : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt) + : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 07518e79994..4d0f6d00b61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -7857,6 +7857,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } /** @param coprocessorHost the new coprocessor host */ + @VisibleForTesting public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) { this.coprocessorHost = coprocessorHost; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c0cea4e1526..7b8ca79ab5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1909,7 +1909,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat this.forceMajor = true; } - ////////////////////////////////////////////////////////////////////////////// // File administration ////////////////////////////////////////////////////////////////////////////// @@ -1922,21 +1921,27 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat * @return a scanner over the current key values * @throws IOException on failure */ - public KeyValueScanner getScanner(Scan scan, - final NavigableSet targetCols, long readPt) throws IOException { + public KeyValueScanner getScanner(Scan scan, final NavigableSet targetCols, long readPt) + throws IOException { lock.readLock().lock(); try { - return createScanner(scan, targetCols, readPt); + ScanInfo scanInfo; + if (this.getCoprocessorHost() != null) { + scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this); + } else { + scanInfo = getScanInfo(); + } + return createScanner(scan, scanInfo, targetCols, readPt); } finally { lock.readLock().unlock(); } } - protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt) throws IOException { - return scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); + // HMobStore will override this method to return its own implementation. + protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, + NavigableSet targetCols, long readPt) throws IOException { + return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) + : new StoreScanner(this, scanInfo, scan, targetCols, readPt); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index b3ba998d116..7ab2fe325e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.OptionalInt; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -108,9 +107,11 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator */ private StoreScanner createScanner(HStore store, List scanners) throws IOException { - // Get all available versions - return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + // FIXME: This is the old comment 'Get all available versions' + // But actually if we really reset the ScanInfo to get all available versions then lots of UTs + // will fail + return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES, + store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); } /* Refill kev-value set (should be invoked only when KVS is empty) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index c242fd18503..c5a3de37e5f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -622,6 +621,21 @@ public class RegionCoprocessorHost }); } + /** + * Called prior to opening store scanner for compaction. + */ + public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType, + CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException { + CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request); + } + }); + return builder.build(); + } + /** * Called prior to rewriting the store files selected for compaction * @param store the store being compacted @@ -665,6 +679,22 @@ public class RegionCoprocessorHost }); } + /** + * Invoked before create StoreScanner for flush. + * @throws IOException + */ + public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker) + throws IOException { + CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preFlushScannerOpen(this, store, builder, tracker); + } + }); + return builder.build(); + } + /** * Invoked before a memstore flush * @throws IOException @@ -1263,6 +1293,20 @@ public class RegionCoprocessorHost }); } + /** + * Called before open store scanner for user scan. + */ + public ScanInfo preStoreScannerOpen(HStore store) throws IOException { + CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preStoreScannerOpen(this, store, builder); + } + }); + return builder.build(); + } + /** * @param info the RegionInfo for this region * @param edits the file of recovered edits diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index 8e48c696a1d..4e5cb70cc57 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -49,7 +49,6 @@ public class ScanInfo { private long cellsPerTimeoutCheck; private boolean parallelSeekEnabled; private final long preadMaxBytes; - private final Configuration conf; private final boolean newVersionBehavior; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT @@ -64,10 +63,18 @@ public class ScanInfo { * major compaction. * @param comparator The store's comparator */ - public ScanInfo(final Configuration conf, final ColumnFamilyDescriptor family, final long ttl, - final long timeToPurgeDeletes, final CellComparator comparator) { + public ScanInfo(Configuration conf, ColumnFamilyDescriptor family, long ttl, + long timeToPurgeDeletes, CellComparator comparator) { this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, - family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, family.isNewVersionBehavior()); + family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, + family.isNewVersionBehavior()); + } + + private static long getCellsPerTimeoutCheck(Configuration conf) { + long perHeartbeat = conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, + StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK); + return perHeartbeat > 0 ? perHeartbeat + : StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; } /** @@ -82,10 +89,20 @@ public class ScanInfo { * @param keepDeletedCells Store's keepDeletedCells setting * @param comparator The store's comparator */ - public ScanInfo(final Configuration conf, final byte[] family, final int minVersions, - final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells, - final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator, - final boolean newVersionBehavior) { + public ScanInfo(Configuration conf, byte[] family, int minVersions, int maxVersions, long ttl, + KeepDeletedCells keepDeletedCells, long blockSize, long timeToPurgeDeletes, + CellComparator comparator, boolean newVersionBehavior) { + this(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes, comparator, + conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT), + conf.getBoolean("hbase.storescanner.use.pread", false), getCellsPerTimeoutCheck(conf), + conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false), + conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize), newVersionBehavior); + } + + private ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl, + KeepDeletedCells keepDeletedCells, long timeToPurgeDeletes, CellComparator comparator, + long tableMaxRowSize, boolean usePread, long cellsPerTimeoutCheck, + boolean parallelSeekEnabled, long preadMaxBytes, boolean newVersionBehavior) { this.family = family; this.minVersions = minVersions; this.maxVersions = maxVersions; @@ -93,25 +110,14 @@ public class ScanInfo { this.keepDeletedCells = keepDeletedCells; this.timeToPurgeDeletes = timeToPurgeDeletes; this.comparator = comparator; - this.tableMaxRowSize = - conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT); - this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false); - long perHeartbeat = - conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, - StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK); - this.cellsPerTimeoutCheck = perHeartbeat > 0? - perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; - this.parallelSeekEnabled = - conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false); - this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize); - this.conf = conf; + this.tableMaxRowSize = tableMaxRowSize; + this.usePread = usePread; + this.cellsPerTimeoutCheck = cellsPerTimeoutCheck; + this.parallelSeekEnabled = parallelSeekEnabled; + this.preadMaxBytes = preadMaxBytes; this.newVersionBehavior = newVersionBehavior; } - public Configuration getConfiguration() { - return this.conf; - } - long getTableMaxRowSize() { return this.tableMaxRowSize; } @@ -163,4 +169,12 @@ public class ScanInfo { public boolean isNewVersionBehavior() { return newVersionBehavior; } + + /** + * Used for CP users for customizing max versions and ttl. + */ + ScanInfo customize(int maxVersions, long ttl) { + return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, ttl, comparator, + ttl, usePread, maxVersions, parallelSeekEnabled, ttl, newVersionBehavior); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java new file mode 100644 index 00000000000..5a35d51a27f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * This class gives you the ability to change the max versions and TTL options before opening a + * scanner for a Store. And also gives you some information for the scan. + *

+ * Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a + * way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap + * the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in + * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}. + *

+ * For user scans, we also provide this class as a parameter in the {@code preStoreScannerOpen} + * method in {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}. You can use it to change + * the inherent properties for a Store. For example, even if you use {@code Scan.readAllVersions}, + * you still can not read two versions if the max versions property of the Store is one. You need to + * set the max versions to a value greater than two in {@code preStoreScannerOpen}. + * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preFlushScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext, + * Store, ScanOptions, FlushLifeCycleTracker) + * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preCompactScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext, + * Store, ScanType, ScanOptions, + * org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker, + * org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest) + * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preStoreScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext, + * Store, ScanOptions) + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface ScanOptions { + + int getMaxVersions(); + + void setMaxVersions(int maxVersions); + + default void readAllVersions() { + setMaxVersions(Integer.MAX_VALUE); + } + + long getTTL(); + + void setTTL(long ttl); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index b0bff106785..442d47d3a13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -80,9 +80,14 @@ abstract class StoreFlusher { */ protected final InternalScanner createScanner(List snapshotScanners, long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException { - InternalScanner scanner = - new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + ScanInfo scanInfo; + if (store.getCoprocessorHost() != null) { + scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker); + } else { + scanInfo = store.getScanInfo(); + } + InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners, + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); assert scanner != null; if (store.getCoprocessorHost() != null) { try { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 43079a6e778..b2389eb5a61 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -23,7 +23,6 @@ import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; import java.util.NavigableSet; -import java.util.Optional; import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -67,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver { private static final Log LOG = LogFactory.getLog(StoreScanner.class); // In unit tests, the store could be null - protected final Optional store; + protected final HStore store; private ScanQueryMatcher matcher; protected KeyValueHeap heap; private boolean cacheBlocks; @@ -160,7 +159,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private boolean topChanged = false; /** An internal constructor. */ - private StoreScanner(Optional store, Scan scan, ScanInfo scanInfo, + private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo, int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) { this.readPt = readPt; this.store = store; @@ -199,15 +198,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.preadMaxBytes = scanInfo.getPreadMaxBytes(); this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); // Parallel seeking is on if the config allows and more there is more than one store file. - this.store.ifPresent(s -> { - if (s.getStorefilesCount() > 1) { - RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices(); - if (rsService != null && scanInfo.isParallelSeekEnabled()) { - this.parallelSeekEnabled = true; - this.executor = rsService.getExecutorService(); - } + if (store != null && store.getStorefilesCount() > 1) { + RegionServerServices rsService = store.getHRegion().getRegionServerServices(); + if (rsService != null && scanInfo.isParallelSeekEnabled()) { + this.parallelSeekEnabled = true; + this.executor = rsService.getExecutorService(); } - }); + } } private void addCurrentScanners(List scanners) { @@ -225,7 +222,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, long readPt) throws IOException { - this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt, + this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt, scan.getCacheBlocks(), ScanType.USER_SCAN); if (columns != null && scan.isRaw()) { throw new DoNotRetryIOException("Cannot specify any column for a raw scan"); @@ -275,11 +272,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param scanners ancillary scanners * @param smallestReadPoint the readPoint that we should use for tracking versions */ - public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, - List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs) throws IOException { - this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null, - null); + public StoreScanner(HStore store, ScanInfo scanInfo, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null); } /** @@ -292,21 +287,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW. * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW. */ - public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, - List scanners, long smallestReadPoint, long earliestPutTs, - byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { - this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, + public StoreScanner(HStore store, ScanInfo scanInfo, List scanners, + long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); } - private StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, - List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { - this(Optional.of(store), - maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) - : SCAN_FOR_COMPACTION, - scanInfo, 0, store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), - false, scanType); + private StoreScanner(HStore store, ScanInfo scanInfo, List scanners, + ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, + byte[] dropDeletesToRow) throws IOException { + this(store, SCAN_FOR_COMPACTION, scanInfo, 0, + store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType); assert scanType != ScanType.USER_SCAN; matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs, @@ -333,7 +325,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // For mob compaction only as we do not have a Store instance when doing mob compaction. public StoreScanner(ScanInfo scanInfo, ScanType scanType, List scanners) throws IOException { - this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); + this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType); assert scanType != ScanType.USER_SCAN; this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L, oldestUnexpiredTS, now, null, null, null); @@ -345,7 +337,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, List scanners) throws IOException { // 0 is passed as readpoint because the test bypasses Store - this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L, + this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L, scan.getCacheBlocks(), ScanType.USER_SCAN); this.matcher = UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null); @@ -357,7 +349,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType, List scanners) throws IOException { // 0 is passed as readpoint because the test bypasses Store - this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) + this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt()) : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType); this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null); @@ -478,7 +470,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.closing = true; } // For mob compaction, we do not have a store. - this.store.ifPresent(s -> s.deleteChangedReaderObserver(this)); + if (this.store != null) { + this.store.deleteChangedReaderObserver(this); + } if (withDelayedScannersClose) { clearAndClose(scannersForDelayedClose); clearAndClose(memStoreScannersAfterFlush); @@ -550,8 +544,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Only do a sanity-check if store and comparator are available. - CellComparator comparator = - store.map(s -> s.getComparator()).orElse(null); + CellComparator comparator = store != null ? store.getComparator() : null; int count = 0; long totalBytesRead = 0; @@ -864,8 +857,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * @return if top of heap has changed (and KeyValueHeap has to try the next KV) */ protected final boolean reopenAfterFlush() throws IOException { - // here we can make sure that we have a Store instance. - HStore store = this.store.get(); + // here we can make sure that we have a Store instance so no null check on store. Cell lastTop = heap.peek(); // When we have the scan object, should we not pass it to getScanners() to get a limited set of // scanners? We did so in the constructor and we could have done it now by storing the scan @@ -992,9 +984,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner List fileScanners = null; List newCurrentScanners; KeyValueHeap newHeap; - // We must have a store instance here - HStore store = this.store.get(); try { + // We must have a store instance here so no null check // recreate the scanners on the current file scanners fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false, matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(), diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 6ed8fefadd2..817ddf87f79 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.compactions; import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS; import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY; +import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES; +import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES; import java.io.IOException; import java.io.InterruptedIOException; @@ -26,7 +28,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.OptionalInt; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -40,10 +41,12 @@ import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.regionserver.CellSink; +import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ShipperListener; @@ -230,7 +233,7 @@ public abstract class Compactor { ScanType getScanType(CompactionRequestImpl request); - InternalScanner createScanner(List scanners, ScanType scanType, + InternalScanner createScanner(ScanInfo scanInfo, List scanners, ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException; } @@ -238,14 +241,13 @@ public abstract class Compactor { @Override public ScanType getScanType(CompactionRequestImpl request) { - return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES - : ScanType.COMPACT_RETAIN_DELETES; + return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES; } @Override - public InternalScanner createScanner(List scanners, ScanType scanType, - FileDetails fd, long smallestReadPoint) throws IOException { - return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint, + public InternalScanner createScanner(ScanInfo scanInfo, List scanners, + ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { + return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint, fd.earliestPutTs); } }; @@ -266,6 +268,31 @@ public abstract class Compactor { /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); } + private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, + User user) throws IOException { + if (store.getCoprocessorHost() == null) { + return store.getScanInfo(); + } + return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(), + request, user); + } + + /** + * Calls coprocessor, if any, to create scanners - after normal scanner creation. + * @param request Compaction request. + * @param scanType Scan type. + * @param scanner The default scanner created for compaction. + * @return Scanner scanner to use (usually the default); null if compaction should not proceed. + */ + private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType, + InternalScanner scanner, User user) throws IOException { + if (store.getCoprocessorHost() == null) { + return scanner; + } + return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), + request, user); + } + protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { @@ -290,8 +317,9 @@ public abstract class Compactor { try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); - scanner = postCreateCoprocScanner(request, scanType, - scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user); + ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); + scanner = postCompactScannerOpen(request, scanType, + scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList<>(); @@ -325,22 +353,6 @@ public abstract class Compactor { protected abstract void abortWriter(T writer) throws IOException; - /** - * Calls coprocessor, if any, to create scanners - after normal scanner creation. - * @param request Compaction request. - * @param scanType Scan type. - * @param scanner The default scanner created for compaction. - * @return Scanner scanner to use (usually the default); null if compaction should not proceed. - */ - private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, - InternalScanner scanner, User user) throws IOException { - if (store.getCoprocessorHost() == null) { - return scanner; - } - return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(), - request, user); - } - /** * Performs the compaction. * @param fd FileDetails of cell sink writer @@ -475,10 +487,10 @@ public abstract class Compactor { * @param earliestPutTs Earliest put across all files. * @return A compaction scanner. */ - protected InternalScanner createScanner(HStore store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { - return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType, - smallestReadPoint, earliestPutTs); + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { + return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs); } /** @@ -490,10 +502,10 @@ public abstract class Compactor { * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null. * @return A compaction scanner. */ - protected InternalScanner createScanner(HStore store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { - return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, - smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow); + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, long smallestReadPoint, long earliestPutTs, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { + return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs, + dropDeletesFromRow, dropDeletesToRow); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index f4836a8d709..c9e591ea432 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; @@ -66,13 +67,13 @@ public class StripeCompactor extends AbstractMultiOutputCompactor scanners, ScanType scanType, - FileDetails fd, long smallestReadPoint) throws IOException { + public InternalScanner createScanner(ScanInfo scanInfo, List scanners, + ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException { return (majorRangeFromRow == null) - ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint, - fd.earliestPutTs) - : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs, - majorRangeFromRow, majorRangeToRow); + ? StripeCompactor.this.createScanner(store, scanInfo, scanners, scanType, + smallestReadPoint, fd.earliestPutTs) + : StripeCompactor.this.createScanner(store, scanInfo, scanners, smallestReadPoint, + fd.earliestPutTs, majorRangeFromRow, majorRangeToRow); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java index f18ccc0bec3..6b16b08d5a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java @@ -124,11 +124,10 @@ public class TestFromClientSideScanExcpetion { } @Override - protected KeyValueScanner createScanner(Scan scan, NavigableSet targetCols, long readPt) - throws IOException { - return scan.isReversed() - ? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt) - : new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo, + NavigableSet targetCols, long readPt) throws IOException { + return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt) + : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 63168099256..7248f562cec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -211,11 +211,9 @@ public class TestCompaction { // Multiple versions allowed for an entry, so the delete isn't enough // Lower TTL and expire to ensure that all our entries have been wiped final int ttl = 1000; - for (HStore store: this.r.stores.values()) { + for (HStore store : this.r.stores.values()) { ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), - old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0, - old.getComparator(), old.isNewVersionBehavior()); + ScanInfo si = old.customize(old.getMaxVersions(), ttl); store.setScanInfo(si); } Thread.sleep(ttl); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index c6c0bdc440c..6038bb2686f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -158,10 +158,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy { public void testCompactionEmptyHFile() throws IOException { // Set TTL ScanInfo oldScanInfo = store.getScanInfo(); - ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(), - oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600, - oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(), - oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator(), oldScanInfo.isNewVersionBehavior()); + ScanInfo newScanInfo = oldScanInfo.customize(oldScanInfo.getMaxVersions(), 600); store.setScanInfo(newScanInfo); // Do not compact empty store file List candidates = sfCreate(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 4499cd569d8..268b352239a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -403,6 +403,8 @@ public class TestHRegion { RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class), Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null); + when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class), + Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo()); region.setCoprocessorHost(mockedCPHost); region.put(put); region.flush(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index f443705e1ec..2a556d7a529 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -292,9 +292,7 @@ public class TestMajorCompaction { final int ttl = 1000; for (HStore store : r.getStores()) { ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), - old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0, - old.getComparator(), old.isNewVersionBehavior()); + ScanInfo si = old.customize(old.getMaxVersions(), ttl); store.setScanInfo(si); } Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java index a5a0e7830b7..95c2c56dafb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java @@ -110,15 +110,16 @@ public class TestDateTieredCompactor { return new DateTieredCompactor(conf, store) { @Override - protected InternalScanner createScanner(HStore store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, long smallestReadPoint, long earliestPutTs, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { return scanner; } @Override - protected InternalScanner createScanner(HStore store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { return scanner; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index af30b7caf4c..48e560c748f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; @@ -787,15 +788,16 @@ public class TestStripeCompactionPolicy { final Scanner scanner = new Scanner(); return new StripeCompactor(conf, store) { @Override - protected InternalScanner createScanner(HStore store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, long smallestReadPoint, long earliestPutTs, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { return scanner; } @Override - protected InternalScanner createScanner(HStore store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { return scanner; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java index dbf95f3b089..772a674f34c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java @@ -207,15 +207,16 @@ public class TestStripeCompactor { return new StripeCompactor(conf, store) { @Override - protected InternalScanner createScanner(HStore store, List scanners, - long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow, - byte[] dropDeletesToRow) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, long smallestReadPoint, long earliestPutTs, + byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException { return scanner; } @Override - protected InternalScanner createScanner(HStore store, List scanners, - ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException { + protected InternalScanner createScanner(HStore store, ScanInfo scanInfo, + List scanners, ScanType scanType, long smallestReadPoint, + long earliestPutTs) throws IOException { return scanner; } };