SOLR-3469: prevent false peersync recovery by recording buffering flags in tlog

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1341283 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-05-22 00:36:11 +00:00
parent 467717e475
commit 964e90adb3
6 changed files with 170 additions and 44 deletions

View File

@ -226,40 +226,48 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
} }
List<Long> startingRecentVersions; List<Long> recentVersions;
UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates(); UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates();
try { try {
startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep); recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep);
} finally { } finally {
startingRecentUpdates.close(); recentUpdates.close();
} }
List<Long> reallyStartingVersions = ulog.getStartingVersions(); List<Long> startingVersions = ulog.getStartingVersions();
if (reallyStartingVersions != null && recoveringAfterStartup) { if (startingVersions != null && recoveringAfterStartup) {
int oldIdx = 0; // index of the start of the old list in the current list int oldIdx = 0; // index of the start of the old list in the current list
long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0) : 0; long firstStartingVersion = startingVersions.size() > 0 ? startingVersions.get(0) : 0;
for (; oldIdx<startingRecentVersions.size(); oldIdx++) { for (; oldIdx<recentVersions.size(); oldIdx++) {
if (startingRecentVersions.get(oldIdx) == firstStartingVersion) break; if (recentVersions.get(oldIdx) == firstStartingVersion) break;
} }
if (oldIdx > 0) { if (oldIdx > 0) {
log.info("####### Found new versions added after startup: num=" + oldIdx); log.info("####### Found new versions added after startup: num=" + oldIdx);
log.info("###### currentVersions=" + startingRecentVersions); log.info("###### currentVersions=" + recentVersions);
} }
log.info("###### startupVersions=" + reallyStartingVersions); log.info("###### startupVersions=" + startingVersions);
} }
boolean firstTime = true;
if (recoveringAfterStartup) { if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were // if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down. // when we went down. We may have received updates since then.
startingRecentVersions = reallyStartingVersions; recentVersions = startingVersions;
}
boolean firstTime = true; if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
// last operation at the time of startup had the GAP flag set...
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the meantime.
firstTime = false; // skip peersync
}
}
while (!successfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though while (!successfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
try { try {
@ -287,7 +295,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
// + " i am:" + zkController.getNodeName()); // + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core, PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep); Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
peerSync.setStartingVersions(startingRecentVersions); peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync(); boolean syncSuccess = peerSync.sync();
if (syncSuccess) { if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core, SolrQueryRequest req = new LocalSolrQueryRequest(core,

View File

@ -141,7 +141,7 @@ public class RealTimeGetComponent extends SearchComponent
// should currently be a List<Oper,Ver,Doc/Id> // should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o; List entry = (List)o;
assert entry.size() >= 3; assert entry.size() >= 3;
int oper = (Integer)entry.get(0); int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
switch (oper) { switch (oper) {
case UpdateLog.ADD: case UpdateLog.ADD:
SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema()); SolrDocument doc = toSolrDoc((SolrInputDocument)entry.get(entry.size()-1), req.getSchema());
@ -211,7 +211,7 @@ public class RealTimeGetComponent extends SearchComponent
// should currently be a List<Oper,Ver,Doc/Id> // should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o; List entry = (List)o;
assert entry.size() >= 3; assert entry.size() >= 3;
int oper = (Integer)entry.get(0); int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
switch (oper) { switch (oper) {
case UpdateLog.ADD: case UpdateLog.ADD:
sid = (SolrInputDocument)entry.get(entry.size()-1); sid = (SolrInputDocument)entry.get(entry.size()-1);

View File

@ -233,7 +233,7 @@ public class PeerSync {
} }
// let's merge the lists // let's merge the lists
List<Long> newList = new ArrayList(ourUpdates); List<Long> newList = new ArrayList<Long>(ourUpdates);
for (Long ver : startingVersions) { for (Long ver : startingVersions) {
if (Math.abs(ver) < smallestNewUpdate) { if (Math.abs(ver) < smallestNewUpdate) {
newList.add(ver); newList.add(ver);
@ -458,7 +458,7 @@ public class PeerSync {
log.debug(msg() + "raw update record " + o); log.debug(msg() + "raw update record " + o);
} }
int oper = (Integer)entry.get(0); int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
long version = (Long) entry.get(1); long version = (Long) entry.get(1);
if (version == lastVersion && version != 0) continue; if (version == lastVersion && version != 0) continue;
lastVersion = version; lastVersion = version;

View File

@ -296,7 +296,7 @@ public class TransactionLog {
} }
public long write(AddUpdateCommand cmd) { public long write(AddUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(); LogCodec codec = new LogCodec();
long pos = 0; long pos = 0;
synchronized (this) { synchronized (this) {
@ -319,7 +319,7 @@ public class TransactionLog {
codec.init(fos); codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3); codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.ADD); // should just take one byte codec.writeInt(UpdateLog.ADD | flags); // should just take one byte
codec.writeLong(cmd.getVersion()); codec.writeLong(cmd.getVersion());
codec.writeSolrInputDocument(cmd.getSolrInputDocument()); codec.writeSolrInputDocument(cmd.getSolrInputDocument());
@ -333,7 +333,7 @@ public class TransactionLog {
} }
} }
public long writeDelete(DeleteUpdateCommand cmd) { public long writeDelete(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(); LogCodec codec = new LogCodec();
synchronized (this) { synchronized (this) {
try { try {
@ -344,7 +344,7 @@ public class TransactionLog {
} }
codec.init(fos); codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3); codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE); // should just take one byte codec.writeInt(UpdateLog.DELETE | flags); // should just take one byte
codec.writeLong(cmd.getVersion()); codec.writeLong(cmd.getVersion());
BytesRef br = cmd.getIndexedId(); BytesRef br = cmd.getIndexedId();
codec.writeByteArray(br.bytes, br.offset, br.length); codec.writeByteArray(br.bytes, br.offset, br.length);
@ -359,7 +359,7 @@ public class TransactionLog {
} }
} }
public long writeDeleteByQuery(DeleteUpdateCommand cmd) { public long writeDeleteByQuery(DeleteUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(); LogCodec codec = new LogCodec();
synchronized (this) { synchronized (this) {
try { try {
@ -370,7 +370,7 @@ public class TransactionLog {
} }
codec.init(fos); codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3); codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.DELETE_BY_QUERY); // should just take one byte codec.writeInt(UpdateLog.DELETE_BY_QUERY | flags); // should just take one byte
codec.writeLong(cmd.getVersion()); codec.writeLong(cmd.getVersion());
codec.writeStr(cmd.query); codec.writeStr(cmd.query);
@ -385,7 +385,7 @@ public class TransactionLog {
} }
public long writeCommit(CommitUpdateCommand cmd) { public long writeCommit(CommitUpdateCommand cmd, int flags) {
LogCodec codec = new LogCodec(); LogCodec codec = new LogCodec();
synchronized (this) { synchronized (this) {
try { try {
@ -397,7 +397,7 @@ public class TransactionLog {
} }
codec.init(fos); codec.init(fos);
codec.writeTag(JavaBinCodec.ARR, 3); codec.writeTag(JavaBinCodec.ARR, 3);
codec.writeInt(UpdateLog.COMMIT); // should just take one byte codec.writeInt(UpdateLog.COMMIT | flags); // should just take one byte
codec.writeLong(cmd.getVersion()); codec.writeLong(cmd.getVersion());
codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file codec.writeStr(END_MESSAGE); // ensure these bytes are (almost) last in the file

View File

@ -64,6 +64,12 @@ public class UpdateLog implements PluginInfoInitialized {
public static final int DELETE = 0x02; public static final int DELETE = 0x02;
public static final int DELETE_BY_QUERY = 0x03; public static final int DELETE_BY_QUERY = 0x03;
public static final int COMMIT = 0x04; public static final int COMMIT = 0x04;
// Flag indicating that this is a buffered operation, and that a gap exists before buffering started.
// for example, if full index replication starts and we are buffering updates, then this flag should
// be set to indicate that replaying the log would not bring us into sync (i.e. peersync should
// fail if this flag is set on the last update in the tlog).
public static final int FLAG_GAP = 0x10;
public static final int OPERATION_MASK = 0x0f; // mask off flags to get the operation
public static class RecoveryInfo { public static class RecoveryInfo {
public long positionOfStart; public long positionOfStart;
@ -81,6 +87,7 @@ public class UpdateLog implements PluginInfoInitialized {
long id = -1; long id = -1;
private State state = State.ACTIVE; private State state = State.ACTIVE;
private int operationFlags; // flags to write in the transaction log with operations (i.e. FLAG_GAP)
private TransactionLog tlog; private TransactionLog tlog;
private TransactionLog prevTlog; private TransactionLog prevTlog;
@ -117,7 +124,7 @@ public class UpdateLog implements PluginInfoInitialized {
private volatile UpdateHandler uhandler; // a core reload can change this reference! private volatile UpdateHandler uhandler; // a core reload can change this reference!
private volatile boolean cancelApplyBufferUpdate; private volatile boolean cancelApplyBufferUpdate;
List<Long> startingVersions; List<Long> startingVersions;
int startingOperation; // last operation in the logs on startup
public static class LogPtr { public static class LogPtr {
final long pointer; final long pointer;
@ -188,16 +195,18 @@ public class UpdateLog implements PluginInfoInitialized {
versionInfo = new VersionInfo(uhandler, 256); versionInfo = new VersionInfo(uhandler, 256);
// TODO: these startingVersions assume that we successfully recover from all non-complete tlogs. // TODO: these startingVersions assume that we successfully recover from all non-complete tlogs.
UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates(); UpdateLog.RecentUpdates startingUpdates = getRecentUpdates();
try { try {
startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep); startingVersions = startingUpdates.getVersions(numRecordsToKeep);
startingOperation = startingUpdates.getLatestOperation();
// populate recent deletes list (since we can't get that info from the index) // populate recent deletes list (since we can't get that info from the index)
for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) { for (int i=startingUpdates.deleteList.size()-1; i>=0; i--) {
DeleteUpdate du = startingRecentUpdates.deleteList.get(i); DeleteUpdate du = startingUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version)); oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
} }
} finally { } finally {
startingRecentUpdates.close(); startingUpdates.close();
} }
} }
@ -210,6 +219,10 @@ public class UpdateLog implements PluginInfoInitialized {
return startingVersions; return startingVersions;
} }
public int getStartingOperation() {
return startingOperation;
}
/* Takes over ownership of the log, keeping it until no longer needed /* Takes over ownership of the log, keeping it until no longer needed
and then decrementing it's reference and dropping it. and then decrementing it's reference and dropping it.
*/ */
@ -275,7 +288,7 @@ public class UpdateLog implements PluginInfoInitialized {
// don't log if we are replaying from another log // don't log if we are replaying from another log
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
ensureLog(); ensureLog();
pos = tlog.write(cmd); pos = tlog.write(cmd, operationFlags);
} }
// TODO: in the future we could support a real position for a REPLAY update. // TODO: in the future we could support a real position for a REPLAY update.
@ -302,7 +315,7 @@ public class UpdateLog implements PluginInfoInitialized {
// don't log if we are replaying from another log // don't log if we are replaying from another log
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
ensureLog(); ensureLog();
pos = tlog.writeDelete(cmd); pos = tlog.writeDelete(cmd, operationFlags);
} }
LogPtr ptr = new LogPtr(pos, cmd.version); LogPtr ptr = new LogPtr(pos, cmd.version);
@ -326,7 +339,7 @@ public class UpdateLog implements PluginInfoInitialized {
// don't log if we are replaying from another log // don't log if we are replaying from another log
if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) { if ((cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
ensureLog(); ensureLog();
pos = tlog.writeDeleteByQuery(cmd); pos = tlog.writeDeleteByQuery(cmd, operationFlags);
} }
// only change our caches if we are not buffering // only change our caches if we are not buffering
@ -417,7 +430,7 @@ public class UpdateLog implements PluginInfoInitialized {
if (prevTlog != null) { if (prevTlog != null) {
// if we made it through the commit, write a commit command to the log // if we made it through the commit, write a commit command to the log
// TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup. // TODO: check that this works to cap a tlog we were using to buffer so we don't replay on startup.
prevTlog.writeCommit(cmd); prevTlog.writeCommit(cmd, operationFlags);
addOldLog(prevTlog, true); addOldLog(prevTlog, true);
// the old log list will decref when no longer needed // the old log list will decref when no longer needed
@ -630,7 +643,7 @@ public class UpdateLog implements PluginInfoInitialized {
// record a commit // record a commit
log.info("Recording current closed for " + uhandler.core + " log=" + theLog); log.info("Recording current closed for " + uhandler.core + " log=" + theLog);
CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false); CommitUpdateCommand cmd = new CommitUpdateCommand(new LocalSolrQueryRequest(uhandler.core, new ModifiableSolrParams((SolrParams)null)), false);
theLog.writeCommit(cmd); theLog.writeCommit(cmd, operationFlags);
} }
theLog.deleteOnClose = false; theLog.deleteOnClose = false;
@ -685,7 +698,7 @@ public class UpdateLog implements PluginInfoInitialized {
HashMap<Long, Update> updates; HashMap<Long, Update> updates;
List<Update> deleteByQueryList; List<Update> deleteByQueryList;
List<DeleteUpdate> deleteList; List<DeleteUpdate> deleteList;
int latestOperation;
public List<Long> getVersions(int n) { public List<Long> getVersions(int n) {
List<Long> ret = new ArrayList(n); List<Long> ret = new ArrayList(n);
@ -719,6 +732,10 @@ public class UpdateLog implements PluginInfoInitialized {
return result; return result;
} }
public int getLatestOperation() {
return latestOperation;
}
private void update() { private void update() {
int numUpdates = 0; int numUpdates = 0;
@ -743,7 +760,11 @@ public class UpdateLog implements PluginInfoInitialized {
List entry = (List)o; List entry = (List)o;
// TODO: refactor this out so we get common error handling // TODO: refactor this out so we get common error handling
int oper = (Integer)entry.get(0); int opAndFlags = (Integer)entry.get(0);
if (latestOperation == 0) {
latestOperation = opAndFlags;
}
int oper = opAndFlags & UpdateLog.OPERATION_MASK;
long version = (Long) entry.get(1); long version = (Long) entry.get(1);
switch (oper) { switch (oper) {
@ -849,6 +870,9 @@ public class UpdateLog implements PluginInfoInitialized {
} }
state = State.BUFFERING; state = State.BUFFERING;
// currently, buffering is only called by recovery, meaning that there is most likely a gap in updates
operationFlags |= FLAG_GAP;
} finally { } finally {
versionInfo.unblockUpdates(); versionInfo.unblockUpdates();
} }
@ -872,6 +896,7 @@ public class UpdateLog implements PluginInfoInitialized {
} }
state = State.ACTIVE; state = State.ACTIVE;
operationFlags &= ~FLAG_GAP;
} catch (IOException e) { } catch (IOException e) {
SolrException.log(log,"Error attempting to roll back log", e); SolrException.log(log,"Error attempting to roll back log", e);
return false; return false;
@ -904,6 +929,7 @@ public class UpdateLog implements PluginInfoInitialized {
} }
tlog.incref(); tlog.incref();
state = State.APPLYING_BUFFERED; state = State.APPLYING_BUFFERED;
operationFlags &= ~FLAG_GAP;
} finally { } finally {
versionInfo.unblockUpdates(); versionInfo.unblockUpdates();
} }
@ -1002,6 +1028,7 @@ public class UpdateLog implements PluginInfoInitialized {
UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null)); UpdateRequestProcessor proc = magicFac.getInstance(req, rsp, runFac.getInstance(req, rsp, null));
long commitVersion = 0; long commitVersion = 0;
int operationAndFlags = 0;
for(;;) { for(;;) {
Object o = null; Object o = null;
@ -1046,7 +1073,8 @@ public class UpdateLog implements PluginInfoInitialized {
// should currently be a List<Oper,Ver,Doc/Id> // should currently be a List<Oper,Ver,Doc/Id>
List entry = (List)o; List entry = (List)o;
int oper = (Integer)entry.get(0); operationAndFlags = (Integer)entry.get(0);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(1); long version = (Long) entry.get(1);
switch (oper) { switch (oper) {
@ -1136,7 +1164,10 @@ public class UpdateLog implements PluginInfoInitialized {
if (!activeLog) { if (!activeLog) {
// if we are replaying an old tlog file, we need to add a commit to the end // if we are replaying an old tlog file, we need to add a commit to the end
// so we don't replay it again if we restart right after. // so we don't replay it again if we restart right after.
translog.writeCommit(cmd);
// if the last operation we replayed had FLAG_GAP set, we want to use that again so we don't lose it
// as the flag on the last operation.
translog.writeCommit(cmd, operationFlags | (operationAndFlags & ~OPERATION_MASK));
} }
try { try {

View File

@ -468,6 +468,93 @@ public class TestRecovery extends SolrTestCaseJ4 {
} }
@Test
public void testBufferingFlags() throws Exception {
DirectUpdateHandler2.commitOnClose = false;
final Semaphore logReplayFinish = new Semaphore(0);
UpdateLog.testing_logReplayFinishHook = new Runnable() {
@Override
public void run() {
logReplayFinish.release();
}
};
SolrQueryRequest req = req();
UpdateHandler uhandler = req.getCore().getUpdateHandler();
UpdateLog ulog = uhandler.getUpdateLog();
try {
clearIndex();
assertU(commit());
assertEquals(UpdateLog.State.ACTIVE, ulog.getState());
ulog.bufferUpdates();
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
req.close();
h.close();
createCore();
req = req();
uhandler = req.getCore().getUpdateHandler();
ulog = uhandler.getUpdateLog();
logReplayFinish.acquire(); // wait for replay to finish
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // since we died while buffering, we should see this last
//
// Try again to ensure that the previous log replay didn't wipe out our flags
//
req.close();
h.close();
createCore();
req = req();
uhandler = req.getCore().getUpdateHandler();
ulog = uhandler.getUpdateLog();
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
// now do some normal non-buffered adds
updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL));
assertU(commit());
req.close();
h.close();
createCore();
req = req();
uhandler = req.getCore().getUpdateHandler();
ulog = uhandler.getUpdateLog();
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) == 0);
assertEquals(UpdateLog.State.ACTIVE, ulog.getState()); // leave each test method in a good state
} finally {
DirectUpdateHandler2.commitOnClose = true;
UpdateLog.testing_logReplayHook = null;
UpdateLog.testing_logReplayFinishHook = null;
req().close();
}
}
// make sure that on a restart, versions don't start too low // make sure that on a restart, versions don't start too low
@Test @Test
public void testVersionsOnRestart() throws Exception { public void testVersionsOnRestart() throws Exception {