diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 950b18f1a00..344e0e119b0 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -56,6 +56,7 @@ public class PeerSync { private List replicas; private int nUpdates; + private int maxUpdates; // maximum number of updates to request before failing private UpdateHandler uhandler; private UpdateLog ulog; @@ -119,6 +120,7 @@ public class PeerSync { public PeerSync(SolrCore core, List replicas, int nUpdates) { this.replicas = replicas; this.nUpdates = nUpdates; + this.maxUpdates = nUpdates; uhandler = core.getUpdateHandler(); ulog = uhandler.getUpdateLog(); @@ -271,6 +273,8 @@ public class PeerSync { if (otherVersions.size() == 0) { return true; } + + boolean completeList = otherVersions.size() < nUpdates; // do we have their complete list of updates? Collections.sort(otherVersions, absComparator); @@ -295,7 +299,7 @@ public class PeerSync { List toRequest = new ArrayList(); for (Long otherVersion : otherVersions) { // stop when the entries get old enough that reorders may lead us to see updates we don't need - if (Math.abs(otherVersion) < ourLowThreshold) break; + if (!completeList && Math.abs(otherVersion) < ourLowThreshold) break; if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) { // we either have this update, or already requested it @@ -307,11 +311,15 @@ public class PeerSync { } sreq.requestedUpdates = toRequest; - + if (toRequest.isEmpty()) { // we had (or already requested) all the updates referenced by the replica return true; } + + if (toRequest.size() > maxRequests) { + return false; + } return requestUpdates(srsp, toRequest); } diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 57f3fe8fcc1..af4b7b1ceca 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -111,6 +111,7 @@ public class UpdateLog implements PluginInfoInitialized { private volatile UpdateHandler uhandler; // a core reload can change this reference! private volatile boolean cancelApplyBufferUpdate; + List startingVersions; public static class LogPtr { @@ -174,11 +175,28 @@ public class UpdateLog implements PluginInfoInitialized { newestLogOnStartup = oldLog; versionInfo = new VersionInfo(uhandler, 256); + + UpdateLog.RecentUpdates startingRecentUpdates = getRecentUpdates(); + try { + startingVersions = startingRecentUpdates.getVersions(numRecordsToKeep); + // populate recent deletes list (since we can't get that info from the index) + for (int i=startingRecentUpdates.deleteList.size()-1; i>=0; i--) { + DeleteUpdate du = startingRecentUpdates.deleteList.get(i); + oldDeletes.put(new BytesRef(du.id), new LogPtr(-1,du.version)); + } + } finally { + startingRecentUpdates.close(); + } + } public File getLogDir() { return tlogDir; } + + public List getStartingVersions() { + return startingVersions; + } /* Takes over ownership of the log, keeping it until no longer needed and then decrementing it's reference and dropping it. @@ -623,13 +641,24 @@ public class UpdateLog implements PluginInfoInitialized { TransactionLog log; long version; long pointer; - } + } + + static class DeleteUpdate { + long version; + byte[] id; + + public DeleteUpdate(long version, byte[] id) { + this.version = version; + this.id = id; + } + } public class RecentUpdates { Deque logList; // newest first List> updateList; HashMap updates; List deleteByQueryList; + List deleteList; public List getVersions(int n) { @@ -664,10 +693,12 @@ public class UpdateLog implements PluginInfoInitialized { return result; } + private void update() { int numUpdates = 0; updateList = new ArrayList>(logList.size()); deleteByQueryList = new ArrayList(); + deleteList = new ArrayList(); updates = new HashMap(numRecordsToKeep); for (TransactionLog oldLog : logList) { @@ -703,6 +734,8 @@ public class UpdateLog implements PluginInfoInitialized { if (oper == UpdateLog.DELETE_BY_QUERY) { deleteByQueryList.add(update); + } else if (oper == UpdateLog.DELETE) { + deleteList.add(new DeleteUpdate(version, (byte[])entry.get(2))); } break;