HBASE-3515 [replication] ReplicationSource can miss a log after RS comes out of GC

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1196398 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2011-11-01 23:44:02 +00:00
parent 850bb134a6
commit 08a9170236
3 changed files with 20 additions and 11 deletions

View File

@ -438,6 +438,7 @@ Release 0.92.0 - Unreleased
HBASE-4695 WAL logs get deleted before region server can fully flush HBASE-4695 WAL logs get deleted before region server can fully flush
(gaojinchao) (gaojinchao)
HBASE-4708 Revert safemode related pieces of hbase-4510 (Harsh J) HBASE-4708 Revert safemode related pieces of hbase-4510 (Harsh J)
HBASE-3515 [replication] ReplicationSource can miss a log after RS comes out of GC
TESTS TESTS
HBASE-4450 test for number of blocks read: to serve as baseline for expected HBASE-4450 test for number of blocks read: to serve as baseline for expected

View File

@ -432,14 +432,11 @@ public class ReplicationZookeeper {
* @param filename name of the hlog's znode * @param filename name of the hlog's znode
* @param peerId name of the cluster's znode * @param peerId name of the cluster's znode
*/ */
public void addLogToList(String filename, String peerId) { public void addLogToList(String filename, String peerId)
try { throws KeeperException {
String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId); String znode = ZKUtil.joinZNode(this.rsServerNameZnode, peerId);
znode = ZKUtil.joinZNode(znode, filename); znode = ZKUtil.joinZNode(znode, filename);
ZKUtil.createWithParents(this.zookeeper, znode); ZKUtil.createWithParents(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed add log to list", e);
}
} }
/** /**

View File

@ -210,7 +210,14 @@ public class ReplicationSourceManager {
if (this.latestPath != null) { if (this.latestPath != null) {
String name = this.latestPath.getName(); String name = this.latestPath.getName();
this.hlogsById.get(id).add(name); this.hlogsById.get(id).add(name);
try {
this.zkHelper.addLogToList(name, src.getPeerClusterZnode()); this.zkHelper.addLogToList(name, src.getPeerClusterZnode());
} catch (KeeperException ke) {
String message = "Cannot add log to zk for" +
" replication when creating a new source";
stopper.stop(message);
throw new IOException(message, ke);
}
src.enqueueLog(this.latestPath); src.enqueueLog(this.latestPath);
} }
} }
@ -247,7 +254,7 @@ public class ReplicationSourceManager {
return this.sources; return this.sources;
} }
void logRolled(Path newLog) { void logRolled(Path newLog) throws IOException {
if (!this.replicating.get()) { if (!this.replicating.get()) {
LOG.warn("Replication stopped, won't add new log"); LOG.warn("Replication stopped, won't add new log");
return; return;
@ -256,7 +263,11 @@ public class ReplicationSourceManager {
synchronized (this.hlogsById) { synchronized (this.hlogsById) {
String name = newLog.getName(); String name = newLog.getName();
for (ReplicationSourceInterface source : this.sources) { for (ReplicationSourceInterface source : this.sources) {
try {
this.zkHelper.addLogToList(name, source.getPeerClusterZnode()); this.zkHelper.addLogToList(name, source.getPeerClusterZnode());
} catch (KeeperException ke) {
throw new IOException("Cannot add log to zk for replication", ke);
}
} }
for (SortedSet<String> hlogs : this.hlogsById.values()) { for (SortedSet<String> hlogs : this.hlogsById.values()) {
if (this.sources.isEmpty()) { if (this.sources.isEmpty()) {