diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 5f66c03f1c1..376c61ac567 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.regionserver.Region.Operation; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanOptions; import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileReader; @@ -277,6 +278,15 @@ public interface RegionObserver { * {@link InternalScanner} with a custom implementation that is returned from this method. The * custom scanner can then inspect {@link org.apache.hadoop.hbase.Cell}s from the wrapped scanner, * applying its own policy to what gets written. + *

+ * If implementations are wrapping the passed in {@link InternalScanner}, they can also have their + * implementation implement {@link Shipper} and delegate to the original scanner. This will cause + * compactions to free up memory as they progress, which is especially important for people using + * off-heap memory pools. + *

+ * Keep in mind that when {@link Shipper#shipped()} is called, any cell references you maintain in + * your implementation may get corrupted. As such you should make sure to deep clone any cells + * that you need to keep reference to across invocations of shipped. * @param c the environment provided by the region server * @param store the store being compacted * @param scanner the scanner over existing data used in the store file rewriting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java index a7e60c07d29..f4f723ed9ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Shipper.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.yetus.audience.InterfaceAudience; /** @@ -25,7 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience; * to server and fetch N rows/RPC. These are then shipped to client. At the end of every such batch * {@link #shipped()} will get called. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) public interface Shipper { /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 566e708b06d..726bb3848de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -45,10 +45,10 @@ import org.apache.hadoop.hbase.regionserver.CreateStoreFileWriterParams; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.Shipper; import org.apache.hadoop.hbase.regionserver.ShipperListener; import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreFileScanner; @@ -432,7 +432,7 @@ public abstract class Compactor { ScannerContext.newBuilder().setBatchLimit(compactionKVMax).build(); throughputController.start(compactionName); - KeyValueScanner kvs = (scanner instanceof KeyValueScanner) ? (KeyValueScanner) scanner : null; + Shipper shipper = (scanner instanceof Shipper) ? (Shipper) scanner : null; long shippedCallSizeLimit = (long) request.getFiles().size() * this.store.getColumnFamilyDescriptor().getBlocksize(); try { @@ -472,7 +472,7 @@ public abstract class Compactor { return false; } } - if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { + if (shipper != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) { if (lastCleanCell != null) { // HBASE-16931, set back sequence id to avoid affecting scan order unexpectedly. // ShipperListener will do a clone of the last cells it refer, so need to set back @@ -488,7 +488,7 @@ public abstract class Compactor { // we are doing the similar thing. In between the compaction (after every N cells // written with collective size of 'shippedCallSizeLimit') we will call shipped which // may clear prevBlocks list. - kvs.shipped(); + shipper.shipped(); bytesWrittenProgressForShippedCall = 0; } if (lastCleanCell != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java new file mode 100644 index 00000000000..d8c5c8aa595 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionWithShippingCoprocessor.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.hamcrest.MatcherAssert.assertThat; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.hamcrest.Matchers; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MediumTests.class, CoprocessorTests.class }) +public class TestCompactionWithShippingCoprocessor { + + private static final AtomicInteger SHIPPED_COUNT = new AtomicInteger(); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestCompactionWithShippingCoprocessor.class); + + protected final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);// do not retry + TEST_UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * Verifies that if a coproc returns an InternalScanner which implements Shipper, the shippped + * method is appropriately called in Compactor. + */ + @Test + public void testCoprocScannersExtendingShipperGetShipped() throws Exception { + int shippedCountBefore = SHIPPED_COUNT.get(); + final TableName tableName = TableName.valueOf(name.getMethodName()); + // Create a table with block size as 1024 + final Table table = TEST_UTIL.createTable(tableName, new byte[][] { FAMILY }, 1, 1024, + CompactionObserver.class.getName()); + TEST_UTIL.loadTable(table, FAMILY); + TEST_UTIL.flush(); + try { + // get the block cache and region + RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); + HRegion region = TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); + // trigger a major compaction + TEST_UTIL.compact(true); + assertThat(SHIPPED_COUNT.get(), Matchers.greaterThan(shippedCountBefore)); + } finally { + table.close(); + } + } + + public static class CompactionObserver implements RegionCoprocessor, RegionObserver { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new ShippedObservingScanner(scanner); + } + } + + public static class ShippedObservingScanner implements InternalScanner, Shipper { + + protected final InternalScanner scanner; + + public ShippedObservingScanner(InternalScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + + @Override + public void shipped() throws IOException { + if (scanner instanceof Shipper) { + SHIPPED_COUNT.incrementAndGet(); + ((Shipper) scanner).shipped(); + } + } + } +}