HBASE-24552 Replica region needs to check if primary region directory exists at file system in TransitRegionStateProcedure (#1924) (#1971)

Signed-off-by:  stack <stack@apache.org>
This commit is contained in:
huaxiangsun 2020-06-26 11:27:30 -07:00 committed by GitHub
parent a499eae1aa
commit 2c201cc034
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 104 additions and 52 deletions

View File

@ -338,6 +338,19 @@ public class TransitRegionStateProcedure
try { try {
switch (state) { switch (state) {
case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE: case REGION_STATE_TRANSITION_GET_ASSIGN_CANDIDATE:
// Need to do some sanity check for replica region, if the region does not exist at
// master, do not try to assign the replica region, log error and return.
if (!RegionReplicaUtil.isDefaultReplica(regionNode.getRegionInfo())) {
RegionInfo defaultRI =
RegionReplicaUtil.getRegionInfoForDefaultReplica(regionNode.getRegionInfo());
if (env.getMasterServices().getAssignmentManager().getRegionStates().
getRegionStateNode(defaultRI) == null) {
LOG.error(
"Cannot assign replica region {} because its primary region {} does not exist.",
regionNode.getRegionInfo(), defaultRI);
return Flow.NO_MORE_STATE;
}
}
queueAssign(env, regionNode); queueAssign(env, regionNode);
return Flow.HAS_MORE_STATE; return Flow.HAS_MORE_STATE;
case REGION_STATE_TRANSITION_OPEN: case REGION_STATE_TRANSITION_OPEN:

View File

@ -51,6 +51,7 @@ import java.util.TreeSet;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.function.BooleanSupplier;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -4467,4 +4468,22 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next())); ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()));
} }
} }
/**
* Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
* invocations.
*/
public static void await(final long sleepMillis, final BooleanSupplier condition)
throws InterruptedException {
try {
while (!condition.getAsBoolean()) {
Thread.sleep(sleepMillis);
}
} catch (RuntimeException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
}
throw e;
}
}
} }

View File

@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.BooleanSupplier;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
@ -114,7 +113,8 @@ public class TestMetaFixer {
// wait for RITs to settle -- those are the fixed regions being assigned -- or until the // wait for RITs to settle -- those are the fixed regions being assigned -- or until the
// watchdog TestRule terminates the test. // watchdog TestRule terminates the test.
await(50, () -> isNotEmpty(services.getAssignmentManager().getRegionsInTransition())); HBaseTestingUtility.await(50,
() -> isNotEmpty(services.getAssignmentManager().getRegionsInTransition()));
ris = MetaTableAccessor.getTableRegions(TEST_UTIL.getConnection(), tn); ris = MetaTableAccessor.getTableRegions(TEST_UTIL.getConnection(), tn);
assertEquals(originalCount, ris.size()); assertEquals(originalCount, ris.size());
@ -191,7 +191,7 @@ public class TestMetaFixer {
MetaFixer fixer = new MetaFixer(services); MetaFixer fixer = new MetaFixer(services);
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
await(10, () -> { HBaseTestingUtility. await(10, () -> {
try { try {
if (cj.scan() > 0) { if (cj.scan() > 0) {
// It submits GC once, then it will immediately kick off another GC to test if // It submits GC once, then it will immediately kick off another GC to test if
@ -215,7 +215,7 @@ public class TestMetaFixer {
}); });
// Wait until all GCs settled down // Wait until all GCs settled down
await(10, () -> { HBaseTestingUtility.await(10, () -> {
return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty(); return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty();
}); });
@ -258,7 +258,7 @@ public class TestMetaFixer {
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
AssignmentManager am = services.getAssignmentManager(); AssignmentManager am = services.getAssignmentManager();
await(200, () -> { HBaseTestingUtility.await(200, () -> {
try { try {
cj.scan(); cj.scan();
final CatalogJanitor.Report postReport = cj.getLastReport(); final CatalogJanitor.Report postReport = cj.getLastReport();
@ -291,7 +291,7 @@ public class TestMetaFixer {
report = cj.getLastReport(); report = cj.getLastReport();
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
await(20, () -> { HBaseTestingUtility.await(20, () -> {
try { try {
// Make sure it GC only once. // Make sure it GC only once.
return (cj.scan() > 0); return (cj.scan() > 0);
@ -359,7 +359,7 @@ public class TestMetaFixer {
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
// Wait until all procedures settled down // Wait until all procedures settled down
await(200, () -> { HBaseTestingUtility.await(200, () -> {
return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty(); return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty();
}); });
@ -371,7 +371,7 @@ public class TestMetaFixer {
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
// Wait until all procedures settled down // Wait until all procedures settled down
await(200, () -> { HBaseTestingUtility.await(200, () -> {
return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty(); return services.getMasterProcedureExecutor().getActiveProcIds().isEmpty();
}); });
@ -414,7 +414,7 @@ public class TestMetaFixer {
assertEquals(1, MetaFixer.calculateMerges(10, report.getOverlaps()).size()); assertEquals(1, MetaFixer.calculateMerges(10, report.getOverlaps()).size());
MetaFixer fixer = new MetaFixer(services); MetaFixer fixer = new MetaFixer(services);
fixer.fixOverlaps(report); fixer.fixOverlaps(report);
await(10, () -> { HBaseTestingUtility.await(10, () -> {
try { try {
services.getCatalogJanitor().scan(); services.getCatalogJanitor().scan();
final CatalogJanitor.Report postReport = services.getCatalogJanitor().getLastReport(); final CatalogJanitor.Report postReport = services.getCatalogJanitor().getLastReport();
@ -424,22 +424,4 @@ public class TestMetaFixer {
} }
}); });
} }
/**
* Await the successful return of {@code condition}, sleeping {@code sleepMillis} between
* invocations.
*/
private static void await(final long sleepMillis, final BooleanSupplier condition)
throws InterruptedException {
try {
while (!condition.getAsBoolean()) {
Thread.sleep(sleepMillis);
}
} catch (RuntimeException e) {
if (e.getCause() instanceof AssertionError) {
throw (AssertionError) e.getCause();
}
throw e;
}
}
} }

View File

@ -18,6 +18,7 @@
*/ */
package org.apache.hadoop.hbase.master.assignment; package org.apache.hadoop.hbase.master.assignment;
import static org.junit.Assert.assertNotEquals;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -28,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionReplicaTestHelper; import org.apache.hadoop.hbase.client.RegionReplicaTestHelper;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@ -56,7 +58,6 @@ public class TestRegionReplicaSplit {
private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class); private static final Logger LOG = LoggerFactory.getLogger(TestRegionReplicaSplit.class);
private static final int NB_SERVERS = 4; private static final int NB_SERVERS = 4;
private static Table table;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility(); private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY; private static final byte[] f = HConstants.CATALOG_FAMILY;
@ -65,21 +66,19 @@ public class TestRegionReplicaSplit {
public static void beforeClass() throws Exception { public static void beforeClass() throws Exception {
HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", 3); HTU.getConfiguration().setInt("hbase.master.wait.on.regionservers.mintostart", 3);
HTU.startMiniCluster(NB_SERVERS); HTU.startMiniCluster(NB_SERVERS);
final TableName tableName = TableName.valueOf(TestRegionReplicaSplit.class.getSimpleName());
// Create table then get the single region for our new table.
createTable(tableName);
} }
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
private static void createTable(final TableName tableName) throws IOException { private static Table createTableAndLoadData(final TableName tableName) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
builder.setRegionReplication(3); builder.setRegionReplication(3);
// create a table with 3 replication // create a table with 3 replication
table = HTU.createTable(builder.build(), new byte[][] { f }, getSplits(2), Table table = HTU.createTable(builder.build(), new byte[][] { f }, getSplits(2),
new Configuration(HTU.getConfiguration())); new Configuration(HTU.getConfiguration()));
HTU.loadTable(HTU.getConnection().getTable(tableName), f);
return table;
} }
private static byte[][] getSplits(int numRegions) { private static byte[][] getSplits(int numRegions) {
@ -92,35 +91,74 @@ public class TestRegionReplicaSplit {
@AfterClass @AfterClass
public static void afterClass() throws Exception { public static void afterClass() throws Exception {
HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false; HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
table.close();
HTU.shutdownMiniCluster(); HTU.shutdownMiniCluster();
} }
@Test @Test
public void testRegionReplicaSplitRegionAssignment() throws Exception { public void testRegionReplicaSplitRegionAssignment() throws Exception {
HTU.loadNumericRows(table, f, 0, 3); TableName tn = TableName.valueOf(this.name.getMethodName());
// split the table Table table = null;
List<RegionInfo> regions = new ArrayList<RegionInfo>(); try {
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { table = createTableAndLoadData(tn);
for (Region r : rs.getRegionServer().getRegions(table.getName())) { HTU.loadNumericRows(table, f, 0, 3);
regions.add(r.getRegionInfo()); // split the table
} List<RegionInfo> regions = new ArrayList<RegionInfo>();
}
// There are 6 regions before split, 9 regions after split.
HTU.getAdmin().split(table.getName(), Bytes.toBytes(1));
int count = 0;
while (true) {
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) { for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (Region r : rs.getRegionServer().getRegions(table.getName())) { for (Region r : rs.getRegionServer().getRegions(table.getName())) {
count++; regions.add(r.getRegionInfo());
} }
} }
if (count >= 9) { // There are 6 regions before split, 9 regions after split.
break; HTU.getAdmin().split(table.getName(), Bytes.toBytes(1));
int count = 0;
while (true) {
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (Region r : rs.getRegionServer().getRegions(table.getName())) {
count++;
}
}
if (count >= 9) {
break;
}
count = 0;
}
RegionReplicaTestHelper.assertReplicaDistributed(HTU, table);
} finally {
if (table != null) {
HTU.deleteTable(tn);
} }
count = 0;
} }
}
RegionReplicaTestHelper.assertReplicaDistributed(HTU, table); @Test
public void testAssignFakeReplicaRegion() throws Exception {
TableName tn = TableName.valueOf(this.name.getMethodName());
Table table = null;
try {
table = createTableAndLoadData(tn);
final RegionInfo fakeHri =
RegionInfoBuilder.newBuilder(table.getName()).setStartKey(Bytes.toBytes("a"))
.setEndKey(Bytes.toBytes("b")).setReplicaId(1)
.setRegionId(System.currentTimeMillis()).build();
// To test AssignProcedure can defend this case.
HTU.getMiniHBaseCluster().getMaster().getAssignmentManager().assign(fakeHri);
// Wait until all assigns are done.
HBaseTestingUtility.await(50, () -> {
return HTU.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getActiveProcIds()
.isEmpty();
});
// Make sure the region is not online.
for (RegionServerThread rs : HTU.getMiniHBaseCluster().getRegionServerThreads()) {
for (Region r : rs.getRegionServer().getRegions(table.getName())) {
assertNotEquals(r.getRegionInfo(), fakeHri);
}
}
} finally {
if (table != null) {
HTU.deleteTable(tn);
}
}
} }
} }