diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java index e9b590deda0..55d9ac324d1 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java @@ -162,6 +162,20 @@ public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObs return wrap(store.getColumnFamilyDescriptor().getName(), scanner); } + @Override + public void preMemStoreCompactionCompactScannerOpen( + ObserverContext c, Store store, ScanOptions options) + throws IOException { + options.readAllVersions(); + } + + @Override + public InternalScanner preMemStoreCompactionCompact( + ObserverContext c, Store store, InternalScanner scanner) + throws IOException { + return wrap(store.getColumnFamilyDescriptor().getName(), scanner); + } + @Override public void preGetOp(ObserverContext c, Get get, List result) throws IOException { diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java index 1881c8552f5..18e819f2d1f 100644 --- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java @@ -20,58 +20,25 @@ package org.apache.hadoop.hbase.coprocessor.example; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.IntStream; - -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @Category({ CoprocessorTests.class, MediumTests.class }) -public class TestWriteHeavyIncrementObserver { - - private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); - - private static TableName NAME = TableName.valueOf("TestCP"); - - private static byte[] FAMILY = Bytes.toBytes("cf"); - - private static byte[] ROW = Bytes.toBytes("row"); - - private static byte[] CQ1 = Bytes.toBytes("cq1"); - - private static byte[] CQ2 = Bytes.toBytes("cq2"); - - private static Table TABLE; - - private static long UPPER = 1000; - - private static int THREADS = 10; +public class TestWriteHeavyIncrementObserver extends WriteHeavyIncrementObserverTestBase { @BeforeClass public static void setUp() throws Exception { - UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L); - UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L); - UTIL.startMiniCluster(3); + WriteHeavyIncrementObserverTestBase.setUp(); UTIL.getAdmin() .createTable(TableDescriptorBuilder.newBuilder(NAME) .addCoprocessor(WriteHeavyIncrementObserver.class.getName()) @@ -79,45 +46,9 @@ public class TestWriteHeavyIncrementObserver { TABLE = UTIL.getConnection().getTable(NAME); } - @AfterClass - public static void tearDown() throws Exception { - if (TABLE != null) { - TABLE.close(); - } - UTIL.shutdownMiniCluster(); - } - - private static void increment() throws IOException { - for (long i = 1; i <= UPPER; i++) { - TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i)); - try { - Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10)); - } catch (InterruptedException e) { - } - } - } - - private void assertSum() throws IOException { - Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2)); - assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1))); - assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2))); - } - @Test public void test() throws Exception { - Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> { - try { - increment(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }, "increment-" + i)).toArray(Thread[]::new); - for (Thread thread : threads) { - thread.start(); - } - for (Thread thread : threads) { - thread.join(); - } + doIncrement(0); assertSum(); // we do not hack scan operation so using scan we could get the original values added into the // table. diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java new file mode 100644 index 00000000000..eeb1fa81563 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserverWithMemStoreCompaction.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.hbase.MemoryCompactionPolicy; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.CompactingMemStore; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestWriteHeavyIncrementObserverWithMemStoreCompaction + extends WriteHeavyIncrementObserverTestBase { + + @BeforeClass + public static void setUp() throws Exception { + WriteHeavyIncrementObserverTestBase.setUp(); + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addCoprocessor(WriteHeavyIncrementObserver.class.getName()) + .setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY, + MemoryCompactionPolicy.EAGER.name()) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build()); + TABLE = UTIL.getConnection().getTable(NAME); + } + + @Test + public void test() throws Exception { + // sleep every 10 loops to give memstore compaction enough time to finish before reaching the + // flush size. + doIncrement(10); + assertSum(); + HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY); + // should have no store files created as we have done aggregating all in memory + assertEquals(0, store.getStorefilesCount()); + } +} diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java new file mode 100644 index 00000000000..35832302169 --- /dev/null +++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserverTestBase.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.example; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.stream.IntStream; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class WriteHeavyIncrementObserverTestBase { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + protected static TableName NAME = TableName.valueOf("TestCP"); + + protected static byte[] FAMILY = Bytes.toBytes("cf"); + + protected static byte[] ROW = Bytes.toBytes("row"); + + protected static byte[] CQ1 = Bytes.toBytes("cq1"); + + protected static byte[] CQ2 = Bytes.toBytes("cq2"); + + protected static Table TABLE; + + protected static long UPPER = 1000; + + protected static int THREADS = 10; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L); + UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L); + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDown() throws Exception { + if (TABLE != null) { + TABLE.close(); + } + UTIL.shutdownMiniCluster(); + } + + private static void increment(int sleepSteps) throws IOException { + for (long i = 1; i <= UPPER; i++) { + TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i)); + if (sleepSteps > 0 && i % sleepSteps == 0) { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + } + + protected final void assertSum() throws IOException { + Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2)); + assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1))); + assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2))); + } + + protected final void doIncrement(int sleepSteps) throws InterruptedException { + Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> { + try { + increment(sleepSteps); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, "increment-" + i)).toArray(Thread[]::new); + for (Thread thread : threads) { + thread.start(); + } + for (Thread thread : threads) { + thread.join(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 9546116d6dc..83cd35888ae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.coprocessor; +import edu.umd.cs.findbugs.annotations.NonNull; + import java.io.IOException; import java.util.List; import java.util.Map; @@ -144,10 +146,10 @@ public interface RegionObserver { * Called before a Store's memstore is flushed to disk. * @param c the environment provided by the region server * @param store the store where flush is being requested - * @param scanner the scanner over existing data used in the store file + * @param scanner the scanner over existing data used in the memstore * @param tracker tracker used to track the life cycle of a flush - * @return the scanner to use during compaction. Should not be {@code null} - * unless the implementation is writing new store files on its own. + * @return the scanner to use during flush. Should not be {@code null} unless the implementation + * is writing new store files on its own. */ default InternalScanner preFlush(ObserverContext c, Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { @@ -173,6 +175,51 @@ public interface RegionObserver { default void postFlush(ObserverContext c, Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {} + /** + * Called before in memory compaction started. + * @param c the environment provided by the region server + * @param store the store where in memory compaction is being requested + */ + default void preMemStoreCompaction(ObserverContext c, Store store) + throws IOException {} + + /** + * Called before we open store scanner for in memory compaction. You can use the {@code options} + * to change max versions and TTL for the scanner being opened. Notice that this method will only + * be called when you use {@code eager} mode. For {@code basic} mode we will not drop any cells + * thus we do not open a store scanner. + * @param c the environment provided by the region server + * @param store the store where in memory compaction is being requested + * @param options used to change max versions and TTL for the scanner being opened + */ + default void preMemStoreCompactionCompactScannerOpen( + ObserverContext c, Store store, ScanOptions options) + throws IOException {} + + /** + * Called before we do in memory compaction. Notice that this method will only be called when you + * use {@code eager} mode. For {@code basic} mode we will not drop any cells thus there is no + * {@link InternalScanner}. + * @param c the environment provided by the region server + * @param store the store where in memory compaction is being executed + * @param scanner the scanner over existing data used in the memstore segments being compact + * @return the scanner to use during in memory compaction. Must be non-null. + */ + @NonNull + default InternalScanner preMemStoreCompactionCompact( + ObserverContext c, Store store, InternalScanner scanner) + throws IOException { + return scanner; + } + + /** + * Called after the in memory compaction is finished. + * @param c the environment provided by the region server + * @param store the store where in memory compaction is being executed + */ + default void postMemStoreCompaction(ObserverContext c, Store store) + throws IOException {} + /** * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of * available candidates. To alter the files used for compaction, you may mutate the passed in list diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java index fea9f1741c7..4d97411a777 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactor.java @@ -96,8 +96,18 @@ public class MemStoreCompactor { LOG.debug("Starting the In-Memory Compaction for store " + compactingMemStore.getStore().getColumnFamilyName()); } - - doCompaction(); + HStore store = compactingMemStore.getStore(); + RegionCoprocessorHost cpHost = store.getCoprocessorHost(); + if (cpHost != null) { + cpHost.preMemStoreCompaction(store); + } + try { + doCompaction(); + } finally { + if (cpHost != null) { + cpHost.postMemStoreCompaction(store); + } + } return true; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java index 7ab2fe325e7..0f96936d981 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java @@ -23,12 +23,18 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables; + /** * The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * and performs the scan for compaction operation meaning it is based on SQM @@ -36,12 +42,14 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator { - private List kvs = new ArrayList<>(); - private boolean hasMore; + private static final Log LOG = LogFactory.getLog(MemStoreCompactorSegmentsIterator.class); + + private final List kvs = new ArrayList<>(); + private boolean hasMore = true; private Iterator kvsIterator; // scanner on top of pipeline scanner that uses ScanQueryMatcher - private StoreScanner compactingScanner; + private InternalScanner compactingScanner; // C-tor public MemStoreCompactorSegmentsIterator(List segments, @@ -56,44 +64,34 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator // build the scanner based on Query Matcher // reinitialize the compacting scanner for each instance of iterator compactingScanner = createScanner(store, scanners); - - hasMore = compactingScanner.next(kvs, scannerContext); - - if (!kvs.isEmpty()) { - kvsIterator = kvs.iterator(); - } - + refillKVS(); } @Override public boolean hasNext() { - if (kvsIterator == null) { // for the case when the result is empty + if (kvsIterator == null) { // for the case when the result is empty return false; } - if (!kvsIterator.hasNext()) { - // refillKVS() method should be invoked only if !kvsIterator.hasNext() - if (!refillKVS()) { - return false; - } - } - return kvsIterator.hasNext(); + // return true either we have cells in buffer or we can get more. + return kvsIterator.hasNext() || refillKVS(); } @Override - public Cell next() { - if (kvsIterator == null) { // for the case when the result is empty - return null; + public Cell next() { + if (!hasNext()) { + throw new NoSuchElementException(); } - if (!kvsIterator.hasNext()) { - // refillKVS() method should be invoked only if !kvsIterator.hasNext() - if (!refillKVS()) return null; - } - return (!hasMore) ? null : kvsIterator.next(); + return kvsIterator.next(); } public void close() { - compactingScanner.close(); + try { + compactingScanner.close(); + } catch (IOException e) { + LOG.warn("close store scanner failed", e); + } compactingScanner = null; + kvs.clear(); } @Override @@ -105,39 +103,64 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator * Creates the scanner for compacting the pipeline. * @return the scanner */ - private StoreScanner createScanner(HStore store, List scanners) + private InternalScanner createScanner(HStore store, List scanners) throws IOException { - // FIXME: This is the old comment 'Get all available versions' - // But actually if we really reset the ScanInfo to get all available versions then lots of UTs - // will fail - return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES, - store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + InternalScanner scanner = null; + boolean success = false; + try { + RegionCoprocessorHost cpHost = store.getCoprocessorHost(); + ScanInfo scanInfo; + if (cpHost != null) { + scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store); + } else { + scanInfo = store.getScanInfo(); + } + scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, + store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + if (cpHost != null) { + InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner); + if (scannerFromCp == null) { + throw new CoprocessorException("Got a null InternalScanner when calling" + + " preMemStoreCompactionCompact which is not acceptable"); + } + success = true; + return scannerFromCp; + } else { + success = true; + return scanner; + } + } finally { + if (!success) { + Closeables.close(scanner, true); + scanners.forEach(KeyValueScanner::close); + } + } } - /* Refill kev-value set (should be invoked only when KVS is empty) - * Returns true if KVS is non-empty */ + /* + * Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is + * non-empty + */ private boolean refillKVS() { - kvs.clear(); // clear previous KVS, first initiated in the constructor - if (!hasMore) { // if there is nothing expected next in compactingScanner + // if there is nothing expected next in compactingScanner + if (!hasMore) { return false; } - - try { // try to get next KVS - hasMore = compactingScanner.next(kvs, scannerContext); - } catch (IOException ie) { - throw new IllegalStateException(ie); - } - - if (!kvs.isEmpty() ) {// is the new KVS empty ? - kvsIterator = kvs.iterator(); - return true; - } else { - // KVS is empty, but hasMore still true? - if (hasMore) { // try to move to next row - return refillKVS(); + // clear previous KVS, first initiated in the constructor + kvs.clear(); + for (;;) { + try { + hasMore = compactingScanner.next(kvs, scannerContext); + } catch (IOException e) { + // should not happen as all data are in memory + throw new IllegalStateException(e); + } + if (!kvs.isEmpty()) { + kvsIterator = kvs.iterator(); + return true; + } else if (!hasMore) { + return false; } - } - return hasMore; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 43a01bac220..7c3bf852ad9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -776,6 +776,61 @@ public class RegionCoprocessorHost }); } + /** + * Invoked before in memory compaction. + */ + public void preMemStoreCompaction(HStore store) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preMemStoreCompaction(this, store); + } + }); + } + + /** + * Invoked before create StoreScanner for in memory compaction. + */ + public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException { + CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo()); + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.preMemStoreCompactionCompactScannerOpen(this, store, builder); + } + }); + return builder.build(); + } + + /** + * Invoked before compacting memstore. + */ + public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner) + throws IOException { + if (coprocEnvironments.isEmpty()) { + return scanner; + } + return execOperationWithResult(new ObserverOperationWithResult( + regionObserverGetter, scanner) { + @Override + public InternalScanner call(RegionObserver observer) throws IOException { + return observer.preMemStoreCompactionCompact(this, store, getResult()); + } + }); + } + + /** + * Invoked after in memory compaction. + */ + public void postMemStoreCompaction(HStore store) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() { + @Override + public void call(RegionObserver observer) throws IOException { + observer.postMemStoreCompaction(this, store); + } + }); + } + /** * Invoked after a memstore flush * @throws IOException diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 09be65c799f..6abca134d7e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -268,9 +268,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private static final Scan SCAN_FOR_COMPACTION = new Scan(); /** - * Used for compactions. + * Used for store file compaction and memstore compaction. *

- * Opens a scanner across specified StoreFiles. + * Opens a scanner across specified StoreFiles/MemStoreSegments. * @param store who we scan * @param scanners ancillary scanners * @param smallestReadPoint the readPoint that we should use for tracking versions