HBASE-3041 [replication] ReplicationSink shouldn't kill the whole RS when
it fails to replicate HBASE-3044 [replication] ReplicationSource won't cleanup logs if there's nothing to replicate git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1022770 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bcaa2dea49
commit
a5a00687ef
|
@ -581,6 +581,10 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-3063 TestThriftServer failing in TRUNK
|
HBASE-3063 TestThriftServer failing in TRUNK
|
||||||
HBASE-3094 Fixes for miscellaneous broken tests
|
HBASE-3094 Fixes for miscellaneous broken tests
|
||||||
HBASE-3060 [replication] Reenable replication on trunk with unit tests
|
HBASE-3060 [replication] Reenable replication on trunk with unit tests
|
||||||
|
HBASE-3041 [replication] ReplicationSink shouldn't kill the whole RS when
|
||||||
|
it fails to replicate
|
||||||
|
HBASE-3044 [replication] ReplicationSource won't cleanup logs if there's
|
||||||
|
nothing to replicate
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -146,22 +146,9 @@ public class ReplicationSink {
|
||||||
this.metrics.appliedBatchesRate.inc(1);
|
this.metrics.appliedBatchesRate.inc(1);
|
||||||
LOG.info("Total replicated: " + totalReplicated);
|
LOG.info("Total replicated: " + totalReplicated);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
if (ex.getCause() instanceof TableNotFoundException) {
|
LOG.error("Unable to accept edit because:", ex);
|
||||||
LOG.warn("Losing edits because: ", ex);
|
|
||||||
} else {
|
|
||||||
// Should we log rejected edits in a file for replay?
|
|
||||||
LOG.error("Unable to accept edit because", ex);
|
|
||||||
this.stopper.stop("Unable to accept edit because " + ex.getMessage());
|
|
||||||
throw ex;
|
throw ex;
|
||||||
}
|
}
|
||||||
} catch (RuntimeException re) {
|
|
||||||
if (re.getCause() instanceof TableNotFoundException) {
|
|
||||||
LOG.warn("Losing edits because: ", re);
|
|
||||||
} else {
|
|
||||||
this.stopper.stop("Replication stopped us because " + re.getMessage());
|
|
||||||
throw re;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -320,6 +320,8 @@ public class ReplicationSource extends Thread
|
||||||
// wait a bit and retry.
|
// wait a bit and retry.
|
||||||
// But if we need to stop, don't bother sleeping
|
// But if we need to stop, don't bother sleeping
|
||||||
if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
|
if (!stopper.isStopped() && (gotIOE || currentNbEntries == 0)) {
|
||||||
|
this.manager.logPositionAndCleanOldLogs(this.currentPath,
|
||||||
|
this.peerClusterZnode, this.position, queueRecovered);
|
||||||
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
|
if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
|
||||||
sleepMultiplier++;
|
sleepMultiplier++;
|
||||||
}
|
}
|
||||||
|
@ -527,6 +529,10 @@ public class ReplicationSource extends Thread
|
||||||
*/
|
*/
|
||||||
protected void shipEdits() {
|
protected void shipEdits() {
|
||||||
int sleepMultiplier = 1;
|
int sleepMultiplier = 1;
|
||||||
|
if (this.currentNbEntries == 0) {
|
||||||
|
LOG.warn("Was given 0 edits to ship");
|
||||||
|
return;
|
||||||
|
}
|
||||||
while (!this.stopper.isStopped()) {
|
while (!this.stopper.isStopped()) {
|
||||||
try {
|
try {
|
||||||
HRegionInterface rrs = getRS();
|
HRegionInterface rrs = getRS();
|
||||||
|
|
Loading…
Reference in New Issue