HBASE-8599 HLogs in ZK are not cleaned up when replication lag is minimal (Varun Sharma)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1504661 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2013-07-18 21:09:43 +00:00
parent 3ea120a69b
commit f4872d7ef8
2 changed files with 24 additions and 6 deletions

View File

@ -282,6 +282,10 @@ public class ReplicationSource extends Thread
sleepMultiplier++;
}
continue;
} else if (oldPath != null && !oldPath.getName().equals(getCurrentPath().getName())) {
this.manager.cleanOldLogs(getCurrentPath().getName(),
this.peerId,
this.replicationQueueInfo.isQueueRecovered());
}
boolean currentWALisBeingWrittenTo = false;
//For WAL files we own (rather than recovered), take a snapshot of whether the

View File

@ -155,15 +155,29 @@ public class ReplicationSourceManager {
if (holdLogInZK) {
return;
}
cleanOldLogs(key, id, queueRecovered);
}
/**
* Cleans a log file and all older files from ZK. Called when we are sure that a
* log file is closed and has no more entries.
* @param key Path to the log
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
public void cleanOldLogs(String key,
String id,
boolean queueRecovered) {
synchronized (this.hlogsById) {
SortedSet<String> hlogs = this.hlogsById.get(id);
if (!queueRecovered && !hlogs.first().equals(key)) {
SortedSet<String> hlogSet = hlogs.headSet(key);
for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog, id);
}
hlogSet.clear();
if (queueRecovered || hlogs.first().equals(key)) {
return;
}
SortedSet<String> hlogSet = hlogs.headSet(key);
for (String hlog : hlogSet) {
this.zkHelper.removeLogFromList(hlog, id);
}
hlogSet.clear();
}
}