HBASE-19122 preCompact and preFlush can bypass by returning null scanner; shut it down
Removed TestHRegion#testMemstoreSizeWithFlushCanceling test. CPs are not able to cancel a flush any more so this test was failing; removed it (It was testing memory accounting kept working across a cancelled flush). Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
parent
7acf3f9a9c
commit
194efe3e5a
|
@ -110,9 +110,6 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
|
||||||
// Use a store scanner to find which rows to flush.
|
// Use a store scanner to find which rows to flush.
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
long smallestReadPoint = store.getSmallestReadPoint();
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||||
if (scanner == null) {
|
|
||||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
|
||||||
}
|
|
||||||
StoreFileWriter writer;
|
StoreFileWriter writer;
|
||||||
try {
|
try {
|
||||||
// TODO: We can fail in the below block before we complete adding this flush to
|
// TODO: We can fail in the below block before we complete adding this flush to
|
||||||
|
|
|
@ -54,10 +54,6 @@ public class DefaultStoreFlusher extends StoreFlusher {
|
||||||
// Use a store scanner to find which rows to flush.
|
// Use a store scanner to find which rows to flush.
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
long smallestReadPoint = store.getSmallestReadPoint();
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||||
if (scanner == null) {
|
|
||||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
|
||||||
}
|
|
||||||
|
|
||||||
StoreFileWriter writer;
|
StoreFileWriter writer;
|
||||||
try {
|
try {
|
||||||
// TODO: We can fail in the below block before we complete adding this flush to
|
// TODO: We can fail in the below block before we complete adding this flush to
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.BaseEnvironment;
|
||||||
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorServiceBackwardCompatiblity;
|
||||||
|
@ -655,9 +656,9 @@ public class RegionCoprocessorHost
|
||||||
* @param tracker used to track the life cycle of a compaction
|
* @param tracker used to track the life cycle of a compaction
|
||||||
* @param request the compaction request
|
* @param request the compaction request
|
||||||
* @param user the user
|
* @param user the user
|
||||||
|
* @return Scanner to use (cannot be null!)
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
// A Coprocessor can return null to cancel Compact. Leaving for now but this is form of 'bypass'.
|
|
||||||
public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
|
public InternalScanner preCompact(final HStore store, final InternalScanner scanner,
|
||||||
final ScanType scanType, final CompactionLifeCycleTracker tracker,
|
final ScanType scanType, final CompactionLifeCycleTracker tracker,
|
||||||
final CompactionRequest request, final User user) throws IOException {
|
final CompactionRequest request, final User user) throws IOException {
|
||||||
|
@ -670,7 +671,12 @@ public class RegionCoprocessorHost
|
||||||
defaultResult, user) {
|
defaultResult, user) {
|
||||||
@Override
|
@Override
|
||||||
public InternalScanner call(RegionObserver observer) throws IOException {
|
public InternalScanner call(RegionObserver observer) throws IOException {
|
||||||
return observer.preCompact(this, store, getResult(), scanType, tracker, request);
|
InternalScanner scanner =
|
||||||
|
observer.preCompact(this, store, getResult(), scanType, tracker, request);
|
||||||
|
if (scanner == null) {
|
||||||
|
throw new CoprocessorException("Null Scanner return disallowed!");
|
||||||
|
}
|
||||||
|
return scanner;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -715,9 +721,9 @@ public class RegionCoprocessorHost
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked before a memstore flush
|
* Invoked before a memstore flush
|
||||||
|
* @return Scanner to use (cannot be null!)
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
// A Coprocessor can return null to cancel Flush. Leaving for now but this is a form of 'bypass'.
|
|
||||||
public InternalScanner preFlush(HStore store, InternalScanner scanner,
|
public InternalScanner preFlush(HStore store, InternalScanner scanner,
|
||||||
FlushLifeCycleTracker tracker) throws IOException {
|
FlushLifeCycleTracker tracker) throws IOException {
|
||||||
if (coprocEnvironments.isEmpty()) {
|
if (coprocEnvironments.isEmpty()) {
|
||||||
|
@ -727,7 +733,11 @@ public class RegionCoprocessorHost
|
||||||
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
|
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter, scanner) {
|
||||||
@Override
|
@Override
|
||||||
public InternalScanner call(RegionObserver observer) throws IOException {
|
public InternalScanner call(RegionObserver observer) throws IOException {
|
||||||
return observer.preFlush(this, store, getResult(), tracker);
|
InternalScanner scanner = observer.preFlush(this, store, getResult(), tracker);
|
||||||
|
if (scanner == null) {
|
||||||
|
throw new CoprocessorException("Null Scanner return disallowed!");
|
||||||
|
}
|
||||||
|
return scanner;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,9 +64,6 @@ public class StripeStoreFlusher extends StoreFlusher {
|
||||||
|
|
||||||
long smallestReadPoint = store.getSmallestReadPoint();
|
long smallestReadPoint = store.getSmallestReadPoint();
|
||||||
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
|
||||||
if (scanner == null) {
|
|
||||||
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Let policy select flush method.
|
// Let policy select flush method.
|
||||||
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
|
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
|
||||||
|
|
|
@ -320,10 +320,6 @@ public abstract class Compactor<T extends CellSink> {
|
||||||
ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
|
ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
|
||||||
scanner = postCompactScannerOpen(request, scanType,
|
scanner = postCompactScannerOpen(request, scanType,
|
||||||
scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
|
scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
|
||||||
if (scanner == null) {
|
|
||||||
// NULL scanner returned from coprocessor hooks means skip normal processing.
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
boolean cleanSeqId = false;
|
boolean cleanSeqId = false;
|
||||||
if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
|
if (fd.minSeqIdToKeep > 0 && !store.getColumnFamilyDescriptor().isNewVersionBehavior()) {
|
||||||
// For mvcc-sensitive family, we never set mvcc to 0.
|
// For mvcc-sensitive family, we never set mvcc to 0.
|
||||||
|
|
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Optional;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCESSOR_CONF_KEY;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that we fail if a Coprocessor tries to return a null scanner out
|
||||||
|
* {@link RegionObserver#preFlush(ObserverContext, Store, InternalScanner, FlushLifeCycleTracker)}
|
||||||
|
* or {@link RegionObserver#preCompact(ObserverContext, Store, InternalScanner, ScanType,
|
||||||
|
* CompactionLifeCycleTracker, CompactionRequest)}
|
||||||
|
* @see <a href=https://issues.apache.org/jira/browse/HBASE-19122>HBASE-19122</a>
|
||||||
|
*/
|
||||||
|
@Category({CoprocessorTests.class, SmallTests.class})
|
||||||
|
public class TestRegionObserverPreFlushAndPreCompact {
|
||||||
|
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().
|
||||||
|
withTimeout(this.getClass()).withLookingForStuckThread(true).build();
|
||||||
|
@Rule public TestName name = new TestName();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coprocessor that returns null when preCompact or preFlush is called.
|
||||||
|
*/
|
||||||
|
public static class TestRegionObserver implements RegionObserver, RegionCoprocessor {
|
||||||
|
@Override
|
||||||
|
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
|
InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
|
||||||
|
CompactionRequest request) throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Optional<RegionObserver> getRegionObserver() {
|
||||||
|
return Optional.of(this);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure we get expected exception when we try to return null from a preFlush call.
|
||||||
|
* @throws IOException We expect it to throw {@link CoprocessorException}
|
||||||
|
*/
|
||||||
|
@Test (expected = CoprocessorException.class)
|
||||||
|
public void testPreFlushReturningNull() throws IOException {
|
||||||
|
RegionCoprocessorHost rch = getRegionCoprocessorHost();
|
||||||
|
rch.preFlush(null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensure we get expected exception when we try to return null from a preCompact call.
|
||||||
|
* @throws IOException We expect it to throw {@link CoprocessorException}
|
||||||
|
*/
|
||||||
|
@Test (expected = CoprocessorException.class)
|
||||||
|
public void testPreCompactReturningNull() throws IOException {
|
||||||
|
RegionCoprocessorHost rch = getRegionCoprocessorHost();
|
||||||
|
rch.preCompact(null, null, null, null, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private RegionCoprocessorHost getRegionCoprocessorHost() {
|
||||||
|
// Make up an HRegion instance. Use the hbase:meta first region as our RegionInfo. Use
|
||||||
|
// hbase:meta table name for building the TableDescriptor our mock returns when asked schema
|
||||||
|
// down inside RegionCoprocessorHost. Pass in mocked RegionServerServices too.
|
||||||
|
RegionInfo ri = RegionInfoBuilder.FIRST_META_REGIONINFO;
|
||||||
|
HRegion mockedHRegion = Mockito.mock(HRegion.class);
|
||||||
|
Mockito.when(mockedHRegion.getRegionInfo()).thenReturn(ri);
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(ri.getTable()).build();
|
||||||
|
Mockito.when(mockedHRegion.getTableDescriptor()).thenReturn(td);
|
||||||
|
RegionServerServices mockedServices = Mockito.mock(RegionServerServices.class);
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
// Load our test coprocessor defined above.
|
||||||
|
conf.set(REGION_COPROCESSOR_CONF_KEY, TestRegionObserver.class.getName());
|
||||||
|
return new RegionCoprocessorHost(mockedHRegion, mockedServices, conf);
|
||||||
|
}
|
||||||
|
}
|
|
@ -378,52 +378,6 @@ public class TestHRegion {
|
||||||
.getWAL(tableName.toBytes(), tableName.getNamespace());
|
.getWAL(tableName.toBytes(), tableName.getNamespace());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testMemstoreSizeWithFlushCanceling() throws IOException {
|
|
||||||
FileSystem fs = FileSystem.get(CONF);
|
|
||||||
Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
|
|
||||||
FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
|
|
||||||
HRegion region = initHRegion(tableName, null, null, false, Durability.SYNC_WAL, hLog,
|
|
||||||
COLUMN_FAMILY_BYTES);
|
|
||||||
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
|
|
||||||
assertEquals(0, region.getMemStoreSize());
|
|
||||||
|
|
||||||
// Put some value and make sure flush could be completed normally
|
|
||||||
byte [] value = Bytes.toBytes(method);
|
|
||||||
Put put = new Put(value);
|
|
||||||
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
|
|
||||||
region.put(put);
|
|
||||||
long onePutSize = region.getMemStoreSize();
|
|
||||||
assertTrue(onePutSize > 0);
|
|
||||||
region.flush(true);
|
|
||||||
assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
|
|
||||||
assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
|
|
||||||
|
|
||||||
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
|
|
||||||
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
|
|
||||||
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
|
||||||
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
|
|
||||||
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
|
|
||||||
when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class),
|
|
||||||
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo());
|
|
||||||
region.setCoprocessorHost(mockedCPHost);
|
|
||||||
region.put(put);
|
|
||||||
region.flush(true);
|
|
||||||
assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemStoreSize());
|
|
||||||
assertEquals("flushable size should NOT be zero", onePutSize,
|
|
||||||
store.getFlushableSize().getDataSize());
|
|
||||||
|
|
||||||
// set normalCPHost and flush again, the snapshot will be flushed
|
|
||||||
region.setCoprocessorHost(normalCPHost);
|
|
||||||
region.flush(true);
|
|
||||||
assertEquals("memstoreSize should be zero", 0, region.getMemStoreSize());
|
|
||||||
assertEquals("flushable size should be zero", 0, store.getFlushableSize().getDataSize());
|
|
||||||
HBaseTestingUtility.closeRegionAndWAL(region);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
|
public void testMemstoreSizeAccountingWithFailedPostBatchMutate() throws IOException {
|
||||||
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
|
String testName = "testMemstoreSizeAccountingWithFailedPostBatchMutate";
|
||||||
|
|
Loading…
Reference in New Issue