mirror of https://github.com/apache/lucene.git
SOLR-3126: tweaks on this + reenable test
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1245004 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
242092c929
commit
c2eb461acc
|
@ -18,6 +18,7 @@ package org.apache.solr.cloud;
|
|||
*/
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.MalformedURLException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
@ -26,7 +27,9 @@ import java.util.concurrent.TimeoutException;
|
|||
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
|
||||
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.SolrException.ErrorCode;
|
||||
import org.apache.solr.common.cloud.SafeStopThread;
|
||||
|
@ -45,6 +48,7 @@ import org.apache.solr.update.CommitUpdateCommand;
|
|||
import org.apache.solr.update.PeerSync;
|
||||
import org.apache.solr.update.UpdateLog;
|
||||
import org.apache.solr.update.UpdateLog.RecoveryInfo;
|
||||
import org.apache.solr.update.processor.DistributedUpdateProcessor;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -102,24 +106,14 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
|
||||
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
|
||||
String leaderUrl = leaderCNodeProps.getCoreUrl();
|
||||
String leaderCoreName = leaderCNodeProps.getCoreName();
|
||||
|
||||
log.info("Attempting to replicate from " + leaderUrl);
|
||||
|
||||
// if we are the leader, either we are trying to recover faster
|
||||
// then our ephemeral timed out or we are the only node
|
||||
if (!leaderBaseUrl.equals(baseUrl)) {
|
||||
|
||||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
|
||||
server.setConnectionTimeout(30000);
|
||||
server.setSoTimeout(30000);
|
||||
PrepRecovery prepCmd = new PrepRecovery();
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(nodeName);
|
||||
prepCmd.setCoreNodeName(coreZkNodeName);
|
||||
|
||||
server.request(prepCmd);
|
||||
server.shutdown();
|
||||
// send commit
|
||||
commitOnLeader(leaderUrl);
|
||||
|
||||
// use rep handler directly, so we can do this sync rather than async
|
||||
SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
|
||||
|
@ -137,7 +131,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication");
|
||||
|
||||
if (close) retries = INTERRUPTED;
|
||||
boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have
|
||||
boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have
|
||||
|
||||
if (!success) {
|
||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed.");
|
||||
|
@ -158,6 +152,35 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
// }
|
||||
}
|
||||
}
|
||||
|
||||
private void commitOnLeader(String leaderUrl) throws MalformedURLException,
|
||||
SolrServerException, IOException {
|
||||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderUrl);
|
||||
server.setConnectionTimeout(30000);
|
||||
server.setSoTimeout(30000);
|
||||
UpdateRequest ureq = new UpdateRequest();
|
||||
ureq.setParams(new ModifiableSolrParams());
|
||||
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
|
||||
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
|
||||
server);
|
||||
server.commit();
|
||||
server.shutdown();
|
||||
}
|
||||
|
||||
private void sendPrepRecoveryCmd(String leaderBaseUrl,
|
||||
String leaderCoreName) throws MalformedURLException, SolrServerException,
|
||||
IOException {
|
||||
CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl);
|
||||
server.setConnectionTimeout(45000);
|
||||
server.setSoTimeout(45000);
|
||||
PrepRecovery prepCmd = new PrepRecovery();
|
||||
prepCmd.setCoreName(leaderCoreName);
|
||||
prepCmd.setNodeName(zkController.getNodeName());
|
||||
prepCmd.setCoreNodeName(coreZkNodeName);
|
||||
|
||||
server.request(prepCmd);
|
||||
server.shutdown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -180,20 +203,23 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
startingRecentUpdates.close();
|
||||
}
|
||||
|
||||
while (!succesfulRecovery && !close && !isInterrupted()) {
|
||||
while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though
|
||||
try {
|
||||
// first thing we just try to sync
|
||||
zkController.publish(core, ZkStateReader.RECOVERING);
|
||||
|
||||
CloudDescriptor cloudDesc = core.getCoreDescriptor()
|
||||
.getCloudDescriptor();
|
||||
ZkNodeProps leaderprops = null;
|
||||
|
||||
leaderprops = zkStateReader.getLeaderProps(
|
||||
ZkNodeProps leaderprops = zkStateReader.getLeaderProps(
|
||||
cloudDesc.getCollectionName(), cloudDesc.getShardId());
|
||||
|
||||
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP));
|
||||
|
||||
// TODO: we should only try this the first time through the loop?
|
||||
String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP);
|
||||
String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP);
|
||||
|
||||
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
|
||||
|
||||
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName);
|
||||
|
||||
log.info("Attempting to PeerSync from " + leaderUrl);
|
||||
PeerSync peerSync = new PeerSync(core,
|
||||
Collections.singletonList(leaderUrl), 100);
|
||||
|
@ -207,8 +233,11 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
// sync success - register as active and return
|
||||
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
|
||||
coreZkNodeName, coreName);
|
||||
succesfulRecovery = true;
|
||||
close = true;
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("Sync Recovery was not successful - trying replication");
|
||||
|
||||
log.info("Begin buffering updates");
|
||||
|
@ -227,7 +256,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
// if there are pending recovery requests, don't advert as active
|
||||
zkController.publishAsActive(baseUrl, core.getCoreDescriptor(),
|
||||
coreZkNodeName, coreName);
|
||||
|
||||
close = true;
|
||||
succesfulRecovery = true;
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
|
@ -294,6 +323,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread {
|
|||
// no replay needed\
|
||||
log.info("No replay needed");
|
||||
} else {
|
||||
log.info("Replaying buffered documents");
|
||||
// wait for replay
|
||||
future.get();
|
||||
}
|
||||
|
|
|
@ -30,9 +30,7 @@ import org.junit.After;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
|
||||
@Ignore
|
||||
public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest {
|
||||
|
||||
@BeforeClass
|
||||
|
|
Loading…
Reference in New Issue