SOLR-4829: fix tlog leaks

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1483561 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2013-05-16 20:49:59 +00:00
parent 439c28f123
commit 069387266e
4 changed files with 58 additions and 18 deletions

View File

@ -180,6 +180,10 @@ Bug Fixes
* SOLR-4813: Fix SynonymFilterFactory to allow init parameters for
tokenizer factory used when parsing synonyms file. (Shingo Sasaki, hossman)
* SOLR-4829: Fix transaction log leaks (a failure to clean up some old logs)
on a shard leader, or when unexpected exceptions are thrown during log
recovery. (Steven Bower, Mark Miller, yonik)
Other Changes

View File

@ -186,15 +186,29 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (!success && (ulog == null || ulog.getRecentUpdates().getVersions(1).isEmpty())) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway
log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
if (!success) {
boolean hasRecentUpdates = false;
if (ulog != null) {
// TODO: we could optimize this if necessary
UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
try {
hasRecentUpdates = !recentUpdates.getVersions(1).isEmpty();
} finally {
recentUpdates.close();
}
}
if (!hasRecentUpdates) {
// we failed sync, but we have no versions - we can't sync in that case
// - we were active
// before, so become leader anyway
log.info("We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway");
success = true;
}
}
// if !success but no one else is in active mode,
// we are the leader anyway
// TODO: should we also be leader if there is only one other active?

View File

@ -554,13 +554,13 @@ public class RealTimeGetComponent extends SearchComponent
List<String> versions = StrUtils.splitSmart(versionsStr, ",", true);
// TODO: get this from cache instead of rebuilding?
UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
List<Object> updates = new ArrayList<Object>(versions.size());
long minVersion = Long.MAX_VALUE;
// TODO: get this from cache instead of rebuilding?
UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
try {
for (String versionStr : versions) {
long version = Long.parseLong(versionStr);

View File

@ -989,7 +989,7 @@ public class UpdateLog implements PluginInfoInitialized {
}
}
/** The RecentUpdates object returned must be closed after use */
public RecentUpdates getRecentUpdates() {
Deque<TransactionLog> logList;
synchronized (this) {
@ -1009,9 +1009,21 @@ public class UpdateLog implements PluginInfoInitialized {
// TODO: what if I hand out a list of updates, then do an update, then hand out another list (and
// one of the updates I originally handed out fell off the list). Over-request?
RecentUpdates recentUpdates = new RecentUpdates();
recentUpdates.logList = logList;
recentUpdates.update();
boolean success = false;
RecentUpdates recentUpdates = null;
try {
recentUpdates = new RecentUpdates();
recentUpdates.logList = logList;
recentUpdates.update();
success = true;
} finally {
// defensive: if some unknown exception is thrown,
// make sure we close so that the tlogs are decref'd
if (!success && recentUpdates != null) {
recentUpdates.close();
}
}
return recentUpdates;
}
@ -1132,14 +1144,15 @@ public class UpdateLog implements PluginInfoInitialized {
class LogReplayer implements Runnable {
private Logger loglog = log; // set to something different?
List<TransactionLog> translogs;
Deque<TransactionLog> translogs;
TransactionLog.LogReader tlogReader;
boolean activeLog;
boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
boolean debug = loglog.isDebugEnabled();
public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
this.translogs = translogs;
this.translogs = new LinkedList<TransactionLog>();
this.translogs.addAll(translogs);
this.activeLog = activeLog;
}
@ -1159,7 +1172,9 @@ public class UpdateLog implements PluginInfoInitialized {
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging
try {
for (TransactionLog translog : translogs) {
for(;;) {
TransactionLog translog = translogs.pollFirst();
if (translog == null) break;
doReplay(translog);
}
} catch (SolrException e) {
@ -1179,6 +1194,13 @@ public class UpdateLog implements PluginInfoInitialized {
if (finishing) {
versionInfo.unblockUpdates();
}
// clean up in case we hit some unexpected exception and didn't get
// to more transaction logs
for (TransactionLog translog : translogs) {
log.error("ERROR: didn't get to recover from tlog " + translog);
translog.decref();
}
}
loglog.warn("Log replay finished. recoveryInfo=" + recoveryInfo);