From 964e90adb3384a86130a5f626585677ab6561dd0 Mon Sep 17 00:00:00 2001 From: Yonik Seeley Date: Tue, 22 May 2012 00:36:11 +0000 Subject: [PATCH] SOLR-3469: prevent false peersync recovery by recording buffering flags in tlog git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1341283 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/RecoveryStrategy.java | 40 +++++---- .../component/RealTimeGetComponent.java | 4 +- .../java/org/apache/solr/update/PeerSync.java | 6 +- .../apache/solr/update/TransactionLog.java | 16 ++-- .../org/apache/solr/update/UpdateLog.java | 61 +++++++++---- .../org/apache/solr/search/TestRecovery.java | 87 +++++++++++++++++++ 6 files changed, 170 insertions(+), 44 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index e3f7a9111a9..e3d7eff3d51 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -226,40 +226,48 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { } - List startingRecentVersions; - UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates(); + List recentVersions; + UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates(); try { - startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep); + recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep); } finally { - startingRecentUpdates.close(); + recentUpdates.close(); } - List reallyStartingVersions = ulog.getStartingVersions(); + List startingVersions = ulog.getStartingVersions(); - if (reallyStartingVersions != null && recoveringAfterStartup) { + if (startingVersions != null && recoveringAfterStartup) { int oldIdx = 0; // index of the start of the old list in the current list - long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0) : 0; + long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0; - for (; oldIdx 0) { log.info("####### Found new versions added after startup: num=" + oldIdx); - log.info("###### currentVersions=" + startingRecentVersions); + log.info("###### currentVersions=" + recentVersions); } - log.info("###### startupVersions=" + reallyStartingVersions); + log.info("###### startupVersions=" + startingVersions); } + + boolean firstTime = true; + if (recoveringAfterStartup) { // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were - // when we went down. - startingRecentVersions = reallyStartingVersions; - } + // when we went down. We may have received updates since then. + recentVersions = startingVersions; - boolean firstTime = true; + if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) { + // last operation at the time of startup had the GAP flag set... + // this means we were previously doing a full index replication + // that probably didn't complete and buffering updates in the meantime. + firstTime = false; // skip peersync + } + } while (!successfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though try { @@ -287,7 +295,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { // + " i am:" + zkController.getNodeName()); PeerSync peerSync = new PeerSync(core, Collections.singletonList(leaderUrl), ulog.numRecordsToKeep); - peerSync.setStartingVersions(startingRecentVersions); + peerSync.setStartingVersions(recentVersions); boolean syncSuccess = peerSync.sync(); if (syncSuccess) { SolrQueryRequest req = new LocalSolrQueryRequest(core, diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 0d5f963d918..5efeff52d49 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -141,7 +141,7 @@ public class RealTimeGetComponent extends SearchComponent // should currently be a List List entry = (List)o; assert entry.size() >= 3; - int oper = (Integer)entry.get(0); + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; switch (oper) { case UpdateLog.ADD: SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema()); @@ -211,7 +211,7 @@ public class RealTimeGetComponent extends SearchComponent // should currently be a List List entry = (List)o; assert entry.size() >= 3; - int oper = (Integer)entry.get(0); + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; switch (oper) { case UpdateLog.ADD: sid = (SolrInputDocument)entry.get(entry.size()-1); diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index b068b8690e7..6f119880600 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -233,7 +233,7 @@ public class PeerSync { } // let's merge the lists - List newList = new ArrayList(ourUpdates); + List newList = new ArrayList(ourUpdates); for (Long ver : startingVersions) { if (Math.abs(ver) < smallestNewUpdate) { newList.add(ver); @@ -457,8 +457,8 @@ public class PeerSync { if (debug) { log.debug(msg() + "raw update record " + o); } - - int oper = (Integer)entry.get(0); + + int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; long version = (Long) entry.get(1); if (version == lastVersion && version != 0) continue; lastVersion = version; diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java index 85eeb7c40ab..618b83ac3a9 100644 --- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java @@ -296,7 +296,7 @@ public class TransactionLog { } - public long write(AddUpdateCommand cmd) { + public long write(AddUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); long pos = 0; synchronized (this) { @@ -319,7 +319,7 @@ public class TransactionLog { codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.ADD); // should just take one byte + codec.writeInt(UpdateLog.ADD | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeSolrInputDocument(cmd.getSolrInputDocument()); @@ -333,7 +333,7 @@ public class TransactionLog { } } - public long writeDelete(DeleteUpdateCommand cmd) { + public long writeDelete(DeleteUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -344,7 +344,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE); // should just take one byte + codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte codec.writeLong(cmd.getVersion()); BytesRef br = cmd.getIndexedId(); codec.writeByteArray(br.bytes, br.offset, br.length); @@ -359,7 +359,7 @@ public class TransactionLog { } } - public long writeDeleteByQuery(DeleteUpdateCommand cmd) { + public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -370,7 +370,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte + codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(cmd.query); @@ -385,7 +385,7 @@ public class TransactionLog { } - public long writeCommit(CommitUpdateCommand cmd) { + public long writeCommit(CommitUpdateCommand cmd, int flags) { LogCodec codec = new LogCodec(); synchronized (this) { try { @@ -397,7 +397,7 @@ public class TransactionLog { } codec.init(fos); codec.writeTag(JavaBinCodec.ARR, 3); - codec.writeInt(UpdateLog.COMMIT); // should just take one byte + codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte codec.writeLong(cmd.getVersion()); codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 0ab9735ff41..c4838b614a6 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -64,6 +64,12 @@ public class UpdateLog implements PluginInfoInitialized { public static final int DELETE = 0x02; public static final int DELETE_BY_QUERY = 0x03; public static final int COMMIT = 0x04; + // Flag indicating that this is a buffered operation, and that a gap exists before buffering started. + // for example, if full index replication starts and we are buffering updates, then this flag should + // be set to indicate that replaying the log would not bring us into sync (i.e. peersync should + // fail if this flag is set on the last update in the tlog). + public static final int FLAG_GAP = 0x10; + public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation public static class RecoveryInfo { public long positionOfStart; @@ -81,6 +87,7 @@ public class UpdateLog implements PluginInfoInitialized { long id = -1; private State state = State.ACTIVE; + private int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP) private TransactionLog tlog; private TransactionLog prevTlog; @@ -117,7 +124,7 @@ public class UpdateLog implements PluginInfoInitialized { private volatile UpdateHandler uhandler; // a core reload can change this reference! private volatile boolean cancelApplyBufferUpdate; List startingVersions; - + int startingOperation; // last operation in the logs on startup public static class LogPtr { final long pointer; @@ -188,16 +195,18 @@ public class UpdateLog implements PluginInfoInitialized { versionInfo = new VersionInfo(uhandler, 256); // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. - UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates(); + UpdateLog.RecentUpdates startingUpdates = getRecentUpdates(); try { - startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep); + startingVersions = startingUpdates.getVersions(numRecordsToKeep); + startingOperation = startingUpdates.getLatestOperation(); + // populate recent deletes list (since we can't get that info from the index) - for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) { - DeleteUpdate du = startingRecentUpdates.deleteList.get(i); + for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) { + DeleteUpdate du = startingUpdates.deleteList.get(i); oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version)); } } finally { - startingRecentUpdates.close(); + startingUpdates.close(); } } @@ -210,6 +219,10 @@ public class UpdateLog implements PluginInfoInitialized { return startingVersions; } + public int getStartingOperation() { + return startingOperation; + } + /* Takes over ownership of the log, keeping it until no longer needed and then decrementing it's reference and dropping it. */ @@ -275,7 +288,7 @@ public class UpdateLog implements PluginInfoInitialized { // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.write(cmd); + pos = tlog.write(cmd, operationFlags); } // TODO: in the future we could support a real position for a REPLAY update. @@ -302,7 +315,7 @@ public class UpdateLog implements PluginInfoInitialized { // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.writeDelete(cmd); + pos = tlog.writeDelete(cmd, operationFlags); } LogPtr ptr = new LogPtr(pos, cmd.version); @@ -326,7 +339,7 @@ public class UpdateLog implements PluginInfoInitialized { // don't log if we are replaying from another log if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { ensureLog(); - pos = tlog.writeDeleteByQuery(cmd); + pos = tlog.writeDeleteByQuery(cmd, operationFlags); } // only change our caches if we are not buffering @@ -417,7 +430,7 @@ public class UpdateLog implements PluginInfoInitialized { if (prevTlog != null) { // if we made it through the commit, write a commit command to the log // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. - prevTlog.writeCommit(cmd); + prevTlog.writeCommit(cmd, operationFlags); addOldLog(prevTlog, true); // the old log list will decref when no longer needed @@ -630,7 +643,7 @@ public class UpdateLog implements PluginInfoInitialized { // record a commit log.info("Recording current closed for " + uhandler.core + " log=" + theLog); CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); - theLog.writeCommit(cmd); + theLog.writeCommit(cmd, operationFlags); } theLog.deleteOnClose = false; @@ -685,7 +698,7 @@ public class UpdateLog implements PluginInfoInitialized { HashMap updates; List deleteByQueryList; List deleteList; - + int latestOperation; public List getVersions(int n) { List ret = new ArrayList(n); @@ -719,6 +732,10 @@ public class UpdateLog implements PluginInfoInitialized { return result; } + public int getLatestOperation() { + return latestOperation; + } + private void update() { int numUpdates = 0; @@ -743,7 +760,11 @@ public class UpdateLog implements PluginInfoInitialized { List entry = (List)o; // TODO: refactor this out so we get common error handling - int oper = (Integer)entry.get(0); + int opAndFlags = (Integer)entry.get(0); + if (latestOperation == 0) { + latestOperation = opAndFlags; + } + int oper = opAndFlags & UpdateLog.OPERATION_MASK; long version = (Long) entry.get(1); switch (oper) { @@ -849,6 +870,9 @@ public class UpdateLog implements PluginInfoInitialized { } state = State.BUFFERING; + + // currently, buffering is only called by recovery, meaning that there is most likely a gap in updates + operationFlags |= FLAG_GAP; } finally { versionInfo.unblockUpdates(); } @@ -872,6 +896,7 @@ public class UpdateLog implements PluginInfoInitialized { } state = State.ACTIVE; + operationFlags &= ~FLAG_GAP; } catch (IOException e) { SolrException.log(log,"Error attempting to roll back log", e); return false; @@ -904,6 +929,7 @@ public class UpdateLog implements PluginInfoInitialized { } tlog.incref(); state = State.APPLYING_BUFFERED; + operationFlags &= ~FLAG_GAP; } finally { versionInfo.unblockUpdates(); } @@ -1002,6 +1028,7 @@ public class UpdateLog implements PluginInfoInitialized { UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null)); long commitVersion = 0; + int operationAndFlags = 0; for(;;) { Object o = null; @@ -1046,7 +1073,8 @@ public class UpdateLog implements PluginInfoInitialized { // should currently be a List List entry = (List)o; - int oper = (Integer)entry.get(0); + operationAndFlags = (Integer)entry.get(0); + int oper = operationAndFlags & OPERATION_MASK; long version = (Long) entry.get(1); switch (oper) { @@ -1136,7 +1164,10 @@ public class UpdateLog implements PluginInfoInitialized { if (!activeLog) { // if we are replaying an old tlog file, we need to add a commit to the end // so we don't replay it again if we restart right after. - translog.writeCommit(cmd); + + // if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it + // as the flag on the last operation. + translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK)); } try { diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java index 4fd5e2621a5..2ad9a83b8af 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java +++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java @@ -468,6 +468,93 @@ public class TestRecovery extends SolrTestCaseJ4 { } + @Test + public void testBufferingFlags() throws Exception { + + DirectUpdateHandler2.commitOnClose = false; + final Semaphore logReplayFinish = new Semaphore(0); + + UpdateLog.testing_logReplayFinishHook = new Runnable() { + @Override + public void run() { + logReplayFinish.release(); + } + }; + + + SolrQueryRequest req = req(); + UpdateHandler uhandler = req.getCore().getUpdateHandler(); + UpdateLog ulog = uhandler.getUpdateLog(); + + try { + clearIndex(); + assertU(commit()); + + assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); + ulog.bufferUpdates(); + + // simulate updates from a leader + updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + logReplayFinish.acquire(); // wait for replay to finish + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last + + // + // Try again to ensure that the previous log replay didn't wipe out our flags + // + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); + + // now do some normal non-buffered adds + updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + assertU(commit()); + + req.close(); + h.close(); + createCore(); + + req = req(); + uhandler = req.getCore().getUpdateHandler(); + ulog = uhandler.getUpdateLog(); + + assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0); + + + assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state + } finally { + DirectUpdateHandler2.commitOnClose = true; + UpdateLog.testing_logReplayHook = null; + UpdateLog.testing_logReplayFinishHook = null; + + req().close(); + } + + } + + + // make sure that on a restart, versions don't start too low @Test public void testVersionsOnRestart() throws Exception {