HBASE-26639 The implementation of TestMergesSplitsAddToTracker is problematic (#4010)
Signed-off-by: Wellington Ramos Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
f1e263a8fd
commit
47983cf790
|
@ -17,18 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.
|
import static org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory.TRACKER_IMPL;
|
||||||
TRACKER_IMPL;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.commons.lang3.mutable.MutableBoolean;
|
import org.apache.commons.lang3.mutable.MutableBoolean;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -37,10 +34,14 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNameTestRule;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
|
import org.apache.hadoop.hbase.regionserver.storefiletracker.TestStoreFileTracker;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
@ -55,7 +56,6 @@ import org.junit.ClassRule;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
|
||||||
|
|
||||||
|
|
||||||
@Category({RegionServerTests.class, LargeTests.class})
|
@Category({RegionServerTests.class, LargeTests.class})
|
||||||
|
@ -67,14 +67,15 @@ public class TestMergesSplitsAddToTracker {
|
||||||
|
|
||||||
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
private static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
|
||||||
|
|
||||||
public static final byte[] FAMILY_NAME = Bytes.toBytes("info");
|
private static final String FAMILY_NAME_STR = "info";
|
||||||
|
|
||||||
|
private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TableNameTestRule name = new TableNameTestRule();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupClass() throws Exception {
|
public static void setupClass() throws Exception {
|
||||||
TEST_UTIL.getConfiguration().set(TRACKER_IMPL, TestStoreFileTracker.class.getName());
|
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,13 +86,24 @@ public class TestMergesSplitsAddToTracker {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup(){
|
public void setup(){
|
||||||
TestStoreFileTracker.trackedFiles = new HashMap<>();
|
TestStoreFileTracker.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TableName createTable(byte[] splitKey) throws IOException {
|
||||||
|
TableDescriptor td = TableDescriptorBuilder.newBuilder(name.getTableName())
|
||||||
|
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY_NAME))
|
||||||
|
.setValue(TRACKER_IMPL, TestStoreFileTracker.class.getName()).build();
|
||||||
|
if (splitKey != null) {
|
||||||
|
TEST_UTIL.getAdmin().createTable(td, new byte[][] { splitKey });
|
||||||
|
} else {
|
||||||
|
TEST_UTIL.getAdmin().createTable(td);
|
||||||
|
}
|
||||||
|
return td.getTableName();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitDaughterRegion() throws Exception {
|
public void testCommitDaughterRegion() throws Exception {
|
||||||
TableName table = TableName.valueOf(name.getMethodName());
|
TableName table = createTable(null);
|
||||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
|
||||||
//first put some data in order to have a store file created
|
//first put some data in order to have a store file created
|
||||||
putThreeRowsAndFlush(table);
|
putThreeRowsAndFlush(table);
|
||||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||||
|
@ -125,8 +137,7 @@ public class TestMergesSplitsAddToTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCommitMergedRegion() throws Exception {
|
public void testCommitMergedRegion() throws Exception {
|
||||||
TableName table = TableName.valueOf(name.getMethodName());
|
TableName table = createTable(null);
|
||||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
|
||||||
//splitting the table first
|
//splitting the table first
|
||||||
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
TEST_UTIL.getAdmin().split(table, Bytes.toBytes("002"));
|
||||||
//Add data and flush to create files in the two different regions
|
//Add data and flush to create files in the two different regions
|
||||||
|
@ -163,8 +174,7 @@ public class TestMergesSplitsAddToTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSplitLoadsFromTracker() throws Exception {
|
public void testSplitLoadsFromTracker() throws Exception {
|
||||||
TableName table = TableName.valueOf(name.getMethodName());
|
TableName table = createTable(null);
|
||||||
TEST_UTIL.createTable(table, FAMILY_NAME);
|
|
||||||
//Add data and flush to create files in the two different regions
|
//Add data and flush to create files in the two different regions
|
||||||
putThreeRowsAndFlush(table);
|
putThreeRowsAndFlush(table);
|
||||||
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
HRegion region = TEST_UTIL.getHBaseCluster().getRegions(table).get(0);
|
||||||
|
@ -182,9 +192,7 @@ public class TestMergesSplitsAddToTracker {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMergeLoadsFromTracker() throws Exception {
|
public void testMergeLoadsFromTracker() throws Exception {
|
||||||
TableName table = TableName.valueOf(name.getMethodName());
|
TableName table = createTable(Bytes.toBytes("002"));
|
||||||
TEST_UTIL.createTable(table, new byte[][]{FAMILY_NAME},
|
|
||||||
new byte[][]{Bytes.toBytes("002")});
|
|
||||||
//Add data and flush to create files in the two different regions
|
//Add data and flush to create files in the two different regions
|
||||||
putThreeRowsAndFlush(table);
|
putThreeRowsAndFlush(table);
|
||||||
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
List<HRegion> regions = TEST_UTIL.getHBaseCluster().getRegions(table);
|
||||||
|
@ -232,10 +240,8 @@ public class TestMergesSplitsAddToTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
|
private void verifyFilesAreTracked(Path regionDir, FileSystem fs) throws Exception {
|
||||||
String storeId = regionDir.getName() + "-info";
|
for (FileStatus f : fs.listStatus(new Path(regionDir, FAMILY_NAME_STR))) {
|
||||||
for(FileStatus f : fs.listStatus(new Path(regionDir, Bytes.toString(FAMILY_NAME)))){
|
assertTrue(TestStoreFileTracker.tracked(regionDir.getName(), FAMILY_NAME_STR, f.getPath()));
|
||||||
assertTrue(TestStoreFileTracker.trackedFiles.get(storeId).stream().filter(s ->
|
|
||||||
s.getPath().equals(f.getPath())).findFirst().isPresent());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,11 +20,13 @@ package org.apache.hadoop.hbase.regionserver.storefiletracker;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
import org.apache.hadoop.hbase.regionserver.StoreContext;
|
||||||
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
@ -33,7 +35,8 @@ import org.slf4j.LoggerFactory;
|
||||||
public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestStoreFileTracker.class);
|
||||||
public static Map<String, List<StoreFileInfo>> trackedFiles = new HashMap<>();
|
private static ConcurrentMap<String, BlockingQueue<StoreFileInfo>> trackedFiles =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
private String storeId;
|
private String storeId;
|
||||||
|
|
||||||
public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
|
public TestStoreFileTracker(Configuration conf, boolean isPrimaryReplica, StoreContext ctx) {
|
||||||
|
@ -41,7 +44,7 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
||||||
if (ctx != null && ctx.getRegionFileSystem() != null) {
|
if (ctx != null && ctx.getRegionFileSystem() != null) {
|
||||||
this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
|
this.storeId = ctx.getRegionInfo().getEncodedName() + "-" + ctx.getFamily().getNameAsString();
|
||||||
LOG.info("created storeId: {}", storeId);
|
LOG.info("created storeId: {}", storeId);
|
||||||
trackedFiles.computeIfAbsent(storeId, v -> new ArrayList<>());
|
trackedFiles.computeIfAbsent(storeId, v -> new LinkedBlockingQueue<>());
|
||||||
} else {
|
} else {
|
||||||
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
|
LOG.info("ctx.getRegionFileSystem() returned null. Leaving storeId null.");
|
||||||
}
|
}
|
||||||
|
@ -51,11 +54,19 @@ public class TestStoreFileTracker extends DefaultStoreFileTracker {
|
||||||
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
protected void doAddNewStoreFiles(Collection<StoreFileInfo> newFiles) throws IOException {
|
||||||
LOG.info("adding to storeId: {}", storeId);
|
LOG.info("adding to storeId: {}", storeId);
|
||||||
trackedFiles.get(storeId).addAll(newFiles);
|
trackedFiles.get(storeId).addAll(newFiles);
|
||||||
trackedFiles.putIfAbsent(storeId, (List<StoreFileInfo>)newFiles);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<StoreFileInfo> load() throws IOException {
|
public List<StoreFileInfo> load() throws IOException {
|
||||||
return trackedFiles.get(storeId);
|
return new ArrayList<>(trackedFiles.get(storeId));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean tracked(String encodedRegionName, String family, Path file) {
|
||||||
|
BlockingQueue<StoreFileInfo> files = trackedFiles.get(encodedRegionName + "-" + family);
|
||||||
|
return files != null && files.stream().anyMatch(s -> s.getPath().equals(file));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void clear() {
|
||||||
|
trackedFiles.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue