diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 1bc80688de9..27809c4e00a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -110,9 +110,6 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); - if (scanner == null) { - return result; // NULL scanner returned from coprocessor hooks means skip normal processing - } StoreFileWriter writer; try { // TODO: We can fail in the below block before we complete adding this flush to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 06d47522ae1..d666ba9e95b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -54,10 +54,6 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); - if (scanner == null) { - return result; // NULL scanner returned from coprocessor hooks means skip normal processing - } - StoreFileWriter writer; try { // TODO: We can fail in the below block before we complete adding this flush to diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 347ee84eee0..0e4131e3723 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.coprocessor.BaseEnvironment; import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; +import org.apache.hadoop.hbase.coprocessor.CoprocessorException; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity; @@ -655,9 +656,9 @@ public class RegionCoprocessorHost * @param tracker used to track the life cycle of a compaction * @param request the compaction request * @param user the user + * @return Scanner to use (cannot be null!) * @throws IOException */ - // A Coprocessor can return null to cancel Compact. Leaving for now but this is form of 'bypass'. public InternalScanner preCompact(final HStore store, final InternalScanner scanner, final ScanType scanType, final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user) throws IOException { @@ -670,7 +671,12 @@ public class RegionCoprocessorHost defaultResult, user) { @Override public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preCompact(this, store, getResult(), scanType, tracker, request); + InternalScanner scanner = + observer.preCompact(this, store, getResult(), scanType, tracker, request); + if (scanner == null) { + throw new CoprocessorException("Null Scanner return disallowed!"); + } + return scanner; } }); } @@ -715,9 +721,9 @@ public class RegionCoprocessorHost /** * Invoked before a memstore flush + * @return Scanner to use (cannot be null!) * @throws IOException */ - // A Coprocessor can return null to cancel Flush. Leaving for now but this is a form of 'bypass'. public InternalScanner preFlush(HStore store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { if (coprocEnvironments.isEmpty()) { @@ -727,7 +733,11 @@ public class RegionCoprocessorHost new ObserverOperationWithResult(regionObserverGetter, scanner) { @Override public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preFlush(this, store, getResult(), tracker); + InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker); + if (scanner == null) { + throw new CoprocessorException("Null Scanner return disallowed!"); + } + return scanner; } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 259b3334fec..a227979e579 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -64,9 +64,6 @@ public class StripeStoreFlusher extends StoreFlusher { long smallestReadPoint = store.getSmallestReadPoint(); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); - if (scanner == null) { - return result; // NULL scanner returned from coprocessor hooks means skip normal processing - } // Let policy select flush method. StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 817ddf87f79..014d4d1daf8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -320,10 +320,6 @@ public abstract class Compactor { ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user); scanner = postCompactScannerOpen(request, scanType, scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user); - if (scanner == null) { - // NULL scanner returned from coprocessor hooks means skip normal processing. - return new ArrayList<>(); - } boolean cleanSeqId = false; if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) { // For mvcc-sensitive family, we never set mvcc to 0. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java new file mode 100644 index 00000000000..cea0833e285 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverPreFlushAndPreCompact.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CategoryBasedTimeout; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.rules.TestRule; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY; + + + +/** + * Test that we fail if a Coprocessor tries to return a null scanner out + * {@link RegionObserver#preFlush(ObserverContext, Store, InternalScanner, FlushLifeCycleTracker)} + * or {@link RegionObserver#preCompact(ObserverContext, Store, InternalScanner, ScanType, + * CompactionLifeCycleTracker, CompactionRequest)} + * @see HBASE-19122 + */ +@Category({CoprocessorTests.class, SmallTests.class}) +public class TestRegionObserverPreFlushAndPreCompact { + @Rule public final TestRule timeout = CategoryBasedTimeout.builder(). + withTimeout(this.getClass()).withLookingForStuckThread(true).build(); + @Rule public TestName name = new TestName(); + + /** + * Coprocessor that returns null when preCompact or preFlush is called. + */ + public static class TestRegionObserver implements RegionObserver, RegionCoprocessor { + @Override + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + return null; + } + + @Override + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return null; + } + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + } + + /** + * Ensure we get expected exception when we try to return null from a preFlush call. + * @throws IOException We expect it to throw {@link CoprocessorException} + */ + @Test (expected = CoprocessorException.class) + public void testPreFlushReturningNull() throws IOException { + RegionCoprocessorHost rch = getRegionCoprocessorHost(); + rch.preFlush(null, null, null); + } + + /** + * Ensure we get expected exception when we try to return null from a preCompact call. + * @throws IOException We expect it to throw {@link CoprocessorException} + */ + @Test (expected = CoprocessorException.class) + public void testPreCompactReturningNull() throws IOException { + RegionCoprocessorHost rch = getRegionCoprocessorHost(); + rch.preCompact(null, null, null, null, null, null); + } + + private RegionCoprocessorHost getRegionCoprocessorHost() { + // Make up an HRegion instance. Use the hbase:meta first region as our RegionInfo. Use + // hbase:meta table name for building the TableDescriptor our mock returns when asked schema + // down inside RegionCoprocessorHost. Pass in mocked RegionServerServices too. + RegionInfo ri = RegionInfoBuilder.FIRST_META_REGIONINFO; + HRegion mockedHRegion = Mockito.mock(HRegion.class); + Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(ri); + TableDescriptor td = TableDescriptorBuilder.newBuilder(ri.getTable()).build(); + Mockito.when(mockedHRegion.getTableDescriptor()).thenReturn(td); + RegionServerServices mockedServices = Mockito.mock(RegionServerServices.class); + Configuration conf = HBaseConfiguration.create(); + // Load our test coprocessor defined above. + conf.set(REGION_COPROCESSOR_CONF_KEY, TestRegionObserver.class.getName()); + return new RegionCoprocessorHost(mockedHRegion, mockedServices, conf); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index d538b15bbec..0f46f69aa10 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -378,52 +378,6 @@ public class TestHRegion { .getWAL(tableName.toBytes(), tableName.getNamespace()); } - /** - * Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down - */ - @Test - public void testMemstoreSizeWithFlushCanceling() throws IOException { - FileSystem fs = FileSystem.get(CONF); - Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling"); - FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF); - HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog, - COLUMN_FAMILY_BYTES); - HStore store = region.getStore(COLUMN_FAMILY_BYTES); - assertEquals(0, region.getMemStoreSize()); - - // Put some value and make sure flush could be completed normally - byte [] value = Bytes.toBytes(method); - Put put = new Put(value); - put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); - region.put(put); - long onePutSize = region.getMemStoreSize(); - assertTrue(onePutSize > 0); - region.flush(true); - assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize()); - assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize()); - - // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests - RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); - RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); - when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class), - Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null); - when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class), - Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo()); - region.setCoprocessorHost(mockedCPHost); - region.put(put); - region.flush(true); - assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemStoreSize()); - assertEquals("flushable size should NOT be zero", onePutSize, - store.getFlushableSize().getDataSize()); - - // set normalCPHost and flush again, the snapshot will be flushed - region.setCoprocessorHost(normalCPHost); - region.flush(true); - assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize()); - assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize()); - HBaseTestingUtility.closeRegionAndWAL(region); - } - @Test public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException { String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";