HBASE-20561 The way we stop a ReplicationSource may cause the RS down

This commit is contained in:
Guanghao Zhang 2018-06-12 22:19:39 +08:00
parent 8648af07d4
commit ec66434380
3 changed files with 49 additions and 7 deletions

View File

@ -499,9 +499,29 @@ public class ReplicationSource implements ReplicationSourceInterface {
Collection<ReplicationSourceShipper> workers = workerThreads.values(); Collection<ReplicationSourceShipper> workers = workerThreads.values();
for (ReplicationSourceShipper worker : workers) { for (ReplicationSourceShipper worker : workers) {
worker.stopWorker(); worker.stopWorker();
worker.entryReader.interrupt(); worker.entryReader.setReaderRunning(false);
worker.interrupt();
} }
for (ReplicationSourceShipper worker : workers) {
if (worker.isAlive() || worker.entryReader.isAlive()) {
try {
// Wait worker to stop
Thread.sleep(this.sleepForRetries);
} catch (InterruptedException e) {
LOG.info("Interrupted while waiting " + worker.getName() + " to stop");
Thread.currentThread().interrupt();
}
// If worker still is alive after waiting, interrupt it
if (worker.isAlive()) {
worker.interrupt();
}
// If entry reader is alive after waiting, interrupt it
if (worker.entryReader.isAlive()) {
worker.entryReader.interrupt();
}
}
}
if (this.replicationEndpoint != null) { if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop(); this.replicationEndpoint.stop();
} }

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -449,6 +450,24 @@ public class ReplicationSourceManager implements ReplicationListener {
void exec() throws ReplicationException; void exec() throws ReplicationException;
} }
/**
* Refresh replication source will terminate the old source first, then the source thread will be
* interrupted. Need to handle it instead of abort the region server.
*/
private void interruptOrAbortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
} catch (ReplicationException e) {
if (e.getCause() != null && e.getCause() instanceof KeeperException.SystemErrorException
&& e.getCause().getCause() != null && e.getCause()
.getCause() instanceof InterruptedException) {
throw new RuntimeException(
"Thread is interrupted, the replication source may be terminated");
}
server.abort("Failed to operate on replication queue", e);
}
}
private void abortWhenFail(ReplicationQueueOperation op) { private void abortWhenFail(ReplicationQueueOperation op) {
try { try {
op.exec(); op.exec();
@ -484,8 +503,9 @@ public class ReplicationSourceManager implements ReplicationListener {
public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered, public void logPositionAndCleanOldLogs(String queueId, boolean queueRecovered,
WALEntryBatch entryBatch) { WALEntryBatch entryBatch) {
String fileName = entryBatch.getLastWalPath().getName(); String fileName = entryBatch.getLastWalPath().getName();
abortWhenFail(() -> this.queueStorage.setWALPosition(server.getServerName(), queueId, fileName, interruptOrAbortWhenFail(() -> this.queueStorage
entryBatch.getLastWalPosition(), entryBatch.getLastSeqIds())); .setWALPosition(server.getServerName(), queueId, fileName, entryBatch.getLastWalPosition(),
entryBatch.getLastSeqIds()));
cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered); cleanOldLogs(fileName, entryBatch.isEndOfFile(), queueId, queueRecovered);
} }
@ -523,7 +543,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet); LOG.debug("Removing {} logs in the list: {}", walSet.size(), walSet);
for (String wal : walSet) { for (String wal : walSet) {
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal)); interruptOrAbortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
} }
walSet.clear(); walSet.clear();
} }
@ -886,7 +906,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
public void cleanUpHFileRefs(String peerId, List<String> files) { public void cleanUpHFileRefs(String peerId, List<String> files) {
abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files)); interruptOrAbortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
} }
int activeFailoverTaskCount() { int activeFailoverTaskCount() {

View File

@ -607,7 +607,9 @@ public class ZKWatcher implements Watcher, Abortable, Closeable {
public void interruptedException(InterruptedException ie) throws KeeperException { public void interruptedException(InterruptedException ie) throws KeeperException {
interruptedExceptionNoThrow(ie, true); interruptedExceptionNoThrow(ie, true);
// Throw a system error exception to let upper level handle it // Throw a system error exception to let upper level handle it
throw new KeeperException.SystemErrorException(); KeeperException keeperException = new KeeperException.SystemErrorException();
keeperException.initCause(ie);
throw keeperException;
} }
/** /**