diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9ccf33a509a..9581b2ad974 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -874,7 +874,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @return What the next sequence (edit) id should be. * @throws IOException e */ - private long initialize(final CancelableProgressable reporter) throws IOException { + @VisibleForTesting + long initialize(final CancelableProgressable reporter) throws IOException { //Refuse to open the region if there is no column family in the table if (htableDescriptor.getColumnFamilyCount() == 0) { @@ -887,6 +888,22 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { nextSeqId = initializeRegionInternals(reporter, status); return nextSeqId; + } catch (IOException e) { + LOG.warn("Failed initialize of region= {}, starting to roll back memstore", + getRegionInfo().getRegionNameAsString(), e); + // global memstore size will be decreased when dropping memstore + try { + //drop the memory used by memstore if open region fails + dropMemStoreContents(); + } catch (IOException ioE) { + if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) { + LOG.warn("Failed drop memstore of region= {}, " + + "some chunks may not released forever since MSLAB is enabled", + getRegionInfo().getRegionNameAsString()); + } + + } + throw e; } finally { // nextSeqid will be -1 if the initialization fails. // At least it will be 0 otherwise. @@ -4510,11 +4527,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } } - // The edits size added into rsAccounting during this replaying will not - // be required any more. So just clear it. - if (this.rsAccounting != null) { - this.rsAccounting.clearRegionReplayEditsSize(getRegionInfo().getRegionName()); - } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); @@ -4696,9 +4708,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi editsCount++; } MemStoreSize mss = memStoreSizing.getMemStoreSize(); - if (this.rsAccounting != null) { - rsAccounting.addRegionReplayEditsSize(getRegionInfo().getRegionName(), mss); - } incMemStoreSize(mss); flush = isFlushSize(this.memStoreSizing.getMemStoreSize()); if (flush) { @@ -5140,6 +5149,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + /** + * Be careful, this method will drop all data in the memstore of this region. + * Currently, this method is used to drop memstore to prevent memory leak + * when replaying recovered.edits while opening region. + */ + public MemStoreSize dropMemStoreContents() throws IOException { + MemStoreSizing totalFreedSize = new NonThreadSafeMemStoreSizing(); + this.updatesLock.writeLock().lock(); + try { + for (HStore s : stores.values()) { + MemStoreSize memStoreSize = doDropStoreMemStoreContentsForSeqId(s, HConstants.NO_SEQNUM); + LOG.info("Drop memstore for Store " + s.getColumnFamilyName() + " in region " + + this.getRegionInfo().getRegionNameAsString() + + " , dropped memstoresize: [" + memStoreSize + " }"); + totalFreedSize.incMemStoreSize(memStoreSize); + } + return totalFreedSize.getMemStoreSize(); + } finally { + this.updatesLock.writeLock().unlock(); + } + } + /** * Drops the memstore contents after replaying a flush descriptor or region open event replay * if the memstore edits have seqNums smaller than the given seq id diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c8d4255fe0b..cb257ede5a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -2348,6 +2348,10 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat @Override public void abort() throws IOException { if (snapshot != null) { + //We need to close the snapshot when aborting, otherwise, the segment scanner + //won't be closed. If we are using MSLAB, the chunk referenced by those scanners + //can't be released, thus memory leak + snapshot.close(); HStore.this.updateStorefiles(Collections.emptyList(), snapshot.getId()); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java index 4e66fc7583a..baa9a6a0d2e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerAccounting.java @@ -19,14 +19,11 @@ package org.apache.hadoop.hbase.regionserver; import java.lang.management.MemoryType; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.LongAdder; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.util.MemorySizeUtil; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; /** @@ -43,11 +40,6 @@ public class RegionServerAccounting { // memstore off-heap size. private final LongAdder globalMemStoreOffHeapSize = new LongAdder(); - // Store the edits size during replaying WAL. Use this to roll back the - // global memstore size once a region opening failed. - private final ConcurrentMap replayEditsPerRegion = - new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - private long globalMemStoreLimit; private final float globalMemStoreLimitLowMarkPercent; private long globalMemStoreLimitLowMark; @@ -216,48 +208,4 @@ public class RegionServerAccounting { getGlobalMemStoreHeapSize() * 1.0 / globalOnHeapMemstoreLimitLowMark); } } - - /*** - * Add memStoreSize to replayEditsPerRegion. - * - * @param regionName region name. - * @param memStoreSize the Memstore size will be added to replayEditsPerRegion. - */ - public void addRegionReplayEditsSize(byte[] regionName, MemStoreSize memStoreSize) { - MemStoreSizing replayEdistsSize = replayEditsPerRegion.get(regionName); - // All ops on the same MemStoreSize object is going to be done by single thread, sequentially - // only. First calls to this method to increment the per region reply edits size and then call - // to either rollbackRegionReplayEditsSize or clearRegionReplayEditsSize as per the result of - // the region open operation. No need to handle multi thread issues on one region's entry in - // this Map. - if (replayEdistsSize == null) { - replayEdistsSize = new ThreadSafeMemStoreSizing(); - replayEditsPerRegion.put(regionName, replayEdistsSize); - } - replayEdistsSize.incMemStoreSize(memStoreSize); - } - - /** - * Roll back the global MemStore size for a specified region when this region - * can't be opened. - * - * @param regionName the region which could not open. - */ - public void rollbackRegionReplayEditsSize(byte[] regionName) { - MemStoreSizing replayEditsSizing = replayEditsPerRegion.get(regionName); - if (replayEditsSizing != null) { - clearRegionReplayEditsSize(regionName); - decGlobalMemStoreSize(replayEditsSizing.getDataSize(), replayEditsSizing.getHeapSize(), - replayEditsSizing.getOffHeapSize()); - } - } - - /** - * Clear a region from replayEditsPerRegion. - * - * @param regionName region name. - */ - public void clearRegionReplayEditsSize(byte[] regionName) { - replayEditsPerRegion.remove(regionName); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java index f408629534a..970911fde70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java @@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext; import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext; @@ -300,16 +299,7 @@ public class OpenRegionHandler extends EventHandler { // and transition the node back to FAILED_OPEN. If that fails, // we rely on the Timeout Monitor in the master to reassign. LOG.error( - "Failed open of region=" + this.regionInfo.getRegionNameAsString() - + ", starting to roll back the global memstore size.", t); - // Decrease the global memstore size. - if (this.rsServices != null) { - RegionServerAccounting rsAccounting = - this.rsServices.getRegionServerAccounting(); - if (rsAccounting != null) { - rsAccounting.rollbackRegionReplayEditsSize(this.regionInfo.getRegionName()); - } - } + "Failed open of region=" + this.regionInfo.getRegionNameAsString(), t); } return region; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java new file mode 100644 index 00000000000..b0cbd58f730 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -0,0 +1,214 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALEdit; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALProvider; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * HBASE-21031 + * If replay edits fails, we need to make sure memstore is rollbacked + * And if MSLAB is used, all chunk is released too. + */ +@Category({RegionServerTests.class, SmallTests.class }) +public class TestRecoveredEditsReplayAndAbort { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRecoveredEditsReplayAndAbort.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestRecoveredEditsReplayAndAbort.class); + + protected final byte[] row = Bytes.toBytes("rowA"); + + protected final static byte [] fam1 = Bytes.toBytes("colfamily11"); + + @Rule + public TestName name = new TestName(); + + // Test names + protected TableName tableName; + protected String method; + + protected static HBaseTestingUtility TEST_UTIL; + public static Configuration CONF ; + private static FileSystem FILESYSTEM; + private HRegion region = null; + + private final Random random = new Random(); + + @Before + public void setup() throws IOException { + TEST_UTIL = new HBaseTestingUtility(); + FILESYSTEM = TEST_UTIL.getTestFileSystem(); + CONF = TEST_UTIL.getConfiguration(); + method = name.getMethodName(); + tableName = TableName.valueOf(method); + } + + @After + public void tearDown() throws Exception { + LOG.info("Cleaning test directory: " + TEST_UTIL.getDataTestDir()); + TEST_UTIL.cleanupTestDir(); + } + + @Test + public void test() throws Exception { + //set flush size to 10MB + CONF.setInt("hbase.hregion.memstore.flush.size", 1024 * 1024 * 10); + //set the report interval to a very small value + CONF.setInt("hbase.hstore.report.interval.edits", 1); + CONF.setInt("hbase.hstore.report.period", 0); + //mock a RegionServerServices + final RegionServerAccounting rsAccounting = new RegionServerAccounting(CONF); + RegionServerServices rs = Mockito.mock(RegionServerServices.class); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + Mockito.when(rs.getRegionServerAccounting()).thenReturn(rsAccounting); + Mockito.when(rs.isAborted()).thenReturn(false); + Mockito.when(rs.getNonceManager()).thenReturn(null); + Mockito.when(rs.getServerName()).thenReturn(ServerName + .valueOf("test", 0, 111)); + //create a region + TableName testTable = TableName.valueOf("testRecoveredEidtsReplayAndAbort"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(testTable) + .addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(fam1).build()) + .build(); + HRegionInfo info = new HRegionInfo(htd.getTableName(), + HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY, false); + Path logDir = TEST_UTIL + .getDataTestDirOnTestFS("TestRecoveredEidtsReplayAndAbort.log"); + final WAL wal = HBaseTestingUtility.createWal(CONF, logDir, info); + Path rootDir = TEST_UTIL.getDataTestDir(); + Path tableDir = FSUtils.getTableDir(rootDir, info.getTable()); + HRegionFileSystem + .createRegionOnFileSystem(CONF, TEST_UTIL.getTestFileSystem(), tableDir, info); + region = HRegion.newHRegion(tableDir, wal, TEST_UTIL.getTestFileSystem(), CONF, info, + htd, rs); + //create some recovered.edits + final WALFactory wals = new WALFactory(CONF, method); + try { + Path regiondir = region.getRegionFileSystem().getRegionDir(); + FileSystem fs = region.getRegionFileSystem().getFileSystem(); + byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); + + Path recoveredEditsDir = WALSplitter + .getRegionDirRecoveredEditsDir(regiondir); + long maxSeqId = 1200; + long minSeqId = 1000; + long totalEdits = maxSeqId - minSeqId; + for (long i = minSeqId; i <= maxSeqId; i += 100) { + Path recoveredEdits = new Path(recoveredEditsDir, + String.format("%019d", i)); + LOG.info("Begin to write recovered.edits : " + recoveredEdits); + fs.create(recoveredEdits); + WALProvider.Writer writer = wals + .createRecoveredEditsWriter(fs, recoveredEdits); + for (long j = i; j < i + 100; j++) { + long time = System.nanoTime(); + WALEdit edit = new WALEdit(); + // 200KB kv + byte[] value = new byte[200 * 1024]; + random.nextBytes(value); + edit.add( + new KeyValue(row, fam1, Bytes.toBytes(j), time, KeyValue.Type.Put, + value)); + writer.append(new WAL.Entry( + new WALKeyImpl(regionName, tableName, j, time, + HConstants.DEFAULT_CLUSTER_ID), edit)); + } + writer.close(); + } + MonitoredTask status = TaskMonitor.get().createStatus(method); + //try to replay the edits + try { + region.initialize(new CancelableProgressable() { + private long replayedEdits = 0; + + @Override + public boolean progress() { + replayedEdits++; + //during replay, rsAccounting should align with global memstore, because + //there is only one memstore here + Assert.assertEquals(rsAccounting.getGlobalMemStoreDataSize(), + region.getMemStoreDataSize()); + Assert.assertEquals(rsAccounting.getGlobalMemStoreHeapSize(), + region.getMemStoreHeapSize()); + Assert.assertEquals(rsAccounting.getGlobalMemStoreOffHeapSize(), + region.getMemStoreOffHeapSize()); + // abort the replay before finishing, leaving some edits in the memory + return replayedEdits < totalEdits - 10; + } + }); + Assert.fail("Should not reach here"); + } catch (IOException t) { + LOG.info("Current memstore: " + region.getMemStoreDataSize() + ", " + region + .getMemStoreHeapSize() + ", " + region + .getMemStoreOffHeapSize()); + } + //After aborting replay, there should be no data in the memory + Assert.assertEquals(0, rsAccounting.getGlobalMemStoreDataSize()); + Assert.assertEquals(0, region.getMemStoreDataSize()); + //All the chunk in the MSLAB should be recycled, otherwise, there might be + //a memory leak. + Assert.assertEquals(0, ChunkCreator.getInstance().numberOfMappedChunks()); + } finally { + HBaseTestingUtility.closeRegionAndWAL(this.region); + this.region = null; + wals.close(); + } + } +}