SOLR-3180: harden erratic shutdowns, allow more than one tlog to recover on startup

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1296493 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-03-02 22:12:44 +00:00
parent 03d560ee05
commit fbe25c380d
3 changed files with 223 additions and 73 deletions

View File

@ -538,14 +538,33 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
return; return;
} }
if (writer != null) { // if we are later going to mark everything in the tlog as committed, then we
writer.close(); // need to block all updates from coming in so we can be sure that the close
// will contain all of the updates.
VersionInfo vinfo = ulog == null ? null : ulog.getVersionInfo();
if (vinfo != null) {
// TODO: move the RW update lock somewhere else?
vinfo.blockUpdates();
}
try {
boolean succeeded = false;
try {
if (writer != null) {
writer.close();
}
succeeded = true;
} finally {
if (ulog != null) ulog.close(succeeded);
}
} finally {
if (vinfo != null) {
vinfo.unblockUpdates();
}
} }
// if the writer hits an exception, it's OK (and perhaps desirable)
// to not close the ulog.
if (ulog != null) ulog.close(true);
} finally { } finally {
commitLock.unlock(); commitLock.unlock();
} }

View File

@ -86,7 +86,7 @@ public class UpdateLog implements PluginInfoInitialized {
private TransactionLog tlog; private TransactionLog tlog;
private TransactionLog prevTlog; private TransactionLog prevTlog;
private Deque<TransactionLog> logs = new LinkedList<TransactionLog>(); // list of recent logs, newest first private Deque<TransactionLog> logs = new LinkedList<TransactionLog>(); // list of recent logs, newest first
private TransactionLog newestLogOnStartup; private LinkedList<TransactionLog> newestLogsOnStartup = new LinkedList<TransactionLog>();
private int numOldRecords; // number of records in the recent logs private int numOldRecords; // number of records in the recent logs
private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>(); private Map<BytesRef,LogPtr> map = new HashMap<BytesRef, LogPtr>();
@ -172,16 +172,23 @@ public class UpdateLog implements PluginInfoInitialized {
File f = new File(tlogDir, oldLogName); File f = new File(tlogDir, oldLogName);
try { try {
oldLog = new TransactionLog( f, null, true ); oldLog = new TransactionLog( f, null, true );
addOldLog(oldLog); addOldLog(oldLog, false); // don't remove old logs on startup since more than one may be uncapped.
} catch (Exception e) { } catch (Exception e) {
SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e); SolrException.log(log, "Failure to open existing log file (non fatal) " + f, e);
f.delete(); f.delete();
} }
} }
newestLogOnStartup = oldLog;
// Record first two logs (oldest first) at startup for potential tlog recovery.
// It's possible that at abnormal shutdown both "tlog" and "prevTlog" were uncapped.
for (TransactionLog ll : logs) {
newestLogsOnStartup.addFirst(ll);
if (newestLogsOnStartup.size() >= 2) break;
}
versionInfo = new VersionInfo(uhandler, 256); versionInfo = new VersionInfo(uhandler, 256);
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates(); UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates();
try { try {
startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep); startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep);
@ -207,7 +214,7 @@ public class UpdateLog implements PluginInfoInitialized {
/* Takes over ownership of the log, keeping it until no longer needed /* Takes over ownership of the log, keeping it until no longer needed
and then decrementing it's reference and dropping it. and then decrementing it's reference and dropping it.
*/ */
private void addOldLog(TransactionLog oldLog) { private void addOldLog(TransactionLog oldLog, boolean removeOld) {
if (oldLog == null) return; if (oldLog == null) return;
numOldRecords += oldLog.numRecords(); numOldRecords += oldLog.numRecords();
@ -218,7 +225,7 @@ public class UpdateLog implements PluginInfoInitialized {
currRecords += tlog.numRecords(); currRecords += tlog.numRecords();
} }
while (logs.size() > 0) { while (removeOld && logs.size() > 0) {
TransactionLog log = logs.peekLast(); TransactionLog log = logs.peekLast();
int nrec = log.numRecords(); int nrec = log.numRecords();
// remove oldest log if we don't need it to keep at least numRecordsToKeep, or if // remove oldest log if we don't need it to keep at least numRecordsToKeep, or if
@ -386,18 +393,16 @@ public class UpdateLog implements PluginInfoInitialized {
// since we're changing the log, we must change the map. // since we're changing the log, we must change the map.
newMap(); newMap();
if (prevTlog != null) {
globalStrings = prevTlog.getGlobalStrings();
}
// since document additions can happen concurrently with commit, create // since document additions can happen concurrently with commit, create
// a new transaction log first so that we know the old one is definitely // a new transaction log first so that we know the old one is definitely
// in the index. // in the index.
prevTlog = tlog; prevTlog = tlog;
tlog = null; tlog = null;
id++; id++;
if (prevTlog != null) {
globalStrings = prevTlog.getGlobalStrings();
}
addOldLog(prevTlog);
} }
} }
@ -410,6 +415,8 @@ public class UpdateLog implements PluginInfoInitialized {
// if we made it through the commit, write a commit command to the log // if we made it through the commit, write a commit command to the log
// TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
prevTlog.writeCommit(cmd); prevTlog.writeCommit(cmd);
addOldLog(prevTlog, true);
// the old log list will decref when no longer needed // the old log list will decref when no longer needed
// prevTlog.decref(); // prevTlog.decref();
prevTlog = null; prevTlog = null;
@ -562,26 +569,32 @@ public class UpdateLog implements PluginInfoInitialized {
} }
} }
public Future<RecoveryInfo> recoverFromLog() { public Future<RecoveryInfo> recoverFromLog() {
recoveryInfo = new RecoveryInfo(); recoveryInfo = new RecoveryInfo();
if (newestLogOnStartup == null) return null;
if (!newestLogOnStartup.try_incref()) return null; // log file was already closed List<TransactionLog> recoverLogs = new ArrayList<TransactionLog>(1);
for (TransactionLog ll : newestLogsOnStartup) {
if (!ll.try_incref()) continue;
// now that we've incremented the reference, the log shouldn't go away. try {
try { if (ll.endsWithCommit()) {
if (newestLogOnStartup.endsWithCommit()) { ll.decref();
newestLogOnStartup.decref(); continue;
return null; }
} catch (IOException e) {
log.error("Error inspecting tlog " + ll);
ll.decref();
continue;
} }
} catch (IOException e) {
log.error("Error inspecting tlog " + newestLogOnStartup); recoverLogs.add(ll);
newestLogOnStartup.decref();
return null;
} }
if (recoverLogs.isEmpty()) return null;
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor); ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
LogReplayer replayer = new LogReplayer(newestLogOnStartup, false); LogReplayer replayer = new LogReplayer(recoverLogs, false);
versionInfo.blockUpdates(); versionInfo.blockUpdates();
try { try {
@ -590,8 +603,9 @@ public class UpdateLog implements PluginInfoInitialized {
versionInfo.unblockUpdates(); versionInfo.unblockUpdates();
} }
return cs.submit(replayer, recoveryInfo); // At this point, we are guaranteed that any new updates coming in will see the state as "replaying"
return cs.submit(replayer, recoveryInfo);
} }
@ -606,6 +620,22 @@ public class UpdateLog implements PluginInfoInitialized {
} }
} }
private void doClose(TransactionLog theLog, boolean writeCommit) {
if (theLog != null) {
if (writeCommit) {
// record a commit
log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
theLog.writeCommit(cmd);
}
theLog.deleteOnClose = false;
theLog.decref();
theLog.forceClose();
}
}
public void close(boolean committed) { public void close(boolean committed) {
synchronized (this) { synchronized (this) {
try { try {
@ -616,25 +646,11 @@ public class UpdateLog implements PluginInfoInitialized {
// Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions // Don't delete the old tlogs, we want to be able to replay from them and retrieve old versions
if (prevTlog != null) { doClose(prevTlog, committed);
prevTlog.deleteOnClose = false; doClose(tlog, committed);
prevTlog.decref();
prevTlog.forceClose();
}
if (tlog != null) {
if (committed) {
// record a commit
log.info("Recording current log as closed for " + uhandler.core);
CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
tlog.writeCommit(cmd);
}
tlog.deleteOnClose = false;
tlog.decref();
tlog.forceClose();
}
for (TransactionLog log : logs) { for (TransactionLog log : logs) {
if (log == prevTlog || log == tlog) continue;
log.deleteOnClose = false; log.deleteOnClose = false;
log.decref(); log.decref();
log.forceClose(); log.forceClose();
@ -894,7 +910,7 @@ public class UpdateLog implements PluginInfoInitialized {
throw new RuntimeException("executor is not running..."); throw new RuntimeException("executor is not running...");
} }
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor); ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<RecoveryInfo>(recoveryExecutor);
LogReplayer replayer = new LogReplayer(tlog, true); LogReplayer replayer = new LogReplayer(Arrays.asList(new TransactionLog[]{tlog}), true);
return cs.submit(replayer, recoveryInfo); return cs.submit(replayer, recoveryInfo);
} }
@ -914,31 +930,59 @@ public class UpdateLog implements PluginInfoInitialized {
private RecoveryInfo recoveryInfo; private RecoveryInfo recoveryInfo;
// TODO: do we let the log replayer run across core reloads?
class LogReplayer implements Runnable { class LogReplayer implements Runnable {
private Logger loglog = log; // set to something different? private Logger loglog = log; // set to something different?
TransactionLog translog; List<TransactionLog> translogs;
TransactionLog.LogReader tlogReader; TransactionLog.LogReader tlogReader;
boolean activeLog; boolean activeLog;
boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
boolean debug = loglog.isDebugEnabled(); boolean debug = loglog.isDebugEnabled();
public LogReplayer(TransactionLog translog, boolean activeLog) { public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
this.translog = translog; this.translogs = translogs;
this.activeLog = activeLog; this.activeLog = activeLog;
} }
private SolrQueryRequest req;
private SolrQueryResponse rsp;
@Override @Override
public void run() { public void run() {
ModifiableSolrParams params = new ModifiableSolrParams(); ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.SEEN_LEADER, true); params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params); req = new LocalSolrQueryRequest(uhandler.core, params);
SolrQueryResponse rsp = new SolrQueryResponse(); rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging
try { try {
for (TransactionLog translog : translogs) {
doReplay(translog);
}
} catch (Throwable e) {
recoveryInfo.errors++;
SolrException.log(log,e);
} finally {
// change the state while updates are still blocked to prevent races
state = State.ACTIVE;
if (finishing) {
versionInfo.unblockUpdates();
}
}
loglog.warn("Log replay finished. recoveryInfo=" + recoveryInfo);
if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
SolrRequestInfo.clearRequestInfo();
}
public void doReplay(TransactionLog translog) {
try {
loglog.warn("Starting log replay " + translog + " active="+activeLog + " starting pos=" + recoveryInfo.positionOfStart); loglog.warn("Starting log replay " + translog + " active="+activeLog + " starting pos=" + recoveryInfo.positionOfStart);
tlogReader = translog.getReader(recoveryInfo.positionOfStart); tlogReader = translog.getReader(recoveryInfo.positionOfStart);
@ -1099,25 +1143,10 @@ public class UpdateLog implements PluginInfoInitialized {
loglog.error("Replay exception: finish()", ex); loglog.error("Replay exception: finish()", ex);
} }
tlogReader.close();
translog.decref();
} catch (Throwable e) {
recoveryInfo.errors++;
SolrException.log(log,e);
} finally { } finally {
// change the state while updates are still blocked to prevent races if (tlogReader != null) tlogReader.close();
state = State.ACTIVE; translog.decref();
if (finishing) {
versionInfo.unblockUpdates();
}
} }
loglog.warn("Ending log replay " + tlogReader + " recoveryInfo=" + recoveryInfo);
if (testing_logReplayFinishHook != null) testing_logReplayFinishHook.run();
SolrRequestInfo.clearRequestInfo();
} }
} }

View File

@ -17,9 +17,11 @@
package org.apache.solr.search; package org.apache.solr.search;
import org.apache.lucene.util.BytesRef;
import org.apache.noggit.JSONUtil; import org.apache.noggit.JSONUtil;
import org.apache.noggit.ObjectBuilder; import org.apache.noggit.ObjectBuilder;
import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.util.ByteUtils;
import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.DirectUpdateHandler2; import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateLog;
@ -727,6 +729,106 @@ public class TestRecovery extends SolrTestCaseJ4 {
} }
} }
// in rare circumstances, two logs can be left uncapped (lacking a commit at the end signifying that all the content in the log was committed)
@Test
public void testRecoveryMultipleLogs() throws Exception {
try {
DirectUpdateHandler2.commitOnClose = false;
final Semaphore logReplay = new Semaphore(0);
final Semaphore logReplayFinish = new Semaphore(0);
UpdateLog.testing_logReplayHook = new Runnable() {
@Override
public void run() {
try {
assertTrue(logReplay.tryAcquire(timeout, TimeUnit.SECONDS));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
UpdateLog.testing_logReplayFinishHook = new Runnable() {
@Override
public void run() {
logReplayFinish.release();
}
};
File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();
clearIndex();
assertU(commit());
assertU(adoc("id","AAAAAA"));
assertU(adoc("id","BBBBBB"));
assertU(adoc("id","CCCCCC"));
h.close();
String[] files = UpdateLog.getLogList(logDir);
Arrays.sort(files);
String fname = files[files.length-1];
RandomAccessFile raf = new RandomAccessFile(new File(logDir, fname), "rw");
raf.seek(raf.length()); // seek to end
raf.writeLong(0xffffffffffffffffL);
raf.writeChars("This should be appended to a good log file, representing a bad partially written record.");
byte[] content = new byte[(int)raf.length()];
raf.seek(0);
raf.readFully(content);
raf.close();
// Now make a newer log file with just the IDs changed. NOTE: this may not work if log format changes too much!
findReplace("AAAAAA".getBytes("UTF-8"), "aaaaaa".getBytes("UTF-8"), content);
findReplace("BBBBBB".getBytes("UTF-8"), "bbbbbb".getBytes("UTF-8"), content);
findReplace("CCCCCC".getBytes("UTF-8"), "cccccc".getBytes("UTF-8"), content);
// WARNING... assumes format of .00000n where n is less than 9
String fname2 = fname.substring(0, fname.length()-1) + (char)(fname.charAt(fname.length()-1)+1);
raf = new RandomAccessFile(new File(logDir, fname2), "rw");
raf.write(content);
raf.close();
logReplay.release(1000);
logReplayFinish.drainPermits();
ignoreException("OutOfBoundsException"); // this is what the corrupted log currently produces... subject to change.
createCore();
assertTrue(logReplayFinish.tryAcquire(timeout, TimeUnit.SECONDS));
resetExceptionIgnores();
assertJQ(req("q","*:*") ,"/response/numFound==6");
} finally {
DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
}
}
// NOTE: replacement must currently be same size
private static void findReplace(byte[] from, byte[] to, byte[] data) {
int idx = -from.length;
for(;;) {
idx = indexOf(from, data, idx + from.length); // skip over previous match
if (idx < 0) break;
for (int i=0; i<to.length; i++) {
data[idx+i] = to[i];
}
}
}
private static int indexOf(byte[] target, byte[] data, int start) {
outer: for (int i=start; i<data.length - target.length; i++) {
for (int j=0; j<target.length; j++) {
if (data[i+j] != target[j]) continue outer;
}
return i;
}
return -1;
}
// stops the core, removes the transaction logs, restarts the core. // stops the core, removes the transaction logs, restarts the core.
void deleteLogs() throws Exception { void deleteLogs() throws Exception {
File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir(); File logDir = h.getCore().getUpdateHandler().getUpdateLog().getLogDir();