HBASE-4501 [replication] Shutting down a stream leaves recovered
sources running git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1179345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f973b6081e
commit
1edf8b65e4
|
@ -702,6 +702,8 @@ Release 0.90.5 - Unreleased
|
||||||
HBASE-4515 User.getCurrent() can fail to initialize the current user
|
HBASE-4515 User.getCurrent() can fail to initialize the current user
|
||||||
HBASE-4473 NPE when executors are down but events are still coming in
|
HBASE-4473 NPE when executors are down but events are still coming in
|
||||||
HBASE-4537 TestUser imports breaking build against secure Hadoop
|
HBASE-4537 TestUser imports breaking build against secure Hadoop
|
||||||
|
HBASE-4501 [replication] Shutting down a stream leaves recovered
|
||||||
|
sources running
|
||||||
|
|
||||||
IMPROVEMENT
|
IMPROVEMENT
|
||||||
HBASE-4205 Enhance HTable javadoc (Eric Charles)
|
HBASE-4205 Enhance HTable javadoc (Eric Charles)
|
||||||
|
|
|
@ -354,6 +354,7 @@ public class ReplicationSourceManager {
|
||||||
LOG.info("Closing the following queue " + id + ", currently have "
|
LOG.info("Closing the following queue " + id + ", currently have "
|
||||||
+ sources.size() + " and another "
|
+ sources.size() + " and another "
|
||||||
+ oldsources.size() + " that were recovered");
|
+ oldsources.size() + " that were recovered");
|
||||||
|
String terminateMessage = "Replication stream was removed by a user";
|
||||||
ReplicationSourceInterface srcToRemove = null;
|
ReplicationSourceInterface srcToRemove = null;
|
||||||
List<ReplicationSourceInterface> oldSourcesToDelete =
|
List<ReplicationSourceInterface> oldSourcesToDelete =
|
||||||
new ArrayList<ReplicationSourceInterface>();
|
new ArrayList<ReplicationSourceInterface>();
|
||||||
|
@ -364,6 +365,7 @@ public class ReplicationSourceManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (ReplicationSourceInterface src : oldSourcesToDelete) {
|
for (ReplicationSourceInterface src : oldSourcesToDelete) {
|
||||||
|
src.terminate(terminateMessage);
|
||||||
closeRecoveredQueue((src));
|
closeRecoveredQueue((src));
|
||||||
}
|
}
|
||||||
LOG.info("Number of deleted recovered sources for " + id + ": "
|
LOG.info("Number of deleted recovered sources for " + id + ": "
|
||||||
|
@ -379,7 +381,7 @@ public class ReplicationSourceManager {
|
||||||
LOG.error("The queue we wanted to close is missing " + id);
|
LOG.error("The queue we wanted to close is missing " + id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
srcToRemove.terminate("Replication stream was removed by a user");
|
srcToRemove.terminate(terminateMessage);
|
||||||
this.sources.remove(srcToRemove);
|
this.sources.remove(srcToRemove);
|
||||||
this.zkHelper.deleteSource(id, true);
|
this.zkHelper.deleteSource(id, true);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue