HBASE-25349 [Flakey Tests] branch-2 TestRefreshRecoveredReplication.testReplicationRefreshSource:141 Waiting timed out after [60,000] msec (#2731)
Start the check for recovered queue presence earlier. Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
This commit is contained in:
parent
34721c42ec
commit
9cc2f76248
|
@ -812,6 +812,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
oldsources.add(src);
|
oldsources.add(src);
|
||||||
|
LOG.info("Added recovered source {}", src.getQueueId());
|
||||||
for (String wal : walsSet) {
|
for (String wal : walsSet) {
|
||||||
src.enqueueLog(new Path(oldLogDir, wal));
|
src.enqueueLog(new Path(oldLogDir, wal));
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,10 @@
|
||||||
package org.apache.hadoop.hbase.replication.regionserver;
|
package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
@ -32,6 +35,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
@ -51,6 +55,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testcase for HBASE-24871.
|
* Testcase for HBASE-24871.
|
||||||
|
@ -75,6 +80,7 @@ public class TestRefreshRecoveredReplication extends TestReplicationBase {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
// NUM_SLAVES1 is presumed 2 in below.
|
||||||
NUM_SLAVES1 = 2;
|
NUM_SLAVES1 = 2;
|
||||||
// replicate slowly
|
// replicate slowly
|
||||||
Configuration conf1 = UTIL1.getConfiguration();
|
Configuration conf1 = UTIL1.getConfiguration();
|
||||||
|
@ -121,22 +127,25 @@ public class TestRefreshRecoveredReplication extends TestReplicationBase {
|
||||||
table1.put(new Put(r).addColumn(famName, famName, r));
|
table1.put(new Put(r).addColumn(famName, famName, r));
|
||||||
}
|
}
|
||||||
|
|
||||||
// kill rs holding table region
|
// Kill rs holding table region. There are only TWO servers. We depend on it.
|
||||||
Optional<RegionServerThread> server = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads()
|
List<RegionServerThread> rss = UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads();
|
||||||
.stream()
|
assertEquals(2, rss.size());
|
||||||
|
Optional<RegionServerThread> server = rss.stream()
|
||||||
.filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
|
.filter(rst -> CollectionUtils.isNotEmpty(rst.getRegionServer().getRegions(tablename)))
|
||||||
.findAny();
|
.findAny();
|
||||||
Assert.assertTrue(server.isPresent());
|
Assert.assertTrue(server.isPresent());
|
||||||
|
HRegionServer otherServer = rss.get(0).getRegionServer() == server.get().getRegionServer()?
|
||||||
|
rss.get(1).getRegionServer(): rss.get(0).getRegionServer();
|
||||||
server.get().getRegionServer().abort("stopping for test");
|
server.get().getRegionServer().abort("stopping for test");
|
||||||
|
// waiting for recovered peer to appear.
|
||||||
|
Replication replication = (Replication)otherServer.getReplicationSourceService();
|
||||||
|
UTIL1.waitFor(60000, () -> !replication.getReplicationManager().getOldSources().isEmpty());
|
||||||
|
// Wait on only one server being up.
|
||||||
UTIL1.waitFor(60000, () ->
|
UTIL1.waitFor(60000, () ->
|
||||||
|
// Have to go back to source here because getLiveRegionServerThreads makes new array each time
|
||||||
UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
|
UTIL1.getMiniHBaseCluster().getLiveRegionServerThreads().size() == NUM_SLAVES1 - 1);
|
||||||
UTIL1.waitTableAvailable(tablename);
|
UTIL1.waitTableAvailable(tablename);
|
||||||
|
LOG.info("Available {}", tablename);
|
||||||
// waiting for recovered peer to start
|
|
||||||
Replication replication = (Replication) UTIL1.getMiniHBaseCluster()
|
|
||||||
.getLiveRegionServerThreads().get(0).getRegionServer().getReplicationSourceService();
|
|
||||||
UTIL1.waitFor(60000, () ->
|
|
||||||
!replication.getReplicationManager().getOldSources().isEmpty());
|
|
||||||
|
|
||||||
// disable peer to trigger refreshSources
|
// disable peer to trigger refreshSources
|
||||||
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
hbaseAdmin.disableReplicationPeer(PEER_ID2);
|
||||||
|
|
Loading…
Reference in New Issue