SOLR-9835: Create another replication mode for SolrCloud

This commit is contained in:
Cao Manh Dat 2017-03-14 14:37:47 +07:00
parent faeb1fe8c1
commit 7830462d4b
39 changed files with 1309 additions and 35 deletions

View File

@ -163,7 +163,8 @@ public class TreeMergeOutputFormat extends FileOutputFormat<Text, NullWritable>
// Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on
// commitTimeMSec in the commit data to do replication.
SolrIndexWriter.setCommitData(writer);
//TODO no commitUpdateCommand
SolrIndexWriter.setCommitData(writer, -1);
timer = new RTimer();
LOG.info("Optimizing Solr: Closing index writer");

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.fs.Path;
@ -420,7 +421,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase {
try {
// we must check LIR before registering as leader
checkLIR(coreName, allReplicasInLine);
boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
if (onlyLeaderIndexes) {
// stop replicate from old leader
zkController.stopReplicationFromLeader(coreName);
if (weAreReplacement) {
try (SolrCore core = cc.getCore(coreName)) {
Future<UpdateLog.RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog();
if (future != null) {
log.info("Replaying tlog before become new leader");
future.get();
} else {
log.info("New leader does not have old tlog to replay");
}
}
}
}
super.runLeaderProcess(weAreReplacement, 0);
try (SolrCore core = cc.getCore(coreName)) {
if (core != null) {

View File

@ -131,6 +131,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler
ZkStateReader.REPLICATION_FACTOR, "1",
ZkStateReader.MAX_SHARDS_PER_NODE, "1",
ZkStateReader.AUTO_ADD_REPLICAS, "false",
ZkStateReader.REALTIME_REPLICAS, "-1",
DocCollection.RULE, null,
SNITCH, null));

View File

@ -118,7 +118,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
private boolean onlyLeaderIndexes;
protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
@ -128,6 +129,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
String collection = cd.getCloudDescriptor().getCollectionName();
onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
}
final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() {
@ -260,7 +263,7 @@ public class RecoveryStrategy extends Thread implements Closeable {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
}
@ -309,7 +312,8 @@ public class RecoveryStrategy extends Thread implements Closeable {
return;
}
boolean firstTime = true;
// we temporary ignore peersync for realtimeReplicas mode
boolean firstTime = !onlyLeaderIndexes;
List<Long> recentVersions;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
@ -361,6 +365,10 @@ public class RecoveryStrategy extends Thread implements Closeable {
}
}
if (onlyLeaderIndexes) {
zkController.stopReplicationFromLeader(coreName);
}
Future<RecoveryInfo> replayFuture = null;
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
@ -514,6 +522,9 @@ public class RecoveryStrategy extends Thread implements Closeable {
if (successfulRecovery) {
LOG.info("Registering as Active after recovery.");
try {
if (onlyLeaderIndexes) {
zkController.startReplicationFromLeader(coreName);
}
zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE);
} catch (Exception e) {
LOG.error("Could not publish as ACTIVE after succesful recovery", e);
@ -587,8 +598,20 @@ public class RecoveryStrategy extends Thread implements Closeable {
LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery));
}
public static Runnable testing_beforeReplayBufferingUpdates;
final private Future<RecoveryInfo> replay(SolrCore core)
throws InterruptedException, ExecutionException {
if (testing_beforeReplayBufferingUpdates != null) {
testing_beforeReplayBufferingUpdates.run();
}
if (onlyLeaderIndexes) {
// roll over all updates during buffering to new tlog, make RTG available
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false));
return null;
}
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
// no replay needed\

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReplicateFromLeader {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private CoreContainer cc;
private String coreName;
private ReplicationHandler replicationProcess;
private long lastVersion = 0;
public ReplicateFromLeader(CoreContainer cc, String coreName) {
this.cc = cc;
this.coreName = coreName;
}
public void startReplication() throws InterruptedException {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (cc.isShutDown()) {
return;
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames());
}
}
SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
String pollIntervalStr = "00:00:03";
if (uinfo.autoCommmitMaxTime != -1) {
pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
} else if (uinfo.autoSoftCommmitMaxTime != -1) {
pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
}
NamedList slaveConfig = new NamedList();
slaveConfig.add("fetchFromLeader", true);
slaveConfig.add("pollInterval", pollIntervalStr);
NamedList replicationConfig = new NamedList();
replicationConfig.add("slave", slaveConfig);
String lastCommitVersion = getCommitVersion(core);
if (lastCommitVersion != null) {
lastVersion = Long.parseLong(lastCommitVersion);
}
replicationProcess = new ReplicationHandler();
replicationProcess.setPollListener((solrCore, pollSuccess) -> {
if (pollSuccess) {
String commitVersion = getCommitVersion(core);
if (commitVersion == null) return;
if (Long.parseLong(commitVersion) == lastVersion) return;
UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
cuc.setVersion(Long.parseLong(commitVersion));
updateLog.copyOverOldUpdates(cuc);
lastVersion = Long.parseLong(commitVersion);
}
});
replicationProcess.init(replicationConfig);
replicationProcess.inform(core);
}
}
public static String getCommitVersion(SolrCore solrCore) {
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
try {
String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
if (commitVersion == null) return null;
else return commitVersion;
} catch (Exception e) {
LOG.warn("Cannot get commit command version from index commit point ",e);
return null;
}
}
private static String toPollIntervalStr(int ms) {
int sec = ms/1000;
int hour = sec / 3600;
sec = sec % 3600;
int min = sec / 60;
sec = sec % 60;
return hour + ":" + min + ":" + sec;
}
public void stopReplication() {
replicationProcess.close();
}
}

View File

@ -189,6 +189,7 @@ public class ZkController {
private LeaderElector overseerElector;
private Map<String, ReplicateFromLeader> replicateFromLeaders = new ConcurrentHashMap<>();
// for now, this can be null in tests, in which case recovery will be inactive, and other features
// may accept defaults or use mocks rather than pulling things from a CoreContainer
@ -877,7 +878,7 @@ public class ZkController {
coreName, baseUrl, cloudDesc.getCollectionName(), shardId);
ZkNodeProps leaderProps = new ZkNodeProps(props);
try {
// If we're a preferred leader, insert ourselves at the head of the queue
boolean joinAtHead = false;
@ -913,9 +914,16 @@ public class ZkController {
// leader election perhaps?
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1;
boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader;
if (isReplicaInOnlyLeaderIndexes) {
String commitVersion = ReplicateFromLeader.getCommitVersion(core);
if (commitVersion != null) {
ulog.copyOverOldUpdates(Long.parseLong(commitVersion));
}
}
// we will call register again after zk expiration and on reload
if (!afterExpiration && !core.isReloaded() && ulog != null) {
if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) {
// disable recovery in case shard is in construction state (for shard splits)
Slice slice = getClusterState().getSlice(collection, shardId);
if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) {
@ -934,6 +942,9 @@ public class ZkController {
boolean didRecovery
= checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration);
if (!didRecovery) {
if (isReplicaInOnlyLeaderIndexes) {
startReplicationFromLeader(coreName);
}
publish(desc, Replica.State.ACTIVE);
}
@ -948,6 +959,20 @@ public class ZkController {
}
}
public void startReplicationFromLeader(String coreName) throws InterruptedException {
ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName);
if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) {
replicateFromLeader.startReplication();
}
}
public void stopReplicationFromLeader(String coreName) {
ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName);
if (replicateFromLeader != null) {
replicateFromLeader.stopReplication();
}
}
// timeoutms is the timeout for the first call to get the leader - there is then
// a longer wait to make sure that leader matches our local state
private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) {

View File

@ -1137,6 +1137,13 @@ public class CoreContainer {
log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName());
SolrCore newCore = core.reload(coreConfig);
registerCore(cd.getName(), newCore, false, false);
if (getZkController() != null) {
boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1;
if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) {
getZkController().stopReplicationFromLeader(core.getName());
getZkController().startReplicationFromLeader(newCore.getName());
}
}
} catch (SolrCoreState.CoreIsClosedException e) {
throw e;
} catch (Exception e) {

View File

@ -68,8 +68,11 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.ExecutorUtil;
@ -115,7 +118,7 @@ public class IndexFetcher {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String masterUrl;
private String masterUrl;
final ReplicationHandler replicationHandler;
@ -150,6 +153,8 @@ public class IndexFetcher {
private boolean useExternalCompression = false;
private boolean fetchFromLeader = false;
private final HttpClient myHttpClient;
private Integer connTimeout;
@ -167,11 +172,15 @@ public class IndexFetcher {
public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER);
if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
this.fetchFromLeader = (boolean) fetchFromLeader;
}
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null)
if (masterUrl == null && !this.fetchFromLeader)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"'masterUrl' is required for a slave");
if (masterUrl.endsWith(ReplicationHandler.PATH)) {
if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) {
masterUrl = masterUrl.substring(0, masterUrl.length()-12);
LOG.warn("'masterUrl' must be specified without the "+ReplicationHandler.PATH+" suffix");
}
@ -298,6 +307,15 @@ public class IndexFetcher {
}
try {
if (fetchFromLeader) {
Replica replica = getLeaderReplica();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
if (cd.getCoreNodeName().equals(replica.getName())) {
return false;
}
masterUrl = replica.getCoreUrl();
LOG.info("Updated masterUrl to " + masterUrl);
}
//get the current 'replicateable' index version in the master
NamedList response;
try {
@ -404,7 +422,7 @@ public class IndexFetcher {
isFullCopyNeeded = true;
}
if (!isFullCopyNeeded) {
if (!isFullCopyNeeded && !fetchFromLeader) {
// a searcher might be using some flushed but not committed segments
// because of soft commits (which open a searcher on IW's data)
// so we need to close the existing searcher on the last commit
@ -565,6 +583,14 @@ public class IndexFetcher {
}
}
private Replica getLeaderReplica() throws InterruptedException {
ZkController zkController = solrCore.getCoreDescriptor().getCoreContainer().getZkController();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
cd.getCollectionName(), cd.getShardId());
return leaderReplica;
}
private void cleanup(final SolrCore core, Directory tmpIndexDir,
Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
try {

View File

@ -209,6 +209,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
private Long pollIntervalNs;
private String pollIntervalStr;
private PollListener pollListener;
public interface PollListener {
void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException;
}
/**
* Disable the timer task for polling
*/
@ -218,6 +223,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
return pollIntervalStr;
}
public void setPollListener(PollListener pollListener) {
this.pollListener = pollListener;
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
rsp.setHttpCaching(false);
@ -1142,7 +1151,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
try {
LOG.debug("Polling for index modifications");
markScheduledExecutionStart();
doFetch(null, false);
boolean pollSuccess = doFetch(null, false);
if (pollListener != null) pollListener.onComplete(core, pollSuccess);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
@ -1328,6 +1338,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
});
}
public void close() {
if (executorService != null) executorService.shutdown();
if (pollingIndexFetcher != null) {
pollingIndexFetcher.destroy();
}
if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) {
currentIndexFetcher.destroy();
}
ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor);
if (restoreFuture != null) {
restoreFuture.cancel(false);
}
}
/**
* Register a listener for postcommit/optimize
*
@ -1680,6 +1704,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw
public static final String MASTER_URL = "masterUrl";
public static final String FETCH_FROM_LEADER = "fetchFromLeader";
public static final String STATUS = "status";
public static final String COMMAND = "command";

View File

@ -115,6 +115,7 @@ import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
@ -404,7 +405,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission
STATE_FORMAT,
AUTO_ADD_REPLICAS,
RULE,
SNITCH);
SNITCH,
REALTIME_REPLICAS);
if (props.get(STATE_FORMAT) == null) {
props.put(STATE_FORMAT, "2");

View File

@ -207,6 +207,11 @@ public final class CommitTracker implements Runnable {
command.openSearcher = openSearcher;
command.waitSearcher = waitSearcher;
command.softCommit = softCommit;
if (core.getCoreDescriptor().getCloudDescriptor() != null
&& core.getCoreDescriptor().getCloudDescriptor().isLeader()
&& !softCommit) {
command.version = core.getUpdateHandler().getUpdateLog().getVersionInfo().getNewClock();
}
// no need for command.maxOptimizeSegments = 1; since it is not optimizing
// we increment this *before* calling commit because it was causing a race

View File

@ -45,7 +45,9 @@ import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.util.BytesRefHash;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SimpleOrderedMap;
@ -123,6 +125,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit;
indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges;
ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController();
if (zkController != null) {
DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName());
if (dc.getRealtimeReplicas() == 1) {
commitWithinSoftCommit = false;
commitTracker.setOpenSearcher(true);
}
}
}
@ -233,6 +243,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
cmd.overwrite = false;
}
try {
if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
if (ulog != null) ulog.add(cmd);
return 1;
}
if (cmd.overwrite) {
// Check for delete by query commands newer (i.e. reordered). This
// should always be null on a leader
@ -404,6 +419,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByIdCommands.increment();
deleteByIdCommandsCumulative.mark();
if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) {
if (ulog != null) ulog.delete(cmd);
return;
}
Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId());
// SolrCore.verbose("deleteDocuments",deleteTerm,writer);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
@ -463,6 +483,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
deleteByQueryCommandsCumulative.mark();
boolean madeIt=false;
try {
if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) {
if (ulog != null) ulog.deleteByQuery(cmd);
madeIt = true;
return;
}
Query q = getQuery(cmd);
boolean delAll = MatchAllDocsQuery.class == q.getClass();
@ -563,7 +588,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
log.info("start "+cmd);
RefCounted<IndexWriter> iw = solrCoreState.getIndexWriter(core);
try {
SolrIndexWriter.setCommitData(iw.get());
SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion());
iw.get().prepareCommit();
} finally {
iw.decref();
@ -647,7 +672,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
// SolrCore.verbose("writer.commit() start writer=",writer);
if (writer.hasUncommittedChanges()) {
SolrIndexWriter.setCommitData(writer);
SolrIndexWriter.setCommitData(writer, cmd.getVersion());
writer.commit();
} else {
log.info("No uncommitted changes. Skipping IW.commit.");
@ -838,7 +863,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
// todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used)
SolrIndexWriter.setCommitData(writer);
SolrIndexWriter.setCommitData(writer, cmd.getVersion());
writer.commit();
synchronized (solrCoreState.getUpdateLock()) {

View File

@ -20,8 +20,10 @@ import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@ -368,6 +370,10 @@ public class HdfsTransactionLog extends TransactionLog {
return new HDFSLogReader(startingPos);
}
public LogReader getSortedReader(long startingPos) {
return new HDFSSortedLogReader(startingPos);
}
/** Returns a single threaded reverse reader */
@Override
public ReverseReader getReverseReader() throws IOException {
@ -477,6 +483,50 @@ public class HdfsTransactionLog extends TransactionLog {
}
public class HDFSSortedLogReader extends HDFSLogReader{
private long startingPos;
private boolean inOrder = true;
private TreeMap<Long, Long> versionToPos;
Iterator<Long> iterator;
public HDFSSortedLogReader(long startingPos) {
super(startingPos);
this.startingPos = startingPos;
}
@Override
public Object next() throws IOException, InterruptedException {
if (versionToPos == null) {
versionToPos = new TreeMap<>();
Object o;
long pos = startingPos;
long lastVersion = Long.MIN_VALUE;
while ( (o = super.next()) != null) {
List entry = (List) o;
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
version = Math.abs(version);
versionToPos.put(version, pos);
pos = currentPos();
if (version < lastVersion) inOrder = false;
lastVersion = version;
}
fis.seek(startingPos);
}
if (inOrder) {
return super.next();
} else {
if (iterator == null) iterator = versionToPos.values().iterator();
if (!iterator.hasNext()) return null;
long pos = iterator.next();
if (pos != currentPos()) fis.seek(pos);
return super.next();
}
}
}
public class HDFSReverseReader extends ReverseReader {
FSDataFastInputStream fis;
private LogCodec codec = new LogCodec(resolver) {

View File

@ -137,7 +137,8 @@ public class SolrIndexSplitter {
// we commit explicitly instead of sending a CommitUpdateCommand through the processor chain
// because the sub-shard cores will just ignore such a commit because the update log is not
// in active state at this time.
SolrIndexWriter.setCommitData(iw);
//TODO no commitUpdateCommand
SolrIndexWriter.setCommitData(iw, -1);
iw.commit();
success = true;
} finally {

View File

@ -61,6 +61,7 @@ public class SolrIndexWriter extends IndexWriter {
/** Stored into each Lucene commit to record the
* System.currentTimeMillis() when commit was called. */
public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec";
public static final String COMMIT_COMMAND_VERSION = "commitCommandVer";
private final Object CLOSE_LOCK = new Object();
@ -183,10 +184,11 @@ public class SolrIndexWriter extends IndexWriter {
@SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " +
" but currently suspiciously used for replication as well")
public static void setCommitData(IndexWriter iw) {
log.info("Calling setCommitData with IW:" + iw.toString());
public static void setCommitData(IndexWriter iw, long commitCommandVersion) {
log.info("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion);
final Map<String,String> commitData = new HashMap<>();
commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis()));
commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion));
iw.setLiveCommitData(commitData.entrySet());
}

View File

@ -29,9 +29,11 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.BytesRef;
@ -632,6 +634,10 @@ public class TransactionLog implements Closeable {
return new LogReader(startingPos);
}
public LogReader getSortedReader(long startingPos) {
return new SortedLogReader(startingPos);
}
/** Returns a single threaded reverse reader */
public ReverseReader getReverseReader() throws IOException {
return new FSReverseReader();
@ -715,6 +721,50 @@ public class TransactionLog implements Closeable {
}
public class SortedLogReader extends LogReader {
private long startingPos;
private boolean inOrder = true;
private TreeMap<Long, Long> versionToPos;
Iterator<Long> iterator;
public SortedLogReader(long startingPos) {
super(startingPos);
this.startingPos = startingPos;
}
@Override
public Object next() throws IOException, InterruptedException {
if (versionToPos == null) {
versionToPos = new TreeMap<>();
Object o;
long pos = startingPos;
long lastVersion = Long.MIN_VALUE;
while ( (o = super.next()) != null) {
List entry = (List) o;
long version = (Long) entry.get(UpdateLog.VERSION_IDX);
version = Math.abs(version);
versionToPos.put(version, pos);
pos = currentPos();
if (version < lastVersion) inOrder = false;
lastVersion = version;
}
fis.seek(startingPos);
}
if (inOrder) {
return super.next();
} else {
if (iterator == null) iterator = versionToPos.values().iterator();
if (!iterator.hasNext()) return null;
long pos = iterator.next();
if (pos != currentPos()) fis.seek(pos);
return super.next();
}
}
}
public abstract class ReverseReader {
/** Returns the next object from the log, or null if none available.

View File

@ -34,6 +34,7 @@ public abstract class UpdateCommand implements Cloneable {
public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer.
public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits.
public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log. used when applying reordered DBQ updates when doing an add.
public static int IGNORE_INDEXWRITER = 0x00000020;
public UpdateCommand(SolrQueryRequest req) {
this.req = req;

View File

@ -27,6 +27,7 @@ import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.LinkedHashMap;
@ -618,7 +619,7 @@ public static final int VERSION_IDX = 1;
}
// only change our caches if we are not buffering
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) {
if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) {
// given that we just did a delete-by-query, we don't know what documents were
// affected and hence we must purge our caches.
openRealtimeSearcher();
@ -1095,6 +1096,162 @@ public static final int VERSION_IDX = 1;
return cs.submit(replayer, recoveryInfo);
}
/**
* Replay current tlog, so all updates will be written to index.
* This is must do task for a append replica become a new leader.
* @return future of this task
*/
public Future<RecoveryInfo> recoverFromCurrentLog() {
if (tlog == null) {
return null;
}
map.clear();
recoveryInfo = new RecoveryInfo();
tlog.incref();
ExecutorCompletionService<RecoveryInfo> cs = new ExecutorCompletionService<>(recoveryExecutor);
LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true);
versionInfo.blockUpdates();
try {
state = State.REPLAYING;
} finally {
versionInfo.unblockUpdates();
}
return cs.submit(replayer, recoveryInfo);
}
/**
* Block updates, append a commit at current tlog,
* then copy over buffer updates to new tlog and bring back ulog to active state.
* So any updates which hasn't made it to the index is preserved in the current tlog,
* this also make RTG work
* @param cuc any updates that have version larger than the version of cuc will be copied over
*/
public void copyOverBufferingUpdates(CommitUpdateCommand cuc) {
versionInfo.blockUpdates();
try {
operationFlags &= ~FLAG_GAP;
state = State.ACTIVE;
copyAndSwitchToNewTlog(cuc);
} finally {
versionInfo.unblockUpdates();
}
}
/**
* Block updates, append a commit at current tlog, then copy over updates to a new tlog.
* So any updates which hasn't made it to the index is preserved in the current tlog
* @param cuc any updates that have version larger than the version of cuc will be copied over
*/
public void copyOverOldUpdates(CommitUpdateCommand cuc) {
versionInfo.blockUpdates();
try {
copyAndSwitchToNewTlog(cuc);
} finally {
versionInfo.unblockUpdates();
}
}
protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) {
synchronized (this) {
if (tlog == null) return;
preCommit(cuc);
try {
copyOverOldUpdates(cuc.getVersion());
} finally {
postCommit(cuc);
}
}
}
/**
* Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog
* @param commitVersion any updates that have version larger than the commitVersion will be copied over
*/
public void copyOverOldUpdates(long commitVersion) {
TransactionLog oldTlog = prevTlog;
if (oldTlog == null && !logs.isEmpty()) {
oldTlog = logs.getFirst();
}
if (oldTlog == null || oldTlog.refcount.get() == 0) {
return;
}
try {
if (oldTlog.endsWithCommit()) {
return;
}
} catch (IOException e) {
log.warn("Exception reading log", e);
return;
}
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core,
new ModifiableSolrParams());
TransactionLog.LogReader logReader = oldTlog.getReader(0);
Object o = null;
try {
while ( (o = logReader.next()) != null ) {
try {
List entry = (List)o;
int operationAndFlags = (Integer) entry.get(0);
int oper = operationAndFlags & OPERATION_MASK;
long version = (Long) entry.get(1);
if (Math.abs(version) > commitVersion) {
switch (oper) {
case UpdateLog.UPDATE_INPLACE:
case UpdateLog.ADD: {
SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1);
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = sdoc;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
add(cmd);
break;
}
case UpdateLog.DELETE: {
byte[] idBytes = (byte[]) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.setIndexedId(new BytesRef(idBytes));
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
delete(cmd);
break;
}
case UpdateLog.DELETE_BY_QUERY: {
String query = (String) entry.get(2);
DeleteUpdateCommand cmd = new DeleteUpdateCommand(req);
cmd.query = query;
cmd.setVersion(version);
cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT);
deleteByQuery(cmd);
break;
}
default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
}
}
} catch (ClassCastException e) {
log.warn("Unexpected log entry or corrupt log. Entry=" + o, e);
}
}
// Prev tlog will be closed, so nullify prevMap
if (prevTlog == oldTlog) {
prevMap = null;
}
} catch (IOException e) {
log.error("Exception reading versions from log",e);
} catch (InterruptedException e) {
log.warn("Exception reading log", e);
} finally {
if (logReader != null) logReader.close();
}
}
protected void ensureLog() {
if (tlog == null) {
@ -1482,6 +1639,7 @@ public static final int VERSION_IDX = 1;
boolean activeLog;
boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked
boolean debug = loglog.isDebugEnabled();
boolean inSortedOrder;
public LogReplayer(List<TransactionLog> translogs, boolean activeLog) {
this.translogs = new LinkedList<>();
@ -1489,6 +1647,11 @@ public static final int VERSION_IDX = 1;
this.activeLog = activeLog;
}
public LogReplayer(List<TransactionLog> translogs, boolean activeLog, boolean inSortedOrder) {
this(translogs, activeLog);
this.inSortedOrder = inSortedOrder;
}
private SolrQueryRequest req;
@ -1554,7 +1717,11 @@ public static final int VERSION_IDX = 1;
try {
loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart);
long lastStatusTime = System.nanoTime();
tlogReader = translog.getReader(recoveryInfo.positionOfStart);
if (inSortedOrder) {
tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart);
} else {
tlogReader = translog.getReader(recoveryInfo.positionOfStart);
}
// NOTE: we don't currently handle a core reload during recovery. This would cause the core
// to change underneath us.

View File

@ -279,6 +279,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this is set to true in the constructor if the next processors in the chain
// are custom and may modify the SolrInputDocument racing with its serialization for replication
private final boolean cloneRequiredOnLeader;
private final boolean onlyLeaderIndexes;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
@ -324,8 +325,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
if (cloudDesc != null) {
collection = cloudDesc.getCollectionName();
ClusterState cstate = zkController.getClusterState();
DocCollection coll = cstate.getCollection(collection);
onlyLeaderIndexes = coll.getRealtimeReplicas() == 1;
} else {
collection = null;
onlyLeaderIndexes = false;
}
boolean shouldClone = false;
@ -1186,6 +1191,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
checkDeleteByQueries = true;
}
}
if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
}
@ -1692,6 +1700,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
doLocalDelete(cmd);
}
}
@ -1845,6 +1857,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return true;
}
}
if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
}
}
@ -1876,7 +1892,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) {
doLocalCommit(cmd);
if (onlyLeaderIndexes) {
try {
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, cloudDesc.getShardId());
isLeader = leaderReplica.getName().equals(
req.getCore().getCoreDescriptor().getCloudDescriptor()
.getCoreNodeName());
if (isLeader) {
long commitVersion = vinfo.getNewClock();
cmd.setVersion(commitVersion);
doLocalCommit(cmd);
} else {
assert TestInjection.waitForInSyncWithLeader(req.getCore(),
zkController, collection, cloudDesc.getShardId());
}
} catch (InterruptedException e) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e);
}
} else {
doLocalCommit(cmd);
}
} else {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
if (!req.getParams().getBool(COMMIT_END_POINT, false)) {

View File

@ -28,14 +28,28 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.NonExistentCoreException;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.update.SolrIndexWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
/**
* Allows random faults to be injected in running code during test runs.
@ -118,6 +132,8 @@ public class TestInjection {
public static int randomDelayMaxInCoreCreationInSec = 10;
public static String splitFailureBeforeReplicaCreation = null;
public static String waitForReplicasInSync = "true:60";
private static Set<Timer> timers = Collections.synchronizedSet(new HashSet<Timer>());
@ -343,6 +359,44 @@ public class TestInjection {
return true;
}
@SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value")
public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException {
if (waitForReplicasInSync == null) return true;
Pair<Boolean,Integer> pair = parseValue(waitForReplicasInSync);
boolean enabled = pair.first();
if (!enabled) return true;
long t = System.currentTimeMillis() - 100;
try {
for (int i = 0; i < pair.second(); i++) {
if (core.isClosed()) return true;
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
collection, shardId);
try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl()).build()) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.QT, ReplicationHandler.PATH);
params.set(COMMAND, CMD_DETAILS);
NamedList<Object> response = leaderClient.request(new QueryRequest(params));
long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion");
String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true;
if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) {
return true;
} else {
Thread.sleep(500);
}
}
}
} catch (Exception e) {
log.error("Exception when wait for replicas in sync with master");
}
return false;
}
private static Pair<Boolean,Integer> parseValue(String raw) {
Matcher m = ENABLED_PERCENT.matcher(raw);

View File

@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<schema name="minimal" version="1.1">
<field name="inplace_updatable_int" type="int" indexed="false" stored="false" docValues="true" />
<dynamicField name="*" type="string" indexed="true" stored="true"/>
<!-- for versioning -->
<field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
<field name="id" type="string" indexed="true" stored="true" docValues="true"/>
<uniqueKey>id</uniqueKey>
<fieldType name="string" class="solr.StrField"/>
<fieldType name="int" class="solr.TrieIntField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
<fieldType name="long" class="solr.TrieLongField" precisionStep="0" omitNorms="true" positionIncrementGap="0"/>
</schema>

View File

@ -0,0 +1,48 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Minimal solrconfig.xml with /select, /admin and /update only -->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<schemaFactory class="ClassicIndexSchemaFactory"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LATEST}</luceneMatchVersion>
<updateHandler class="solr.DirectUpdateHandler2">
<commitWithin>
<softCommit>${solr.commitwithin.softcommit:true}</softCommit>
</commitWithin>
<updateLog class="${solr.ulog:solr.UpdateLog}"></updateLog>
</updateHandler>
<requestHandler name="/select" class="solr.SearchHandler">
<lst name="defaults">
<str name="echoParams">explicit</str>
<str name="indent">true</str>
<str name="df">text</str>
</lst>
</requestHandler>
</config>

View File

@ -54,11 +54,17 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
private static final String SHARD2 = "shard2";
private static final String SHARD1 = "shard1";
private static final String ONE_NODE_COLLECTION = "onenodecollection";
private final boolean onlyLeaderIndexes = random().nextBoolean();
public BasicDistributedZk2Test() {
super();
sliceCount = 2;
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@Test
@ShardsFixed(num = 4)

View File

@ -87,6 +87,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String DEFAULT_COLLECTION = "collection1";
private final boolean onlyLeaderIndexes = random().nextBoolean();
String t1="a_t";
String i1="a_i1";
String tlong = "other_tl1";
@ -114,7 +116,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase {
pending = new HashSet<>();
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@Override
protected void setDistributedParams(ModifiableSolrParams params) {

View File

@ -55,6 +55,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1"));
private final boolean onlyLeaderIndexes = random().nextBoolean();
@BeforeClass
public static void beforeSuperClass() {
schemaString = "schema15.xml"; // we need a string id
@ -109,6 +111,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
clientSoTimeout = 5000;
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@Test
public void test() throws Exception {
cloudClient.setSoTimeout(clientSoTimeout);

View File

@ -55,6 +55,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP;
public class ForceLeaderTest extends HttpPartitionTest {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final boolean onlyLeaderIndexes = random().nextBoolean();
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@Test
@Override

View File

@ -76,12 +76,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
// give plenty of time for replicas to recover when running in slow Jenkins test envs
protected static final int maxWaitSecsToSeeAllActive = 90;
private final boolean onlyLeaderIndexes = random().nextBoolean();
public HttpPartitionTest() {
super();
sliceCount = 2;
fixShardCount(3);
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
/**
* We need to turn off directUpdatesToLeadersOnly due to SOLR-9512
*/

View File

@ -37,12 +37,19 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest
private static final long sleepMsBeforeHealPartition = 2000L;
private final boolean onlyLeaderIndexes = random().nextBoolean();
public LeaderInitiatedRecoveryOnCommitTest() {
super();
sliceCount = 1;
fixShardCount(4);
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@Override
@Test
public void test() throws Exception {

View File

@ -0,0 +1,435 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import org.apache.lucene.index.IndexWriter;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.DirectUpdateHandler2;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateHandler;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.util.RefCounted;
import org.junit.BeforeClass;
import org.junit.Test;
public class OnlyLeaderIndexesTest extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
@BeforeClass
public static void setupCluster() throws Exception {
System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory");
System.setProperty("solr.ulog.numRecordsToKeep", "1000");
configureCluster(3)
.addConfig("config", TEST_PATH().resolve("configsets")
.resolve("cloud-minimal-inplace-updates").resolve("conf"))
.configure();
CollectionAdminRequest
.createCollection(COLLECTION, "config", 1, 3)
.setRealtimeReplicas(1)
.setMaxShardsPerNode(1)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
}
@Test
public void test() throws Exception {
basicTest();
recoveryTest();
dbiTest();
basicLeaderElectionTest();
outOfOrderDBQWithInPlaceUpdatesTest();
}
public void basicTest() throws Exception {
CloudSolrClient cloudClient = cluster.getSolrClient();
new UpdateRequest()
.add(sdoc("id", "1"))
.add(sdoc("id", "2"))
.add(sdoc("id", "3"))
.add(sdoc("id", "4"))
.process(cloudClient, COLLECTION);
{
UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler();
RefCounted<IndexWriter> iwRef = updateHandler.getSolrCoreState().getIndexWriter(null);
assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges());
iwRef.decref();
}
for (SolrCore solrCore : getSolrCore(false)) {
RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
iwRef.decref();
}
checkRTG(1, 4, cluster.getJettySolrRunners());
new UpdateRequest()
.deleteById("1")
.deleteByQuery("id:2")
.process(cloudClient, COLLECTION);
// The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG
checkRTG(2,4, getSolrRunner(false));
new UpdateRequest()
.commit(cloudClient, COLLECTION);
checkShardConsistency(2, 1);
// Update log roll over
for (SolrCore solrCore : getSolrCore(false)) {
UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
assertFalse(updateLog.hasUncommittedChanges());
}
// UpdateLog copy over old updates
for (int i = 15; i <= 150; i++) {
cloudClient.add(COLLECTION, sdoc("id",String.valueOf(i)));
if (random().nextInt(100) < 15 & i != 150) {
cloudClient.commit(COLLECTION);
}
}
checkRTG(120,150, cluster.getJettySolrRunners());
waitForReplicasCatchUp(20);
}
public void recoveryTest() throws Exception {
CloudSolrClient cloudClient = cluster.getSolrClient();
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest()
.add(sdoc("id", "3"))
.add(sdoc("id", "4"))
.commit(cloudClient, COLLECTION);
// Replica recovery
new UpdateRequest()
.add(sdoc("id", "5"))
.process(cloudClient, COLLECTION);
JettySolrRunner solrRunner = getSolrRunner(false).get(0);
ChaosMonkey.stop(solrRunner);
new UpdateRequest()
.add(sdoc("id", "6"))
.process(cloudClient, COLLECTION);
ChaosMonkey.start(solrRunner);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
// We skip peerSync, so replica will always trigger commit on leader
checkShardConsistency(4, 20);
// LTR can be kicked off, so waiting for replicas recovery
new UpdateRequest()
.add(sdoc("id", "7"))
.commit(cloudClient, COLLECTION);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
checkShardConsistency(5, 20);
// More Replica recovery testing
new UpdateRequest()
.add(sdoc("id", "8"))
.process(cloudClient, COLLECTION);
checkRTG(3,8, cluster.getJettySolrRunners());
DirectUpdateHandler2.commitOnClose = false;
ChaosMonkey.stop(solrRunner);
DirectUpdateHandler2.commitOnClose = true;
ChaosMonkey.start(solrRunner);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
checkRTG(3,8, cluster.getJettySolrRunners());
checkShardConsistency(6, 20);
// Test replica recovery apply buffer updates
Semaphore waitingForBufferUpdates = new Semaphore(0);
Semaphore waitingForReplay = new Semaphore(0);
RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> {
try {
waitingForReplay.release();
waitingForBufferUpdates.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
};
ChaosMonkey.stop(solrRunner);
ChaosMonkey.start(solrRunner);
waitingForReplay.acquire();
new UpdateRequest()
.add(sdoc("id", "9"))
.add(sdoc("id", "10"))
.process(cloudClient, COLLECTION);
waitingForBufferUpdates.release();
RecoveryStrategy.testing_beforeReplayBufferingUpdates = null;
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
checkRTG(3,10, cluster.getJettySolrRunners());
checkShardConsistency(6, 20);
for (SolrCore solrCore : getSolrCore(false)) {
RefCounted<IndexWriter> iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges());
iwRef.decref();
}
}
public void dbiTest() throws Exception{
CloudSolrClient cloudClient = cluster.getSolrClient();
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest()
.add(sdoc("id", "1"))
.commit(cloudClient, COLLECTION);
checkShardConsistency(1, 1);
new UpdateRequest()
.deleteById("1")
.process(cloudClient, COLLECTION);
try {
checkRTG(1, 1, cluster.getJettySolrRunners());
} catch (AssertionError e) {
return;
}
fail("Doc1 is deleted but it's still exist");
}
public void basicLeaderElectionTest() throws Exception {
CloudSolrClient cloudClient = cluster.getSolrClient();
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest()
.add(sdoc("id", "1"))
.add(sdoc("id", "2"))
.process(cloudClient, COLLECTION);
String oldLeader = getLeader();
JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
ChaosMonkey.kill(oldLeaderJetty);
for (int i = 0; i < 60; i++) { // wait till leader is changed
if (!oldLeader.equals(getLeader())) {
break;
}
Thread.sleep(100);
}
new UpdateRequest()
.add(sdoc("id", "3"))
.add(sdoc("id", "4"))
.process(cloudClient, COLLECTION);
ChaosMonkey.start(oldLeaderJetty);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 60);
checkRTG(1,4, cluster.getJettySolrRunners());
new UpdateRequest()
.commit(cloudClient, COLLECTION);
checkShardConsistency(4,1);
}
private String getLeader() throws InterruptedException {
ZkNodeProps props = cluster.getSolrClient().getZkStateReader().getLeaderRetry("collection1", "shard1", 30000);
return props.getStr(ZkStateReader.NODE_NAME_PROP);
}
public void outOfOrderDBQWithInPlaceUpdatesTest() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*")
.commit(cluster.getSolrClient(), COLLECTION);
List<UpdateRequest> updates = new ArrayList<>();
updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", Long.MAX_VALUE-100)); // full update
updates.add(simulatedDBQ("inplace_updatable_int:5", Long.MAX_VALUE-98));
updates.add(simulatedUpdateRequest(Long.MAX_VALUE-100, "id", 1, "inplace_updatable_int", 6, "_version_", Long.MAX_VALUE-99));
for (JettySolrRunner solrRunner: getSolrRunner(false)) {
try (SolrClient client = solrRunner.newClient()) {
for (UpdateRequest up : updates) {
up.process(client, COLLECTION);
}
}
}
JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0);
ChaosMonkey.kill(oldLeaderJetty);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
ChaosMonkey.start(oldLeaderJetty);
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
false, true, 30);
new UpdateRequest()
.add(sdoc("id", "2"))
.commit(cluster.getSolrClient(), COLLECTION);
checkShardConsistency(2,20);
SolrDocument doc = cluster.getSolrClient().getById(COLLECTION,"1");
assertNotNull(doc.get("title_s"));
}
private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException {
SolrInputDocument doc = sdoc(fields);
// get baseUrl of the leader
String baseUrl = getBaseUrl();
UpdateRequest ur = new UpdateRequest();
ur.add(doc);
ur.setParam("update.distrib", "FROMLEADER");
if (prevVersion != null) {
ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion));
ur.setParam("distrib.inplace.update", "true");
}
ur.setParam("distrib.from", baseUrl);
return ur;
}
private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException {
String baseUrl = getBaseUrl();
UpdateRequest ur = new UpdateRequest();
ur.deleteByQuery(query);
ur.setParam("_version_", ""+version);
ur.setParam("update.distrib", "FROMLEADER");
ur.setParam("distrib.from", baseUrl);
return ur;
}
private String getBaseUrl() {
DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION);
Slice slice = collection.getSlice("shard1");
return slice.getLeader().getCoreUrl();
}
private void checkRTG(int from, int to, List<JettySolrRunner> solrRunners) throws Exception{
for (JettySolrRunner solrRunner: solrRunners) {
try (SolrClient client = solrRunner.newClient()) {
for (int i = from; i <= to; i++) {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
query.setRequestHandler("/get");
query.set("id",i);
QueryResponse res = client.query(COLLECTION, query);
assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc"));
}
}
}
}
private void checkShardConsistency(int expected, int numTry) throws Exception{
for (int i = 0; i < numTry; i++) {
boolean inSync = true;
for (JettySolrRunner solrRunner: cluster.getJettySolrRunners()) {
try (SolrClient client = solrRunner.newClient()) {
SolrQuery query = new SolrQuery("*:*");
query.set("distrib", false);
long results = client.query(COLLECTION, query).getResults().getNumFound();
if (expected != results) {
inSync = false;
Thread.sleep(500);
break;
}
}
}
if (inSync) return;
}
fail("Some replicas are not in sync with leader");
}
private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException {
String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (leaderTimeCommit == null) return;
for (int i = 0; i < numTry; i++) {
boolean inSync = true;
for (SolrCore solrCore : getSolrCore(false)) {
String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY);
if (!leaderTimeCommit.equals(replicateTimeCommit)) {
inSync = false;
Thread.sleep(500);
break;
}
}
if (inSync) return;
}
fail("Some replicas are not in sync with leader");
}
private List<SolrCore> getSolrCore(boolean isLeader) {
List<SolrCore> rs = new ArrayList<>();
CloudSolrClient cloudClient = cluster.getSolrClient();
DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
if (solrRunner.getCoreContainer() == null) continue;
for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
if (slice.getLeader() == replica && isLeader) {
rs.add(solrCore);
} else if (slice.getLeader() != replica && !isLeader) {
rs.add(solrCore);
}
}
}
return rs;
}
private List<JettySolrRunner> getSolrRunner(boolean isLeader) {
List<JettySolrRunner> rs = new ArrayList<>();
CloudSolrClient cloudClient = cluster.getSolrClient();
DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION);
for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) {
if (solrRunner.getCoreContainer() == null) continue;
for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) {
CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor();
Slice slice = docCollection.getSlice(cloudDescriptor.getShardId());
Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName());
if (slice.getLeader() == replica && isLeader) {
rs.add(solrRunner);
} else if (slice.getLeader() != replica && !isLeader) {
rs.add(solrRunner);
}
}
}
return rs;
}
}

View File

@ -33,12 +33,17 @@ import org.junit.Test;
@SolrTestCaseJ4.SuppressSSL
public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase {
private static final int MAX_BUFFERED_DOCS = 2, ULOG_NUM_RECORDS_TO_KEEP = 2;
private final boolean onlyLeaderIndexes = random().nextBoolean();
public RecoveryAfterSoftCommitTest() {
sliceCount = 1;
fixShardCount(2);
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1: -1;
}
@BeforeClass
public static void beforeTests() {
System.setProperty("solr.tests.maxBufferedDocs", String.valueOf(MAX_BUFFERED_DOCS));

View File

@ -86,6 +86,12 @@ public class ShardSplitTest extends BasicDistributedZkTest {
useFactory(null);
}
//TODO for now, onlyLeaderIndexes do not work with ShardSplitTest
@Override
protected int getRealtimeReplicas() {
return -1;
}
@Test
public void test() throws Exception {

View File

@ -52,6 +52,7 @@ import org.junit.Test;
public class TestCloudRecovery extends SolrCloudTestCase {
private static final String COLLECTION = "collection1";
private static boolean onlyLeaderIndexes;
@BeforeClass
public static void setupCluster() throws Exception {
@ -63,8 +64,10 @@ public class TestCloudRecovery extends SolrCloudTestCase {
.addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf"))
.configure();
onlyLeaderIndexes = random().nextBoolean();
CollectionAdminRequest
.createCollection(COLLECTION, "config", 2, 2)
.setRealtimeReplicas(onlyLeaderIndexes? 1: -1)
.setMaxShardsPerNode(2)
.process(cluster.getSolrClient());
AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(),
@ -107,7 +110,12 @@ public class TestCloudRecovery extends SolrCloudTestCase {
resp = cloudClient.query(COLLECTION, params);
assertEquals(4, resp.getResults().getNumFound());
// Make sure all nodes is recover from tlog
assertEquals(4, countReplayLog.get());
if (onlyLeaderIndexes) {
// Leader election can be kicked off, so 2 append replicas will replay its tlog before becoming new leader
assertTrue( countReplayLog.get() >=2);
} else {
assertEquals(4, countReplayLog.get());
}
// check metrics
int replicationCount = 0;
@ -127,7 +135,11 @@ public class TestCloudRecovery extends SolrCloudTestCase {
skippedCount += skipped.getCount();
}
}
assertEquals(2, replicationCount);
if (onlyLeaderIndexes) {
assertTrue(replicationCount >= 2);
} else {
assertEquals(2, replicationCount);
}
}
@Test

View File

@ -60,7 +60,10 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
@ShardsFixed(num = 2)
public void test() throws Exception {
try (CloudSolrClient client = createCloudClient(null)) {
createCollection(null, COLLECTION_NAME, 2, 2, 2, client, null, "conf1");
CollectionAdminRequest.Create req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2,2);
req.setRealtimeReplicas(1);
req.setMaxShardsPerNode(2);
client.request(req);
createCollection(null, COLLECTION_NAME1, 1, 1, 1, client, null, "conf1");
}
@ -170,6 +173,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
assertNotNull(collection);
assertEquals("conf1", collection.get("configName"));
assertEquals("1", collection.get("realtimeReplicas"));
}
}

View File

@ -42,7 +42,12 @@ public class HdfsBasicDistributedZkTest extends BasicDistributedZkTest {
System.setProperty("tests.hdfs.numdatanodes", "1");
dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath());
}
@Override
protected int getRealtimeReplicas() {
return -1;
}
@AfterClass
public static void teardownClass() throws Exception {
HdfsTestUtil.teardownClass(dfsCluster);

View File

@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory;
@Slow
public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final boolean onlyLeaderIndexes = random().nextBoolean();
@BeforeClass
public static void beforeSuperClass() throws Exception {
@ -108,7 +109,12 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
iw.decref();
}
}
@Override
protected int getRealtimeReplicas() {
return onlyLeaderIndexes? 1 : -1;
}
@After
public void after() {
System.clearProperty("solr.tests.intClassName");
@ -265,6 +271,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
}
private void reorderedDBQIndividualReplicaTest() throws Exception {
if (onlyLeaderIndexes) {
log.info("RTG with DBQs are not working in append replicas");
return;
}
clearIndex();
commit();
@ -595,7 +605,6 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
}
private void outOfOrderUpdatesIndividualReplicaTest() throws Exception {
clearIndex();
commit();
@ -741,6 +750,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
DV(id=x, val=5, ver=3)
*/
private void reorderedDBQsResurrectionTest() throws Exception {
if (onlyLeaderIndexes) {
log.info("RTG with DBQs are not working in append replicas");
return;
}
clearIndex();
commit();
@ -1016,7 +1029,7 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
String baseUrl = getBaseUrl(""+id);
UpdateRequest ur = new UpdateRequest();
if (random().nextBoolean()) {
if (random().nextBoolean() || onlyLeaderIndexes) {
ur.deleteById(""+id);
} else {
ur.deleteByQuery("id:"+id);
@ -1138,6 +1151,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase {
* dbq("inp:14",version=4)
*/
private void reorderedDBQsUsingUpdatedValueFromADroppedUpdate() throws Exception {
if (onlyLeaderIndexes) {
log.info("RTG with DBQs are not working in append replicas");
return;
}
clearIndex();
commit();

View File

@ -366,6 +366,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
private Properties properties;
protected Boolean autoAddReplicas;
protected Integer realtimeReplicas;
protected Integer stateFormat;
private String[] rule , snitch;
@ -407,6 +408,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public Create setNumShards(Integer numShards) {this.numShards = numShards; return this; }
public Create setMaxShardsPerNode(Integer numShards) { this.maxShardsPerNode = numShards; return this; }
public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; }
public Create setRealtimeReplicas(Integer realtimeReplicas) { this.realtimeReplicas = realtimeReplicas; return this;}
@Deprecated
public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; }
public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; }
@ -421,6 +423,7 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
public Integer getMaxShardsPerNode() { return maxShardsPerNode; }
public Integer getReplicationFactor() { return replicationFactor; }
public Boolean getAutoAddReplicas() { return autoAddReplicas; }
public Integer getRealtimeReplicas() { return realtimeReplicas; }
public Integer getStateFormat() { return stateFormat; }
/**
@ -507,6 +510,9 @@ public abstract class CollectionAdminRequest<T extends CollectionAdminResponse>
if (autoAddReplicas != null) {
params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas);
}
if (realtimeReplicas != null) {
params.set(ZkStateReader.REALTIME_REPLICAS, realtimeReplicas);
}
if(properties != null) {
addProperties(params, properties);
}

View File

@ -33,6 +33,7 @@ import org.noggit.JSONWriter;
import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
/**
@ -59,6 +60,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
private final Integer replicationFactor;
private final Integer maxShardsPerNode;
private final Boolean autoAddReplicas;
private final Integer realtimeReplicas;
public DocCollection(String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
@ -84,6 +86,11 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE);
Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS);
this.autoAddReplicas = autoAddReplicas == null ? false : autoAddReplicas;
Integer realtimeReplicas = (Integer) verifyProp(props, REALTIME_REPLICAS);
this.realtimeReplicas = realtimeReplicas == null ? -1 : realtimeReplicas;
if (this.realtimeReplicas != -1 && this.realtimeReplicas != 1) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid realtimeReplicas must be 1 or -1, found:" + this.realtimeReplicas);
}
verifyProp(props, RULE);
verifyProp(props, SNITCH);
Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
@ -126,6 +133,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
switch (propName) {
case MAX_SHARDS_PER_NODE:
case REPLICATION_FACTOR:
case REALTIME_REPLICAS:
return Integer.parseInt(o.toString());
case AUTO_ADD_REPLICAS:
return Boolean.parseBoolean(o.toString());
@ -226,6 +234,10 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
return maxShardsPerNode;
}
public int getRealtimeReplicas() {
return realtimeReplicas;
}
public String getZNode(){
return znode;
}

View File

@ -96,6 +96,7 @@ public class ZkStateReader implements Closeable {
public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode";
public static final String AUTO_ADD_REPLICAS = "autoAddReplicas";
public static final String MAX_CORES_PER_NODE = "maxCoresPerNode";
public static final String REALTIME_REPLICAS = "realtimeReplicas";
public static final String ROLES = "/roles.json";

View File

@ -272,6 +272,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION,
shardToJetty, shardToLeaderJetty);
}
protected int getRealtimeReplicas() {
return -1;
}
protected CloudSolrClient createCloudClient(String defaultCollection) {
CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean());
@ -383,7 +387,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
Utils.toJSON(Utils.makeMap(Overseer.QUEUE_OPERATION,
CollectionParams.CollectionAction.CREATE.toLower(), "name",
DEFAULT_COLLECTION, "numShards", String.valueOf(sliceCount),
DocCollection.STATE_FORMAT, getStateFormat())));
DocCollection.STATE_FORMAT, getStateFormat(),
ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas())));
zkClient.close();
}
@ -1619,7 +1624,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
NUM_SLICES, numShards,
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
CREATE_NODE_SET, createNodeSetStr,
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()),
client);
}
@ -1631,7 +1637,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
NUM_SLICES, numShards,
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
CREATE_NODE_SET, createNodeSetStr,
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode),
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()),
client, configName);
}
@ -1814,6 +1821,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
Map<String, Object> props = makeMap(
ZkStateReader.REPLICATION_FACTOR, replicationFactor,
ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode,
ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas(),
NUM_SLICES, numShards);
Map<String,List<Integer>> collectionInfos = new HashMap<>();
createCollection(collectionInfos, collName, props, client);