HBASE-10575 ReplicationSource thread can't be terminated if it runs into the loop to contact peer's zk ensemble and fails continuously

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1571579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2014-02-25 06:31:10 +00:00
parent 577c1ee7d7
commit 48d8e996ed
1 changed files with 23 additions and 12 deletions

View File

@ -207,24 +207,43 @@ public class ReplicationSource extends Thread
} }
} }
private void uninitialize() {
if (this.conn != null) {
try {
this.conn.close();
} catch (IOException e) {
LOG.debug("Attempt to close connection failed", e);
}
}
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
}
@Override @Override
public void run() { public void run() {
connectToPeers(); connectToPeers();
// We were stopped while looping to connect to sinks, just abort // We were stopped while looping to connect to sinks, just abort
if (!this.isActive()) { if (!this.isActive()) {
metrics.clear(); uninitialize();
return; return;
} }
int sleepMultiplier = 1; int sleepMultiplier = 1;
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
while (this.peerClusterId == null) { while (this.isActive() && this.peerClusterId == null) {
this.peerClusterId = replicationPeers.getPeerUUID(this.peerId); this.peerClusterId = replicationPeers.getPeerUUID(this.peerId);
if (this.peerClusterId == null) { if (this.isActive() && this.peerClusterId == null) {
if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) { if (sleepForRetries("Cannot contact the peer's zk ensemble", sleepMultiplier)) {
sleepMultiplier++; sleepMultiplier++;
} }
} }
} }
// We were stopped while looping to contact peer's zk ensemble, just abort
if (!this.isActive()) {
uninitialize();
return;
}
// resetting to 1 to reuse later // resetting to 1 to reuse later
sleepMultiplier = 1; sleepMultiplier = 1;
@ -365,15 +384,7 @@ public class ReplicationSource extends Thread
sleepMultiplier = 1; sleepMultiplier = 1;
shipEdits(currentWALisBeingWrittenTo, entries); shipEdits(currentWALisBeingWrittenTo, entries);
} }
if (this.conn != null) { uninitialize();
try {
this.conn.close();
} catch (IOException e) {
LOG.debug("Attempt to close connection failed", e);
}
}
LOG.debug("Source exiting " + this.peerId);
metrics.clear();
} }
/** /**