SOLR-3126: restore old deletes via tlog so peersync won't reorder

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1290938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-02-19 03:57:43 +00:00
parent 777a357934
commit 3bbd90ebd5
2 changed files with 44 additions and 3 deletions

View File

@ -56,6 +56,7 @@ public class PeerSync {
private List<String> replicas;
private int nUpdates;
private int maxUpdates; // maximum number of updates to request before failing
private UpdateHandler uhandler;
private UpdateLog ulog;
@ -119,6 +120,7 @@ public class PeerSync {
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
this.replicas = replicas;
this.nUpdates = nUpdates;
this.maxUpdates = nUpdates;
uhandler = core.getUpdateHandler();
ulog = uhandler.getUpdateLog();
@ -271,6 +273,8 @@ public class PeerSync {
if (otherVersions.size() == 0) {
return true;
}
boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates?
Collections.sort(otherVersions, absComparator);
@ -295,7 +299,7 @@ public class PeerSync {
List<Long> toRequest = new ArrayList<Long>();
for (Long otherVersion : otherVersions) {
// stop when the entries get old enough that reorders may lead us to see updates we don't need
if (Math.abs(otherVersion) < ourLowThreshold) break;
if (!completeList && Math.abs(otherVersion) < ourLowThreshold) break;
if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
// we either have this update, or already requested it
@ -307,11 +311,15 @@ public class PeerSync {
}
sreq.requestedUpdates = toRequest;
if (toRequest.isEmpty()) {
// we had (or already requested) all the updates referenced by the replica
return true;
}
if (toRequest.size() > maxRequests) {
return false;
}
return requestUpdates(srsp, toRequest);
}

View File

@ -111,6 +111,7 @@ public class UpdateLog implements PluginInfoInitialized {
private volatile UpdateHandler uhandler; // a core reload can change this reference!
private volatile boolean cancelApplyBufferUpdate;
List<Long> startingVersions;
public static class LogPtr {
@ -174,11 +175,28 @@ public class UpdateLog implements PluginInfoInitialized {
newestLogOnStartup = oldLog;
versionInfo = new VersionInfo(uhandler, 256);
UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates();
try {
startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep);
// populate recent deletes list (since we can't get that info from the index)
for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) {
DeleteUpdate du = startingRecentUpdates.deleteList.get(i);
oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version));
}
} finally {
startingRecentUpdates.close();
}
}
public File getLogDir() {
return tlogDir;
}
public List<Long> getStartingVersions() {
return startingVersions;
}
/* Takes over ownership of the log, keeping it until no longer needed
and then decrementing it's reference and dropping it.
@ -623,13 +641,24 @@ public class UpdateLog implements PluginInfoInitialized {
TransactionLog log;
long version;
long pointer;
}
}
static class DeleteUpdate {
long version;
byte[] id;
public DeleteUpdate(long version, byte[] id) {
this.version = version;
this.id = id;
}
}
public class RecentUpdates {
Deque<TransactionLog> logList; // newest first
List<List<Update>> updateList;
HashMap<Long, Update> updates;
List<Update> deleteByQueryList;
List<DeleteUpdate> deleteList;
public List<Long> getVersions(int n) {
@ -664,10 +693,12 @@ public class UpdateLog implements PluginInfoInitialized {
return result;
}
private void update() {
int numUpdates = 0;
updateList = new ArrayList<List<Update>>(logList.size());
deleteByQueryList = new ArrayList<Update>();
deleteList = new ArrayList<DeleteUpdate>();
updates = new HashMap<Long,Update>(numRecordsToKeep);
for (TransactionLog oldLog : logList) {
@ -703,6 +734,8 @@ public class UpdateLog implements PluginInfoInitialized {
if (oper == UpdateLog.DELETE_BY_QUERY) {
deleteByQueryList.add(update);
} else if (oper == UpdateLog.DELETE) {
deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2)));
}
break;