SOLR-9045: Make RecoveryStrategy settings configurable.

This commit is contained in:
Christine Poerschke 2017-03-13 15:49:01 +00:00
parent ceffbf9844
commit c8bad8c10a
9 changed files with 300 additions and 24 deletions

View File

@ -173,6 +173,8 @@ New Features
However, if there is a leader election while this request is in transit, the versions may not be returned from that
shard. (Boris Naguet, Ishan Chattopadhyaya)
* SOLR-9045: Make RecoveryStrategy settings configurable. (Christine Poerschke)
Bug Fixes
----------------------

View File

@ -46,6 +46,7 @@ import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory.DirContext;
@ -62,17 +63,43 @@ import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.SolrPluginUtils;
import org.apache.solr.util.plugin.NamedListInitializedPlugin;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class may change in future and customisations are not supported
* between versions in terms of API or back compat behaviour.
* @lucene.experimental
*/
public class RecoveryStrategy extends Thread implements Closeable {
public static class Builder implements NamedListInitializedPlugin {
private NamedList args;
@Override
public void init(NamedList args) {
this.args = args;
}
// this should only be used from SolrCoreState
public RecoveryStrategy create(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
final RecoveryStrategy recoveryStrategy = newRecoveryStrategy(cc, cd, recoveryListener);
SolrPluginUtils.invokeSetters(recoveryStrategy, args);
return recoveryStrategy;
}
protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
return new RecoveryStrategy(cc, cd, recoveryListener);
}
}
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
private static final int MAX_RETRIES = 500;
private static final int STARTING_RECOVERY_DELAY = 5000;
private int waitForUpdatesWithStaleStatePauseMilliSeconds = Integer.getInteger("solr.cloud.wait-for-updates-with-stale-state-pause", 2500);
private int maxRetries = 500;
private int startingRecoveryDelayMilliSeconds = 5000;
public static interface RecoveryListener {
public void recovered();
@ -92,8 +119,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
// this should only be used from SolrCoreState
public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
this.recoveryListener = recoveryListener;
@ -104,13 +130,41 @@ public class RecoveryStrategy extends Thread implements Closeable {
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
}
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
return waitForUpdatesWithStaleStatePauseMilliSeconds;
}
final public void setWaitForUpdatesWithStaleStatePauseMilliSeconds(int waitForUpdatesWithStaleStatePauseMilliSeconds) {
this.waitForUpdatesWithStaleStatePauseMilliSeconds = waitForUpdatesWithStaleStatePauseMilliSeconds;
}
final public int getMaxRetries() {
return maxRetries;
}
final public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
final public int getStartingRecoveryDelayMilliSeconds() {
return startingRecoveryDelayMilliSeconds;
}
final public void setStartingRecoveryDelayMilliSeconds(int startingRecoveryDelayMilliSeconds) {
this.startingRecoveryDelayMilliSeconds = startingRecoveryDelayMilliSeconds;
}
final public boolean getRecoveringAfterStartup() {
return recoveringAfterStartup;
}
final public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
}
// make sure any threads stop retrying
@Override
public void close() {
final public void close() {
close = true;
if (prevSendPreRecoveryHttpUriRequest != null) {
prevSendPreRecoveryHttpUriRequest.abort();
@ -118,7 +172,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.warn("Stopping recovery for core=[{}] coreNodeName=[{}]", coreName, coreZkNodeName);
}
private void recoveryFailed(final SolrCore core,
final private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
SolrException.log(LOG, "Recovery failed - I give up.");
@ -130,11 +184,19 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
}
private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
/**
* This method may change in future and customisations are not supported
* between versions in terms of API or back compat behaviour.
* @lucene.experimental
*/
protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
return new ZkCoreNodeProps(leaderprops).getCoreUrl();
}
final private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
throws SolrServerException, IOException {
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
String leaderUrl = leaderCNodeProps.getCoreUrl();
final String leaderUrl = getReplicateLeaderUrl(leaderprops);
LOG.info("Attempting to replicate from [{}].", leaderUrl);
@ -191,7 +253,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
private void commitOnLeader(String leaderUrl) throws SolrServerException,
final private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl).build()) {
client.setConnectionTimeout(30000);
@ -205,7 +267,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
@Override
public void run() {
final public void run() {
// set request info for logging
try (SolrCore core = cc.getCore(coreName)) {
@ -234,7 +296,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
final public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
boolean replayed = false;
boolean successfulRecovery = false;
@ -360,7 +422,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
// are sure to have finished (see SOLR-7141 for
// discussion around current value)
try {
Thread.sleep(WAIT_FOR_UPDATES_WITH_STALE_STATE_PAUSE);
Thread.sleep(waitForUpdatesWithStaleStatePauseMilliSeconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@ -479,7 +541,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.error("Recovery failed - trying again... (" + retries + ")");
retries++;
if (retries >= MAX_RETRIES) {
if (retries >= maxRetries) {
SolrException.log(LOG, "Recovery failed - max retries exceeded (" + retries + ").");
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName, core.getCoreDescriptor());
@ -504,7 +566,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.info("RecoveryStrategy has been closed");
break; // check if someone closed us
}
Thread.sleep(STARTING_RECOVERY_DELAY);
Thread.sleep(startingRecoveryDelayMilliSeconds);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -525,7 +587,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
private Future<RecoveryInfo> replay(SolrCore core)
final private Future<RecoveryInfo> replay(SolrCore core)
throws InterruptedException, ExecutionException {
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
@ -547,7 +609,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
return future;
}
private void cloudDebugLog(SolrCore core, String op) {
final private void cloudDebugLog(SolrCore core, String op) {
if (!LOG.isDebugEnabled()) {
return;
}
@ -566,11 +628,11 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
}
public boolean isClosed() {
final public boolean isClosed() {
return close;
}
private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
final private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {
WaitForState prepCmd = new WaitForState();
@ -603,7 +665,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
}
private void sendPrepRecoveryCmd(String leaderBaseUrl, WaitForState prepCmd)
final private void sendPrepRecoveryCmd(String leaderBaseUrl, WaitForState prepCmd)
throws SolrServerException, IOException, InterruptedException, ExecutionException {
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderBaseUrl).build()) {
client.setConnectionTimeout(10000);

View File

@ -58,6 +58,7 @@ import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.util.Version;
import org.apache.solr.client.solrj.io.stream.expr.Expressible;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.MapSerializable;
import org.apache.solr.common.SolrException;
@ -357,6 +358,7 @@ public class SolrConfig extends Config implements MapSerializable {
.add(new SolrPluginInfo(SolrEventListener.class, "//listener", REQUIRE_CLASS, MULTI_OK, REQUIRE_NAME_IN_OVERLAY))
.add(new SolrPluginInfo(DirectoryFactory.class, "directoryFactory", REQUIRE_CLASS))
.add(new SolrPluginInfo(RecoveryStrategy.Builder.class, "recoveryStrategy"))
.add(new SolrPluginInfo(IndexDeletionPolicy.class, "indexConfig/deletionPolicy", REQUIRE_CLASS))
.add(new SolrPluginInfo(CodecFactory.class, "codecFactory", REQUIRE_CLASS))
.add(new SolrPluginInfo(IndexReaderFactory.class, "indexReaderFactory", REQUIRE_CLASS))

View File

@ -74,6 +74,7 @@ import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.solr.client.solrj.impl.BinaryResponseParser;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.cloud.ZkSolrResourceLoader;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
@ -203,6 +204,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
private final IndexDeletionPolicyWrapper solrDelPolicy;
private final SolrSnapshotMetaDataManager snapshotMgr;
private final DirectoryFactory directoryFactory;
private final RecoveryStrategy.Builder recoveryStrategyBuilder;
private IndexReaderFactory indexReaderFactory;
private final Codec codec;
private final MemClassLoader memClassLoader;
@ -657,6 +659,22 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
return DirectoryFactory.loadDirectoryFactory(solrConfig, getCoreDescriptor().getCoreContainer(), coreMetricManager.getRegistryName());
}
private RecoveryStrategy.Builder initRecoveryStrategyBuilder() {
final PluginInfo info = solrConfig.getPluginInfo(RecoveryStrategy.Builder.class.getName());
final RecoveryStrategy.Builder rsBuilder;
if (info != null && info.className != null) {
log.info(info.className);
rsBuilder = getResourceLoader().newInstance(info.className, RecoveryStrategy.Builder.class);
} else {
log.info("solr.RecoveryStrategy.Builder");
rsBuilder = new RecoveryStrategy.Builder();
}
if (info != null) {
rsBuilder.init(info.initArgs);
}
return rsBuilder;
}
private void initIndexReaderFactory() {
IndexReaderFactory indexReaderFactory;
PluginInfo info = solrConfig.getPluginInfo(IndexReaderFactory.class.getName());
@ -864,10 +882,12 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
if (updateHandler == null) {
directoryFactory = initDirectoryFactory();
solrCoreState = new DefaultSolrCoreState(directoryFactory);
recoveryStrategyBuilder = initRecoveryStrategyBuilder();
solrCoreState = new DefaultSolrCoreState(directoryFactory, recoveryStrategyBuilder);
} else {
solrCoreState = updateHandler.getSolrCoreState();
directoryFactory = solrCoreState.getDirectoryFactory();
recoveryStrategyBuilder = solrCoreState.getRecoveryStrategyBuilder();
isReloaded = true;
}

View File

@ -63,6 +63,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
private SolrIndexWriter indexWriter = null;
private DirectoryFactory directoryFactory;
private final RecoveryStrategy.Builder recoveryStrategyBuilder;
private volatile RecoveryStrategy recoveryStrat;
@ -76,8 +77,15 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
protected final ReentrantLock commitLock = new ReentrantLock();
@Deprecated
public DefaultSolrCoreState(DirectoryFactory directoryFactory) {
this(directoryFactory, new RecoveryStrategy.Builder());
}
public DefaultSolrCoreState(DirectoryFactory directoryFactory,
RecoveryStrategy.Builder recoveryStrategyBuilder) {
this.directoryFactory = directoryFactory;
this.recoveryStrategyBuilder = recoveryStrategyBuilder;
}
private void closeIndexWriter(IndexWriterCloser closer) {
@ -262,6 +270,11 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
return directoryFactory;
}
@Override
public RecoveryStrategy.Builder getRecoveryStrategyBuilder() {
return recoveryStrategyBuilder;
}
@Override
public void doRecovery(CoreContainer cc, CoreDescriptor cd) {
@ -310,7 +323,7 @@ public final class DefaultSolrCoreState extends SolrCoreState implements Recover
recoveryThrottle.minimumWaitBetweenActions();
recoveryThrottle.markAttemptingAction();
recoveryStrat = new RecoveryStrategy(cc, cd, DefaultSolrCoreState.this);
recoveryStrat = recoveryStrategyBuilder.create(cc, cd, DefaultSolrCoreState.this);
recoveryStrat.setRecoveringAfterStartup(recoveringAfterStartup);
Future<?> future = cc.getUpdateShardHandler().getRecoveryExecutor().submit(recoveryStrat);
try {

View File

@ -23,6 +23,7 @@ import java.util.concurrent.locks.Lock;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.search.Sort;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory;
@ -144,6 +145,11 @@ public abstract class SolrCoreState {
*/
public abstract DirectoryFactory getDirectoryFactory();
/**
* @return the {@link org.apache.solr.cloud.RecoveryStrategy.Builder} that should be used.
*/
public abstract RecoveryStrategy.Builder getRecoveryStrategyBuilder();
public interface IndexWriterCloser {
void closeWriter(IndexWriter writer) throws IOException;

View File

@ -0,0 +1,28 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
<recoveryStrategy>
<int name="maxRetries">250</int>
</recoveryStrategy>
<schemaFactory class="ClassicIndexSchemaFactory"/>
</config>

View File

@ -0,0 +1,32 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<requestHandler name="standard" class="solr.StandardRequestHandler"></requestHandler>
<!--
The RecoveryStrategy and RecoveryStrategy.Builder classes may change in future and customisations
are not supported between versions in terms of API or back compat behaviour.
-->
<recoveryStrategy class="org.apache.solr.core.ConfigureRecoveryStrategyTest$CustomRecoveryStrategyBuilder">
<str name="alternativeBaseUrlProp">recovery_base_url</str>
</recoveryStrategy>
<schemaFactory class="ClassicIndexSchemaFactory"/>
</config>

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cloud.RecoveryStrategy;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.junit.BeforeClass;
/**
* test that configs can override the RecoveryStrategy
*/
public class ConfigureRecoveryStrategyTest extends SolrTestCaseJ4 {
private static final String solrConfigFileNameConfigure = "solrconfig-configurerecoverystrategy.xml";
private static final String solrConfigFileNameCustom = "solrconfig-customrecoverystrategy.xml";
private static String solrConfigFileName;
@BeforeClass
public static void beforeClass() throws Exception {
solrConfigFileName = (random().nextBoolean()
? solrConfigFileNameConfigure : solrConfigFileNameCustom);
initCore(solrConfigFileName, "schema.xml");
}
public void testBuilder() throws Exception {
final RecoveryStrategy.Builder recoveryStrategyBuilder =
h.getCore().getSolrCoreState().getRecoveryStrategyBuilder();
assertNotNull("recoveryStrategyBuilder is null", recoveryStrategyBuilder);
final String expectedClassName;
if (solrConfigFileName.equals(solrConfigFileNameConfigure)) {
expectedClassName = RecoveryStrategy.Builder.class.getName();
} else if (solrConfigFileName.equals(solrConfigFileNameCustom)) {
assertTrue("recoveryStrategyBuilder is wrong class (instanceof)",
recoveryStrategyBuilder instanceof CustomRecoveryStrategyBuilder);
expectedClassName = ConfigureRecoveryStrategyTest.CustomRecoveryStrategyBuilder.class.getName();
} else {
expectedClassName = null;
}
assertEquals("recoveryStrategyBuilder is wrong class (name)",
expectedClassName, recoveryStrategyBuilder.getClass().getName());
}
public void testAlmostAllMethodsAreFinal() throws Exception {
for (Method m : RecoveryStrategy.class.getDeclaredMethods()) {
final String methodName = m.getName();
if ("getReplicateLeaderUrl".equals(methodName)) {
assertFalse(m.toString(), Modifier.isFinal(m.getModifiers()));
} else {
assertTrue(m.toString(), Modifier.isFinal(m.getModifiers()));
}
}
}
static public class CustomRecoveryStrategy extends RecoveryStrategy {
private String alternativeBaseUrlProp;
public String getAlternativeBaseUrlProp() {
return alternativeBaseUrlProp;
}
public void setAlternativeBaseUrlProp(String alternativeBaseUrlProp) {
this.alternativeBaseUrlProp = alternativeBaseUrlProp;
}
public CustomRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
super(cc, cd, recoveryListener);
}
@Override
protected String getReplicateLeaderUrl(ZkNodeProps leaderprops) {
return ZkCoreNodeProps.getCoreUrl(
leaderprops.getStr(alternativeBaseUrlProp),
leaderprops.getStr(ZkStateReader.CORE_NAME_PROP));
}
}
static public class CustomRecoveryStrategyBuilder extends RecoveryStrategy.Builder {
@Override
protected RecoveryStrategy newRecoveryStrategy(CoreContainer cc, CoreDescriptor cd,
RecoveryStrategy.RecoveryListener recoveryListener) {
return new CustomRecoveryStrategy(cc, cd, recoveryListener);
}
}
}