HBASE-14229 Flushing canceled by coprocessor still leads to memstoreSize set down (Yerui Sun)
This commit is contained in:
parent
f8dd99d738
commit
15a88d2e1b
|
@ -521,6 +521,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
final FlushResult result; // indicating a failure result from prepare
|
final FlushResult result; // indicating a failure result from prepare
|
||||||
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
|
final TreeMap<byte[], StoreFlushContext> storeFlushCtxs;
|
||||||
final TreeMap<byte[], List<Path>> committedFiles;
|
final TreeMap<byte[], List<Path>> committedFiles;
|
||||||
|
final TreeMap<byte[], Long> storeFlushableSize;
|
||||||
final long startTime;
|
final long startTime;
|
||||||
final long flushOpSeqId;
|
final long flushOpSeqId;
|
||||||
final long flushedSeqId;
|
final long flushedSeqId;
|
||||||
|
@ -528,26 +529,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
|
|
||||||
/** Constructs an early exit case */
|
/** Constructs an early exit case */
|
||||||
PrepareFlushResult(FlushResult result, long flushSeqId) {
|
PrepareFlushResult(FlushResult result, long flushSeqId) {
|
||||||
this(result, null, null, Math.max(0, flushSeqId), 0, 0, 0);
|
this(result, null, null, null, Math.max(0, flushSeqId), 0, 0, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Constructs a successful prepare flush result */
|
/** Constructs a successful prepare flush result */
|
||||||
PrepareFlushResult(
|
PrepareFlushResult(
|
||||||
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
|
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
|
||||||
TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
|
TreeMap<byte[], List<Path>> committedFiles,
|
||||||
|
TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
|
||||||
long flushedSeqId, long totalFlushableSize) {
|
long flushedSeqId, long totalFlushableSize) {
|
||||||
this(null, storeFlushCtxs, committedFiles, startTime,
|
this(null, storeFlushCtxs, committedFiles, storeFlushableSize, startTime,
|
||||||
flushSeqId, flushedSeqId, totalFlushableSize);
|
flushSeqId, flushedSeqId, totalFlushableSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
private PrepareFlushResult(
|
private PrepareFlushResult(
|
||||||
FlushResult result,
|
FlushResult result,
|
||||||
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
|
TreeMap<byte[], StoreFlushContext> storeFlushCtxs,
|
||||||
TreeMap<byte[], List<Path>> committedFiles, long startTime, long flushSeqId,
|
TreeMap<byte[], List<Path>> committedFiles,
|
||||||
|
TreeMap<byte[], Long> storeFlushableSize, long startTime, long flushSeqId,
|
||||||
long flushedSeqId, long totalFlushableSize) {
|
long flushedSeqId, long totalFlushableSize) {
|
||||||
this.result = result;
|
this.result = result;
|
||||||
this.storeFlushCtxs = storeFlushCtxs;
|
this.storeFlushCtxs = storeFlushCtxs;
|
||||||
this.committedFiles = committedFiles;
|
this.committedFiles = committedFiles;
|
||||||
|
this.storeFlushableSize = storeFlushableSize;
|
||||||
this.startTime = startTime;
|
this.startTime = startTime;
|
||||||
this.flushOpSeqId = flushSeqId;
|
this.flushOpSeqId = flushSeqId;
|
||||||
this.flushedSeqId = flushedSeqId;
|
this.flushedSeqId = flushedSeqId;
|
||||||
|
@ -2156,6 +2160,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
= new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
|
= new TreeMap<byte[], StoreFlushContext>(Bytes.BYTES_COMPARATOR);
|
||||||
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
|
TreeMap<byte[], List<Path>> committedFiles = new TreeMap<byte[], List<Path>>(
|
||||||
Bytes.BYTES_COMPARATOR);
|
Bytes.BYTES_COMPARATOR);
|
||||||
|
TreeMap<byte[], Long> storeFlushableSize
|
||||||
|
= new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||||
// The sequence id of this flush operation which is used to log FlushMarker and pass to
|
// The sequence id of this flush operation which is used to log FlushMarker and pass to
|
||||||
// createFlushContext to use as the store file's sequence id. It can be in advance of edits
|
// createFlushContext to use as the store file's sequence id. It can be in advance of edits
|
||||||
// still in the memstore, edits that are in other column families yet to be flushed.
|
// still in the memstore, edits that are in other column families yet to be flushed.
|
||||||
|
@ -2194,6 +2200,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
|
totalFlushableSizeOfFlushableStores += s.getFlushableSize();
|
||||||
storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
|
storeFlushCtxs.put(s.getFamily().getName(), s.createFlushContext(flushOpSeqId));
|
||||||
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
|
committedFiles.put(s.getFamily().getName(), null); // for writing stores to WAL
|
||||||
|
storeFlushableSize.put(s.getFamily().getName(), s.getFlushableSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
// write the snapshot start to WAL
|
// write the snapshot start to WAL
|
||||||
|
@ -2260,7 +2267,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
mvcc.advanceMemstore(w);
|
mvcc.advanceMemstore(w);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return new PrepareFlushResult(storeFlushCtxs, committedFiles, startTime, flushOpSeqId,
|
return new PrepareFlushResult(storeFlushCtxs, committedFiles, storeFlushableSize, startTime, flushOpSeqId,
|
||||||
flushedSeqId, totalFlushableSizeOfFlushableStores);
|
flushedSeqId, totalFlushableSizeOfFlushableStores);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2335,7 +2342,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
||||||
if (needsCompaction) {
|
if (needsCompaction) {
|
||||||
compactionRequested = true;
|
compactionRequested = true;
|
||||||
}
|
}
|
||||||
committedFiles.put(it.next().getFamily().getName(), flush.getCommittedFiles());
|
byte[] storeName = it.next().getFamily().getName();
|
||||||
|
List<Path> storeCommittedFiles = flush.getCommittedFiles();
|
||||||
|
committedFiles.put(storeName, storeCommittedFiles);
|
||||||
|
// Flush committed no files, indicating flush is empty or flush was canceled
|
||||||
|
if (storeCommittedFiles == null || storeCommittedFiles.isEmpty()) {
|
||||||
|
totalFlushableSizeOfFlushableStores -= prepareResult.storeFlushableSize.get(storeName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
storeFlushCtxs.clear();
|
storeFlushCtxs.clear();
|
||||||
|
|
||||||
|
|
|
@ -33,10 +33,7 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.*;
|
||||||
import static org.mockito.Matchers.anyBoolean;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
@ -131,12 +128,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
import org.apache.hadoop.hbase.regionserver.Region.RowLock;
|
||||||
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
|
import org.apache.hadoop.hbase.regionserver.TestStore.FaultyFileSystem;
|
||||||
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
|
import org.apache.hadoop.hbase.regionserver.handler.FinishRegionRecoveringHandler;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
|
import org.apache.hadoop.hbase.regionserver.wal.*;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.MetricsWALSource;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -321,7 +313,7 @@ public class TestHRegion {
|
||||||
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
|
Path rootDir = new Path(dir + "testMemstoreSnapshotSize");
|
||||||
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
|
MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF);
|
||||||
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
|
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
|
||||||
CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
|
CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES);
|
||||||
|
|
||||||
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||||
// Get some random bytes.
|
// Get some random bytes.
|
||||||
|
@ -362,6 +354,48 @@ public class TestHRegion {
|
||||||
.getWAL(tableName.toBytes());
|
.getWAL(tableName.toBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for HBASE-14229: Flushing canceled by coprocessor still leads to memstoreSize set down
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMemstoreSizeWithFlushCanceling() throws IOException {
|
||||||
|
FileSystem fs = FileSystem.get(CONF);
|
||||||
|
Path rootDir = new Path(dir + "testMemstoreSizeWithFlushCanceling");
|
||||||
|
FSHLog hLog = new FSHLog(fs, rootDir, "testMemstoreSizeWithFlushCanceling", CONF);
|
||||||
|
HRegion region = initHRegion(tableName, null, null, name.getMethodName(),
|
||||||
|
CONF, false, Durability.SYNC_WAL, hLog, COLUMN_FAMILY_BYTES);
|
||||||
|
Store store = region.getStore(COLUMN_FAMILY_BYTES);
|
||||||
|
assertEquals(0, region.getMemstoreSize());
|
||||||
|
|
||||||
|
// Put some value and make sure flush could be completed normally
|
||||||
|
byte [] value = Bytes.toBytes(name.getMethodName());
|
||||||
|
Put put = new Put(value);
|
||||||
|
put.add(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
|
||||||
|
region.put(put);
|
||||||
|
long onePutSize = region.getMemstoreSize();
|
||||||
|
assertTrue(onePutSize > 0);
|
||||||
|
region.flush(true);
|
||||||
|
assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
|
||||||
|
assertEquals("flushable size should be zero", 0, store.getFlushableSize());
|
||||||
|
|
||||||
|
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
|
||||||
|
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
|
||||||
|
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
|
||||||
|
when(mockedCPHost.preFlush(isA(HStore.class), isA(InternalScanner.class))).thenReturn(null);
|
||||||
|
region.setCoprocessorHost(mockedCPHost);
|
||||||
|
region.put(put);
|
||||||
|
region.flush(true);
|
||||||
|
assertEquals("memstoreSize should NOT be zero", onePutSize, region.getMemstoreSize());
|
||||||
|
assertEquals("flushable size should NOT be zero", onePutSize, store.getFlushableSize());
|
||||||
|
|
||||||
|
// set normalCPHost and flush again, the snapshot will be flushed
|
||||||
|
region.setCoprocessorHost(normalCPHost);
|
||||||
|
region.flush(true);
|
||||||
|
assertEquals("memstoreSize should be zero", 0, region.getMemstoreSize());
|
||||||
|
assertEquals("flushable size should be zero", 0, store.getFlushableSize());
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(region);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test we do not lose data if we fail a flush and then close.
|
* Test we do not lose data if we fail a flush and then close.
|
||||||
* Part of HBase-10466. Tests the following from the issue description:
|
* Part of HBase-10466. Tests the following from the issue description:
|
||||||
|
|
Loading…
Reference in New Issue