HBASE-20475 Fix the flaky TestReplicationDroppedTables unit test.
Signed-off-by: zhangduo <zhangduo@apache.org>
This commit is contained in:
parent
e8603e1d7c
commit
72093178fb
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -103,6 +104,10 @@ public class ReplicationPeers {
|
|||
return Collections.unmodifiableSet(peerCache.keySet());
|
||||
}
|
||||
|
||||
public Map<String, ReplicationPeerImpl> getPeerCache() {
|
||||
return Collections.unmodifiableMap(peerCache);
|
||||
}
|
||||
|
||||
public PeerState refreshPeerState(String peerId) throws ReplicationException {
|
||||
ReplicationPeerImpl peer = peerCache.get(peerId);
|
||||
if (peer == null) {
|
||||
|
|
|
@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationListener;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
|
@ -627,11 +628,25 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
class NodeFailoverWorker extends Thread {
|
||||
|
||||
private final ServerName deadRS;
|
||||
// After claim the queues from dead region server, the NodeFailoverWorker will skip to start
|
||||
// the RecoveredReplicationSource if the peer has been removed. but there's possible that
|
||||
// remove a peer with peerId = 2 and add a peer with peerId = 2 again during the
|
||||
// NodeFailoverWorker. So we need a deep copied <peerId, peer> map to decide whether we
|
||||
// should start the RecoveredReplicationSource. If the latest peer is not the old peer when
|
||||
// NodeFailoverWorker begin, we should skip to start the RecoveredReplicationSource, Otherwise
|
||||
// the rs will abort (See HBASE-20475).
|
||||
private final Map<String, ReplicationPeerImpl> peersSnapshot;
|
||||
|
||||
@VisibleForTesting
|
||||
public NodeFailoverWorker(ServerName deadRS) {
|
||||
super("Failover-for-" + deadRS);
|
||||
this.deadRS = deadRS;
|
||||
peersSnapshot = new HashMap<>(replicationPeers.getPeerCache());
|
||||
}
|
||||
|
||||
private boolean isOldPeer(String peerId, ReplicationPeerImpl newPeerRef) {
|
||||
ReplicationPeerImpl oldPeerRef = peersSnapshot.get(peerId);
|
||||
return oldPeerRef != null && oldPeerRef == newPeerRef;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -691,16 +706,16 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queueId);
|
||||
String actualPeerId = replicationQueueInfo.getPeerId();
|
||||
|
||||
ReplicationPeer peer = replicationPeers.getPeer(actualPeerId);
|
||||
if (peer == null) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS +
|
||||
", peer is null");
|
||||
ReplicationPeerImpl peer = replicationPeers.getPeer(actualPeerId);
|
||||
if (peer == null || !isOldPeer(actualPeerId, peer)) {
|
||||
LOG.warn("Skipping failover for peer {} of node {}, peer is null", actualPeerId,
|
||||
deadRS);
|
||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), queueId));
|
||||
continue;
|
||||
}
|
||||
if (server instanceof ReplicationSyncUp.DummyServer
|
||||
&& peer.getPeerState().equals(PeerState.DISABLED)) {
|
||||
LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip "
|
||||
LOG.warn("Peer {} is disabled. ReplicationSyncUp tool will skip "
|
||||
+ "replicating data to this peer.",
|
||||
actualPeerId);
|
||||
continue;
|
||||
|
@ -721,7 +736,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
ReplicationSourceInterface src = createSource(queueId, peer);
|
||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
synchronized (oldsources) {
|
||||
if (!replicationPeers.getAllPeerIds().contains(src.getPeerId())) {
|
||||
peer = replicationPeers.getPeer(src.getPeerId());
|
||||
if (peer == null || !isOldPeer(src.getPeerId(), peer)) {
|
||||
src.terminate("Recovered queue doesn't belong to any current peer");
|
||||
removeRecoveredSource(src);
|
||||
continue;
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Result;
|
|||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -59,8 +60,8 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
public void setUp() throws Exception {
|
||||
// Starting and stopping replication can make us miss new logs,
|
||||
// rolling like this makes sure the most recent one gets added to the queue
|
||||
for ( JVMClusterUtil.RegionServerThread r :
|
||||
utility1.getHBaseCluster().getRegionServerThreads()) {
|
||||
for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
|
||||
.getRegionServerThreads()) {
|
||||
utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
|
||||
}
|
||||
int rowCount = utility1.countRows(tableName);
|
||||
|
@ -73,7 +74,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
Scan scan = new Scan();
|
||||
int lastCount = 0;
|
||||
for (int i = 0; i < NB_RETRIES; i++) {
|
||||
if (i==NB_RETRIES-1) {
|
||||
if (i == NB_RETRIES - 1) {
|
||||
fail("Waited too much time for truncate");
|
||||
}
|
||||
ResultScanner scanner = htable2.getScanner(scan);
|
||||
|
@ -90,6 +91,12 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
break;
|
||||
}
|
||||
}
|
||||
// Set the max request size to a tiny 10K for dividing the replication WAL entries into multiple
|
||||
// batches. the default max request size is 256M, so all replication entries are in a batch, but
|
||||
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
|
||||
// may apply first, and then test_dropped table, and we will believe that the replication is not
|
||||
// got stuck (HBASE-20475).
|
||||
conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -158,15 +165,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
// put some data (lead with 0 so the edit gets sorted before the other table's edits
|
||||
// in the replication batch)
|
||||
// write a bunch of edits, making sure we fill a batch
|
||||
byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
|
||||
Put put = new Put(rowkey);
|
||||
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
|
||||
Put put = new Put(rowKey);
|
||||
put.addColumn(familyname, row, row);
|
||||
lHtable1.put(put);
|
||||
|
||||
rowkey = Bytes.toBytes("normal put");
|
||||
put = new Put(rowkey);
|
||||
put.addColumn(famName, row, row);
|
||||
htable1.put(put);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
rowKey = Bytes.toBytes("NormalPut" + i);
|
||||
put = new Put(rowKey).addColumn(famName, row, row);
|
||||
htable1.put(put);
|
||||
}
|
||||
|
||||
try (Admin admin1 = connection1.getAdmin()) {
|
||||
admin1.disableTable(tablename);
|
||||
|
@ -180,9 +188,9 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
admin.enablePeer("2");
|
||||
if (allowProceeding) {
|
||||
// in this we'd expect the key to make it over
|
||||
verifyReplicationProceeded(rowkey);
|
||||
verifyReplicationProceeded(rowKey);
|
||||
} else {
|
||||
verifyReplicationStuck(rowkey);
|
||||
verifyReplicationStuck(rowKey);
|
||||
}
|
||||
// just to be safe
|
||||
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
|
||||
|
@ -224,17 +232,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
admin.disablePeer("2");
|
||||
|
||||
// put some data (lead with 0 so the edit gets sorted before the other table's edits
|
||||
// in the replication batch)
|
||||
// in the replication batch)
|
||||
// write a bunch of edits, making sure we fill a batch
|
||||
byte[] rowkey = Bytes.toBytes(0+" put on table to be dropped");
|
||||
Put put = new Put(rowkey);
|
||||
byte[] rowKey = Bytes.toBytes(0 + " put on table to be dropped");
|
||||
Put put = new Put(rowKey);
|
||||
put.addColumn(familyname, row, row);
|
||||
lHtable1.put(put);
|
||||
|
||||
rowkey = Bytes.toBytes("normal put");
|
||||
put = new Put(rowkey);
|
||||
put.addColumn(famName, row, row);
|
||||
htable1.put(put);
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
rowKey = Bytes.toBytes("NormalPut" + i);
|
||||
put = new Put(rowKey).addColumn(famName, row, row);
|
||||
htable1.put(put);
|
||||
}
|
||||
|
||||
try (Admin admin2 = connection2.getAdmin()) {
|
||||
admin2.disableTable(tablename);
|
||||
|
@ -246,16 +255,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
|
|||
|
||||
try (Admin admin1 = connection1.getAdmin()) {
|
||||
// the source table still exists, replication should be stalled
|
||||
verifyReplicationStuck(rowkey);
|
||||
verifyReplicationStuck(rowKey);
|
||||
|
||||
admin1.disableTable(tablename);
|
||||
// still stuck, source table still exists
|
||||
verifyReplicationStuck(rowkey);
|
||||
verifyReplicationStuck(rowKey);
|
||||
|
||||
admin1.deleteTable(tablename);
|
||||
// now the source table is gone, replication should proceed, the
|
||||
// offending edits be dropped
|
||||
verifyReplicationProceeded(rowkey);
|
||||
verifyReplicationProceeded(rowKey);
|
||||
}
|
||||
// just to be safe
|
||||
conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
|
||||
|
|
Loading…
Reference in New Issue