HBASE-21031 Memory leak if replay edits failed during region opening
This commit is contained in:
parent
5f03be4675
commit
c07afa8875
@ -874,7 +874,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||||||
* @return What the next sequence (edit) id should be.
|
* @return What the next sequence (edit) id should be.
|
||||||
* @throws IOException e
|
* @throws IOException e
|
||||||
*/
|
*/
|
||||||
private long initialize(final CancelableProgressable reporter) throws IOException {
|
@VisibleForTesting
|
||||||
|
long initialize(final CancelableProgressable reporter) throws IOException {
|
||||||
|
|
||||||
//Refuse to open the region if there is no column family in the table
|
//Refuse to open the region if there is no column family in the table
|
||||||
if (htableDescriptor.getColumnFamilyCount() == 0) {
|
if (htableDescriptor.getColumnFamilyCount() == 0) {
|
||||||
@ -887,6 +888,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||||||
try {
|
try {
|
||||||
nextSeqId = initializeRegionInternals(reporter, status);
|
nextSeqId = initializeRegionInternals(reporter, status);
|
||||||
return nextSeqId;
|
return nextSeqId;
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed initialize of region= {}, starting to roll back memstore",
|
||||||
|
getRegionInfo().getRegionNameAsString(), e);
|
||||||
|
// global memstore size will be decreased when dropping memstore
|
||||||
|
try {
|
||||||
|
//drop the memory used by memstore if open region fails
|
||||||
|
dropMemStoreContents();
|
||||||
|
} catch (IOException ioE) {
|
||||||
|
if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
|
||||||
|
LOG.warn("Failed drop memstore of region= {}, "
|
||||||
|
+ "some chunks may not released forever since MSLAB is enabled",
|
||||||
|
getRegionInfo().getRegionNameAsString());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
// nextSeqid will be -1 if the initialization fails.
|
// nextSeqid will be -1 if the initialization fails.
|
||||||
// At least it will be 0 otherwise.
|
// At least it will be 0 otherwise.
|
||||||
@ -4510,11 +4527,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// The edits size added into rsAccounting during this replaying will not
|
|
||||||
// be required any more. So just clear it.
|
|
||||||
if (this.rsAccounting != null) {
|
|
||||||
this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName());
|
|
||||||
}
|
|
||||||
if (seqid > minSeqIdForTheRegion) {
|
if (seqid > minSeqIdForTheRegion) {
|
||||||
// Then we added some edits to memory. Flush and cleanup split edit files.
|
// Then we added some edits to memory. Flush and cleanup split edit files.
|
||||||
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
|
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
|
||||||
@ -4696,9 +4708,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||||||
editsCount++;
|
editsCount++;
|
||||||
}
|
}
|
||||||
MemStoreSize mss = memStoreSizing.getMemStoreSize();
|
MemStoreSize mss = memStoreSizing.getMemStoreSize();
|
||||||
if (this.rsAccounting != null) {
|
|
||||||
rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss);
|
|
||||||
}
|
|
||||||
incMemStoreSize(mss);
|
incMemStoreSize(mss);
|
||||||
flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
|
flush = isFlushSize(this.memStoreSizing.getMemStoreSize());
|
||||||
if (flush) {
|
if (flush) {
|
||||||
@ -5140,6 +5149,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Be careful, this method will drop all data in the memstore of this region.
|
||||||
|
* Currently, this method is used to drop memstore to prevent memory leak
|
||||||
|
* when replaying recovered.edits while opening region.
|
||||||
|
*/
|
||||||
|
public MemStoreSize dropMemStoreContents() throws IOException {
|
||||||
|
MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing();
|
||||||
|
this.updatesLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
for (HStore s : stores.values()) {
|
||||||
|
MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM);
|
||||||
|
LOG.info("Drop memstore for Store " + s.getColumnFamilyName() + " in region "
|
||||||
|
+ this.getRegionInfo().getRegionNameAsString()
|
||||||
|
+ " , dropped memstoresize: [" + memStoreSize + " }");
|
||||||
|
totalFreedSize.incMemStoreSize(memStoreSize);
|
||||||
|
}
|
||||||
|
return totalFreedSize.getMemStoreSize();
|
||||||
|
} finally {
|
||||||
|
this.updatesLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drops the memstore contents after replaying a flush descriptor or region open event replay
|
* Drops the memstore contents after replaying a flush descriptor or region open event replay
|
||||||
* if the memstore edits have seqNums smaller than the given seq id
|
* if the memstore edits have seqNums smaller than the given seq id
|
||||||
|
@ -2348,6 +2348,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
|||||||
@Override
|
@Override
|
||||||
public void abort() throws IOException {
|
public void abort() throws IOException {
|
||||||
if (snapshot != null) {
|
if (snapshot != null) {
|
||||||
|
//We need to close the snapshot when aborting, otherwise, the segment scanner
|
||||||
|
//won't be closed. If we are using MSLAB, the chunk referenced by those scanners
|
||||||
|
//can't be released, thus memory leak
|
||||||
|
snapshot.close();
|
||||||
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
|
HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,14 +19,11 @@
|
|||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.lang.management.MemoryType;
|
import java.lang.management.MemoryType;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
|
||||||
import java.util.concurrent.atomic.LongAdder;
|
import java.util.concurrent.atomic.LongAdder;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
import org.apache.hadoop.hbase.io.util.MemorySizeUtil;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -43,11 +40,6 @@ public class RegionServerAccounting {
|
|||||||
// memstore off-heap size.
|
// memstore off-heap size.
|
||||||
private final LongAdder globalMemStoreOffHeapSize = new LongAdder();
|
private final LongAdder globalMemStoreOffHeapSize = new LongAdder();
|
||||||
|
|
||||||
// Store the edits size during replaying WAL. Use this to roll back the
|
|
||||||
// global memstore size once a region opening failed.
|
|
||||||
private final ConcurrentMap<byte[], MemStoreSizing> replayEditsPerRegion =
|
|
||||||
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
|
|
||||||
|
|
||||||
private long globalMemStoreLimit;
|
private long globalMemStoreLimit;
|
||||||
private final float globalMemStoreLimitLowMarkPercent;
|
private final float globalMemStoreLimitLowMarkPercent;
|
||||||
private long globalMemStoreLimitLowMark;
|
private long globalMemStoreLimitLowMark;
|
||||||
@ -216,48 +208,4 @@ public class RegionServerAccounting {
|
|||||||
getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark);
|
getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/***
|
|
||||||
* Add memStoreSize to replayEditsPerRegion.
|
|
||||||
*
|
|
||||||
* @param regionName region name.
|
|
||||||
* @param memStoreSize the Memstore size will be added to replayEditsPerRegion.
|
|
||||||
*/
|
|
||||||
public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) {
|
|
||||||
MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName);
|
|
||||||
// All ops on the same MemStoreSize object is going to be done by single thread, sequentially
|
|
||||||
// only. First calls to this method to increment the per region reply edits size and then call
|
|
||||||
// to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of
|
|
||||||
// the region open operation. No need to handle multi thread issues on one region's entry in
|
|
||||||
// this Map.
|
|
||||||
if (replayEdistsSize == null) {
|
|
||||||
replayEdistsSize = new ThreadSafeMemStoreSizing();
|
|
||||||
replayEditsPerRegion.put(regionName, replayEdistsSize);
|
|
||||||
}
|
|
||||||
replayEdistsSize.incMemStoreSize(memStoreSize);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Roll back the global MemStore size for a specified region when this region
|
|
||||||
* can't be opened.
|
|
||||||
*
|
|
||||||
* @param regionName the region which could not open.
|
|
||||||
*/
|
|
||||||
public void rollbackRegionReplayEditsSize(byte[] regionName) {
|
|
||||||
MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName);
|
|
||||||
if (replayEditsSizing != null) {
|
|
||||||
clearRegionReplayEditsSize(regionName);
|
|
||||||
decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(),
|
|
||||||
replayEditsSizing.getOffHeapSize());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Clear a region from replayEditsPerRegion.
|
|
||||||
*
|
|
||||||
* @param regionName region name.
|
|
||||||
*/
|
|
||||||
public void clearRegionReplayEditsSize(byte[] regionName) {
|
|
||||||
replayEditsPerRegion.remove(regionName);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.executor.EventHandler;
|
|||||||
import org.apache.hadoop.hbase.executor.EventType;
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.Region;
|
import org.apache.hadoop.hbase.regionserver.Region;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerAccounting;
|
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
|
||||||
@ -300,16 +299,7 @@ public class OpenRegionHandler extends EventHandler {
|
|||||||
// and transition the node back to FAILED_OPEN. If that fails,
|
// and transition the node back to FAILED_OPEN. If that fails,
|
||||||
// we rely on the Timeout Monitor in the master to reassign.
|
// we rely on the Timeout Monitor in the master to reassign.
|
||||||
LOG.error(
|
LOG.error(
|
||||||
"Failed open of region=" + this.regionInfo.getRegionNameAsString()
|
"Failed open of region=" + this.regionInfo.getRegionNameAsString(), t);
|
||||||
+ ", starting to roll back the global memstore size.", t);
|
|
||||||
// Decrease the global memstore size.
|
|
||||||
if (this.rsServices != null) {
|
|
||||||
RegionServerAccounting rsAccounting =
|
|
||||||
this.rsServices.getRegionServerAccounting();
|
|
||||||
if (rsAccounting != null) {
|
|
||||||
rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return region;
|
return region;
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,214 @@
|
|||||||
|
/**
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestName;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBASE-21031
|
||||||
|
* If replay edits fails, we need to make sure memstore is rollbacked
|
||||||
|
* And if MSLAB is used, all chunk is released too.
|
||||||
|
*/
|
||||||
|
@Category({RegionServerTests.class, SmallTests.class })
|
||||||
|
public class TestRecoveredEditsReplayAndAbort {
|
||||||
|
@ClassRule
|
||||||
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
|
HBaseClassTestRule.forClass(TestRecoveredEditsReplayAndAbort.class);
|
||||||
|
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(TestRecoveredEditsReplayAndAbort.class);
|
||||||
|
|
||||||
|
protected final byte[] row = Bytes.toBytes("rowA");
|
||||||
|
|
||||||
|
protected final static byte [] fam1 = Bytes.toBytes("colfamily11");
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TestName name = new TestName();
|
||||||
|
|
||||||
|
// Test names
|
||||||
|
protected TableName tableName;
|
||||||
|
protected String method;
|
||||||
|
|
||||||
|
protected static HBaseTestingUtility TEST_UTIL;
|
||||||
|
public static Configuration CONF ;
|
||||||
|
private static FileSystem FILESYSTEM;
|
||||||
|
private HRegion region = null;
|
||||||
|
|
||||||
|
private final Random random = new Random();
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws IOException {
|
||||||
|
TEST_UTIL = new HBaseTestingUtility();
|
||||||
|
FILESYSTEM = TEST_UTIL.getTestFileSystem();
|
||||||
|
CONF = TEST_UTIL.getConfiguration();
|
||||||
|
method = name.getMethodName();
|
||||||
|
tableName = TableName.valueOf(method);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir());
|
||||||
|
TEST_UTIL.cleanupTestDir();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
//set flush size to 10MB
|
||||||
|
CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10);
|
||||||
|
//set the report interval to a very small value
|
||||||
|
CONF.setInt("hbase.hstore.report.interval.edits", 1);
|
||||||
|
CONF.setInt("hbase.hstore.report.period", 0);
|
||||||
|
//mock a RegionServerServices
|
||||||
|
final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF);
|
||||||
|
RegionServerServices rs = Mockito.mock(RegionServerServices.class);
|
||||||
|
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
|
||||||
|
Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting);
|
||||||
|
Mockito.when(rs.isAborted()).thenReturn(false);
|
||||||
|
Mockito.when(rs.getNonceManager()).thenReturn(null);
|
||||||
|
Mockito.when(rs.getServerName()).thenReturn(ServerName
|
||||||
|
.valueOf("test", 0, 111));
|
||||||
|
//create a region
|
||||||
|
TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort");
|
||||||
|
TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable)
|
||||||
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam1).build())
|
||||||
|
.build();
|
||||||
|
HRegionInfo info = new HRegionInfo(htd.getTableName(),
|
||||||
|
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false);
|
||||||
|
Path logDir = TEST_UTIL
|
||||||
|
.getDataTestDirOnTestFS("TestRecoveredEidtsReplayAndAbort.log");
|
||||||
|
final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info);
|
||||||
|
Path rootDir = TEST_UTIL.getDataTestDir();
|
||||||
|
Path tableDir = FSUtils.getTableDir(rootDir, info.getTable());
|
||||||
|
HRegionFileSystem
|
||||||
|
.createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info);
|
||||||
|
region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info,
|
||||||
|
htd, rs);
|
||||||
|
//create some recovered.edits
|
||||||
|
final WALFactory wals = new WALFactory(CONF, method);
|
||||||
|
try {
|
||||||
|
Path regiondir = region.getRegionFileSystem().getRegionDir();
|
||||||
|
FileSystem fs = region.getRegionFileSystem().getFileSystem();
|
||||||
|
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
|
||||||
|
|
||||||
|
Path recoveredEditsDir = WALSplitter
|
||||||
|
.getRegionDirRecoveredEditsDir(regiondir);
|
||||||
|
long maxSeqId = 1200;
|
||||||
|
long minSeqId = 1000;
|
||||||
|
long totalEdits = maxSeqId - minSeqId;
|
||||||
|
for (long i = minSeqId; i <= maxSeqId; i += 100) {
|
||||||
|
Path recoveredEdits = new Path(recoveredEditsDir,
|
||||||
|
String.format("%019d", i));
|
||||||
|
LOG.info("Begin to write recovered.edits : " + recoveredEdits);
|
||||||
|
fs.create(recoveredEdits);
|
||||||
|
WALProvider.Writer writer = wals
|
||||||
|
.createRecoveredEditsWriter(fs, recoveredEdits);
|
||||||
|
for (long j = i; j < i + 100; j++) {
|
||||||
|
long time = System.nanoTime();
|
||||||
|
WALEdit edit = new WALEdit();
|
||||||
|
// 200KB kv
|
||||||
|
byte[] value = new byte[200 * 1024];
|
||||||
|
random.nextBytes(value);
|
||||||
|
edit.add(
|
||||||
|
new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put,
|
||||||
|
value));
|
||||||
|
writer.append(new WAL.Entry(
|
||||||
|
new WALKeyImpl(regionName, tableName, j, time,
|
||||||
|
HConstants.DEFAULT_CLUSTER_ID), edit));
|
||||||
|
}
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
MonitoredTask status = TaskMonitor.get().createStatus(method);
|
||||||
|
//try to replay the edits
|
||||||
|
try {
|
||||||
|
region.initialize(new CancelableProgressable() {
|
||||||
|
private long replayedEdits = 0;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean progress() {
|
||||||
|
replayedEdits++;
|
||||||
|
//during replay, rsAccounting should align with global memstore, because
|
||||||
|
//there is only one memstore here
|
||||||
|
Assert.assertEquals(rsAccounting.getGlobalMemStoreDataSize(),
|
||||||
|
region.getMemStoreDataSize());
|
||||||
|
Assert.assertEquals(rsAccounting.getGlobalMemStoreHeapSize(),
|
||||||
|
region.getMemStoreHeapSize());
|
||||||
|
Assert.assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(),
|
||||||
|
region.getMemStoreOffHeapSize());
|
||||||
|
// abort the replay before finishing, leaving some edits in the memory
|
||||||
|
return replayedEdits < totalEdits - 10;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Assert.fail("Should not reach here");
|
||||||
|
} catch (IOException t) {
|
||||||
|
LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region
|
||||||
|
.getMemStoreHeapSize() + ", " + region
|
||||||
|
.getMemStoreOffHeapSize());
|
||||||
|
}
|
||||||
|
//After aborting replay, there should be no data in the memory
|
||||||
|
Assert.assertEquals(0, rsAccounting.getGlobalMemStoreDataSize());
|
||||||
|
Assert.assertEquals(0, region.getMemStoreDataSize());
|
||||||
|
//All the chunk in the MSLAB should be recycled, otherwise, there might be
|
||||||
|
//a memory leak.
|
||||||
|
Assert.assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks());
|
||||||
|
} finally {
|
||||||
|
HBaseTestingUtility.closeRegionAndWAL(this.region);
|
||||||
|
this.region = null;
|
||||||
|
wals.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user