From c2eb461accad85424ec93bcd57f65a3240dc254b Mon Sep 17 00:00:00 2001 From: Mark Robert Miller Date: Thu, 16 Feb 2012 14:51:14 +0000 Subject: [PATCH] SOLR-3126: tweaks on this + reenable test git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1245004 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/solr/cloud/RecoveryStrategy.java | 72 +++++++++++++------ .../solr/cloud/ChaosMonkeySafeLeaderTest.java | 2 - 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 6287e1940ec..ee77fb78c71 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -18,6 +18,7 @@ package org.apache.solr.cloud; */ import java.io.IOException; +import java.net.MalformedURLException; import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; @@ -26,7 +27,9 @@ import java.util.concurrent.TimeoutException; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer; +import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.PrepRecovery; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.cloud.SafeStopThread; @@ -45,6 +48,7 @@ import org.apache.solr.update.CommitUpdateCommand; import org.apache.solr.update.PeerSync; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateLog.RecoveryInfo; +import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,24 +106,14 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP); ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops); String leaderUrl = leaderCNodeProps.getCoreUrl(); - String leaderCoreName = leaderCNodeProps.getCoreName(); log.info("Attempting to replicate from " + leaderUrl); // if we are the leader, either we are trying to recover faster // then our ephemeral timed out or we are the only node if (!leaderBaseUrl.equals(baseUrl)) { - - CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl); - server.setConnectionTimeout(30000); - server.setSoTimeout(30000); - PrepRecovery prepCmd = new PrepRecovery(); - prepCmd.setCoreName(leaderCoreName); - prepCmd.setNodeName(nodeName); - prepCmd.setCoreNodeName(coreZkNodeName); - - server.request(prepCmd); - server.shutdown(); + // send commit + commitOnLeader(leaderUrl); // use rep handler directly, so we can do this sync rather than async SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER); @@ -137,7 +131,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl + "replication"); if (close) retries = INTERRUPTED; - boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure fore=true does not download files we already have + boolean success = replicationHandler.doFetch(solrParams, true); // TODO: look into making sure force=true does not download files we already have if (!success) { throw new SolrException(ErrorCode.SERVER_ERROR, "Replication for recovery failed."); @@ -158,6 +152,35 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { // } } } + + private void commitOnLeader(String leaderUrl) throws MalformedURLException, + SolrServerException, IOException { + CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderUrl); + server.setConnectionTimeout(30000); + server.setSoTimeout(30000); + UpdateRequest ureq = new UpdateRequest(); + ureq.setParams(new ModifiableSolrParams()); + ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true); + ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process( + server); + server.commit(); + server.shutdown(); + } + + private void sendPrepRecoveryCmd(String leaderBaseUrl, + String leaderCoreName) throws MalformedURLException, SolrServerException, + IOException { + CommonsHttpSolrServer server = new CommonsHttpSolrServer(leaderBaseUrl); + server.setConnectionTimeout(45000); + server.setSoTimeout(45000); + PrepRecovery prepCmd = new PrepRecovery(); + prepCmd.setCoreName(leaderCoreName); + prepCmd.setNodeName(zkController.getNodeName()); + prepCmd.setCoreNodeName(coreZkNodeName); + + server.request(prepCmd); + server.shutdown(); + } @Override public void run() { @@ -180,20 +203,23 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { startingRecentUpdates.close(); } - while (!succesfulRecovery && !close && !isInterrupted()) { + while (!succesfulRecovery && !close && !isInterrupted()) { // don't use interruption or it will close channels though try { // first thing we just try to sync zkController.publish(core, ZkStateReader.RECOVERING); + CloudDescriptor cloudDesc = core.getCoreDescriptor() .getCloudDescriptor(); - ZkNodeProps leaderprops = null; - - leaderprops = zkStateReader.getLeaderProps( + ZkNodeProps leaderprops = zkStateReader.getLeaderProps( cloudDesc.getCollectionName(), cloudDesc.getShardId()); - String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderprops.get(ZkStateReader.BASE_URL_PROP), leaderprops.get(ZkStateReader.CORE_NAME_PROP)); - - // TODO: we should only try this the first time through the loop? + String leaderBaseUrl = leaderprops.get(ZkStateReader.BASE_URL_PROP); + String leaderCoreName = leaderprops.get(ZkStateReader.CORE_NAME_PROP); + + String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName); + + sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName); + log.info("Attempting to PeerSync from " + leaderUrl); PeerSync peerSync = new PeerSync(core, Collections.singletonList(leaderUrl), 100); @@ -207,8 +233,11 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { // sync success - register as active and return zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName, coreName); + succesfulRecovery = true; + close = true; return; } + log.info("Sync Recovery was not successful - trying replication"); log.info("Begin buffering updates"); @@ -227,7 +256,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { // if there are pending recovery requests, don't advert as active zkController.publishAsActive(baseUrl, core.getCoreDescriptor(), coreZkNodeName, coreName); - + close = true; succesfulRecovery = true; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -294,6 +323,7 @@ public class RecoveryStrategy extends Thread implements SafeStopThread { // no replay needed\ log.info("No replay needed"); } else { + log.info("Replaying buffered documents"); // wait for replay future.get(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java index e28536c68c1..e3a313c19a0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeySafeLeaderTest.java @@ -30,9 +30,7 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Ignore; -@Ignore public class ChaosMonkeySafeLeaderTest extends FullSolrCloudTest { @BeforeClass