HBASE-22169 Open region failed cause memory leak
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
13d5a67094
commit
963cb45f1c
|
@ -7234,21 +7234,29 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
|
|||
*/
|
||||
protected HRegion openHRegion(final CancelableProgressable reporter)
|
||||
throws IOException {
|
||||
// Refuse to open the region if we are missing local compression support
|
||||
checkCompressionCodecs();
|
||||
// Refuse to open the region if encryption configuration is incorrect or
|
||||
// codec support is missing
|
||||
checkEncryption();
|
||||
// Refuse to open the region if a required class cannot be loaded
|
||||
checkClassLoading();
|
||||
this.openSeqNum = initialize(reporter);
|
||||
this.mvcc.advanceTo(openSeqNum);
|
||||
// The openSeqNum must be increased every time when a region is assigned, as we rely on it to
|
||||
// determine whether a region has been successfully reopened. So here we always write open
|
||||
// marker, even if the table is read only.
|
||||
if (wal != null && getRegionServerServices() != null &&
|
||||
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||
writeRegionOpenMarker(wal, openSeqNum);
|
||||
try {
|
||||
// Refuse to open the region if we are missing local compression support
|
||||
checkCompressionCodecs();
|
||||
// Refuse to open the region if encryption configuration is incorrect or
|
||||
// codec support is missing
|
||||
checkEncryption();
|
||||
// Refuse to open the region if a required class cannot be loaded
|
||||
checkClassLoading();
|
||||
this.openSeqNum = initialize(reporter);
|
||||
this.mvcc.advanceTo(openSeqNum);
|
||||
// The openSeqNum must be increased every time when a region is assigned, as we rely on it to
|
||||
// determine whether a region has been successfully reopened. So here we always write open
|
||||
// marker, even if the table is read only.
|
||||
if (wal != null && getRegionServerServices() != null &&
|
||||
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
|
||||
writeRegionOpenMarker(wal, openSeqNum);
|
||||
}
|
||||
}catch(Throwable t){
|
||||
// By coprocessor path wrong region will open failed,
|
||||
// MetricsRegionWrapperImpl is already init and not close,
|
||||
// add region close when open failed
|
||||
this.close();
|
||||
throw t;
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
|
|
@ -41,6 +41,7 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.math.BigDecimal;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
@ -52,11 +53,14 @@ import java.util.Map;
|
|||
import java.util.NavigableMap;
|
||||
import java.util.Objects;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -162,6 +166,7 @@ import org.apache.hadoop.hbase.wal.WALKeyImpl;
|
|||
import org.apache.hadoop.hbase.wal.WALProvider;
|
||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.apache.hadoop.metrics2.MetricsExecutor;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -6595,6 +6600,45 @@ public class TestHRegion {
|
|||
region.close();
|
||||
}
|
||||
|
||||
// make sure region is success close when coprocessor wrong region open failed
|
||||
@Test
|
||||
public void testOpenRegionFailedMemoryLeak() throws Exception {
|
||||
final ServerName serverName = ServerName.valueOf("testOpenRegionFailed", 100, 42);
|
||||
final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName));
|
||||
|
||||
HTableDescriptor htd
|
||||
= new HTableDescriptor(TableName.valueOf("testOpenRegionFailed"));
|
||||
htd.addFamily(new HColumnDescriptor(fam1));
|
||||
htd.setValue("COPROCESSOR$1", "hdfs://test/test.jar|test||");
|
||||
|
||||
HRegionInfo hri = new HRegionInfo(htd.getTableName(),
|
||||
HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY);
|
||||
ScheduledExecutorService executor = CompatibilitySingletonFactory.
|
||||
getInstance(MetricsExecutor.class).getExecutor();
|
||||
for (int i = 0; i < 20 ; i++) {
|
||||
try {
|
||||
HRegion.openHRegion(hri, htd, rss.getWAL(hri),
|
||||
TEST_UTIL.getConfiguration(), rss, null);
|
||||
}catch(Throwable t){
|
||||
LOG.info("Expected exception, continue");
|
||||
}
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(MetricsRegionWrapperImpl.PERIOD);
|
||||
Field[] fields = ThreadPoolExecutor.class.getDeclaredFields();
|
||||
boolean found = false;
|
||||
for(Field field : fields){
|
||||
if(field.getName().equals("workQueue")){
|
||||
field.setAccessible(true);
|
||||
BlockingQueue<Runnable> workQueue = (BlockingQueue<Runnable>)field.get(executor);
|
||||
//there are still two task not cancel, can not cause to memory lack
|
||||
Assert.assertTrue("ScheduledExecutor#workQueue should equals 2, now is " +
|
||||
workQueue.size() + ", please check region is close", 2 == workQueue.size());
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
Assert.assertTrue("can not find workQueue, test failed", found);
|
||||
}
|
||||
|
||||
/**
|
||||
* The same as HRegion class, the only difference is that instantiateHStore will
|
||||
* create a different HStore - HStoreForTesting. [HBASE-8518]
|
||||
|
|
|
@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -74,6 +76,11 @@ public class TestOpenSeqNumUnexpectedIncrease {
|
|||
throw new IOException("Inject error for testing");
|
||||
}
|
||||
}
|
||||
|
||||
public Map<byte[], List<HStoreFile>> close() throws IOException {
|
||||
//skip close
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
Loading…
Reference in New Issue