From 7830462d4b7da3acefff6353419e71cde62d5fee Mon Sep 17 00:00:00 2001 From: Cao Manh Dat Date: Tue, 14 Mar 2017 14:37:47 +0700 Subject: [PATCH] SOLR-9835: Create another replication mode for SolrCloud --- .../solr/hadoop/TreeMergeOutputFormat.java | 3 +- .../apache/solr/cloud/ElectionContext.java | 20 +- .../OverseerCollectionMessageHandler.java | 1 + .../apache/solr/cloud/RecoveryStrategy.java | 29 +- .../solr/cloud/ReplicateFromLeader.java | 124 +++++ .../org/apache/solr/cloud/ZkController.java | 31 +- .../org/apache/solr/core/CoreContainer.java | 7 + .../org/apache/solr/handler/IndexFetcher.java | 34 +- .../solr/handler/ReplicationHandler.java | 28 +- .../handler/admin/CollectionsHandler.java | 4 +- .../org/apache/solr/update/CommitTracker.java | 5 + .../solr/update/DirectUpdateHandler2.java | 31 +- .../solr/update/HdfsTransactionLog.java | 50 ++ .../apache/solr/update/SolrIndexSplitter.java | 3 +- .../apache/solr/update/SolrIndexWriter.java | 6 +- .../apache/solr/update/TransactionLog.java | 50 ++ .../org/apache/solr/update/UpdateCommand.java | 1 + .../org/apache/solr/update/UpdateLog.java | 171 ++++++- .../processor/DistributedUpdateProcessor.java | 38 +- .../org/apache/solr/util/TestInjection.java | 54 +++ .../conf/schema.xml | 31 ++ .../conf/solrconfig.xml | 48 ++ .../solr/cloud/BasicDistributedZk2Test.java | 6 + .../solr/cloud/BasicDistributedZkTest.java | 9 +- .../cloud/ChaosMonkeyNothingIsSafeTest.java | 7 + .../apache/solr/cloud/ForceLeaderTest.java | 6 + .../apache/solr/cloud/HttpPartitionTest.java | 7 + .../LeaderInitiatedRecoveryOnCommitTest.java | 7 + .../solr/cloud/OnlyLeaderIndexesTest.java | 435 ++++++++++++++++++ .../cloud/RecoveryAfterSoftCommitTest.java | 7 +- .../org/apache/solr/cloud/ShardSplitTest.java | 6 + .../apache/solr/cloud/TestCloudRecovery.java | 16 +- .../apache/solr/cloud/TestCollectionAPI.java | 6 +- .../hdfs/HdfsBasicDistributedZkTest.java | 7 +- .../update/TestInPlaceUpdatesDistrib.java | 23 +- .../solrj/request/CollectionAdminRequest.java | 6 + .../solr/common/cloud/DocCollection.java | 12 + .../solr/common/cloud/ZkStateReader.java | 1 + .../cloud/AbstractFullDistribZkTestBase.java | 14 +- 39 files changed, 1309 insertions(+), 35 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java create mode 100644 solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml create mode 100644 solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml create mode 100644 solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java diff --git a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java index e3487adad21..cac57c35aae 100644 --- a/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java +++ b/solr/contrib/map-reduce/src/java/org/apache/solr/hadoop/TreeMergeOutputFormat.java @@ -163,7 +163,8 @@ public class TreeMergeOutputFormat extends FileOutputFormat // Set Solr's commit data so the created index is usable by SolrCloud. E.g. Currently SolrCloud relies on // commitTimeMSec in the commit data to do replication. - SolrIndexWriter.setCommitData(writer); + //TODO no commitUpdateCommand + SolrIndexWriter.setCommitData(writer, -1); timer = new RTimer(); LOG.info("Optimizing Solr: Closing index writer"); diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index d3ad3224382..223a5395201 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; @@ -420,7 +421,24 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { try { // we must check LIR before registering as leader checkLIR(coreName, allReplicasInLine); - + + boolean onlyLeaderIndexes = zkController.getClusterState().getCollection(collection).getRealtimeReplicas() == 1; + if (onlyLeaderIndexes) { + // stop replicate from old leader + zkController.stopReplicationFromLeader(coreName); + if (weAreReplacement) { + try (SolrCore core = cc.getCore(coreName)) { + Future future = core.getUpdateHandler().getUpdateLog().recoverFromCurrentLog(); + if (future != null) { + log.info("Replaying tlog before become new leader"); + future.get(); + } else { + log.info("New leader does not have old tlog to replay"); + } + } + } + } + super.runLeaderProcess(weAreReplacement, 0); try (SolrCore core = cc.getCore(coreName)) { if (core != null) { diff --git a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java index 00eb12db692..4d64a0087e5 100644 --- a/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java +++ b/solr/core/src/java/org/apache/solr/cloud/OverseerCollectionMessageHandler.java @@ -131,6 +131,7 @@ public class OverseerCollectionMessageHandler implements OverseerMessageHandler ZkStateReader.REPLICATION_FACTOR, "1", ZkStateReader.MAX_SHARDS_PER_NODE, "1", ZkStateReader.AUTO_ADD_REPLICAS, "false", + ZkStateReader.REALTIME_REPLICAS, "-1", DocCollection.RULE, null, SNITCH, null)); diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index 8865c08ddb1..cb6c69c8aa9 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -118,7 +118,8 @@ public class RecoveryStrategy extends Thread implements Closeable { private boolean recoveringAfterStartup; private CoreContainer cc; private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest; - + private boolean onlyLeaderIndexes; + protected RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) { this.cc = cc; this.coreName = cd.getName(); @@ -128,6 +129,8 @@ public class RecoveryStrategy extends Thread implements Closeable { zkStateReader = zkController.getZkStateReader(); baseUrl = zkController.getBaseUrl(); coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName(); + String collection = cd.getCloudDescriptor().getCollectionName(); + onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1; } final public int getWaitForUpdatesWithStaleStatePauseMilliSeconds() { @@ -260,7 +263,7 @@ public class RecoveryStrategy extends Thread implements Closeable { UpdateRequest ureq = new UpdateRequest(); ureq.setParams(new ModifiableSolrParams()); ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true); - ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false); + ureq.getParams().set(UpdateParams.OPEN_SEARCHER, onlyLeaderIndexes); ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process( client); } @@ -309,7 +312,8 @@ public class RecoveryStrategy extends Thread implements Closeable { return; } - boolean firstTime = true; + // we temporary ignore peersync for realtimeReplicas mode + boolean firstTime = !onlyLeaderIndexes; List recentVersions; try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) { @@ -361,6 +365,10 @@ public class RecoveryStrategy extends Thread implements Closeable { } } + if (onlyLeaderIndexes) { + zkController.stopReplicationFromLeader(coreName); + } + Future replayFuture = null; while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though try { @@ -514,6 +522,9 @@ public class RecoveryStrategy extends Thread implements Closeable { if (successfulRecovery) { LOG.info("Registering as Active after recovery."); try { + if (onlyLeaderIndexes) { + zkController.startReplicationFromLeader(coreName); + } zkController.publish(core.getCoreDescriptor(), Replica.State.ACTIVE); } catch (Exception e) { LOG.error("Could not publish as ACTIVE after succesful recovery", e); @@ -587,8 +598,20 @@ public class RecoveryStrategy extends Thread implements Closeable { LOG.info("Finished recovery process, successful=[{}]", Boolean.toString(successfulRecovery)); } + public static Runnable testing_beforeReplayBufferingUpdates; + final private Future replay(SolrCore core) throws InterruptedException, ExecutionException { + if (testing_beforeReplayBufferingUpdates != null) { + testing_beforeReplayBufferingUpdates.run(); + } + if (onlyLeaderIndexes) { + // roll over all updates during buffering to new tlog, make RTG available + SolrQueryRequest req = new LocalSolrQueryRequest(core, + new ModifiableSolrParams()); + core.getUpdateHandler().getUpdateLog().copyOverBufferingUpdates(new CommitUpdateCommand(req, false)); + return null; + } Future future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates(); if (future == null) { // no replay needed\ diff --git a/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java new file mode 100644 index 00000000000..d7fded907a9 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/cloud/ReplicateFromLeader.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.lang.invoke.MethodHandles; + +import org.apache.lucene.index.IndexCommit; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrConfig; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.ReplicationHandler; +import org.apache.solr.request.LocalSolrQueryRequest; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.update.CommitUpdateCommand; +import org.apache.solr.update.SolrIndexWriter; +import org.apache.solr.update.UpdateLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReplicateFromLeader { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private CoreContainer cc; + private String coreName; + + private ReplicationHandler replicationProcess; + private long lastVersion = 0; + + public ReplicateFromLeader(CoreContainer cc, String coreName) { + this.cc = cc; + this.coreName = coreName; + } + + public void startReplication() throws InterruptedException { + try (SolrCore core = cc.getCore(coreName)) { + if (core == null) { + if (cc.isShutDown()) { + return; + } else { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in " + cc.getCoreNames()); + } + } + SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo(); + String pollIntervalStr = "00:00:03"; + if (uinfo.autoCommmitMaxTime != -1) { + pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2); + } else if (uinfo.autoSoftCommmitMaxTime != -1) { + pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2); + } + + NamedList slaveConfig = new NamedList(); + slaveConfig.add("fetchFromLeader", true); + slaveConfig.add("pollInterval", pollIntervalStr); + NamedList replicationConfig = new NamedList(); + replicationConfig.add("slave", slaveConfig); + + String lastCommitVersion = getCommitVersion(core); + if (lastCommitVersion != null) { + lastVersion = Long.parseLong(lastCommitVersion); + } + + replicationProcess = new ReplicationHandler(); + replicationProcess.setPollListener((solrCore, pollSuccess) -> { + if (pollSuccess) { + String commitVersion = getCommitVersion(core); + if (commitVersion == null) return; + if (Long.parseLong(commitVersion) == lastVersion) return; + UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog(); + SolrQueryRequest req = new LocalSolrQueryRequest(core, + new ModifiableSolrParams()); + CommitUpdateCommand cuc = new CommitUpdateCommand(req, false); + cuc.setVersion(Long.parseLong(commitVersion)); + updateLog.copyOverOldUpdates(cuc); + lastVersion = Long.parseLong(commitVersion); + } + }); + replicationProcess.init(replicationConfig); + replicationProcess.inform(core); + } + } + + public static String getCommitVersion(SolrCore solrCore) { + IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit(); + try { + String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION); + if (commitVersion == null) return null; + else return commitVersion; + } catch (Exception e) { + LOG.warn("Cannot get commit command version from index commit point ",e); + return null; + } + } + + private static String toPollIntervalStr(int ms) { + int sec = ms/1000; + int hour = sec / 3600; + sec = sec % 3600; + int min = sec / 60; + sec = sec % 60; + return hour + ":" + min + ":" + sec; + } + + public void stopReplication() { + replicationProcess.close(); + } +} diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index 333acd419d7..a19b35111b1 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -189,6 +189,7 @@ public class ZkController { private LeaderElector overseerElector; + private Map replicateFromLeaders = new ConcurrentHashMap<>(); // for now, this can be null in tests, in which case recovery will be inactive, and other features // may accept defaults or use mocks rather than pulling things from a CoreContainer @@ -877,7 +878,7 @@ public class ZkController { coreName, baseUrl, cloudDesc.getCollectionName(), shardId); ZkNodeProps leaderProps = new ZkNodeProps(props); - + try { // If we're a preferred leader, insert ourselves at the head of the queue boolean joinAtHead = false; @@ -913,9 +914,16 @@ public class ZkController { // leader election perhaps? UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); - + boolean onlyLeaderIndexes = zkStateReader.getClusterState().getCollection(collection).getRealtimeReplicas() == 1; + boolean isReplicaInOnlyLeaderIndexes = onlyLeaderIndexes && !isLeader; + if (isReplicaInOnlyLeaderIndexes) { + String commitVersion = ReplicateFromLeader.getCommitVersion(core); + if (commitVersion != null) { + ulog.copyOverOldUpdates(Long.parseLong(commitVersion)); + } + } // we will call register again after zk expiration and on reload - if (!afterExpiration && !core.isReloaded() && ulog != null) { + if (!afterExpiration && !core.isReloaded() && ulog != null && !isReplicaInOnlyLeaderIndexes) { // disable recovery in case shard is in construction state (for shard splits) Slice slice = getClusterState().getSlice(collection, shardId); if (slice.getState() != Slice.State.CONSTRUCTION || !isLeader) { @@ -934,6 +942,9 @@ public class ZkController { boolean didRecovery = checkRecovery(recoverReloadedCores, isLeader, skipRecovery, collection, coreZkNodeName, core, cc, afterExpiration); if (!didRecovery) { + if (isReplicaInOnlyLeaderIndexes) { + startReplicationFromLeader(coreName); + } publish(desc, Replica.State.ACTIVE); } @@ -948,6 +959,20 @@ public class ZkController { } } + public void startReplicationFromLeader(String coreName) throws InterruptedException { + ReplicateFromLeader replicateFromLeader = new ReplicateFromLeader(cc, coreName); + if (replicateFromLeaders.putIfAbsent(coreName, replicateFromLeader) == null) { + replicateFromLeader.startReplication(); + } + } + + public void stopReplicationFromLeader(String coreName) { + ReplicateFromLeader replicateFromLeader = replicateFromLeaders.remove(coreName); + if (replicateFromLeader != null) { + replicateFromLeader.stopReplication(); + } + } + // timeoutms is the timeout for the first call to get the leader - there is then // a longer wait to make sure that leader matches our local state private String getLeader(final CloudDescriptor cloudDesc, int timeoutms) { diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index b9597ae985f..0de671e839d 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1137,6 +1137,13 @@ public class CoreContainer { log.info("Reloading SolrCore '{}' using configuration from {}", cd.getName(), coreConfig.getName()); SolrCore newCore = core.reload(coreConfig); registerCore(cd.getName(), newCore, false, false); + if (getZkController() != null) { + boolean onlyLeaderIndexes = getZkController().getClusterState().getCollection(cd.getCollectionName()).getRealtimeReplicas() == 1; + if (onlyLeaderIndexes && !cd.getCloudDescriptor().isLeader()) { + getZkController().stopReplicationFromLeader(core.getName()); + getZkController().startReplicationFromLeader(newCore.getName()); + } + } } catch (SolrCoreState.CoreIsClosedException e) { throw e; } catch (Exception e) { diff --git a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java index 8634aeeb002..a07496fcdf1 100644 --- a/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java +++ b/solr/core/src/java/org/apache/solr/handler/IndexFetcher.java @@ -68,8 +68,11 @@ import org.apache.solr.client.solrj.impl.HttpClientUtil; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient.Builder; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.CloudDescriptor; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.ExecutorUtil; @@ -115,7 +118,7 @@ public class IndexFetcher { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private final String masterUrl; + private String masterUrl; final ReplicationHandler replicationHandler; @@ -150,6 +153,8 @@ public class IndexFetcher { private boolean useExternalCompression = false; + private boolean fetchFromLeader = false; + private final HttpClient myHttpClient; private Integer connTimeout; @@ -167,11 +172,15 @@ public class IndexFetcher { public IndexFetcher(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) { solrCore = sc; + Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER); + if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) { + this.fetchFromLeader = (boolean) fetchFromLeader; + } String masterUrl = (String) initArgs.get(MASTER_URL); - if (masterUrl == null) + if (masterUrl == null && !this.fetchFromLeader) throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "'masterUrl' is required for a slave"); - if (masterUrl.endsWith(ReplicationHandler.PATH)) { + if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) { masterUrl = masterUrl.substring(0, masterUrl.length()-12); LOG.warn("'masterUrl' must be specified without the "+ReplicationHandler.PATH+" suffix"); } @@ -298,6 +307,15 @@ public class IndexFetcher { } try { + if (fetchFromLeader) { + Replica replica = getLeaderReplica(); + CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor(); + if (cd.getCoreNodeName().equals(replica.getName())) { + return false; + } + masterUrl = replica.getCoreUrl(); + LOG.info("Updated masterUrl to " + masterUrl); + } //get the current 'replicateable' index version in the master NamedList response; try { @@ -404,7 +422,7 @@ public class IndexFetcher { isFullCopyNeeded = true; } - if (!isFullCopyNeeded) { + if (!isFullCopyNeeded && !fetchFromLeader) { // a searcher might be using some flushed but not committed segments // because of soft commits (which open a searcher on IW's data) // so we need to close the existing searcher on the last commit @@ -565,6 +583,14 @@ public class IndexFetcher { } } + private Replica getLeaderReplica() throws InterruptedException { + ZkController zkController = solrCore.getCoreDescriptor().getCoreContainer().getZkController(); + CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor(); + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( + cd.getCollectionName(), cd.getShardId()); + return leaderReplica; + } + private void cleanup(final SolrCore core, Directory tmpIndexDir, Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException { try { diff --git a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java index cdbadc4eeca..e40b2c38311 100644 --- a/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/ReplicationHandler.java @@ -209,6 +209,11 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw private Long pollIntervalNs; private String pollIntervalStr; + private PollListener pollListener; + public interface PollListener { + void onComplete(SolrCore solrCore, boolean pollSuccess) throws IOException; + } + /** * Disable the timer task for polling */ @@ -218,6 +223,10 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw return pollIntervalStr; } + public void setPollListener(PollListener pollListener) { + this.pollListener = pollListener; + } + @Override public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { rsp.setHttpCaching(false); @@ -1142,7 +1151,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw try { LOG.debug("Polling for index modifications"); markScheduledExecutionStart(); - doFetch(null, false); + boolean pollSuccess = doFetch(null, false); + if (pollListener != null) pollListener.onComplete(core, pollSuccess); } catch (Exception e) { LOG.error("Exception in fetching index", e); } @@ -1328,6 +1338,20 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw }); } + public void close() { + if (executorService != null) executorService.shutdown(); + if (pollingIndexFetcher != null) { + pollingIndexFetcher.destroy(); + } + if (currentIndexFetcher != null && currentIndexFetcher != pollingIndexFetcher) { + currentIndexFetcher.destroy(); + } + ExecutorUtil.shutdownAndAwaitTermination(restoreExecutor); + if (restoreFuture != null) { + restoreFuture.cancel(false); + } + } + /** * Register a listener for postcommit/optimize * @@ -1680,6 +1704,8 @@ public class ReplicationHandler extends RequestHandlerBase implements SolrCoreAw public static final String MASTER_URL = "masterUrl"; + public static final String FETCH_FROM_LEADER = "fetchFromLeader"; + public static final String STATUS = "status"; public static final String COMMAND = "command"; diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java index d7759cadde0..2e17af6df23 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CollectionsHandler.java @@ -115,6 +115,7 @@ import static org.apache.solr.common.cloud.DocCollection.STATE_FORMAT; import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; +import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_PROP; import static org.apache.solr.common.cloud.ZkStateReader.PROPERTY_VALUE_PROP; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; @@ -404,7 +405,8 @@ public class CollectionsHandler extends RequestHandlerBase implements Permission STATE_FORMAT, AUTO_ADD_REPLICAS, RULE, - SNITCH); + SNITCH, + REALTIME_REPLICAS); if (props.get(STATE_FORMAT) == null) { props.put(STATE_FORMAT, "2"); diff --git a/solr/core/src/java/org/apache/solr/update/CommitTracker.java b/solr/core/src/java/org/apache/solr/update/CommitTracker.java index 61f0c352630..9c09ebeb3fd 100644 --- a/solr/core/src/java/org/apache/solr/update/CommitTracker.java +++ b/solr/core/src/java/org/apache/solr/update/CommitTracker.java @@ -207,6 +207,11 @@ public final class CommitTracker implements Runnable { command.openSearcher = openSearcher; command.waitSearcher = waitSearcher; command.softCommit = softCommit; + if (core.getCoreDescriptor().getCloudDescriptor() != null + && core.getCoreDescriptor().getCloudDescriptor().isLeader() + && !softCommit) { + command.version = core.getUpdateHandler().getUpdateLog().getVersionInfo().getNewClock(); + } // no need for command.maxOptimizeSegments = 1; since it is not optimizing // we increment this *before* calling commit because it was causing a race diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index 4592bcf980b..abb5512c841 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -45,7 +45,9 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TermQuery; import org.apache.lucene.util.BytesRefHash; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.SimpleOrderedMap; @@ -123,6 +125,14 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState commitWithinSoftCommit = updateHandlerInfo.commitWithinSoftCommit; indexWriterCloseWaitsForMerges = updateHandlerInfo.indexWriterCloseWaitsForMerges; + ZkController zkController = core.getCoreDescriptor().getCoreContainer().getZkController(); + if (zkController != null) { + DocCollection dc = zkController.getClusterState().getCollection(core.getCoreDescriptor().getCollectionName()); + if (dc.getRealtimeReplicas() == 1) { + commitWithinSoftCommit = false; + commitTracker.setOpenSearcher(true); + } + } } @@ -233,6 +243,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState cmd.overwrite = false; } try { + if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) { + if (ulog != null) ulog.add(cmd); + return 1; + } + if (cmd.overwrite) { // Check for delete by query commands newer (i.e. reordered). This // should always be null on a leader @@ -404,6 +419,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState deleteByIdCommands.increment(); deleteByIdCommandsCumulative.mark(); + if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0 ) { + if (ulog != null) ulog.delete(cmd); + return; + } + Term deleteTerm = new Term(idField.getName(), cmd.getIndexedId()); // SolrCore.verbose("deleteDocuments",deleteTerm,writer); RefCounted iw = solrCoreState.getIndexWriter(core); @@ -463,6 +483,11 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState deleteByQueryCommandsCumulative.mark(); boolean madeIt=false; try { + if ( (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) != 0) { + if (ulog != null) ulog.deleteByQuery(cmd); + madeIt = true; + return; + } Query q = getQuery(cmd); boolean delAll = MatchAllDocsQuery.class == q.getClass(); @@ -563,7 +588,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState log.info("start "+cmd); RefCounted iw = solrCoreState.getIndexWriter(core); try { - SolrIndexWriter.setCommitData(iw.get()); + SolrIndexWriter.setCommitData(iw.get(), cmd.getVersion()); iw.get().prepareCommit(); } finally { iw.decref(); @@ -647,7 +672,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState // SolrCore.verbose("writer.commit() start writer=",writer); if (writer.hasUncommittedChanges()) { - SolrIndexWriter.setCommitData(writer); + SolrIndexWriter.setCommitData(writer, cmd.getVersion()); writer.commit(); } else { log.info("No uncommitted changes. Skipping IW.commit."); @@ -838,7 +863,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState } // todo: refactor this shared code (or figure out why a real CommitUpdateCommand can't be used) - SolrIndexWriter.setCommitData(writer); + SolrIndexWriter.setCommitData(writer, cmd.getVersion()); writer.commit(); synchronized (solrCoreState.getUpdateLock()) { diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java index 90f6856b1cb..c478935d1ec 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java @@ -20,8 +20,10 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.TreeMap; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -368,6 +370,10 @@ public class HdfsTransactionLog extends TransactionLog { return new HDFSLogReader(startingPos); } + public LogReader getSortedReader(long startingPos) { + return new HDFSSortedLogReader(startingPos); + } + /** Returns a single threaded reverse reader */ @Override public ReverseReader getReverseReader() throws IOException { @@ -477,6 +483,50 @@ public class HdfsTransactionLog extends TransactionLog { } + public class HDFSSortedLogReader extends HDFSLogReader{ + private long startingPos; + private boolean inOrder = true; + private TreeMap versionToPos; + Iterator iterator; + + public HDFSSortedLogReader(long startingPos) { + super(startingPos); + this.startingPos = startingPos; + } + + @Override + public Object next() throws IOException, InterruptedException { + if (versionToPos == null) { + versionToPos = new TreeMap<>(); + Object o; + long pos = startingPos; + + long lastVersion = Long.MIN_VALUE; + while ( (o = super.next()) != null) { + List entry = (List) o; + long version = (Long) entry.get(UpdateLog.VERSION_IDX); + version = Math.abs(version); + versionToPos.put(version, pos); + pos = currentPos(); + + if (version < lastVersion) inOrder = false; + lastVersion = version; + } + fis.seek(startingPos); + } + + if (inOrder) { + return super.next(); + } else { + if (iterator == null) iterator = versionToPos.values().iterator(); + if (!iterator.hasNext()) return null; + long pos = iterator.next(); + if (pos != currentPos()) fis.seek(pos); + return super.next(); + } + } + } + public class HDFSReverseReader extends ReverseReader { FSDataFastInputStream fis; private LogCodec codec = new LogCodec(resolver) { diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java index a147b0fc357..e9950f2b42c 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java +++ b/solr/core/src/java/org/apache/solr/update/SolrIndexSplitter.java @@ -137,7 +137,8 @@ public class SolrIndexSplitter { // we commit explicitly instead of sending a CommitUpdateCommand through the processor chain // because the sub-shard cores will just ignore such a commit because the update log is not // in active state at this time. - SolrIndexWriter.setCommitData(iw); + //TODO no commitUpdateCommand + SolrIndexWriter.setCommitData(iw, -1); iw.commit(); success = true; } finally { diff --git a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java index 3c0c1a5a0cc..6a264f851dd 100644 --- a/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java +++ b/solr/core/src/java/org/apache/solr/update/SolrIndexWriter.java @@ -61,6 +61,7 @@ public class SolrIndexWriter extends IndexWriter { /** Stored into each Lucene commit to record the * System.currentTimeMillis() when commit was called. */ public static final String COMMIT_TIME_MSEC_KEY = "commitTimeMSec"; + public static final String COMMIT_COMMAND_VERSION = "commitCommandVer"; private final Object CLOSE_LOCK = new Object(); @@ -183,10 +184,11 @@ public class SolrIndexWriter extends IndexWriter { @SuppressForbidden(reason = "Need currentTimeMillis, commit time should be used only for debugging purposes, " + " but currently suspiciously used for replication as well") - public static void setCommitData(IndexWriter iw) { - log.info("Calling setCommitData with IW:" + iw.toString()); + public static void setCommitData(IndexWriter iw, long commitCommandVersion) { + log.info("Calling setCommitData with IW:" + iw.toString() + " commitCommandVersion:"+commitCommandVersion); final Map commitData = new HashMap<>(); commitData.put(COMMIT_TIME_MSEC_KEY, String.valueOf(System.currentTimeMillis())); + commitData.put(COMMIT_COMMAND_VERSION, String.valueOf(commitCommandVersion)); iw.setLiveCommitData(commitData.entrySet()); } diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java index 5037b45c5ef..73328cf8536 100644 --- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java @@ -29,9 +29,11 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.util.BytesRef; @@ -632,6 +634,10 @@ public class TransactionLog implements Closeable { return new LogReader(startingPos); } + public LogReader getSortedReader(long startingPos) { + return new SortedLogReader(startingPos); + } + /** Returns a single threaded reverse reader */ public ReverseReader getReverseReader() throws IOException { return new FSReverseReader(); @@ -715,6 +721,50 @@ public class TransactionLog implements Closeable { } + public class SortedLogReader extends LogReader { + private long startingPos; + private boolean inOrder = true; + private TreeMap versionToPos; + Iterator iterator; + + public SortedLogReader(long startingPos) { + super(startingPos); + this.startingPos = startingPos; + } + + @Override + public Object next() throws IOException, InterruptedException { + if (versionToPos == null) { + versionToPos = new TreeMap<>(); + Object o; + long pos = startingPos; + + long lastVersion = Long.MIN_VALUE; + while ( (o = super.next()) != null) { + List entry = (List) o; + long version = (Long) entry.get(UpdateLog.VERSION_IDX); + version = Math.abs(version); + versionToPos.put(version, pos); + pos = currentPos(); + + if (version < lastVersion) inOrder = false; + lastVersion = version; + } + fis.seek(startingPos); + } + + if (inOrder) { + return super.next(); + } else { + if (iterator == null) iterator = versionToPos.values().iterator(); + if (!iterator.hasNext()) return null; + long pos = iterator.next(); + if (pos != currentPos()) fis.seek(pos); + return super.next(); + } + } + } + public abstract class ReverseReader { /** Returns the next object from the log, or null if none available. diff --git a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java index 9f015717027..b124271d977 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateCommand.java @@ -34,6 +34,7 @@ public abstract class UpdateCommand implements Cloneable { public static int PEER_SYNC = 0x00000004; // update command is a missing update being provided by a peer. public static int IGNORE_AUTOCOMMIT = 0x00000008; // this update should not count toward triggering of autocommits. public static int CLEAR_CACHES = 0x00000010; // clear caches associated with the update log. used when applying reordered DBQ updates when doing an add. + public static int IGNORE_INDEXWRITER = 0x00000020; public UpdateCommand(SolrQueryRequest req) { this.req = req; diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 16eff9ceece..6a5f407daa1 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -27,6 +27,7 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Deque; import java.util.HashMap; import java.util.LinkedHashMap; @@ -618,7 +619,7 @@ public static final int VERSION_IDX = 1; } // only change our caches if we are not buffering - if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0) { + if ((cmd.getFlags() & UpdateCommand.BUFFERING) == 0 && (cmd.getFlags() & UpdateCommand.IGNORE_INDEXWRITER) == 0) { // given that we just did a delete-by-query, we don't know what documents were // affected and hence we must purge our caches. openRealtimeSearcher(); @@ -1095,6 +1096,162 @@ public static final int VERSION_IDX = 1; return cs.submit(replayer, recoveryInfo); } + /** + * Replay current tlog, so all updates will be written to index. + * This is must do task for a append replica become a new leader. + * @return future of this task + */ + public Future recoverFromCurrentLog() { + if (tlog == null) { + return null; + } + map.clear(); + recoveryInfo = new RecoveryInfo(); + tlog.incref(); + + ExecutorCompletionService cs = new ExecutorCompletionService<>(recoveryExecutor); + LogReplayer replayer = new LogReplayer(Collections.singletonList(tlog), false, true); + + versionInfo.blockUpdates(); + try { + state = State.REPLAYING; + } finally { + versionInfo.unblockUpdates(); + } + + return cs.submit(replayer, recoveryInfo); + } + + /** + * Block updates, append a commit at current tlog, + * then copy over buffer updates to new tlog and bring back ulog to active state. + * So any updates which hasn't made it to the index is preserved in the current tlog, + * this also make RTG work + * @param cuc any updates that have version larger than the version of cuc will be copied over + */ + public void copyOverBufferingUpdates(CommitUpdateCommand cuc) { + versionInfo.blockUpdates(); + try { + operationFlags &= ~FLAG_GAP; + state = State.ACTIVE; + copyAndSwitchToNewTlog(cuc); + } finally { + versionInfo.unblockUpdates(); + } + } + + /** + * Block updates, append a commit at current tlog, then copy over updates to a new tlog. + * So any updates which hasn't made it to the index is preserved in the current tlog + * @param cuc any updates that have version larger than the version of cuc will be copied over + */ + public void copyOverOldUpdates(CommitUpdateCommand cuc) { + versionInfo.blockUpdates(); + try { + copyAndSwitchToNewTlog(cuc); + } finally { + versionInfo.unblockUpdates(); + } + } + + protected void copyAndSwitchToNewTlog(CommitUpdateCommand cuc) { + synchronized (this) { + if (tlog == null) return; + preCommit(cuc); + try { + copyOverOldUpdates(cuc.getVersion()); + } finally { + postCommit(cuc); + } + } + } + + /** + * Copy over updates from prevTlog or last tlog (in tlog folder) to a new tlog + * @param commitVersion any updates that have version larger than the commitVersion will be copied over + */ + public void copyOverOldUpdates(long commitVersion) { + TransactionLog oldTlog = prevTlog; + if (oldTlog == null && !logs.isEmpty()) { + oldTlog = logs.getFirst(); + } + if (oldTlog == null || oldTlog.refcount.get() == 0) { + return; + } + + try { + if (oldTlog.endsWithCommit()) { + return; + } + } catch (IOException e) { + log.warn("Exception reading log", e); + return; + } + + SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, + new ModifiableSolrParams()); + TransactionLog.LogReader logReader = oldTlog.getReader(0); + Object o = null; + try { + while ( (o = logReader.next()) != null ) { + try { + List entry = (List)o; + int operationAndFlags = (Integer) entry.get(0); + int oper = operationAndFlags & OPERATION_MASK; + long version = (Long) entry.get(1); + if (Math.abs(version) > commitVersion) { + switch (oper) { + case UpdateLog.UPDATE_INPLACE: + case UpdateLog.ADD: { + SolrInputDocument sdoc = (SolrInputDocument) entry.get(entry.size() - 1); + AddUpdateCommand cmd = new AddUpdateCommand(req); + cmd.solrDoc = sdoc; + cmd.setVersion(version); + cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); + add(cmd); + break; + } + case UpdateLog.DELETE: { + byte[] idBytes = (byte[]) entry.get(2); + DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); + cmd.setIndexedId(new BytesRef(idBytes)); + cmd.setVersion(version); + cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); + delete(cmd); + break; + } + + case UpdateLog.DELETE_BY_QUERY: { + String query = (String) entry.get(2); + DeleteUpdateCommand cmd = new DeleteUpdateCommand(req); + cmd.query = query; + cmd.setVersion(version); + cmd.setFlags(UpdateCommand.IGNORE_AUTOCOMMIT); + deleteByQuery(cmd); + break; + } + + default: + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); + } + } + } catch (ClassCastException e) { + log.warn("Unexpected log entry or corrupt log. Entry=" + o, e); + } + } + // Prev tlog will be closed, so nullify prevMap + if (prevTlog == oldTlog) { + prevMap = null; + } + } catch (IOException e) { + log.error("Exception reading versions from log",e); + } catch (InterruptedException e) { + log.warn("Exception reading log", e); + } finally { + if (logReader != null) logReader.close(); + } + } + protected void ensureLog() { if (tlog == null) { @@ -1482,6 +1639,7 @@ public static final int VERSION_IDX = 1; boolean activeLog; boolean finishing = false; // state where we lock out other updates and finish those updates that snuck in before we locked boolean debug = loglog.isDebugEnabled(); + boolean inSortedOrder; public LogReplayer(List translogs, boolean activeLog) { this.translogs = new LinkedList<>(); @@ -1489,6 +1647,11 @@ public static final int VERSION_IDX = 1; this.activeLog = activeLog; } + public LogReplayer(List translogs, boolean activeLog, boolean inSortedOrder) { + this(translogs, activeLog); + this.inSortedOrder = inSortedOrder; + } + private SolrQueryRequest req; @@ -1554,7 +1717,11 @@ public static final int VERSION_IDX = 1; try { loglog.warn("Starting log replay " + translog + " active=" + activeLog + " starting pos=" + recoveryInfo.positionOfStart); long lastStatusTime = System.nanoTime(); - tlogReader = translog.getReader(recoveryInfo.positionOfStart); + if (inSortedOrder) { + tlogReader = translog.getSortedReader(recoveryInfo.positionOfStart); + } else { + tlogReader = translog.getReader(recoveryInfo.positionOfStart); + } // NOTE: we don't currently handle a core reload during recovery. This would cause the core // to change underneath us. diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index ec093cf5dfe..08ede724269 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -279,6 +279,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // this is set to true in the constructor if the next processors in the chain // are custom and may modify the SolrInputDocument racing with its serialization for replication private final boolean cloneRequiredOnLeader; + private final boolean onlyLeaderIndexes; public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { this(req, rsp, new AtomicUpdateDocumentMerger(req), next); @@ -324,8 +325,12 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (cloudDesc != null) { collection = cloudDesc.getCollectionName(); + ClusterState cstate = zkController.getClusterState(); + DocCollection coll = cstate.getCollection(collection); + onlyLeaderIndexes = coll.getRealtimeReplicas() == 1; } else { collection = null; + onlyLeaderIndexes = false; } boolean shouldClone = false; @@ -1186,6 +1191,9 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { checkDeleteByQueries = true; } } + if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } } } @@ -1692,6 +1700,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } + if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } + doLocalDelete(cmd); } } @@ -1845,6 +1857,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return true; } } + + if (onlyLeaderIndexes && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } } } @@ -1876,7 +1892,27 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) { - doLocalCommit(cmd); + if (onlyLeaderIndexes) { + try { + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( + collection, cloudDesc.getShardId()); + isLeader = leaderReplica.getName().equals( + req.getCore().getCoreDescriptor().getCloudDescriptor() + .getCoreNodeName()); + if (isLeader) { + long commitVersion = vinfo.getNewClock(); + cmd.setVersion(commitVersion); + doLocalCommit(cmd); + } else { + assert TestInjection.waitForInSyncWithLeader(req.getCore(), + zkController, collection, cloudDesc.getShardId()); + } + } catch (InterruptedException e) { + throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "Exception finding leader for shard " + cloudDesc.getShardId(), e); + } + } else { + doLocalCommit(cmd); + } } else { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); if (!req.getParams().getBool(COMMIT_END_POINT, false)) { diff --git a/solr/core/src/java/org/apache/solr/util/TestInjection.java b/solr/core/src/java/org/apache/solr/util/TestInjection.java index 5e4dc75b11c..97291a1f6a7 100644 --- a/solr/core/src/java/org/apache/solr/util/TestInjection.java +++ b/solr/core/src/java/org/apache/solr/util/TestInjection.java @@ -28,14 +28,28 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.NonExistentCoreException; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.Pair; +import org.apache.solr.common.util.SuppressForbidden; import org.apache.solr.core.CoreContainer; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.ReplicationHandler; +import org.apache.solr.update.SolrIndexWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS; +import static org.apache.solr.handler.ReplicationHandler.COMMAND; + /** * Allows random faults to be injected in running code during test runs. @@ -118,6 +132,8 @@ public class TestInjection { public static int randomDelayMaxInCoreCreationInSec = 10; public static String splitFailureBeforeReplicaCreation = null; + + public static String waitForReplicasInSync = "true:60"; private static Set timers = Collections.synchronizedSet(new HashSet()); @@ -343,6 +359,44 @@ public class TestInjection { return true; } + + @SuppressForbidden(reason = "Need currentTimeMillis, because COMMIT_TIME_MSEC_KEY use currentTimeMillis as value") + public static boolean waitForInSyncWithLeader(SolrCore core, ZkController zkController, String collection, String shardId) throws InterruptedException { + if (waitForReplicasInSync == null) return true; + + Pair pair = parseValue(waitForReplicasInSync); + boolean enabled = pair.first(); + if (!enabled) return true; + long t = System.currentTimeMillis() - 100; + try { + for (int i = 0; i < pair.second(); i++) { + if (core.isClosed()) return true; + Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry( + collection, shardId); + try (HttpSolrClient leaderClient = new HttpSolrClient.Builder(leaderReplica.getCoreUrl()).build()) { + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set(CommonParams.QT, ReplicationHandler.PATH); + params.set(COMMAND, CMD_DETAILS); + + NamedList response = leaderClient.request(new QueryRequest(params)); + long leaderVersion = (long) ((NamedList)response.get("details")).get("indexVersion"); + + String localVersion = core.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); + if (localVersion == null && leaderVersion == 0 && !core.getUpdateHandler().getUpdateLog().hasUncommittedChanges()) return true; + if (localVersion != null && Long.parseLong(localVersion) == leaderVersion && (leaderVersion >= t || i >= 6)) { + return true; + } else { + Thread.sleep(500); + } + } + } + + } catch (Exception e) { + log.error("Exception when wait for replicas in sync with master"); + } + + return false; + } private static Pair parseValue(String raw) { Matcher m = ENABLED_PERCENT.matcher(raw); diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml new file mode 100644 index 00000000000..31802f91bb2 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/schema.xml @@ -0,0 +1,31 @@ + + + + + + + + + + + id + + + + + diff --git a/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml new file mode 100644 index 00000000000..8da7d2847e9 --- /dev/null +++ b/solr/core/src/test-files/solr/configsets/cloud-minimal-inplace-updates/conf/solrconfig.xml @@ -0,0 +1,48 @@ + + + + + + + + + ${solr.data.dir:} + + + + + ${tests.luceneMatchVersion:LATEST} + + + + ${solr.commitwithin.softcommit:true} + + + + + + + explicit + true + text + + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java index 582c8b402ea..5eb4b3b35d9 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZk2Test.java @@ -54,11 +54,17 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase { private static final String SHARD2 = "shard2"; private static final String SHARD1 = "shard1"; private static final String ONE_NODE_COLLECTION = "onenodecollection"; + private final boolean onlyLeaderIndexes = random().nextBoolean(); public BasicDistributedZk2Test() { super(); sliceCount = 2; } + + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } @Test @ShardsFixed(num = 4) diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 25c483bb9e8..d1dbe9c4319 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -87,6 +87,8 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final String DEFAULT_COLLECTION = "collection1"; + + private final boolean onlyLeaderIndexes = random().nextBoolean(); String t1="a_t"; String i1="a_i1"; String tlong = "other_tl1"; @@ -114,7 +116,12 @@ public class BasicDistributedZkTest extends AbstractFullDistribZkTestBase { pending = new HashSet<>(); } - + + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } + @Override protected void setDistributedParams(ModifiableSolrParams params) { diff --git a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java index 4e6122e8fe0..628884c3cda 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ChaosMonkeyNothingIsSafeTest.java @@ -55,6 +55,8 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase private static final Integer RUN_LENGTH = Integer.parseInt(System.getProperty("solr.tests.cloud.cm.runlength", "-1")); + private final boolean onlyLeaderIndexes = random().nextBoolean(); + @BeforeClass public static void beforeSuperClass() { schemaString = "schema15.xml"; // we need a string id @@ -109,6 +111,11 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase clientSoTimeout = 5000; } + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } + @Test public void test() throws Exception { cloudClient.setSoTimeout(clientSoTimeout); diff --git a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java index e9e8907affa..8904ea827be 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ForceLeaderTest.java @@ -55,6 +55,12 @@ import static org.apache.solr.common.cloud.ZkStateReader.CORE_NAME_PROP; public class ForceLeaderTest extends HttpPartitionTest { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final boolean onlyLeaderIndexes = random().nextBoolean(); + + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } @Test @Override diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 5ae4c17c9de..01002cfee44 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -76,12 +76,19 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { // give plenty of time for replicas to recover when running in slow Jenkins test envs protected static final int maxWaitSecsToSeeAllActive = 90; + private final boolean onlyLeaderIndexes = random().nextBoolean(); + public HttpPartitionTest() { super(); sliceCount = 2; fixShardCount(3); } + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } + /** * We need to turn off directUpdatesToLeadersOnly due to SOLR-9512 */ diff --git a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java index fd122ad6a08..457b9d9ef85 100644 --- a/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/LeaderInitiatedRecoveryOnCommitTest.java @@ -37,12 +37,19 @@ public class LeaderInitiatedRecoveryOnCommitTest extends BasicDistributedZkTest private static final long sleepMsBeforeHealPartition = 2000L; + private final boolean onlyLeaderIndexes = random().nextBoolean(); + public LeaderInitiatedRecoveryOnCommitTest() { super(); sliceCount = 1; fixShardCount(4); } + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } + @Override @Test public void test() throws Exception { diff --git a/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java new file mode 100644 index 00000000000..a4e8d6f2bc4 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/OnlyLeaderIndexesTest.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; + +import org.apache.lucene.index.IndexWriter; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.apache.solr.common.cloud.ZkNodeProps; +import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.core.SolrCore; +import org.apache.solr.update.DirectUpdateHandler2; +import org.apache.solr.update.SolrIndexWriter; +import org.apache.solr.update.UpdateHandler; +import org.apache.solr.update.UpdateLog; +import org.apache.solr.util.RefCounted; +import org.junit.BeforeClass; +import org.junit.Test; + +public class OnlyLeaderIndexesTest extends SolrCloudTestCase { + private static final String COLLECTION = "collection1"; + + @BeforeClass + public static void setupCluster() throws Exception { + System.setProperty("solr.directoryFactory", "solr.StandardDirectoryFactory"); + System.setProperty("solr.ulog.numRecordsToKeep", "1000"); + + configureCluster(3) + .addConfig("config", TEST_PATH().resolve("configsets") + .resolve("cloud-minimal-inplace-updates").resolve("conf")) + .configure(); + + CollectionAdminRequest + .createCollection(COLLECTION, "config", 1, 3) + .setRealtimeReplicas(1) + .setMaxShardsPerNode(1) + .process(cluster.getSolrClient()); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + } + + @Test + public void test() throws Exception { + basicTest(); + recoveryTest(); + dbiTest(); + basicLeaderElectionTest(); + outOfOrderDBQWithInPlaceUpdatesTest(); + } + + public void basicTest() throws Exception { + CloudSolrClient cloudClient = cluster.getSolrClient(); + new UpdateRequest() + .add(sdoc("id", "1")) + .add(sdoc("id", "2")) + .add(sdoc("id", "3")) + .add(sdoc("id", "4")) + .process(cloudClient, COLLECTION); + + { + UpdateHandler updateHandler = getSolrCore(true).get(0).getUpdateHandler(); + RefCounted iwRef = updateHandler.getSolrCoreState().getIndexWriter(null); + assertTrue("IndexWriter at leader must see updates ", iwRef.get().hasUncommittedChanges()); + iwRef.decref(); + } + + for (SolrCore solrCore : getSolrCore(false)) { + RefCounted iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null); + assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges()); + iwRef.decref(); + } + + checkRTG(1, 4, cluster.getJettySolrRunners()); + + new UpdateRequest() + .deleteById("1") + .deleteByQuery("id:2") + .process(cloudClient, COLLECTION); + + // The DBQ is not processed at replicas, so we still can get doc2 and other docs by RTG + checkRTG(2,4, getSolrRunner(false)); + + new UpdateRequest() + .commit(cloudClient, COLLECTION); + + checkShardConsistency(2, 1); + + // Update log roll over + for (SolrCore solrCore : getSolrCore(false)) { + UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog(); + assertFalse(updateLog.hasUncommittedChanges()); + } + + // UpdateLog copy over old updates + for (int i = 15; i <= 150; i++) { + cloudClient.add(COLLECTION, sdoc("id",String.valueOf(i))); + if (random().nextInt(100) < 15 & i != 150) { + cloudClient.commit(COLLECTION); + } + } + checkRTG(120,150, cluster.getJettySolrRunners()); + waitForReplicasCatchUp(20); + } + + public void recoveryTest() throws Exception { + CloudSolrClient cloudClient = cluster.getSolrClient(); + new UpdateRequest() + .deleteByQuery("*:*") + .commit(cluster.getSolrClient(), COLLECTION); + new UpdateRequest() + .add(sdoc("id", "3")) + .add(sdoc("id", "4")) + .commit(cloudClient, COLLECTION); + // Replica recovery + new UpdateRequest() + .add(sdoc("id", "5")) + .process(cloudClient, COLLECTION); + JettySolrRunner solrRunner = getSolrRunner(false).get(0); + ChaosMonkey.stop(solrRunner); + new UpdateRequest() + .add(sdoc("id", "6")) + .process(cloudClient, COLLECTION); + ChaosMonkey.start(solrRunner); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + // We skip peerSync, so replica will always trigger commit on leader + checkShardConsistency(4, 20); + + // LTR can be kicked off, so waiting for replicas recovery + new UpdateRequest() + .add(sdoc("id", "7")) + .commit(cloudClient, COLLECTION); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + checkShardConsistency(5, 20); + + // More Replica recovery testing + new UpdateRequest() + .add(sdoc("id", "8")) + .process(cloudClient, COLLECTION); + checkRTG(3,8, cluster.getJettySolrRunners()); + DirectUpdateHandler2.commitOnClose = false; + ChaosMonkey.stop(solrRunner); + DirectUpdateHandler2.commitOnClose = true; + ChaosMonkey.start(solrRunner); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + checkRTG(3,8, cluster.getJettySolrRunners()); + checkShardConsistency(6, 20); + + // Test replica recovery apply buffer updates + Semaphore waitingForBufferUpdates = new Semaphore(0); + Semaphore waitingForReplay = new Semaphore(0); + RecoveryStrategy.testing_beforeReplayBufferingUpdates = () -> { + try { + waitingForReplay.release(); + waitingForBufferUpdates.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + ChaosMonkey.stop(solrRunner); + ChaosMonkey.start(solrRunner); + waitingForReplay.acquire(); + new UpdateRequest() + .add(sdoc("id", "9")) + .add(sdoc("id", "10")) + .process(cloudClient, COLLECTION); + waitingForBufferUpdates.release(); + RecoveryStrategy.testing_beforeReplayBufferingUpdates = null; + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + checkRTG(3,10, cluster.getJettySolrRunners()); + checkShardConsistency(6, 20); + for (SolrCore solrCore : getSolrCore(false)) { + RefCounted iwRef = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null); + assertFalse("IndexWriter at replicas must not see updates ", iwRef.get().hasUncommittedChanges()); + iwRef.decref(); + } + } + + public void dbiTest() throws Exception{ + CloudSolrClient cloudClient = cluster.getSolrClient(); + new UpdateRequest() + .deleteByQuery("*:*") + .commit(cluster.getSolrClient(), COLLECTION); + new UpdateRequest() + .add(sdoc("id", "1")) + .commit(cloudClient, COLLECTION); + checkShardConsistency(1, 1); + new UpdateRequest() + .deleteById("1") + .process(cloudClient, COLLECTION); + try { + checkRTG(1, 1, cluster.getJettySolrRunners()); + } catch (AssertionError e) { + return; + } + fail("Doc1 is deleted but it's still exist"); + } + + public void basicLeaderElectionTest() throws Exception { + CloudSolrClient cloudClient = cluster.getSolrClient(); + new UpdateRequest() + .deleteByQuery("*:*") + .commit(cluster.getSolrClient(), COLLECTION); + new UpdateRequest() + .add(sdoc("id", "1")) + .add(sdoc("id", "2")) + .process(cloudClient, COLLECTION); + String oldLeader = getLeader(); + JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); + ChaosMonkey.kill(oldLeaderJetty); + for (int i = 0; i < 60; i++) { // wait till leader is changed + if (!oldLeader.equals(getLeader())) { + break; + } + Thread.sleep(100); + } + new UpdateRequest() + .add(sdoc("id", "3")) + .add(sdoc("id", "4")) + .process(cloudClient, COLLECTION); + ChaosMonkey.start(oldLeaderJetty); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 60); + checkRTG(1,4, cluster.getJettySolrRunners()); + new UpdateRequest() + .commit(cloudClient, COLLECTION); + checkShardConsistency(4,1); + } + + private String getLeader() throws InterruptedException { + ZkNodeProps props = cluster.getSolrClient().getZkStateReader().getLeaderRetry("collection1", "shard1", 30000); + return props.getStr(ZkStateReader.NODE_NAME_PROP); + } + + public void outOfOrderDBQWithInPlaceUpdatesTest() throws Exception { + new UpdateRequest() + .deleteByQuery("*:*") + .commit(cluster.getSolrClient(), COLLECTION); + List updates = new ArrayList<>(); + updates.add(simulatedUpdateRequest(null, "id", 1, "title_s", "title0_new", "inplace_updatable_int", 5, "_version_", Long.MAX_VALUE-100)); // full update + updates.add(simulatedDBQ("inplace_updatable_int:5", Long.MAX_VALUE-98)); + updates.add(simulatedUpdateRequest(Long.MAX_VALUE-100, "id", 1, "inplace_updatable_int", 6, "_version_", Long.MAX_VALUE-99)); + for (JettySolrRunner solrRunner: getSolrRunner(false)) { + try (SolrClient client = solrRunner.newClient()) { + for (UpdateRequest up : updates) { + up.process(client, COLLECTION); + } + } + } + JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); + ChaosMonkey.kill(oldLeaderJetty); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + ChaosMonkey.start(oldLeaderJetty); + AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), + false, true, 30); + new UpdateRequest() + .add(sdoc("id", "2")) + .commit(cluster.getSolrClient(), COLLECTION); + checkShardConsistency(2,20); + SolrDocument doc = cluster.getSolrClient().getById(COLLECTION,"1"); + assertNotNull(doc.get("title_s")); + } + + private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException { + SolrInputDocument doc = sdoc(fields); + + // get baseUrl of the leader + String baseUrl = getBaseUrl(); + + UpdateRequest ur = new UpdateRequest(); + ur.add(doc); + ur.setParam("update.distrib", "FROMLEADER"); + if (prevVersion != null) { + ur.setParam("distrib.inplace.prevversion", String.valueOf(prevVersion)); + ur.setParam("distrib.inplace.update", "true"); + } + ur.setParam("distrib.from", baseUrl); + return ur; + } + + private UpdateRequest simulatedDBQ(String query, long version) throws SolrServerException, IOException { + String baseUrl = getBaseUrl(); + + UpdateRequest ur = new UpdateRequest(); + ur.deleteByQuery(query); + ur.setParam("_version_", ""+version); + ur.setParam("update.distrib", "FROMLEADER"); + ur.setParam("distrib.from", baseUrl); + return ur; + } + + private String getBaseUrl() { + DocCollection collection = cluster.getSolrClient().getZkStateReader().getClusterState().getCollection(COLLECTION); + Slice slice = collection.getSlice("shard1"); + return slice.getLeader().getCoreUrl(); + } + + private void checkRTG(int from, int to, List solrRunners) throws Exception{ + + for (JettySolrRunner solrRunner: solrRunners) { + try (SolrClient client = solrRunner.newClient()) { + for (int i = from; i <= to; i++) { + SolrQuery query = new SolrQuery("*:*"); + query.set("distrib", false); + query.setRequestHandler("/get"); + query.set("id",i); + QueryResponse res = client.query(COLLECTION, query); + assertNotNull("Can not find doc "+ i + " in " + solrRunner.getBaseUrl(),res.getResponse().get("doc")); + } + } + } + + } + + private void checkShardConsistency(int expected, int numTry) throws Exception{ + + for (int i = 0; i < numTry; i++) { + boolean inSync = true; + for (JettySolrRunner solrRunner: cluster.getJettySolrRunners()) { + try (SolrClient client = solrRunner.newClient()) { + SolrQuery query = new SolrQuery("*:*"); + query.set("distrib", false); + long results = client.query(COLLECTION, query).getResults().getNumFound(); + if (expected != results) { + inSync = false; + Thread.sleep(500); + break; + } + } + } + if (inSync) return; + } + + fail("Some replicas are not in sync with leader"); + } + + private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException { + String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); + if (leaderTimeCommit == null) return; + for (int i = 0; i < numTry; i++) { + boolean inSync = true; + for (SolrCore solrCore : getSolrCore(false)) { + String replicateTimeCommit = solrCore.getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); + if (!leaderTimeCommit.equals(replicateTimeCommit)) { + inSync = false; + Thread.sleep(500); + break; + } + } + if (inSync) return; + } + + fail("Some replicas are not in sync with leader"); + + } + + private List getSolrCore(boolean isLeader) { + List rs = new ArrayList<>(); + + CloudSolrClient cloudClient = cluster.getSolrClient(); + DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION); + + for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { + if (solrRunner.getCoreContainer() == null) continue; + for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { + CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor(); + Slice slice = docCollection.getSlice(cloudDescriptor.getShardId()); + Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName()); + if (slice.getLeader() == replica && isLeader) { + rs.add(solrCore); + } else if (slice.getLeader() != replica && !isLeader) { + rs.add(solrCore); + } + } + } + return rs; + } + + private List getSolrRunner(boolean isLeader) { + List rs = new ArrayList<>(); + + CloudSolrClient cloudClient = cluster.getSolrClient(); + DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(COLLECTION); + + for (JettySolrRunner solrRunner : cluster.getJettySolrRunners()) { + if (solrRunner.getCoreContainer() == null) continue; + for (SolrCore solrCore : solrRunner.getCoreContainer().getCores()) { + CloudDescriptor cloudDescriptor = solrCore.getCoreDescriptor().getCloudDescriptor(); + Slice slice = docCollection.getSlice(cloudDescriptor.getShardId()); + Replica replica = docCollection.getReplica(cloudDescriptor.getCoreNodeName()); + if (slice.getLeader() == replica && isLeader) { + rs.add(solrRunner); + } else if (slice.getLeader() != replica && !isLeader) { + rs.add(solrRunner); + } + } + } + return rs; + } + +} \ No newline at end of file diff --git a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java index c987c90571e..a8e14bf5465 100644 --- a/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/RecoveryAfterSoftCommitTest.java @@ -33,12 +33,17 @@ import org.junit.Test; @SolrTestCaseJ4.SuppressSSL public class RecoveryAfterSoftCommitTest extends AbstractFullDistribZkTestBase { private static final int MAX_BUFFERED_DOCS = 2, ULOG_NUM_RECORDS_TO_KEEP = 2; - + private final boolean onlyLeaderIndexes = random().nextBoolean(); public RecoveryAfterSoftCommitTest() { sliceCount = 1; fixShardCount(2); } + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1: -1; + } + @BeforeClass public static void beforeTests() { System.setProperty("solr.tests.maxBufferedDocs", String.valueOf(MAX_BUFFERED_DOCS)); diff --git a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java index 72f06943cf7..bf9b5e014c8 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ShardSplitTest.java @@ -86,6 +86,12 @@ public class ShardSplitTest extends BasicDistributedZkTest { useFactory(null); } + //TODO for now, onlyLeaderIndexes do not work with ShardSplitTest + @Override + protected int getRealtimeReplicas() { + return -1; + } + @Test public void test() throws Exception { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java index 1af09f4e817..b592861fb30 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestCloudRecovery.java @@ -52,6 +52,7 @@ import org.junit.Test; public class TestCloudRecovery extends SolrCloudTestCase { private static final String COLLECTION = "collection1"; + private static boolean onlyLeaderIndexes; @BeforeClass public static void setupCluster() throws Exception { @@ -63,8 +64,10 @@ public class TestCloudRecovery extends SolrCloudTestCase { .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) .configure(); + onlyLeaderIndexes = random().nextBoolean(); CollectionAdminRequest .createCollection(COLLECTION, "config", 2, 2) + .setRealtimeReplicas(onlyLeaderIndexes? 1: -1) .setMaxShardsPerNode(2) .process(cluster.getSolrClient()); AbstractDistribZkTestBase.waitForRecoveriesToFinish(COLLECTION, cluster.getSolrClient().getZkStateReader(), @@ -107,7 +110,12 @@ public class TestCloudRecovery extends SolrCloudTestCase { resp = cloudClient.query(COLLECTION, params); assertEquals(4, resp.getResults().getNumFound()); // Make sure all nodes is recover from tlog - assertEquals(4, countReplayLog.get()); + if (onlyLeaderIndexes) { + // Leader election can be kicked off, so 2 append replicas will replay its tlog before becoming new leader + assertTrue( countReplayLog.get() >=2); + } else { + assertEquals(4, countReplayLog.get()); + } // check metrics int replicationCount = 0; @@ -127,7 +135,11 @@ public class TestCloudRecovery extends SolrCloudTestCase { skippedCount += skipped.getCount(); } } - assertEquals(2, replicationCount); + if (onlyLeaderIndexes) { + assertTrue(replicationCount >= 2); + } else { + assertEquals(2, replicationCount); + } } @Test diff --git a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java index 8905077e4c8..8fbfee391e0 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestCollectionAPI.java @@ -60,7 +60,10 @@ public class TestCollectionAPI extends ReplicaPropertiesBase { @ShardsFixed(num = 2) public void test() throws Exception { try (CloudSolrClient client = createCloudClient(null)) { - createCollection(null, COLLECTION_NAME, 2, 2, 2, client, null, "conf1"); + CollectionAdminRequest.Create req = CollectionAdminRequest.createCollection(COLLECTION_NAME, "conf1",2,2); + req.setRealtimeReplicas(1); + req.setMaxShardsPerNode(2); + client.request(req); createCollection(null, COLLECTION_NAME1, 1, 1, 1, client, null, "conf1"); } @@ -170,6 +173,7 @@ public class TestCollectionAPI extends ReplicaPropertiesBase { Map collection = (Map) collections.get(COLLECTION_NAME); assertNotNull(collection); assertEquals("conf1", collection.get("configName")); + assertEquals("1", collection.get("realtimeReplicas")); } } diff --git a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsBasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsBasicDistributedZkTest.java index 97be8233324..1bba523c794 100644 --- a/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsBasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/hdfs/HdfsBasicDistributedZkTest.java @@ -42,7 +42,12 @@ public class HdfsBasicDistributedZkTest extends BasicDistributedZkTest { System.setProperty("tests.hdfs.numdatanodes", "1"); dfsCluster = HdfsTestUtil.setupClass(createTempDir().toFile().getAbsolutePath()); } - + + @Override + protected int getRealtimeReplicas() { + return -1; + } + @AfterClass public static void teardownClass() throws Exception { HdfsTestUtil.teardownClass(dfsCluster); diff --git a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java index bb0ab9adeea..7a4fa860e76 100644 --- a/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java +++ b/solr/core/src/test/org/apache/solr/update/TestInPlaceUpdatesDistrib.java @@ -71,6 +71,7 @@ import org.slf4j.LoggerFactory; @Slow public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private final boolean onlyLeaderIndexes = random().nextBoolean(); @BeforeClass public static void beforeSuperClass() throws Exception { @@ -108,7 +109,12 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { iw.decref(); } } - + + @Override + protected int getRealtimeReplicas() { + return onlyLeaderIndexes? 1 : -1; + } + @After public void after() { System.clearProperty("solr.tests.intClassName"); @@ -265,6 +271,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { } private void reorderedDBQIndividualReplicaTest() throws Exception { + if (onlyLeaderIndexes) { + log.info("RTG with DBQs are not working in append replicas"); + return; + } clearIndex(); commit(); @@ -595,7 +605,6 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { } private void outOfOrderUpdatesIndividualReplicaTest() throws Exception { - clearIndex(); commit(); @@ -741,6 +750,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { DV(id=x, val=5, ver=3) */ private void reorderedDBQsResurrectionTest() throws Exception { + if (onlyLeaderIndexes) { + log.info("RTG with DBQs are not working in append replicas"); + return; + } clearIndex(); commit(); @@ -1016,7 +1029,7 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { String baseUrl = getBaseUrl(""+id); UpdateRequest ur = new UpdateRequest(); - if (random().nextBoolean()) { + if (random().nextBoolean() || onlyLeaderIndexes) { ur.deleteById(""+id); } else { ur.deleteByQuery("id:"+id); @@ -1138,6 +1151,10 @@ public class TestInPlaceUpdatesDistrib extends AbstractFullDistribZkTestBase { * dbq("inp:14",version=4) */ private void reorderedDBQsUsingUpdatedValueFromADroppedUpdate() throws Exception { + if (onlyLeaderIndexes) { + log.info("RTG with DBQs are not working in append replicas"); + return; + } clearIndex(); commit(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java index 94750c0acae..8beb6ed3c12 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/request/CollectionAdminRequest.java @@ -366,6 +366,7 @@ public abstract class CollectionAdminRequest private Properties properties; protected Boolean autoAddReplicas; + protected Integer realtimeReplicas; protected Integer stateFormat; private String[] rule , snitch; @@ -407,6 +408,7 @@ public abstract class CollectionAdminRequest public Create setNumShards(Integer numShards) {this.numShards = numShards; return this; } public Create setMaxShardsPerNode(Integer numShards) { this.maxShardsPerNode = numShards; return this; } public Create setAutoAddReplicas(boolean autoAddReplicas) { this.autoAddReplicas = autoAddReplicas; return this; } + public Create setRealtimeReplicas(Integer realtimeReplicas) { this.realtimeReplicas = realtimeReplicas; return this;} @Deprecated public Create setReplicationFactor(Integer repl) { this.replicationFactor = repl; return this; } public Create setStateFormat(Integer stateFormat) { this.stateFormat = stateFormat; return this; } @@ -421,6 +423,7 @@ public abstract class CollectionAdminRequest public Integer getMaxShardsPerNode() { return maxShardsPerNode; } public Integer getReplicationFactor() { return replicationFactor; } public Boolean getAutoAddReplicas() { return autoAddReplicas; } + public Integer getRealtimeReplicas() { return realtimeReplicas; } public Integer getStateFormat() { return stateFormat; } /** @@ -507,6 +510,9 @@ public abstract class CollectionAdminRequest if (autoAddReplicas != null) { params.set(ZkStateReader.AUTO_ADD_REPLICAS, autoAddReplicas); } + if (realtimeReplicas != null) { + params.set(ZkStateReader.REALTIME_REPLICAS, realtimeReplicas); + } if(properties != null) { addProperties(params, properties); } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java index 179b9d54121..bf0f04ff579 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java @@ -33,6 +33,7 @@ import org.noggit.JSONWriter; import static org.apache.solr.common.cloud.ZkStateReader.AUTO_ADD_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE; +import static org.apache.solr.common.cloud.ZkStateReader.REALTIME_REPLICAS; import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR; /** @@ -59,6 +60,7 @@ public class DocCollection extends ZkNodeProps implements Iterable { private final Integer replicationFactor; private final Integer maxShardsPerNode; private final Boolean autoAddReplicas; + private final Integer realtimeReplicas; public DocCollection(String name, Map slices, Map props, DocRouter router) { @@ -84,6 +86,11 @@ public class DocCollection extends ZkNodeProps implements Iterable { this.maxShardsPerNode = (Integer) verifyProp(props, MAX_SHARDS_PER_NODE); Boolean autoAddReplicas = (Boolean) verifyProp(props, AUTO_ADD_REPLICAS); this.autoAddReplicas = autoAddReplicas == null ? false : autoAddReplicas; + Integer realtimeReplicas = (Integer) verifyProp(props, REALTIME_REPLICAS); + this.realtimeReplicas = realtimeReplicas == null ? -1 : realtimeReplicas; + if (this.realtimeReplicas != -1 && this.realtimeReplicas != 1) { + throw new SolrException(ErrorCode.SERVER_ERROR, "Invalid realtimeReplicas must be 1 or -1, found:" + this.realtimeReplicas); + } verifyProp(props, RULE); verifyProp(props, SNITCH); Iterator> iter = slices.entrySet().iterator(); @@ -126,6 +133,7 @@ public class DocCollection extends ZkNodeProps implements Iterable { switch (propName) { case MAX_SHARDS_PER_NODE: case REPLICATION_FACTOR: + case REALTIME_REPLICAS: return Integer.parseInt(o.toString()); case AUTO_ADD_REPLICAS: return Boolean.parseBoolean(o.toString()); @@ -226,6 +234,10 @@ public class DocCollection extends ZkNodeProps implements Iterable { return maxShardsPerNode; } + public int getRealtimeReplicas() { + return realtimeReplicas; + } + public String getZNode(){ return znode; } diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index fea59780f8c..51b4b59d806 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -96,6 +96,7 @@ public class ZkStateReader implements Closeable { public static final String MAX_SHARDS_PER_NODE = "maxShardsPerNode"; public static final String AUTO_ADD_REPLICAS = "autoAddReplicas"; public static final String MAX_CORES_PER_NODE = "maxCoresPerNode"; + public static final String REALTIME_REPLICAS = "realtimeReplicas"; public static final String ROLES = "/roles.json"; diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java index 04eb722d945..ade1c699af0 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java @@ -272,6 +272,10 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes chaosMonkey = new ChaosMonkey(zkServer, zkStateReader, DEFAULT_COLLECTION, shardToJetty, shardToLeaderJetty); } + + protected int getRealtimeReplicas() { + return -1; + } protected CloudSolrClient createCloudClient(String defaultCollection) { CloudSolrClient client = getCloudSolrClient(zkServer.getZkAddress(), random().nextBoolean()); @@ -383,7 +387,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes Utils.toJSON(Utils.makeMap(Overseer.QUEUE_OPERATION, CollectionParams.CollectionAction.CREATE.toLower(), "name", DEFAULT_COLLECTION, "numShards", String.valueOf(sliceCount), - DocCollection.STATE_FORMAT, getStateFormat()))); + DocCollection.STATE_FORMAT, getStateFormat(), + ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()))); zkClient.close(); } @@ -1619,7 +1624,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes NUM_SLICES, numShards, ZkStateReader.REPLICATION_FACTOR, replicationFactor, CREATE_NODE_SET, createNodeSetStr, - ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode), + ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode, + ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()), client); } @@ -1631,7 +1637,8 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes NUM_SLICES, numShards, ZkStateReader.REPLICATION_FACTOR, replicationFactor, CREATE_NODE_SET, createNodeSetStr, - ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode), + ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode, + ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas()), client, configName); } @@ -1814,6 +1821,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes Map props = makeMap( ZkStateReader.REPLICATION_FACTOR, replicationFactor, ZkStateReader.MAX_SHARDS_PER_NODE, maxShardsPerNode, + ZkStateReader.REALTIME_REPLICAS, getRealtimeReplicas(), NUM_SLICES, numShards); Map> collectionInfos = new HashMap<>(); createCollection(collectionInfos, collName, props, client);