HBASE-19122 preCompact and preFlush can bypass by returning null scanner; shut it down
Removed TestHRegion#testMemstoreSizeWithFlushCanceling test. CPs are not able to cancel a flush any more so this test was failing; removed it (It was testing memory accounting kept working across a cancelled flush). Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
8c2a962d1c
commit
3c24c6eda6
|
@ -110,9 +110,6 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
|||
// Use a store scanner to find which rows to flush.
|
||||
long smallestReadPoint = store.getSmallestReadPoint();
|
||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||
if (scanner == null) {
|
||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
||||
}
|
||||
StoreFileWriter writer;
|
||||
try {
|
||||
// TODO: We can fail in the below block before we complete adding this flush to
|
||||
|
|
|
@ -54,10 +54,6 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
|||
// Use a store scanner to find which rows to flush.
|
||||
long smallestReadPoint = store.getSmallestReadPoint();
|
||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||
if (scanner == null) {
|
||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
||||
}
|
||||
|
||||
StoreFileWriter writer;
|
||||
try {
|
||||
// TODO: We can fail in the below block before we complete adding this flush to
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
|||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
|
||||
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
|
||||
|
@ -655,9 +656,9 @@ public class RegionCoprocessorHost
|
|||
* @param tracker used to track the life cycle of a compaction
|
||||
* @param request the compaction request
|
||||
* @param user the user
|
||||
* @return Scanner to use (cannot be null!)
|
||||
* @throws IOException
|
||||
*/
|
||||
// A Coprocessor can return null to cancel Compact. Leaving for now but this is form of 'bypass'.
|
||||
public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
|
||||
final ScanType scanType, final CompactionLifeCycleTracker tracker,
|
||||
final CompactionRequest request, final User user) throws IOException {
|
||||
|
@ -670,7 +671,12 @@ public class RegionCoprocessorHost
|
|||
defaultResult, user) {
|
||||
@Override
|
||||
public InternalScanner call(RegionObserver observer) throws IOException {
|
||||
return observer.preCompact(this, store, getResult(), scanType, tracker, request);
|
||||
InternalScanner scanner =
|
||||
observer.preCompact(this, store, getResult(), scanType, tracker, request);
|
||||
if (scanner == null) {
|
||||
throw new CoprocessorException("Null Scanner return disallowed!");
|
||||
}
|
||||
return scanner;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -715,9 +721,9 @@ public class RegionCoprocessorHost
|
|||
|
||||
/**
|
||||
* Invoked before a memstore flush
|
||||
* @return Scanner to use (cannot be null!)
|
||||
* @throws IOException
|
||||
*/
|
||||
// A Coprocessor can return null to cancel Flush. Leaving for now but this is a form of 'bypass'.
|
||||
public InternalScanner preFlush(HStore store, InternalScanner scanner,
|
||||
FlushLifeCycleTracker tracker) throws IOException {
|
||||
if (coprocEnvironments.isEmpty()) {
|
||||
|
@ -727,7 +733,11 @@ public class RegionCoprocessorHost
|
|||
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
|
||||
@Override
|
||||
public InternalScanner call(RegionObserver observer) throws IOException {
|
||||
return observer.preFlush(this, store, getResult(), tracker);
|
||||
InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
|
||||
if (scanner == null) {
|
||||
throw new CoprocessorException("Null Scanner return disallowed!");
|
||||
}
|
||||
return scanner;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -64,9 +64,6 @@ public class StripeStoreFlusher extends StoreFlusher {
|
|||
|
||||
long smallestReadPoint = store.getSmallestReadPoint();
|
||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||
if (scanner == null) {
|
||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
||||
}
|
||||
|
||||
// Let policy select flush method.
|
||||
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
|
||||
|
|
|
@ -320,10 +320,6 @@ public abstract class Compactor<T extends CellSink> {
|
|||
ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
|
||||
scanner = postCompactScannerOpen(request, scanType,
|
||||
scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
|
||||
if (scanner == null) {
|
||||
// NULL scanner returned from coprocessor hooks means skip normal processing.
|
||||
return new ArrayList<>();
|
||||
}
|
||||
boolean cleanSeqId = false;
|
||||
if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
|
||||
// For mvcc-sensitive family, we never set mvcc to 0.
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.coprocessor;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import org.junit.rules.TestRule;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Test that we fail if a Coprocessor tries to return a null scanner out
|
||||
* {@link RegionObserver#preFlush(ObserverContext, Store, InternalScanner, FlushLifeCycleTracker)}
|
||||
* or {@link RegionObserver#preCompact(ObserverContext, Store, InternalScanner, ScanType,
|
||||
* CompactionLifeCycleTracker, CompactionRequest)}
|
||||
* @see <a href=https://issues.apache.org/jira/browse/HBASE-19122>HBASE-19122</a>
|
||||
*/
|
||||
@Category({CoprocessorTests.class, SmallTests.class})
|
||||
public class TestRegionObserverPreFlushAndPreCompact {
|
||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||
withTimeout(this.getClass()).withLookingForStuckThread(true).build();
|
||||
@Rule public TestName name = new TestName();
|
||||
|
||||
/**
|
||||
* Coprocessor that returns null when preCompact or preFlush is called.
|
||||
*/
|
||||
public static class TestRegionObserver implements RegionObserver, RegionCoprocessor {
|
||||
@Override
|
||||
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
|
||||
CompactionRequest request) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<RegionObserver> getRegionObserver() {
|
||||
return Optional.of(this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure we get expected exception when we try to return null from a preFlush call.
|
||||
* @throws IOException We expect it to throw {@link CoprocessorException}
|
||||
*/
|
||||
@Test (expected = CoprocessorException.class)
|
||||
public void testPreFlushReturningNull() throws IOException {
|
||||
RegionCoprocessorHost rch = getRegionCoprocessorHost();
|
||||
rch.preFlush(null, null, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Ensure we get expected exception when we try to return null from a preCompact call.
|
||||
* @throws IOException We expect it to throw {@link CoprocessorException}
|
||||
*/
|
||||
@Test (expected = CoprocessorException.class)
|
||||
public void testPreCompactReturningNull() throws IOException {
|
||||
RegionCoprocessorHost rch = getRegionCoprocessorHost();
|
||||
rch.preCompact(null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
private RegionCoprocessorHost getRegionCoprocessorHost() {
|
||||
// Make up an HRegion instance. Use the hbase:meta first region as our RegionInfo. Use
|
||||
// hbase:meta table name for building the TableDescriptor our mock returns when asked schema
|
||||
// down inside RegionCoprocessorHost. Pass in mocked RegionServerServices too.
|
||||
RegionInfo ri = RegionInfoBuilder.FIRST_META_REGIONINFO;
|
||||
HRegion mockedHRegion = Mockito.mock(HRegion.class);
|
||||
Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(ri);
|
||||
TableDescriptor td = TableDescriptorBuilder.newBuilder(ri.getTable()).build();
|
||||
Mockito.when(mockedHRegion.getTableDescriptor()).thenReturn(td);
|
||||
RegionServerServices mockedServices = Mockito.mock(RegionServerServices.class);
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
// Load our test coprocessor defined above.
|
||||
conf.set(REGION_COPROCESSOR_CONF_KEY, TestRegionObserver.class.getName());
|
||||
return new RegionCoprocessorHost(mockedHRegion, mockedServices, conf);
|
||||
}
|
||||
}
|
|
@ -378,52 +378,6 @@ public class TestHRegion {
|
|||
.getWAL(tableName.toBytes(), tableName.getNamespace());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
|
||||
*/
|
||||
@Test
|
||||
public void testMemstoreSizeWithFlushCanceling() throws IOException {
|
||||
FileSystem fs = FileSystem.get(CONF);
|
||||
Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
|
||||
FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
|
||||
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
|
||||
COLUMN_FAMILY_BYTES);
|
||||
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||
assertEquals(0, region.getMemStoreSize());
|
||||
|
||||
// Put some value and make sure flush could be completed normally
|
||||
byte [] value = Bytes.toBytes(method);
|
||||
Put put = new Put(value);
|
||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
|
||||
region.put(put);
|
||||
long onePutSize = region.getMemStoreSize();
|
||||
assertTrue(onePutSize > 0);
|
||||
region.flush(true);
|
||||
assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
|
||||
assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
|
||||
|
||||
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
|
||||
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
|
||||
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
||||
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
|
||||
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
|
||||
when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class),
|
||||
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo());
|
||||
region.setCoprocessorHost(mockedCPHost);
|
||||
region.put(put);
|
||||
region.flush(true);
|
||||
assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemStoreSize());
|
||||
assertEquals("flushable size should NOT be zero", onePutSize,
|
||||
store.getFlushableSize().getDataSize());
|
||||
|
||||
// set normalCPHost and flush again, the snapshot will be flushed
|
||||
region.setCoprocessorHost(normalCPHost);
|
||||
region.flush(true);
|
||||
assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
|
||||
assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
|
||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
|
||||
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
|
||||
|
|
Loading…
Reference in New Issue