diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 4dbef8a2c9b..6287e1940ec 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -19,6 +19,7 @@ package org.apache.solr.cloud; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeoutException; @@ -162,6 +163,22 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { public void run() { boolean replayed = false; boolean succesfulRecovery = false; + + UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); + if (ulog == null) { + SolrException.log(log, "No UpdateLog found - cannot recover"); + recoveryFailed(core, zkController, baseUrl, coreZkNodeName, + core.getCoreDescriptor()); + return; + } + + List startingRecentVersions; + UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates(); + try { + startingRecentVersions = startingRecentUpdates.getVersions(100); + } finally { + startingRecentUpdates.close(); + } while (!succesfulRecovery && !close && !isInterrupted()) { try { @@ -175,10 +192,12 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { cloudDesc.getCollectionName(), cloudDesc.getShardId()); String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP)); - + + // TODO: we should only try this the first time through the loop? log.info("Attempting to PeerSync from " + leaderUrl); PeerSync peerSync = new PeerSync(core, Collections.singletonList(leaderUrl), 100); + peerSync.setStartingVersions(startingRecentVersions); boolean syncSuccess = peerSync.sync(); if (syncSuccess) { SolrQueryRequest req = new LocalSolrQueryRequest(core, @@ -191,13 +210,6 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { return; } log.info("Sync Recovery was not successful - trying replication"); - UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - if (ulog == null) { - SolrException.log(log, "No UpdateLog found - cannot recover"); - recoveryFailed(core, zkController, baseUrl, coreZkNodeName, - core.getCoreDescriptor()); - return; - } log.info("Begin buffering updates"); ulog.bufferUpdates(); diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index b6e0efddff6..950b18f1a00 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -63,6 +63,7 @@ public class PeerSync { private ShardHandler shardHandler; private UpdateLog.RecentUpdates recentUpdates; + private List startingVersions; private List ourUpdates; private Set ourUpdateSet; @@ -109,6 +110,12 @@ public class PeerSync { } + /** + * + * @param core + * @param replicas + * @param nUpdates + */ public PeerSync(SolrCore core, List replicas, int nUpdates) { this.replicas = replicas; this.nUpdates = nUpdates; @@ -119,6 +126,11 @@ public class PeerSync { shardHandler = shardHandlerFactory.getShardHandler(); } + /** optional list of updates we had before possibly receiving new updates */ + public void setStartingVersions(List startingVersions) { + this.startingVersions = startingVersions; + } + public long percentile(List arr, float frac) { int elem = (int) (arr.size() * frac); return Math.abs(arr.get(elem)); @@ -132,7 +144,9 @@ public class PeerSync { return false; } - // fire off the requests before getting our own recent updates (for better concurrency) + // Fire off the requests before getting our own recent updates (for better concurrency) + // This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, they would + // have newer stuff that we also had (assuming updates are going on and are being forwarded). for (String replica : replicas) { requestVersions(replica); } @@ -143,17 +157,48 @@ public class PeerSync { } finally { recentUpdates.close(); } - - + Collections.sort(ourUpdates, absComparator); - if (ourUpdates.size() > 0) { - ourLowThreshold = percentile(ourUpdates, 0.8f); - ourHighThreshold = percentile(ourUpdates, 0.2f); + if (startingVersions != null) { + if (startingVersions.size() == 0) { + // no frame of reference to tell of we've missed updates + return false; + } + Collections.sort(startingVersions, absComparator); + + ourLowThreshold = percentile(startingVersions, 0.8f); + ourHighThreshold = percentile(startingVersions, 0.2f); + + // now make sure that the starting updates overlap our updates + // there shouldn't be reorders, so any overlap will do. + + long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size()-1)); + + if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) { + log.warn("PeerSync: too many updates received since start - startingUpdates no longer overlaps with cour urrentUpdates"); + return false; + } + + // let's merge the lists + List newList = new ArrayList(ourUpdates); + for (Long ver : startingVersions) { + if (Math.abs(ver) < smallestNewUpdate) { + newList.add(ver); + } + } + + ourUpdates = newList; } else { - // we have no versions and hence no frame of reference to tell if we can use a peers - // updates to bring us into sync - return false; + + if (ourUpdates.size() > 0) { + ourLowThreshold = percentile(ourUpdates, 0.8f); + ourHighThreshold = percentile(ourUpdates, 0.2f); + } else { + // we have no versions and hence no frame of reference to tell if we can use a peers + // updates to bring us into sync + return false; + } } ourUpdateSet = new HashSet(ourUpdates);