mirror of https://github.com/apache/lucene.git
SOLR-3126: get num versions from updatelog, fail sync if comm fail when retrieving updates, use starting versions if syncing aftger startup, only sync first time in recovery loop, more sync logging
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1291350 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f1345de257
commit
630addb415
|
@ -70,6 +70,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
private volatile String coreName;
|
||||
private int retries;
|
||||
private SolrCore core;
|
||||
private boolean recoveringAfterStartup;
|
||||
|
||||
public RecoveryStrategy(SolrCore core) {
|
||||
this.core = core;
|
||||
|
@ -79,9 +80,12 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
zkStateReader = zkController.getZkStateReader();
|
||||
baseUrl = zkController.getBaseUrl();
|
||||
coreZkNodeName = zkController.getNodeName() + "_" + coreName;
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
|
||||
this.recoveringAfterStartup = recoveringAfterStartup;
|
||||
}
|
||||
|
||||
// make sure any threads stop retrying
|
||||
public void close() {
|
||||
close = true;
|
||||
|
@ -201,48 +205,80 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
List<Long> startingRecentVersions;
|
||||
UpdateLog.RecentUpdates startingRecentUpdates = ulog.getRecentUpdates();
|
||||
try {
|
||||
startingRecentVersions = startingRecentUpdates.getVersions(100);
|
||||
startingRecentVersions = startingRecentUpdates.getVersions(ulog.numRecordsToKeep);
|
||||
} finally {
|
||||
startingRecentUpdates.close();
|
||||
}
|
||||
|
||||
List<Long> reallyStartingVersions = ulog.getStartingVersions();
|
||||
|
||||
|
||||
if (reallyStartingVersions != null && recoveringAfterStartup) {
|
||||
int oldIdx = 0; // index of the start of the old list in the current list
|
||||
long firstStartingVersion = reallyStartingVersions.size() > 0 ? reallyStartingVersions.get(0) : 0;
|
||||
|
||||
for (; oldIdx<startingRecentVersions.size(); oldIdx++) {
|
||||
if (startingRecentVersions.get(oldIdx) == firstStartingVersion) break;
|
||||
}
|
||||
|
||||
if (oldIdx < startingRecentVersions.size()) {
|
||||
log.info("####### Found new versions added after startup: num=" + (startingRecentVersions.size()-oldIdx));
|
||||
}
|
||||
|
||||
log.info("###### startupVersions=" + reallyStartingVersions);
|
||||
log.info("###### currentVersions=" + startingRecentVersions);
|
||||
}
|
||||
|
||||
if (recoveringAfterStartup) {
|
||||
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
|
||||
// when we went down.
|
||||
startingRecentVersions = reallyStartingVersions;
|
||||
}
|
||||
|
||||
boolean firstTime = true;
|
||||
|
||||
while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
|
||||
try {
|
||||
// first thing we just try to sync
|
||||
|
||||
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
|
||||
|
||||
|
||||
CloudDescriptor cloudDesc = core.getCoreDescriptor()
|
||||
.getCloudDescriptor();
|
||||
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
|
||||
cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
||||
|
||||
|
||||
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
|
||||
String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP);
|
||||
|
||||
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
|
||||
|
||||
|
||||
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
|
||||
|
||||
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
|
||||
|
||||
log.info("Attempting to PeerSync from " + leaderUrl);
|
||||
PeerSync peerSync = new PeerSync(core,
|
||||
Collections.singletonList(leaderUrl), 100);
|
||||
peerSync.setStartingVersions(startingRecentVersions);
|
||||
boolean syncSuccess = peerSync.sync();
|
||||
if (syncSuccess) {
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(core,
|
||||
new ModifiableSolrParams());
|
||||
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
|
||||
log.info("Sync Recovery was succesful - registering as Active");
|
||||
// sync success - register as active and return
|
||||
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
|
||||
coreZkNodeName, coreName);
|
||||
succesfulRecovery = true;
|
||||
close = true;
|
||||
return;
|
||||
|
||||
|
||||
// first thing we just try to sync
|
||||
if (firstTime) {
|
||||
firstTime = false; // only try sync the first time through the loop
|
||||
log.info("Attempting to PeerSync from " + leaderUrl + " recoveringAfterStartup="+recoveringAfterStartup);
|
||||
PeerSync peerSync = new PeerSync(core,
|
||||
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep);
|
||||
peerSync.setStartingVersions(startingRecentVersions);
|
||||
boolean syncSuccess = peerSync.sync();
|
||||
if (syncSuccess) {
|
||||
SolrQueryRequest req = new LocalSolrQueryRequest(core,
|
||||
new ModifiableSolrParams());
|
||||
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
|
||||
log.info("Sync Recovery was succesful - registering as Active");
|
||||
// sync success - register as active and return
|
||||
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
|
||||
coreZkNodeName, coreName);
|
||||
succesfulRecovery = true;
|
||||
close = true;
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Sync Recovery was not successful - trying replication");
|
||||
}
|
||||
|
||||
log.info("Sync Recovery was not successful - trying replication");
|
||||
|
||||
log.info("Begin buffering updates");
|
||||
ulog.bufferUpdates();
|
||||
replayed = false;
|
||||
|
|
|
@ -122,8 +122,12 @@ public final class DefaultSolrCoreState extends SolrCoreState {
|
|||
}
|
||||
if (closed) return;
|
||||
}
|
||||
|
||||
|
||||
// if true, we are recovering after startup and shouldn't have (or be receiving) additional updates (except for local tlog recovery)
|
||||
boolean recoveringAfterStartup = recoveryStrat == null;
|
||||
|
||||
recoveryStrat = new RecoveryStrategy(core);
|
||||
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
|
||||
recoveryStrat.start();
|
||||
recoveryRunning = true;
|
||||
}
|
||||
|
|
|
@ -29,11 +29,14 @@ import java.util.Set;
|
|||
import org.apache.commons.httpclient.NoHttpResponseException;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.cloud.CloudDescriptor;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrInputDocument;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.util.NamedList;
|
||||
import org.apache.solr.common.util.StrUtils;
|
||||
import org.apache.solr.core.CoreDescriptor;
|
||||
import org.apache.solr.core.SolrCore;
|
||||
import org.apache.solr.handler.component.ShardHandler;
|
||||
import org.apache.solr.handler.component.ShardHandlerFactory;
|
||||
|
@ -138,7 +141,22 @@ public class PeerSync {
|
|||
return Math.abs(arr.get(elem));
|
||||
}
|
||||
|
||||
/** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates.
|
||||
// start of peersync related debug messages. includes the core name for correlation.
|
||||
private String msg() {
|
||||
ZkController zkController = uhandler.core.getCoreDescriptor().getCoreContainer().getZkController();
|
||||
|
||||
String myURL = "";
|
||||
|
||||
if (zkController != null) {
|
||||
myURL = zkController.getZkServerAddress();
|
||||
}
|
||||
|
||||
// TODO: core name turns up blank in many tests - find URL if cloud enabled?
|
||||
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
|
||||
}
|
||||
|
||||
/** Returns true if peer sync was successful, meaning that this core may not be considered to have the latest updates
|
||||
* when considering the last N updates between it and it's peers.
|
||||
* A commit is not performed.
|
||||
*/
|
||||
public boolean sync() {
|
||||
|
@ -146,6 +164,14 @@ public class PeerSync {
|
|||
return false;
|
||||
}
|
||||
|
||||
log.info(msg() + "START replicas=" + replicas + " nUpdates=" + nUpdates);
|
||||
|
||||
if (debug) {
|
||||
if (startingVersions != null) {
|
||||
log.debug(msg() + "startingVersions=" + startingVersions.size() + " " + startingVersions);
|
||||
}
|
||||
}
|
||||
|
||||
// Fire off the requests before getting our own recent updates (for better concurrency)
|
||||
// This also allows us to avoid getting updates we don't need... if we got our updates and then got their updates, they would
|
||||
// have newer stuff that we also had (assuming updates are going on and are being forwarded).
|
||||
|
@ -178,7 +204,7 @@ public class PeerSync {
|
|||
long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size()-1));
|
||||
|
||||
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
|
||||
log.warn("PeerSync: too many updates received since start - startingUpdates no longer overlaps with cour urrentUpdates");
|
||||
log.warn(msg() + "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -199,6 +225,7 @@ public class PeerSync {
|
|||
} else {
|
||||
// we have no versions and hence no frame of reference to tell if we can use a peers
|
||||
// updates to bring us into sync
|
||||
log.info(msg() + "DONE. We have no versions. sync failed.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -211,11 +238,13 @@ public class PeerSync {
|
|||
if (srsp == null) break;
|
||||
boolean success = handleResponse(srsp);
|
||||
if (!success) {
|
||||
log.info(msg() + "DONE. sync failed");
|
||||
shardHandler.cancelAll();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
log.info(msg() + "DONE. sync succeeded");
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -235,26 +264,35 @@ public class PeerSync {
|
|||
}
|
||||
|
||||
private boolean handleResponse(ShardResponse srsp) {
|
||||
ShardRequest sreq = srsp.getShardRequest();
|
||||
|
||||
if (srsp.getException() != null) {
|
||||
|
||||
// TODO: look at this more thoroughly - we don't want
|
||||
// to fail on connection exceptions, but it may make sense
|
||||
// to determine this based on the number of fails
|
||||
if (srsp.getException() instanceof SolrServerException) {
|
||||
//
|
||||
// If the replica went down between asking for versions and asking for specific updates, that
|
||||
// shouldn't be treated as success since we counted on getting those updates back (and avoided
|
||||
// redundantly asking other replicas for them).
|
||||
if (sreq.purpose == 1 && srsp.getException() instanceof SolrServerException) {
|
||||
Throwable solrException = ((SolrServerException) srsp.getException())
|
||||
.getRootCause();
|
||||
if (solrException instanceof ConnectException
|
||||
|| solrException instanceof NoHttpResponseException) {
|
||||
log.info(msg() + " couldn't connect to " + srsp.getShardAddress() + ", counting as success");
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// TODO: at least log???
|
||||
// srsp.getException().printStackTrace(System.out);
|
||||
|
||||
log.warn(msg() + " exception talking to " + srsp.getShardAddress() + ", counting as success");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ShardRequest sreq = srsp.getShardRequest();
|
||||
if (sreq.purpose == 1) {
|
||||
return handleVersions(srsp);
|
||||
} else {
|
||||
|
@ -270,6 +308,8 @@ public class PeerSync {
|
|||
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
|
||||
sreq.reportedVersions = otherVersions;
|
||||
|
||||
log.info(msg() + " Received " + otherVersions.size() + " versions from " + sreq.shards[0] );
|
||||
|
||||
if (otherVersions.size() == 0) {
|
||||
return true;
|
||||
}
|
||||
|
@ -278,6 +318,10 @@ public class PeerSync {
|
|||
|
||||
Collections.sort(otherVersions, absComparator);
|
||||
|
||||
if (debug) {
|
||||
log.debug(msg() + " sorted versions from " + sreq.shards[0] + " = " + otherVersions);
|
||||
}
|
||||
|
||||
long otherHigh = percentile(otherVersions, .2f);
|
||||
long otherLow = percentile(otherVersions, .8f);
|
||||
|
||||
|
@ -286,6 +330,7 @@ public class PeerSync {
|
|||
// This means that we might miss updates if we attempted to use this method.
|
||||
// Since there exists just one replica that is so much newer, we must
|
||||
// fail the sync.
|
||||
log.info(msg() + " Our versions are too old. ourHighThreshold="+ourHighThreshold + " otherLowThreshold="+otherLow);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -293,6 +338,7 @@ public class PeerSync {
|
|||
// Small overlap between windows and ours is newer.
|
||||
// Using this list to sync would result in requesting/replaying results we don't need
|
||||
// and possibly bringing deleted docs back to life.
|
||||
log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -303,6 +349,8 @@ public class PeerSync {
|
|||
|
||||
if (ourUpdateSet.contains(otherVersion) || requestedUpdateSet.contains(otherVersion)) {
|
||||
// we either have this update, or already requested it
|
||||
// TODO: what if the shard we previously requested this from returns failure (because it goes
|
||||
// down)
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -313,11 +361,14 @@ public class PeerSync {
|
|||
sreq.requestedUpdates = toRequest;
|
||||
|
||||
if (toRequest.isEmpty()) {
|
||||
log.info(msg() + " Our versions are newer. ourLowThreshold="+ourLowThreshold + " otherHigh="+otherHigh);
|
||||
|
||||
// we had (or already requested) all the updates referenced by the replica
|
||||
return true;
|
||||
}
|
||||
|
||||
if (toRequest.size() > maxUpdates) {
|
||||
log.info(msg() + " Failing due to needing too many updates:" + maxUpdates);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -327,9 +378,7 @@ public class PeerSync {
|
|||
private boolean requestUpdates(ShardResponse srsp, List<Long> toRequest) {
|
||||
String replica = srsp.getShardRequest().shards[0];
|
||||
|
||||
log.info("Requesting updates from " + replica + " versions=" + toRequest);
|
||||
|
||||
|
||||
log.info(msg() + "Requesting updates from " + replica + " versions=" + toRequest);
|
||||
|
||||
// reuse our original request object
|
||||
ShardRequest sreq = srsp.getShardRequest();
|
||||
|
@ -353,7 +402,7 @@ public class PeerSync {
|
|||
|
||||
SyncShardRequest sreq = (SyncShardRequest) srsp.getShardRequest();
|
||||
if (updates.size() < sreq.requestedUpdates.size()) {
|
||||
log.error("PeerSync: Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
|
||||
log.error(msg() + " Requested " + sreq.requestedUpdates.size() + " updates from " + sreq.shards[0] + " but retrieved " + updates.size());
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -380,6 +429,10 @@ public class PeerSync {
|
|||
o = obj;
|
||||
List<Object> entry = (List<Object>)o;
|
||||
|
||||
if (debug) {
|
||||
log.debug(msg() + "raw update record " + o);
|
||||
}
|
||||
|
||||
int oper = (Integer)entry.get(0);
|
||||
long version = (Long) entry.get(1);
|
||||
if (version == lastVersion && version != 0) continue;
|
||||
|
@ -395,6 +448,9 @@ public class PeerSync {
|
|||
cmd.solrDoc = sdoc;
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
|
||||
if (debug) {
|
||||
log.debug(msg() + "add " + cmd);
|
||||
}
|
||||
proc.processAdd(cmd);
|
||||
break;
|
||||
}
|
||||
|
@ -405,6 +461,9 @@ public class PeerSync {
|
|||
cmd.setIndexedId(new BytesRef(idBytes));
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
|
||||
if (debug) {
|
||||
log.debug(msg() + "delete " + cmd);
|
||||
}
|
||||
proc.processDelete(cmd);
|
||||
break;
|
||||
}
|
||||
|
@ -416,6 +475,9 @@ public class PeerSync {
|
|||
cmd.query = query;
|
||||
cmd.setVersion(version);
|
||||
cmd.setFlags(UpdateCommand.PEER_SYNC | UpdateCommand.IGNORE_AUTOCOMMIT);
|
||||
if (debug) {
|
||||
log.debug(msg() + "deleteByQuery " + cmd);
|
||||
}
|
||||
proc.processDelete(cmd);
|
||||
break;
|
||||
}
|
||||
|
@ -431,12 +493,12 @@ public class PeerSync {
|
|||
// TODO: should this be handled separately as a problem with us?
|
||||
// I guess it probably already will by causing replication to be kicked off.
|
||||
sreq.updateException = e;
|
||||
log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
|
||||
log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
|
||||
return false;
|
||||
}
|
||||
catch (Exception e) {
|
||||
sreq.updateException = e;
|
||||
log.error("Error applying updates from " + sreq.shards + " ,update=" + o, e);
|
||||
log.error(msg() + "Error applying updates from " + sreq.shards + " ,update=" + o, e);
|
||||
return false;
|
||||
}
|
||||
finally {
|
||||
|
@ -444,7 +506,7 @@ public class PeerSync {
|
|||
proc.finish();
|
||||
} catch (Exception e) {
|
||||
sreq.updateException = e;
|
||||
log.error("Error applying updates from " + sreq.shards + " ,finish()", e);
|
||||
log.error(msg() + "Error applying updates from " + sreq.shards + " ,finish()", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -90,7 +90,7 @@ public class UpdateLog implements PluginInfoInitialized {
|
|||
private TransactionLog prevMapLog2; // the transaction log used to look up entries found in prevMap
|
||||
|
||||
private final int numDeletesToKeep = 1000;
|
||||
private final int numRecordsToKeep = 100;
|
||||
public final int numRecordsToKeep = 100;
|
||||
// keep track of deletes only... this is not updated on an add
|
||||
private LinkedHashMap<BytesRef, LogPtr> oldDeletes = new LinkedHashMap<BytesRef, LogPtr>(numDeletesToKeep) {
|
||||
protected boolean removeEldestEntry(Map.Entry eldest) {
|
||||
|
|
Loading…
Reference in New Issue