HBASE-4363 [replication] ReplicationSource won't close if failing

to contact the sink (JD and Lars Hofhansl)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1171286 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2011-09-15 21:48:59 +00:00
parent 067b987b3c
commit 419490d69b
2 changed files with 14 additions and 6 deletions

View File

@ -266,6 +266,8 @@ Release 0.91.0 - Unreleased
HBASE-4351 If from Admin we try to unassign a region forcefully, HBASE-4351 If from Admin we try to unassign a region forcefully,
though a valid region name is given the master is not able though a valid region name is given the master is not able
to identify the region to unassign (Ramkrishna) to identify the region to unassign (Ramkrishna)
HBASE-4363 [replication] ReplicationSource won't close if failing
to contact the sink (JD and Lars Hofhansl)
IMPROVEMENTS IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -240,7 +240,7 @@ public class ReplicationSource extends Thread
public void run() { public void run() {
connectToPeers(); connectToPeers();
// We were stopped while looping to connect to sinks, just abort // We were stopped while looping to connect to sinks, just abort
if (this.stopper.isStopped()) { if (!this.isActive()) {
return; return;
} }
// delay this until we are in an asynchronous thread // delay this until we are in an asynchronous thread
@ -265,7 +265,7 @@ public class ReplicationSource extends Thread
} }
int sleepMultiplier = 1; int sleepMultiplier = 1;
// Loop until we close down // Loop until we close down
while (!stopper.isStopped() && this.running) { while (isActive()) {
// Sleep until replication is enabled again // Sleep until replication is enabled again
if (!this.replicating.get() || !this.sourceEnabled.get()) { if (!this.replicating.get() || !this.sourceEnabled.get()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) { if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@ -348,7 +348,7 @@ public class ReplicationSource extends Thread
// If we didn't get anything to replicate, or if we hit a IOE, // If we didn't get anything to replicate, or if we hit a IOE,
// wait a bit and retry. // wait a bit and retry.
// But if we need to stop, don't bother sleeping // But if we need to stop, don't bother sleeping
if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) { if (this.isActive() && (gotIOE || currentNbEntries == 0)) {
this.manager.logPositionAndCleanOldLogs(this.currentPath, this.manager.logPositionAndCleanOldLogs(this.currentPath,
this.peerClusterZnode, this.position, queueRecovered); this.peerClusterZnode, this.position, queueRecovered);
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) { if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@ -428,7 +428,8 @@ public class ReplicationSource extends Thread
private void connectToPeers() { private void connectToPeers() {
// Connect to peer cluster first, unless we have to stop // Connect to peer cluster first, unless we have to stop
while (!this.stopper.isStopped() && this.currentPeers.size() == 0) { while (this.isActive() && this.currentPeers.size() == 0) {
try { try {
chooseSinks(); chooseSinks();
Thread.sleep(this.sleepForRetries); Thread.sleep(this.sleepForRetries);
@ -586,7 +587,7 @@ public class ReplicationSource extends Thread
LOG.warn("Was given 0 edits to ship"); LOG.warn("Was given 0 edits to ship");
return; return;
} }
while (!this.stopper.isStopped()) { while (this.isActive()) {
try { try {
HRegionInterface rrs = getRS(); HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries); LOG.debug("Replicating " + currentNbEntries);
@ -613,6 +614,7 @@ public class ReplicationSource extends Thread
} }
try { try {
boolean down; boolean down;
// Spin while the slave is down and we're not asked to shutdown/close
do { do {
down = isSlaveDown(); down = isSlaveDown();
if (down) { if (down) {
@ -622,7 +624,7 @@ public class ReplicationSource extends Thread
chooseSinks(); chooseSinks();
} }
} }
} while (!this.stopper.isStopped() && down); } while (this.isActive() && down );
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.debug("Interrupted while trying to contact the peer cluster"); LOG.debug("Interrupted while trying to contact the peer cluster");
} catch (KeeperException e) { } catch (KeeperException e) {
@ -742,6 +744,10 @@ public class ReplicationSource extends Thread
this.sourceEnabled.set(status); this.sourceEnabled.set(status);
} }
private boolean isActive() {
return !this.stopper.isStopped() && this.running;
}
/** /**
* Comparator used to compare logs together based on their start time * Comparator used to compare logs together based on their start time
*/ */