SOLR-3126: adjust peersync to avoid false positive syncs during updates

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1244806 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2012-02-16 00:30:16 +00:00
parent 783404c19b
commit 9b7945f981
2 changed files with 74 additions and 17 deletions

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
@ -163,6 +164,22 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
boolean replayed = false; boolean replayed = false;
boolean succesfulRecovery = false; boolean succesfulRecovery = false;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
List<Long> startingRecentVersions;
UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
try {
startingRecentVersions = startingRecentUpdates.getVersions(100);
} finally {
startingRecentUpdates.close();
}
while (!succesfulRecovery && !close && !isInterrupted()) { while (!succesfulRecovery && !close && !isInterrupted()) {
try { try {
// first thing we just try to sync // first thing we just try to sync
@ -176,9 +193,11 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP)); String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP));
// TODO: we should only try this the first time through the loop?
log.info("Attempting to PeerSync from " + leaderUrl); log.info("Attempting to PeerSync from " + leaderUrl);
PeerSync peerSync = new PeerSync(core, PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), 100); Collections.singletonList(leaderUrl), 100);
peerSync.setStartingVersions(startingRecentVersions);
boolean syncSuccess = peerSync.sync(); boolean syncSuccess = peerSync.sync();
if (syncSuccess) { if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core, SolrQueryRequest req = new LocalSolrQueryRequest(core,
@ -191,13 +210,6 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
return; return;
} }
log.info("Sync Recovery was not successful - trying replication"); log.info("Sync Recovery was not successful - trying replication");
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover");
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
log.info("Begin buffering updates"); log.info("Begin buffering updates");
ulog.bufferUpdates(); ulog.bufferUpdates();

View File

@ -63,6 +63,7 @@ public class PeerSync {
private ShardHandler shardHandler; private ShardHandler shardHandler;
private UpdateLog.RecentUpdates recentUpdates; private UpdateLog.RecentUpdates recentUpdates;
private List<Long> startingVersions;
private List<Long> ourUpdates; private List<Long> ourUpdates;
private Set<Long> ourUpdateSet; private Set<Long> ourUpdateSet;
@ -109,6 +110,12 @@ public class PeerSync {
} }
/**
*
* @param core
* @param replicas
* @param nUpdates
*/
public PeerSync(SolrCore core, List<String> replicas, int nUpdates) { public PeerSync(SolrCore core, List<String> replicas, int nUpdates) {
this.replicas = replicas; this.replicas = replicas;
this.nUpdates = nUpdates; this.nUpdates = nUpdates;
@ -119,6 +126,11 @@ public class PeerSync {
shardHandler = shardHandlerFactory.getShardHandler(); shardHandler = shardHandlerFactory.getShardHandler();
} }
/** optional list of updates we had before possibly receiving new updates */
public void setStartingVersions(List<Long> startingVersions) {
this.startingVersions = startingVersions;
}
public long percentile(List<Long> arr, float frac) { public long percentile(List<Long> arr, float frac) {
int elem = (int) (arr.size() * frac); int elem = (int) (arr.size() * frac);
return Math.abs(arr.get(elem)); return Math.abs(arr.get(elem));
@ -132,7 +144,9 @@ public class PeerSync {
return false; return false;
} }
// fire off the requests before getting our own recent updates (for better concurrency) // Fire off the requests before getting our own recent updates (for better concurrency)
// This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, they would
// have newer stuff that we also had (assuming updates are going on and are being forwarded).
for (String replica : replicas) { for (String replica : replicas) {
requestVersions(replica); requestVersions(replica);
} }
@ -144,9 +158,39 @@ public class PeerSync {
recentUpdates.close(); recentUpdates.close();
} }
Collections.sort(ourUpdates, absComparator); Collections.sort(ourUpdates, absComparator);
if (startingVersions != null) {
if (startingVersions.size() == 0) {
// no frame of reference to tell of we've missed updates
return false;
}
Collections.sort(startingVersions, absComparator);
ourLowThreshold = percentile(startingVersions, 0.8f);
ourHighThreshold = percentile(startingVersions, 0.2f);
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size()-1));
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
log.warn("PeerSync: too many updates received since start - startingUpdates no longer overlaps with cour urrentUpdates");
return false;
}
// let's merge the lists
List<Long> newList = new ArrayList(ourUpdates);
for (Long ver : startingVersions) {
if (Math.abs(ver) < smallestNewUpdate) {
newList.add(ver);
}
}
ourUpdates = newList;
} else {
if (ourUpdates.size() > 0) { if (ourUpdates.size() > 0) {
ourLowThreshold = percentile(ourUpdates, 0.8f); ourLowThreshold = percentile(ourUpdates, 0.8f);
ourHighThreshold = percentile(ourUpdates, 0.2f); ourHighThreshold = percentile(ourUpdates, 0.2f);
@ -155,6 +199,7 @@ public class PeerSync {
// updates to bring us into sync // updates to bring us into sync
return false; return false;
} }
}
ourUpdateSet = new HashSet<Long>(ourUpdates); ourUpdateSet = new HashSet<Long>(ourUpdates);
requestedUpdateSet = new HashSet<Long>(ourUpdates); requestedUpdateSet = new HashSet<Long>(ourUpdates);