diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5d6d9d7096c..1469d3e4da5 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -173,6 +173,8 @@ New Features However, if there is a leader election while this request is in transit, the versions may not be returned from that shard. (Boris Naguet, Ishan Chattopadhyaya) +* SOLR-9045: Make RecoveryStrategy settings configurable. (Christine Poerschke) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 3bd2e7440c4..8865c08ddb1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.cloud.ZooKeeperException; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.DirectoryFactory.DirContext; @@ -62,17 +63,43 @@ import org.apache.solr.update.UpdateLog; import org.apache.solr.update.UpdateLog.RecoveryInfo; import org.apache.solr.update.processor.DistributedUpdateProcessor; import org.apache.solr.util.RefCounted; +import org.apache.solr.util.SolrPluginUtils; +import org.apache.solr.util.plugin.NamedListInitializedPlugin; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * This class may change in future and customisations are not supported + * between versions in terms of API or back compat behaviour. + * @lucene.experimental + */ public class RecoveryStrategy extends Thread implements Closeable { + public static class Builder implements NamedListInitializedPlugin { + private NamedList args; + @Override + public void init(NamedList args) { + this.args = args; + } + // this should only be used from SolrCoreState + public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd, + RecoveryStrategy.RecoveryListener recoveryListener) { + final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener); + SolrPluginUtils.invokeSetters(recoveryStrategy, args); + return recoveryStrategy; + } + protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, + RecoveryStrategy.RecoveryListener recoveryListener) { + return new RecoveryStrategy(cc, cd, recoveryListener); + } + } + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500); - private static final int MAX_RETRIES = 500; - private static final int STARTING_RECOVERY_DELAY = 5000; + private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500); + private int maxRetries = 500; + private int startingRecoveryDelayMilliSeconds = 5000; public static interface RecoveryListener { public void recovered(); @@ -92,8 +119,7 @@ public class RecoveryStrategy extends Thread implements Closeable { private CoreContainer cc; private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest; - // this should only be used from SolrCoreState - public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) { + protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) { this.cc = cc; this.coreName = cd.getName(); this.recoveryListener = recoveryListener; @@ -104,13 +130,41 @@ public class RecoveryStrategy extends Thread implements Closeable { coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName(); } - public void setRecoveringAfterStartup(boolean recoveringAfterStartup) { + final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() { + return waitForUpdatesWithStaleStatePauseMilliSeconds; + } + + final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) { + this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds; + } + + final public int getMaxRetries() { + return maxRetries; + } + + final public void setMaxRetries(int maxRetries) { + this.maxRetries = maxRetries; + } + + final public int getStartingRecoveryDelayMilliSeconds() { + return startingRecoveryDelayMilliSeconds; + } + + final public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) { + this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds; + } + + final public boolean getRecoveringAfterStartup() { + return recoveringAfterStartup; + } + + final public void setRecoveringAfterStartup(boolean recoveringAfterStartup) { this.recoveringAfterStartup = recoveringAfterStartup; } // make sure any threads stop retrying @Override - public void close() { + final public void close() { close = true; if (prevSendPreRecoveryHttpUriRequest != null) { prevSendPreRecoveryHttpUriRequest.abort(); @@ -118,7 +172,7 @@ public class RecoveryStrategy extends Thread implements Closeable { LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName); } - private void recoveryFailed(final SolrCore core, + final private void recoveryFailed(final SolrCore core, final ZkController zkController, final String baseUrl, final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException { SolrException.log(LOG, "Recovery failed - I give up."); @@ -130,11 +184,19 @@ public class RecoveryStrategy extends Thread implements Closeable { } } - private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops) + /** + * This method may change in future and customisations are not supported + * between versions in terms of API or back compat behaviour. + * @lucene.experimental + */ + protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) { + return new ZkCoreNodeProps(leaderprops).getCoreUrl(); + } + + final private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops) throws SolrServerException, IOException { - ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops); - String leaderUrl = leaderCNodeProps.getCoreUrl(); + final String leaderUrl = getReplicateLeaderUrl(leaderprops); LOG.info("Attempting to replicate from [{}].", leaderUrl); @@ -191,7 +253,7 @@ public class RecoveryStrategy extends Thread implements Closeable { } - private void commitOnLeader(String leaderUrl) throws SolrServerException, + final private void commitOnLeader(String leaderUrl) throws SolrServerException, IOException { try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) { client.setConnectionTimeout(30000); @@ -205,7 +267,7 @@ public class RecoveryStrategy extends Thread implements Closeable { } @Override - public void run() { + final public void run() { // set request info for logging try (SolrCore core = cc.getCore(coreName)) { @@ -234,7 +296,7 @@ public class RecoveryStrategy extends Thread implements Closeable { } // TODO: perhaps make this grab a new core each time through the loop to handle core reloads? - public void doRecovery(SolrCore core) throws KeeperException, InterruptedException { + final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException { boolean replayed = false; boolean successfulRecovery = false; @@ -360,7 +422,7 @@ public class RecoveryStrategy extends Thread implements Closeable { // are sure to have finished (see SOLR-7141 for // discussion around current value) try { - Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE); + Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -479,7 +541,7 @@ public class RecoveryStrategy extends Thread implements Closeable { LOG.error("Recovery failed - trying again... (" + retries + ")"); retries++; - if (retries >= MAX_RETRIES) { + if (retries >= maxRetries) { SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ")."); try { recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor()); @@ -504,7 +566,7 @@ public class RecoveryStrategy extends Thread implements Closeable { LOG.info("RecoveryStrategy has been closed"); break; // check if someone closed us } - Thread.sleep(STARTING_RECOVERY_DELAY); + Thread.sleep(startingRecoveryDelayMilliSeconds); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -525,7 +587,7 @@ public class RecoveryStrategy extends Thread implements Closeable { LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery)); } - private Future replay(SolrCore core) + final private Future replay(SolrCore core) throws InterruptedException, ExecutionException { Future future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates(); if (future == null) { @@ -547,7 +609,7 @@ public class RecoveryStrategy extends Thread implements Closeable { return future; } - private void cloudDebugLog(SolrCore core, String op) { + final private void cloudDebugLog(SolrCore core, String op) { if (!LOG.isDebugEnabled()) { return; } @@ -566,11 +628,11 @@ public class RecoveryStrategy extends Thread implements Closeable { } } - public boolean isClosed() { + final public boolean isClosed() { return close; } - private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice) + final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice) throws SolrServerException, IOException, InterruptedException, ExecutionException { WaitForState prepCmd = new WaitForState(); @@ -603,7 +665,7 @@ public class RecoveryStrategy extends Thread implements Closeable { } } - private void sendPrepRecoveryCmd(String leaderBaseUrl, WaitForState prepCmd) + final private void sendPrepRecoveryCmd(String leaderBaseUrl, WaitForState prepCmd) throws SolrServerException, IOException, InterruptedException, ExecutionException { try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) { client.setConnectionTimeout(10000); diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index bd980750e55..a2444203772 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -58,6 +58,7 @@ import org.apache.lucene.index.IndexDeletionPolicy; import org.apache.lucene.search.BooleanQuery; import org.apache.lucene.util.Version; import org.apache.solr.client.solrj.io.stream.expr.Expressible; +import org.apache.solr.cloud.RecoveryStrategy; import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.common.MapSerializable; import org.apache.solr.common.SolrException; @@ -357,6 +358,7 @@ public class SolrConfig extends Config implements MapSerializable { .add(new SolrPluginInfo(SolrEventListener.class, "//listener", REQUIRE_CLASS, MULTI_OK, REQUIRE_NAME_IN_OVERLAY)) .add(new SolrPluginInfo(DirectoryFactory.class, "directoryFactory", REQUIRE_CLASS)) + .add(new SolrPluginInfo(RecoveryStrategy.Builder.class, "recoveryStrategy")) .add(new SolrPluginInfo(IndexDeletionPolicy.class, "indexConfig/deletionPolicy", REQUIRE_CLASS)) .add(new SolrPluginInfo(CodecFactory.class, "codecFactory", REQUIRE_CLASS)) .add(new SolrPluginInfo(IndexReaderFactory.class, "indexReaderFactory", REQUIRE_CLASS)) diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index 13c3bdd8a1e..70203d4f2bd 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -74,6 +74,7 @@ import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.LockObtainFailedException; import org.apache.solr.client.solrj.impl.BinaryResponseParser; import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.RecoveryStrategy; import org.apache.solr.cloud.ZkSolrResourceLoader; import org.apache.solr.common.SolrException; import org.apache.solr.common.cloud.ClusterState; @@ -203,6 +204,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { private final IndexDeletionPolicyWrapper solrDelPolicy; private final SolrSnapshotMetaDataManager snapshotMgr; private final DirectoryFactory directoryFactory; + private final RecoveryStrategy.Builder recoveryStrategyBuilder; private IndexReaderFactory indexReaderFactory; private final Codec codec; private final MemClassLoader memClassLoader; @@ -657,6 +659,22 @@ public final class SolrCore implements SolrInfoMBean, Closeable { return DirectoryFactory.loadDirectoryFactory(solrConfig, getCoreDescriptor().getCoreContainer(), coreMetricManager.getRegistryName()); } + private RecoveryStrategy.Builder initRecoveryStrategyBuilder() { + final PluginInfo info = solrConfig.getPluginInfo(RecoveryStrategy.Builder.class.getName()); + final RecoveryStrategy.Builder rsBuilder; + if (info != null && info.className != null) { + log.info(info.className); + rsBuilder = getResourceLoader().newInstance(info.className, RecoveryStrategy.Builder.class); + } else { + log.info("solr.RecoveryStrategy.Builder"); + rsBuilder = new RecoveryStrategy.Builder(); + } + if (info != null) { + rsBuilder.init(info.initArgs); + } + return rsBuilder; + } + private void initIndexReaderFactory() { IndexReaderFactory indexReaderFactory; PluginInfo info = solrConfig.getPluginInfo(IndexReaderFactory.class.getName()); @@ -864,10 +882,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable { if (updateHandler == null) { directoryFactory = initDirectoryFactory(); - solrCoreState = new DefaultSolrCoreState(directoryFactory); + recoveryStrategyBuilder = initRecoveryStrategyBuilder(); + solrCoreState = new DefaultSolrCoreState(directoryFactory, recoveryStrategyBuilder); } else { solrCoreState = updateHandler.getSolrCoreState(); directoryFactory = solrCoreState.getDirectoryFactory(); + recoveryStrategyBuilder = solrCoreState.getRecoveryStrategyBuilder(); isReloaded = true; } diff --git a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java index 59e35f03d8f..d0daebbb954 100644 --- a/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/DefaultSolrCoreState.java @@ -63,6 +63,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover private SolrIndexWriter indexWriter = null; private DirectoryFactory directoryFactory; + private final RecoveryStrategy.Builder recoveryStrategyBuilder; private volatile RecoveryStrategy recoveryStrat; @@ -76,8 +77,15 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover protected final ReentrantLock commitLock = new ReentrantLock(); + @Deprecated public DefaultSolrCoreState(DirectoryFactory directoryFactory) { + this(directoryFactory, new RecoveryStrategy.Builder()); + } + + public DefaultSolrCoreState(DirectoryFactory directoryFactory, + RecoveryStrategy.Builder recoveryStrategyBuilder) { this.directoryFactory = directoryFactory; + this.recoveryStrategyBuilder = recoveryStrategyBuilder; } private void closeIndexWriter(IndexWriterCloser closer) { @@ -262,6 +270,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover return directoryFactory; } + @Override + public RecoveryStrategy.Builder getRecoveryStrategyBuilder() { + return recoveryStrategyBuilder; + } + @Override public void doRecovery(CoreContainer cc, CoreDescriptor cd) { @@ -310,7 +323,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover recoveryThrottle.minimumWaitBetweenActions(); recoveryThrottle.markAttemptingAction(); - recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this); + recoveryStrat = recoveryStrategyBuilder.create(cc, cd, DefaultSolrCoreState.this); recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup); Future future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat); try { diff --git a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java index f775b72f2e3..31dd66af773 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrCoreState.java +++ b/solr/core/src/java/org/apache/solr/update/SolrCoreState.java @@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.search.Sort; import org.apache.solr.cloud.ActionThrottle; +import org.apache.solr.cloud.RecoveryStrategy; import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.core.DirectoryFactory; @@ -144,6 +145,11 @@ public abstract class SolrCoreState { */ public abstract DirectoryFactory getDirectoryFactory(); + /** + * @return the {@link org.apache.solr.cloud.RecoveryStrategy.Builder} that should be used. + */ + public abstract RecoveryStrategy.Builder getRecoveryStrategyBuilder(); + public interface IndexWriterCloser { void closeWriter(IndexWriter writer) throws IOException; diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-configurerecoverystrategy.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-configurerecoverystrategy.xml new file mode 100644 index 00000000000..62e671d5671 --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-configurerecoverystrategy.xml @@ -0,0 +1,28 @@ + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + 250 + + + diff --git a/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategy.xml b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategy.xml new file mode 100644 index 00000000000..d43ed29a95b --- /dev/null +++ b/solr/core/src/test-files/solr/collection1/conf/solrconfig-customrecoverystrategy.xml @@ -0,0 +1,32 @@ + + + + + + ${tests.luceneMatchVersion:LATEST} + + + + + recovery_base_url + + + diff --git a/solr/core/src/test/org/apache/solr/core/ConfigureRecoveryStrategyTest.java b/solr/core/src/test/org/apache/solr/core/ConfigureRecoveryStrategyTest.java new file mode 100644 index 00000000000..80032afe2a0 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/core/ConfigureRecoveryStrategyTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.core; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.cloud.RecoveryStrategy; +import org.apache.solr.common.cloud.ZkCoreNodeProps; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.junit.BeforeClass; + +/** + * test that configs can override the RecoveryStrategy + */ +public class ConfigureRecoveryStrategyTest extends SolrTestCaseJ4 { + + private static final String solrConfigFileNameConfigure = "solrconfig-configurerecoverystrategy.xml"; + private static final String solrConfigFileNameCustom = "solrconfig-customrecoverystrategy.xml"; + + private static String solrConfigFileName; + + @BeforeClass + public static void beforeClass() throws Exception { + solrConfigFileName = (random().nextBoolean() + ? solrConfigFileNameConfigure : solrConfigFileNameCustom); + initCore(solrConfigFileName, "schema.xml"); + } + + public void testBuilder() throws Exception { + final RecoveryStrategy.Builder recoveryStrategyBuilder = + h.getCore().getSolrCoreState().getRecoveryStrategyBuilder(); + assertNotNull("recoveryStrategyBuilder is null", recoveryStrategyBuilder); + + final String expectedClassName; + + if (solrConfigFileName.equals(solrConfigFileNameConfigure)) { + expectedClassName = RecoveryStrategy.Builder.class.getName(); + } else if (solrConfigFileName.equals(solrConfigFileNameCustom)) { + assertTrue("recoveryStrategyBuilder is wrong class (instanceof)", + recoveryStrategyBuilder instanceof CustomRecoveryStrategyBuilder); + expectedClassName = ConfigureRecoveryStrategyTest.CustomRecoveryStrategyBuilder.class.getName(); + } else { + expectedClassName = null; + } + + assertEquals("recoveryStrategyBuilder is wrong class (name)", + expectedClassName, recoveryStrategyBuilder.getClass().getName()); + } + + public void testAlmostAllMethodsAreFinal() throws Exception { + for (Method m : RecoveryStrategy.class.getDeclaredMethods()) { + final String methodName = m.getName(); + if ("getReplicateLeaderUrl".equals(methodName)) { + assertFalse(m.toString(), Modifier.isFinal(m.getModifiers())); + } else { + assertTrue(m.toString(), Modifier.isFinal(m.getModifiers())); + } + } + } + + static public class CustomRecoveryStrategy extends RecoveryStrategy { + + private String alternativeBaseUrlProp; + + public String getAlternativeBaseUrlProp() { + return alternativeBaseUrlProp; + } + + public void setAlternativeBaseUrlProp(String alternativeBaseUrlProp) { + this.alternativeBaseUrlProp = alternativeBaseUrlProp; + } + + public CustomRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, + RecoveryStrategy.RecoveryListener recoveryListener) { + super(cc, cd, recoveryListener); + } + + @Override + protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) { + return ZkCoreNodeProps.getCoreUrl( + leaderprops.getStr(alternativeBaseUrlProp), + leaderprops.getStr(ZkStateReader.CORE_NAME_PROP)); + } + } + + static public class CustomRecoveryStrategyBuilder extends RecoveryStrategy.Builder { + @Override + protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd, + RecoveryStrategy.RecoveryListener recoveryListener) { + return new CustomRecoveryStrategy(cc, cd, recoveryListener); + } + } + +}