HBASE-25741: Deadlock during peer cleanup with NoNodeException (#3204)

Introduced due to commit from  HBASE-25583. Fix is to issue the cleanup asynchronously.

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Sandeep Pal 2021-05-05 17:21:25 -07:00 committed by GitHub
parent 852b1b0826
commit a0dd384691
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 11 deletions

View File

@ -241,7 +241,7 @@ public class ReplicationSourceManager implements ReplicationListener {
if (peerId.contains("-")) {
peerId = peerId.split("-")[0];
}
peerRemoved(peerId);
schedulePeerRemoval(peerId);
}
walSet.clear();
}
@ -653,6 +653,23 @@ public class ReplicationSourceManager implements ReplicationListener {
transferQueues(regionserver);
}
/**
* We want to run the peer removal in a separate thread when the peer removal
* is called from ReplicationSource shipper thread on encountering NoNodeException
* because peerRemoved terminate the source which might leave replication source
* in orphaned state.
* See HBASE-25741.
* @param peerId peer ID to be removed.
*/
private void schedulePeerRemoval(final String peerId) {
LOG.info(String.format("Scheduling an async peer removal for peer %s", peerId));
this.executor.submit(new Runnable() {
@Override public void run() {
peerRemoved(peerId);
}
});
}
@Override
public void peerRemoved(String peerId) {
removePeer(peerId);

View File

@ -181,7 +181,7 @@ public class TestReplicationSource {
TEST_UTIL.getConfiguration());
for(int i = 0; i < 3; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
KeyValue kv = new KeyValue(b, b, b);
WALEdit edit = new WALEdit();
edit.add(kv);
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
@ -256,11 +256,10 @@ public class TestReplicationSource {
private void appendEntries(WALProvider.Writer writer, int numEntries) throws IOException {
for (int i = 0; i < numEntries; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b,b,b);
KeyValue kv = new KeyValue(b, b, b);
WALEdit edit = new WALEdit();
edit.add(kv);
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0,
HConstants.DEFAULT_CLUSTER_ID);
WALKey key = new WALKey(b, TableName.valueOf(b), 0, 0, HConstants.DEFAULT_CLUSTER_ID);
NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
scopes.put(b, HConstants.REPLICATION_SCOPE_GLOBAL);
key.setScopes(scopes);
@ -565,7 +564,13 @@ public class TestReplicationSource {
};
final ReplicationSource source = mocks.createReplicationSourceAndManagerWithMocks(endpoint);
source.run();
source.startup();
// source thread should be active
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return source.isAlive();
}
});
source.enqueueLog(log1);
// Wait for source to replicate
@ -588,6 +593,14 @@ public class TestReplicationSource {
return !source.isSourceActive();
}
});
// And the source thread be terminated
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return !source.isAlive();
}
});
assertTrue("Source should be removed", mocks.manager.getSources().isEmpty());
}
@Test

View File

@ -22,6 +22,7 @@ import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -78,19 +79,27 @@ public class TestReplicationSourceWithoutReplicationZnodes
wal.sync(txid);
wal.rollWriter();
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + server.getServerName() + "/1");
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return !manager.getSources().isEmpty();
}
});
Assert.assertEquals("There should be exactly one source",
1, manager.getSources().size());
Assert.assertEquals("Replication source is not correct",
ReplicationSourceDummyWithNoTermination.class,
manager.getSources().get(0).getClass());
// delete the znodes for peer
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + server.getServerName() + "/1");
manager
.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
false);
Assert.assertTrue("Replication source should be terminated and removed",
manager.getSources().isEmpty());
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
return manager.getSources().isEmpty();
}
});
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
}