HBASE-20940 HStore.cansplit should not allow split to happen if it has references (Vishal Khandelwal)
This commit is contained in:
parent
f9793fafb7
commit
e8eb366514
|
@ -1701,7 +1701,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasReferences() {
|
public boolean hasReferences() {
|
||||||
return StoreUtils.hasReferences(this.storeEngine.getStoreFileManager().getStorefiles());
|
List<HStoreFile> reloadedStoreFiles = null;
|
||||||
|
try {
|
||||||
|
// Reloading the store files from file system due to HBASE-20940. As split can happen with an
|
||||||
|
// region which has references
|
||||||
|
reloadedStoreFiles = loadStoreFiles();
|
||||||
|
return StoreUtils.hasReferences(reloadedStoreFiles);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Error trying to determine if store has references, assuming references exists",
|
||||||
|
ioe);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIM
|
||||||
import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
|
import static org.apache.hadoop.hbase.master.LoadBalancer.TABLES_ON_MASTER;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -35,18 +34,21 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
import org.apache.hadoop.hbase.io.ByteBufferPool;
|
import org.apache.hadoop.hbase.io.ByteBufferPool;
|
||||||
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -124,7 +126,7 @@ public class TestAsyncTableGetMultiThreaded {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws IOException, InterruptedException, ExecutionException {
|
public void test() throws Exception {
|
||||||
int numThreads = 20;
|
int numThreads = 20;
|
||||||
AtomicBoolean stop = new AtomicBoolean(false);
|
AtomicBoolean stop = new AtomicBoolean(false);
|
||||||
ExecutorService executor =
|
ExecutorService executor =
|
||||||
|
@ -137,9 +139,31 @@ public class TestAsyncTableGetMultiThreaded {
|
||||||
Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
|
Collections.shuffle(Arrays.asList(SPLIT_KEYS), new Random(123));
|
||||||
Admin admin = TEST_UTIL.getAdmin();
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
for (byte[] splitPoint : SPLIT_KEYS) {
|
for (byte[] splitPoint : SPLIT_KEYS) {
|
||||||
|
int oldRegionCount = admin.getRegions(TABLE_NAME).size();
|
||||||
admin.split(TABLE_NAME, splitPoint);
|
admin.split(TABLE_NAME, splitPoint);
|
||||||
|
TEST_UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return TEST_UTIL.getMiniHBaseCluster().getRegions(TABLE_NAME).size() > oldRegionCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String explainFailure() throws Exception {
|
||||||
|
return "Split has not finished yet";
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
|
for (HRegion region : TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME)) {
|
||||||
region.compact(true);
|
region.compact(true);
|
||||||
|
|
||||||
|
//Waiting for compaction to complete and references are cleaned up
|
||||||
|
RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
|
||||||
|
while (CompactionState.NONE != admin
|
||||||
|
.getCompactionStateForRegion(region.getRegionInfo().getRegionName())
|
||||||
|
&& retrier.shouldRetry()) {
|
||||||
|
retrier.sleepUntilNextRetry();
|
||||||
|
}
|
||||||
|
region.getStores().get(0).closeAndArchiveCompactedFiles();
|
||||||
}
|
}
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
admin.balance(true);
|
admin.balance(true);
|
||||||
|
|
|
@ -107,6 +107,10 @@ public class TestChangingEncoding {
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
// Use a small flush size to create more HFiles.
|
// Use a small flush size to create more HFiles.
|
||||||
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
conf.setInt(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 1024 * 1024);
|
||||||
|
// Disabling split to make sure split does not cause modify column to wait which timesout test
|
||||||
|
// sometime
|
||||||
|
conf.set(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
|
||||||
|
"org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy");
|
||||||
// ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
|
// ((Log4JLogger)RpcServerImplementation.LOG).getLogger().setLevel(Level.TRACE);
|
||||||
// ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
|
// ((Log4JLogger)RpcClient.LOG).getLogger().setLevel(Level.TRACE);
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
|
|
|
@ -48,7 +48,9 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
|
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
||||||
|
@ -81,6 +83,7 @@ import org.apache.hadoop.hbase.snapshot.RestoreSnapshotException;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -365,6 +368,23 @@ public class TestNamespaceAuditor {
|
||||||
HRegion regionToSplit = UTIL.getMiniHBaseCluster().getRegions(tableTwo).stream()
|
HRegion regionToSplit = UTIL.getMiniHBaseCluster().getRegions(tableTwo).stream()
|
||||||
.filter(r -> r.getRegionInfo().containsRow(splitKey)).findFirst().get();
|
.filter(r -> r.getRegionInfo().containsRow(splitKey)).findFirst().get();
|
||||||
regionToSplit.compact(true);
|
regionToSplit.compact(true);
|
||||||
|
// Waiting for compaction to finish
|
||||||
|
UTIL.waitFor(30000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
return (CompactionState.NONE == ADMIN
|
||||||
|
.getCompactionStateForRegion(regionToSplit.getRegionInfo().getRegionName()));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Cleaning compacted references for split to proceed
|
||||||
|
regionToSplit.getStores().stream().forEach(s -> {
|
||||||
|
try {
|
||||||
|
s.closeAndArchiveCompactedFiles();
|
||||||
|
} catch (IOException e1) {
|
||||||
|
LOG.error("Error whiling cleaning compacted file");
|
||||||
|
}
|
||||||
|
});
|
||||||
// the above compact may quit immediately if there is a compaction ongoing, so here we need to
|
// the above compact may quit immediately if there is a compaction ongoing, so here we need to
|
||||||
// wait a while to let the ongoing compaction finish.
|
// wait a while to let the ongoing compaction finish.
|
||||||
UTIL.waitFor(10000, regionToSplit::isSplittable);
|
UTIL.waitFor(10000, regionToSplit::isSplittable);
|
||||||
|
|
|
@ -24,10 +24,13 @@ import static org.junit.Assert.assertTrue;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.ChoreService;
|
import org.apache.hadoop.hbase.ChoreService;
|
||||||
|
@ -41,6 +44,8 @@ import org.apache.hadoop.hbase.ScheduledChore;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
@ -49,13 +54,19 @@ import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.PairOfSameType;
|
import org.apache.hadoop.hbase.util.PairOfSameType;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
import org.apache.hadoop.hbase.util.StoppableImplementation;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
|
@ -65,8 +76,6 @@ import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
|
|
||||||
|
|
||||||
@Category(LargeTests.class)
|
@Category(LargeTests.class)
|
||||||
public class TestEndToEndSplitTransaction {
|
public class TestEndToEndSplitTransaction {
|
||||||
|
|
||||||
|
@ -92,6 +101,78 @@ public class TestEndToEndSplitTransaction {
|
||||||
TEST_UTIL.shutdownMiniCluster();
|
TEST_UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This is the test for : HBASE-20940 This test will split the region and try to open an reference
|
||||||
|
* over store file. Once store file has any reference, it makes sure that region can't be split
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCanSplitJustAfterASplit() throws Exception {
|
||||||
|
LOG.info("Starting testCanSplitJustAfterASplit");
|
||||||
|
byte[] fam = Bytes.toBytes("cf_split");
|
||||||
|
|
||||||
|
TableName tableName = TableName.valueOf("CanSplitTable");
|
||||||
|
Table source = TEST_UTIL.getConnection().getTable(tableName);
|
||||||
|
Admin admin = TEST_UTIL.getAdmin();
|
||||||
|
Map<String, StoreFileReader> scanner = Maps.newHashMap();
|
||||||
|
|
||||||
|
try {
|
||||||
|
TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(fam)).build();
|
||||||
|
|
||||||
|
admin.createTable(htd);
|
||||||
|
TEST_UTIL.loadTable(source, fam);
|
||||||
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
|
||||||
|
regions.get(0).forceSplit(null);
|
||||||
|
admin.split(tableName);
|
||||||
|
|
||||||
|
while (regions.size() <= 1) {
|
||||||
|
regions = TEST_UTIL.getHBaseCluster().getRegions(tableName);
|
||||||
|
regions.stream()
|
||||||
|
.forEach(r -> r.getStores().get(0).getStorefiles().stream()
|
||||||
|
.filter(
|
||||||
|
s -> s.isReference() && !scanner.containsKey(r.getRegionInfo().getEncodedName()))
|
||||||
|
.forEach(sf -> {
|
||||||
|
StoreFileReader reader = ((HStoreFile) sf).getReader();
|
||||||
|
reader.getStoreFileScanner(true, false, false, 0, 0, false);
|
||||||
|
scanner.put(r.getRegionInfo().getEncodedName(), reader);
|
||||||
|
LOG.info("Got reference to file = " + sf.getPath() + ",for region = "
|
||||||
|
+ r.getRegionInfo().getEncodedName());
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertTrue("Regions did not split properly", regions.size() > 1);
|
||||||
|
Assert.assertTrue("Could not get reference any of the store file", scanner.size() > 1);
|
||||||
|
|
||||||
|
RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
|
||||||
|
while (CompactionState.NONE != admin.getCompactionState(tableName) && retrier.shouldRetry()) {
|
||||||
|
retrier.sleepUntilNextRetry();
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals("Compaction did not complete in 30 secs", CompactionState.NONE,
|
||||||
|
admin.getCompactionState(tableName));
|
||||||
|
|
||||||
|
regions.stream()
|
||||||
|
.filter(region -> scanner.containsKey(region.getRegionInfo().getEncodedName()))
|
||||||
|
.forEach(r -> Assert.assertTrue("Contains an open file reference which can be split",
|
||||||
|
!r.getStores().get(0).canSplit()));
|
||||||
|
} finally {
|
||||||
|
scanner.values().stream().forEach(s -> {
|
||||||
|
try {
|
||||||
|
s.close(true);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Failed while closing store file", ioe);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
scanner.clear();
|
||||||
|
if (source != null) {
|
||||||
|
source.close();
|
||||||
|
}
|
||||||
|
TEST_UTIL.deleteTableIfAny(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that the client sees meta table changes as atomic during splits
|
* Tests that the client sees meta table changes as atomic during splits
|
||||||
*/
|
*/
|
||||||
|
@ -151,18 +232,17 @@ public class TestEndToEndSplitTransaction {
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
Random random = new Random();
|
Random random = new Random();
|
||||||
for (int i= 0; i< 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
List<RegionInfo> regions =
|
List<RegionInfo> regions = MetaTableAccessor.getTableRegions(connection, tableName, true);
|
||||||
MetaTableAccessor.getTableRegions(connection, tableName, true);
|
|
||||||
if (regions.isEmpty()) {
|
if (regions.isEmpty()) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int regionIndex = random.nextInt(regions.size());
|
int regionIndex = random.nextInt(regions.size());
|
||||||
|
|
||||||
//pick a random region and split it into two
|
// pick a random region and split it into two
|
||||||
RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
|
RegionInfo region = Iterators.get(regions.iterator(), regionIndex);
|
||||||
|
|
||||||
//pick the mid split point
|
// pick the mid split point
|
||||||
int start = 0, end = Integer.MAX_VALUE;
|
int start = 0, end = Integer.MAX_VALUE;
|
||||||
if (region.getStartKey().length > 0) {
|
if (region.getStartKey().length > 0) {
|
||||||
start = Bytes.toInt(region.getStartKey());
|
start = Bytes.toInt(region.getStartKey());
|
||||||
|
@ -173,7 +253,7 @@ public class TestEndToEndSplitTransaction {
|
||||||
int mid = start + ((end - start) / 2);
|
int mid = start + ((end - start) / 2);
|
||||||
byte[] splitPoint = Bytes.toBytes(mid);
|
byte[] splitPoint = Bytes.toBytes(mid);
|
||||||
|
|
||||||
//put some rows to the regions
|
// put some rows to the regions
|
||||||
addData(start);
|
addData(start);
|
||||||
addData(mid);
|
addData(mid);
|
||||||
|
|
||||||
|
@ -183,11 +263,11 @@ public class TestEndToEndSplitTransaction {
|
||||||
log("Initiating region split for:" + region.getRegionNameAsString());
|
log("Initiating region split for:" + region.getRegionNameAsString());
|
||||||
try {
|
try {
|
||||||
admin.splitRegion(region.getRegionName(), splitPoint);
|
admin.splitRegion(region.getRegionName(), splitPoint);
|
||||||
//wait until the split is complete
|
// wait until the split is complete
|
||||||
blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
|
blockUntilRegionSplit(CONF, 50000, region.getRegionName(), true);
|
||||||
|
|
||||||
} catch (NotServingRegionException ex) {
|
} catch (NotServingRegionException ex) {
|
||||||
//ignore
|
// ignore
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Throwable ex) {
|
} catch (Throwable ex) {
|
||||||
|
@ -226,9 +306,11 @@ public class TestEndToEndSplitTransaction {
|
||||||
/** verify region boundaries obtained from MetaScanner */
|
/** verify region boundaries obtained from MetaScanner */
|
||||||
void verifyRegionsUsingMetaTableAccessor() throws Exception {
|
void verifyRegionsUsingMetaTableAccessor() throws Exception {
|
||||||
List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
|
List<RegionInfo> regionList = MetaTableAccessor.getTableRegions(connection, tableName, true);
|
||||||
verifyTableRegions(regionList.stream().collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
|
verifyTableRegions(regionList.stream()
|
||||||
|
.collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
|
||||||
regionList = MetaTableAccessor.getAllRegions(connection, true);
|
regionList = MetaTableAccessor.getAllRegions(connection, true);
|
||||||
verifyTableRegions(regionList.stream().collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
|
verifyTableRegions(regionList.stream()
|
||||||
|
.collect(Collectors.toCollection(() -> new TreeSet<>(RegionInfo.COMPARATOR))));
|
||||||
}
|
}
|
||||||
|
|
||||||
/** verify region boundaries obtained from HTable.getStartEndKeys() */
|
/** verify region boundaries obtained from HTable.getStartEndKeys() */
|
||||||
|
@ -343,7 +425,9 @@ public class TestEndToEndSplitTransaction {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Blocks until the region split is complete in hbase:meta and region server opens the daughters */
|
/**
|
||||||
|
* Blocks until the region split is complete in hbase:meta and region server opens the daughters
|
||||||
|
*/
|
||||||
public static void blockUntilRegionSplit(Configuration conf, long timeout,
|
public static void blockUntilRegionSplit(Configuration conf, long timeout,
|
||||||
final byte[] regionName, boolean waitForDaughters)
|
final byte[] regionName, boolean waitForDaughters)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
@ -389,10 +473,32 @@ public class TestEndToEndSplitTransaction {
|
||||||
|
|
||||||
rem = timeout - (System.currentTimeMillis() - start);
|
rem = timeout - (System.currentTimeMillis() - start);
|
||||||
blockUntilRegionIsOpened(conf, rem, daughterB);
|
blockUntilRegionIsOpened(conf, rem, daughterB);
|
||||||
|
|
||||||
|
// Compacting the new region to make sure references can be cleaned up
|
||||||
|
compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterA.getRegionName());
|
||||||
|
compactAndBlockUntilDone(TEST_UTIL.getAdmin(),
|
||||||
|
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0), daughterB.getRegionName());
|
||||||
|
|
||||||
|
removeCompactedFiles(conn, timeout, daughterA);
|
||||||
|
removeCompactedFiles(conn, timeout, daughterB);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void removeCompactedFiles(Connection conn, long timeout, RegionInfo hri)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
log("remove compacted files for : " + hri.getRegionNameAsString());
|
||||||
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(hri.getTable());
|
||||||
|
regions.stream().forEach(r -> {
|
||||||
|
try {
|
||||||
|
r.getStores().get(0).closeAndArchiveCompactedFiles();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("failed in removing compacted file", ioe);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
|
public static void blockUntilRegionIsInMeta(Connection conn, long timeout, RegionInfo hri)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
log("blocking until region is in META: " + hri.getRegionNameAsString());
|
log("blocking until region is in META: " + hri.getRegionNameAsString());
|
||||||
|
@ -415,7 +521,9 @@ public class TestEndToEndSplitTransaction {
|
||||||
Table table = conn.getTable(hri.getTable())) {
|
Table table = conn.getTable(hri.getTable())) {
|
||||||
byte[] row = hri.getStartKey();
|
byte[] row = hri.getStartKey();
|
||||||
// Check for null/empty row. If we find one, use a key that is likely to be in first region.
|
// Check for null/empty row. If we find one, use a key that is likely to be in first region.
|
||||||
if (row == null || row.length <= 0) row = new byte[] { '0' };
|
if (row == null || row.length <= 0) {
|
||||||
|
row = new byte[] { '0' };
|
||||||
|
}
|
||||||
Get get = new Get(row);
|
Get get = new Get(row);
|
||||||
while (System.currentTimeMillis() - start < timeout) {
|
while (System.currentTimeMillis() - start < timeout) {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -31,7 +31,9 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -50,6 +52,7 @@ import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.CompactionState;
|
||||||
import org.apache.hadoop.hbase.client.Consistency;
|
import org.apache.hadoop.hbase.client.Consistency;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
import org.apache.hadoop.hbase.client.DoNotRetryRegionException;
|
||||||
|
@ -78,6 +81,10 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -85,7 +92,10 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.HBaseFsck;
|
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
|
||||||
|
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -101,14 +111,6 @@ import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The below tests are testing split region against a running cluster
|
* The below tests are testing split region against a running cluster
|
||||||
*/
|
*/
|
||||||
|
@ -386,11 +388,18 @@ public class TestSplitTransactionOnCluster {
|
||||||
// Compact first to ensure we have cleaned up references -- else the split
|
// Compact first to ensure we have cleaned up references -- else the split
|
||||||
// will fail.
|
// will fail.
|
||||||
this.admin.compactRegion(daughter.getRegionName());
|
this.admin.compactRegion(daughter.getRegionName());
|
||||||
|
RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
|
||||||
|
while (CompactionState.NONE != admin.getCompactionStateForRegion(daughter.getRegionName())
|
||||||
|
&& retrier.shouldRetry()) {
|
||||||
|
retrier.sleepUntilNextRetry();
|
||||||
|
}
|
||||||
daughters = cluster.getRegions(tableName);
|
daughters = cluster.getRegions(tableName);
|
||||||
HRegion daughterRegion = null;
|
HRegion daughterRegion = null;
|
||||||
for (HRegion r: daughters) {
|
for (HRegion r : daughters) {
|
||||||
if (RegionInfo.COMPARATOR.compare(r.getRegionInfo(), daughter) == 0) {
|
if (RegionInfo.COMPARATOR.compare(r.getRegionInfo(), daughter) == 0) {
|
||||||
daughterRegion = r;
|
daughterRegion = r;
|
||||||
|
// Archiving the compacted references file
|
||||||
|
r.getStores().get(0).closeAndArchiveCompactedFiles();
|
||||||
LOG.info("Found matching HRI: " + daughterRegion);
|
LOG.info("Found matching HRI: " + daughterRegion);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -533,11 +542,19 @@ public class TestSplitTransactionOnCluster {
|
||||||
// Call split.
|
// Call split.
|
||||||
this.admin.splitRegion(hri.getRegionName());
|
this.admin.splitRegion(hri.getRegionName());
|
||||||
List<HRegion> daughters = checkAndGetDaughters(tableName);
|
List<HRegion> daughters = checkAndGetDaughters(tableName);
|
||||||
|
|
||||||
// Before cleanup, get a new master.
|
// Before cleanup, get a new master.
|
||||||
HMaster master = abortAndWaitForMaster();
|
HMaster master = abortAndWaitForMaster();
|
||||||
// Now call compact on the daughters and clean up any references.
|
// Now call compact on the daughters and clean up any references.
|
||||||
for (HRegion daughter: daughters) {
|
for (HRegion daughter : daughters) {
|
||||||
daughter.compact(true);
|
daughter.compact(true);
|
||||||
|
RetryCounter retrier = new RetryCounter(30, 1, TimeUnit.SECONDS);
|
||||||
|
while (CompactionState.NONE != admin
|
||||||
|
.getCompactionStateForRegion(daughter.getRegionInfo().getRegionName())
|
||||||
|
&& retrier.shouldRetry()) {
|
||||||
|
retrier.sleepUntilNextRetry();
|
||||||
|
}
|
||||||
|
daughter.getStores().get(0).closeAndArchiveCompactedFiles();
|
||||||
assertFalse(daughter.hasReferences());
|
assertFalse(daughter.hasReferences());
|
||||||
}
|
}
|
||||||
// BUT calling compact on the daughters is not enough. The CatalogJanitor looks
|
// BUT calling compact on the daughters is not enough. The CatalogJanitor looks
|
||||||
|
|
Loading…
Reference in New Issue