diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 5c596887417..2f5281309e4 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -128,6 +128,7 @@ New Features SOLR-3080: Remove shard info from zookeeper when SolrCore is explicitly unloaded. (yonik, Mark Miller, siren) SOLR-3437: Recovery issues a spurious commit to the cluster. (Trym R. Møller via Mark Miller) + SOLR-2822: Skip update processors already run on other nodes (hossman) * SOLR-1566: Transforming documents in the ResponseWriters. This will allow for more complex results in responses and open the door for function queries diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index 6f119880600..abff7ba9344 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -55,6 +55,9 @@ import org.apache.solr.update.processor.UpdateRequestProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; + /** @lucene.experimental */ public class PeerSync { public static Logger log = LoggerFactory.getLogger(PeerSync.class); @@ -431,7 +434,7 @@ public class PeerSync { } ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(DistributedUpdateProcessor.SEEN_LEADER, true); + params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString()); // params.set("peersync",true); // debugging SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params); SolrQueryResponse rsp = new SolrQueryResponse(); @@ -563,4 +566,4 @@ public class PeerSync { } -} \ No newline at end of file +} diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index c4838b614a6..452201ba952 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -47,6 +47,10 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER; + + /** @lucene.experimental */ public class UpdateLog implements PluginInfoInitialized { public static String LOG_FILENAME_PATTERN = "%s.%019d"; @@ -982,7 +986,7 @@ public class UpdateLog implements PluginInfoInitialized { @Override public void run() { ModifiableSolrParams params = new ModifiableSolrParams(); - params.set(DistributedUpdateProcessor.SEEN_LEADER, true); + params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString()); req = new LocalSolrQueryRequest(uhandler.core, params); rsp = new SolrQueryResponse(); SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 9a4ca0ab29c..c10be7adae7 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -65,14 +65,37 @@ import org.apache.solr.update.VersionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + // NOT mt-safe... create a new processor for each add thread // TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for public class DistributedUpdateProcessor extends UpdateRequestProcessor { public final static Logger log = LoggerFactory.getLogger(DistributedUpdateProcessor.class); - public static final String SEEN_LEADER = "leader"; + /** + * Values this processor supports for the DISTRIB_UPDATE_PARAM. + * This is an implementation detail exposed solely for tests. + * + * @see DistributingUpdateProcessorFactory#DISTRIB_UPDATE_PARAM + */ + public static enum DistribPhase { + NONE, TOLEADER, FROMLEADER; + + public static DistribPhase parseParam(final String param) { + if (param == null || param.trim().isEmpty()) { + return NONE; + } + try { + return valueOf(param); + } catch (IllegalArgumentException e) { + throw new SolrException + (SolrException.ErrorCode.BAD_REQUEST, "Illegal value for " + + DISTRIB_UPDATE_PARAM + ": " + param, e); + } + } + } + public static final String COMMIT_END_POINT = "commit_end_point"; - public static final String DELETE_BY_QUERY_LEVEL = "dbq_level"; private final SolrQueryRequest req; private final SolrQueryResponse rsp; @@ -166,7 +189,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { String coreNodeName = zkController.getNodeName() + "_" + coreName; isLeader = coreNodeName.equals(leaderNodeName); - if (req.getParams().getBool(SEEN_LEADER, false)) { + DistribPhase phase = + DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + + if (DistribPhase.FROMLEADER == phase) { // we are coming from the leader, just go local - add no urls forwardToLeader = false; } else if (isLeader) { @@ -254,8 +280,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { hash = hash(cmd); nodes = setupRequest(hash); } else { - // even in non zk mode, tests simulate updates from a leader - isLeader = !req.getParams().getBool(SEEN_LEADER, false); + isLeader = getNonZkLeaderAssumption(req); } boolean dropCmd = false; @@ -271,9 +296,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ModifiableSolrParams params = null; if (nodes != null) { params = new ModifiableSolrParams(req.getParams()); - if (isLeader) { - params.set(SEEN_LEADER, true); - } + params.set(DISTRIB_UPDATE_PARAM, + (isLeader ? + DistribPhase.FROMLEADER.toString() : + DistribPhase.TOLEADER.toString())); params.remove("commit"); // this will be distributed from the local commit cmdDistrib.distribAdd(cmd, nodes, params); } @@ -573,8 +599,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { hash = hash(cmd); nodes = setupRequest(hash); } else { - // even in non zk mode, tests simulate updates from a leader - isLeader = !req.getParams().getBool(SEEN_LEADER, false); + isLeader = getNonZkLeaderAssumption(req); } boolean dropCmd = false; @@ -590,9 +615,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { ModifiableSolrParams params = null; if (nodes != null) { params = new ModifiableSolrParams(req.getParams()); - if (isLeader) { - params.set(SEEN_LEADER, true); - } + params.set(DISTRIB_UPDATE_PARAM, + (isLeader ? + DistribPhase.FROMLEADER.toString() : + DistribPhase.TOLEADER.toString())); params.remove("commit"); // we already will have forwarded this from our local commit cmdDistrib.distribDelete(cmd, nodes, params); } @@ -613,23 +639,24 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { // even in non zk mode, tests simulate updates from a leader if(!zkEnabled) { - isLeader = !req.getParams().getBool(SEEN_LEADER, false); + isLeader = getNonZkLeaderAssumption(req); } else { zkCheck(); } - // Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard - // Lev2: we are a leader receiving a forwarded deleteByQuery... we must: + // NONE: we are the first to receive this deleteByQuery + // - it must be forwarded to the leader of every shard + // TO: we are a leader receiving a forwarded deleteByQuery... we must: // - block all updates (use VersionInfo) // - flush *all* updates going to our replicas // - forward the DBQ to our replicas and wait for the response // - log + execute the local DBQ - // Lev3: we are a replica receiving a DBQ from our leader + // FROM: we are a replica receiving a DBQ from our leader // - log + execute the local DBQ + DistribPhase phase = + DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); - int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1); - - if (zkEnabled && dbqlevel == 1) { + if (zkEnabled && DistribPhase.NONE == phase) { boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard Map slices = zkController.getCloudState().getSlices(collection); @@ -640,7 +667,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); - params.set(DELETE_BY_QUERY_LEVEL, 2); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString()); List leaders = new ArrayList(slices.size()); for (Map.Entry sliceEntry : slices.entrySet()) { @@ -677,13 +704,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return; } - // change the level to 2 so we look up and forward to our own replicas (if any) - dbqlevel = 2; + // change the phase to TOLEADER so we look up and forward to our own replicas (if any) + phase = DistribPhase.TOLEADER; } List replicas = null; - if (zkEnabled && dbqlevel == 2) { + if (zkEnabled && DistribPhase.TOLEADER == phase) { // This core should be a leader replicas = setupRequest(); } @@ -750,9 +777,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // forward to all replicas if (leaderLogic && replicas != null) { ModifiableSolrParams params = new ModifiableSolrParams(req.getParams()); - params.set(DELETE_BY_QUERY_LEVEL, 3); params.set(VERSION_FIELD, Long.toString(cmd.getVersion())); - params.set(SEEN_LEADER, "true"); + params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); cmdDistrib.distribDelete(cmd, replicas, params); cmdDistrib.finish(); } @@ -1029,5 +1055,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { return true; } } - + + /** + * Returns a boolean indicating wether or not the caller should behave as + * if this is the "leader" even when ZooKeeper is not enabled. + * (Even in non zk mode, tests may simulate updates to/from a leader) + */ + public static boolean getNonZkLeaderAssumption(SolrQueryRequest req) { + DistribPhase phase = + DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM)); + + // if we have been told we are coming from a leader, then we are + // definitely not the leader. Otherwise assume we are. + return DistribPhase.FROMLEADER != phase; + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java index de76825ddb1..8a03e55554b 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessorFactory.java @@ -21,9 +21,9 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; -public class DistributedUpdateProcessorFactory extends - UpdateRequestProcessorFactory { - +public class DistributedUpdateProcessorFactory + extends UpdateRequestProcessorFactory + implements DistributingUpdateProcessorFactory { @Override public void init(NamedList args) { diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributingUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/DistributingUpdateProcessorFactory.java new file mode 100644 index 00000000000..f0072e26e29 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributingUpdateProcessorFactory.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import org.apache.solr.common.SolrException; + +/** + * A marker interface for denoting that a factory is responsible for handling + * distributed communication of updates across a SolrCloud cluster. + * + * @see UpdateRequestProcessorChain + */ +public interface DistributingUpdateProcessorFactory { + + /** + * Internal param used to specify the current phase of a distributed update, + * not intended for use by clients. Any non-blank value can be used to + * indicate to the UpdateRequestProcessorChain that factories + * prior to the DistributingUpdateProcessorFactory can be skipped. + * Implementations of this interface may use the non-blank values any way + * they wish. + */ + public static final String DISTRIB_UPDATE_PARAM = "update.distrib"; +} diff --git a/solr/core/src/java/org/apache/solr/update/processor/NoOpDistributingUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/NoOpDistributingUpdateProcessorFactory.java new file mode 100644 index 00000000000..cc03d5f2a68 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/NoOpDistributingUpdateProcessorFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import org.apache.solr.common.util.NamedList; +import org.apache.solr.core.SolrCore; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; + +/** + * A No-Op implementation of DistributingUpdateProcessorFactory that + * allways returns null. + *

+ * This implementation may be useful for Solr installations in which neither + * the {@link DistributedUpdateProcessorFactory} nor any custom + * implementation of {@link DistributingUpdateProcessorFactory} + * is desired (ie: shards are managed externally from Solr) + *

+ */ +public class NoOpDistributingUpdateProcessorFactory + extends UpdateRequestProcessorFactory + implements DistributingUpdateProcessorFactory { + + /** Returns null + */ + public UpdateRequestProcessor getInstance(SolrQueryRequest req, + SolrQueryResponse rsp, + UpdateRequestProcessor next ) { + return null; + } +} diff --git a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java index 1e635e89105..729e4a1bf01 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java +++ b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java @@ -20,10 +20,16 @@ package org.apache.solr.update.processor; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.plugin.PluginInfoInitialized; +import org.apache.solr.common.util.NamedList; +import org.apache.solr.common.SolrException; import org.apache.solr.core.PluginInfo; import org.apache.solr.core.SolrCore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.List; +import java.util.ArrayList; /** * Manages a chain of UpdateRequestProcessorFactories. @@ -42,10 +48,13 @@ import java.util.List; * * * @see UpdateRequestProcessorFactory + * @see #init * @since solr 1.3 */ public final class UpdateRequestProcessorChain implements PluginInfoInitialized { + public final static Logger log = LoggerFactory.getLogger(UpdateRequestProcessorChain.class); + private UpdateRequestProcessorFactory[] chain; private final SolrCore solrCore; @@ -53,31 +62,115 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized this.solrCore = solrCore; } + /** + * Initializes the chain using the factories specified by the PluginInfo. + * if the chain includes the RunUpdateProcessorFactory, but + * does not include an implementation of the + * DistributingUpdateProcessorFactory interface, then an + * instance of DistributedUpdateProcessorFactory will be + * injected immediately prior to the RunUpdateProcessorFactory. + * + * @see DistributingUpdateProcessorFactory + * @see RunUpdateProcessorFactory + * @see DistributedUpdateProcessorFactory + */ public void init(PluginInfo info) { - List list = solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null); + final String infomsg = "updateRequestProcessorChain \"" + + (null != info.name ? info.name : "") + "\"" + + (info.isDefault() ? " (default)" : ""); + + // wrap in an ArrayList so we know we know we can do fast index lookups + // and that add(int,Object) is supported + List list = new ArrayList + (solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null)); + if(list.isEmpty()){ - throw new RuntimeException( "updateRequestProcessorChain require at least one processor"); + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + infomsg + " require at least one processor"); } + + int numDistrib = 0; + int runIndex = -1; + // hi->lo incase multiple run instances, add before first one + // (no idea why someone might use multiple run instances, but just in case) + for (int i = list.size()-1; 0 <= i; i--) { + UpdateRequestProcessorFactory factory = list.get(i); + if (factory instanceof DistributingUpdateProcessorFactory) { + numDistrib++; + } + if (factory instanceof RunUpdateProcessorFactory) { + runIndex = i; + } + } + if (1 < numDistrib) { + throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, + infomsg + " may not contain more then one " + + "instance of DistributingUpdateProcessorFactory"); + } + if (0 <= runIndex && 0 == numDistrib) { + // by default, add distrib processor immediately before run + DistributedUpdateProcessorFactory distrib + = new DistributedUpdateProcessorFactory(); + distrib.init(new NamedList()); + list.add(runIndex, distrib); + + log.info("inserting DistributedUpdateProcessorFactory into " + infomsg); + } + chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); } - public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain , SolrCore solrCore) { + /** + * Creates a chain backed directly by the specified array. Modifications to + * the array will affect future calls to createProcessor + */ + public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain, + SolrCore solrCore) { this.chain = chain; this.solrCore = solrCore; } - public UpdateRequestProcessor createProcessor(SolrQueryRequest req, SolrQueryResponse rsp) + + /** + * Uses the factories in this chain to creates a new + * UpdateRequestProcessor instance specific for this request. + * If the DISTRIB_UPDATE_PARAM is present in the request and is + * non-blank, then any factory in this chain prior to the instance of + * {@link DistributingUpdateProcessorFactory} will be skipped, + * and the UpdateRequestProcessor returned will be from that + * DistributingUpdateProcessorFactory + * + * @see UpdateRequestProcessorFactory#getInstance + * @see DistributingUpdateProcessorFactory#DISTRIB_UPDATE_PARAM + */ + public UpdateRequestProcessor createProcessor(SolrQueryRequest req, + SolrQueryResponse rsp) { UpdateRequestProcessor processor = null; UpdateRequestProcessor last = null; + + final String distribPhase = req.getParams().get + (DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, ""); + final boolean skipToDistrib = ! distribPhase.trim().isEmpty(); + for (int i = chain.length-1; i>=0; i--) { processor = chain[i].getInstance(req, rsp, last); last = processor == null ? last : processor; + if (skipToDistrib + && chain[i] instanceof DistributingUpdateProcessorFactory) { + break; + } } return last; } + /** + * Returns the underlying array of factories used in this chain. + * Modifications to the array will affect future calls to + * createProcessor + */ public UpdateRequestProcessorFactory[] getFactories() { return chain; } + } diff --git a/solr/core/src/test-files/solr/conf/solrconfig-transformers.xml b/solr/core/src/test-files/solr/conf/solrconfig-transformers.xml index fc44036944e..776d49aa831 100644 --- a/solr/core/src/test-files/solr/conf/solrconfig-transformers.xml +++ b/solr/core/src/test-files/solr/conf/solrconfig-transformers.xml @@ -52,4 +52,26 @@ + + + + + + + + + + + + + + + + + + + + + + diff --git a/solr/core/src/test-files/solr/conf/solrconfig.xml b/solr/core/src/test-files/solr/conf/solrconfig.xml index 8029801b701..653357c82c1 100644 --- a/solr/core/src/test-files/solr/conf/solrconfig.xml +++ b/solr/core/src/test-files/solr/conf/solrconfig.xml @@ -507,4 +507,35 @@ + + + + regex_dup_A_s + x + x_x + + + + regex_dup_B_s + x + x_x + + + + + + + + regex_dup_A_s + x + x_x + + + regex_dup_B_s + x + x_x + + + + diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java index 1f7f4564b7d..bbfd0fd3200 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicDistributedZkTest.java @@ -48,12 +48,17 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest; import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest; import org.apache.solr.client.solrj.request.CoreAdminRequest.Create; import org.apache.solr.client.solrj.request.QueryRequest; +import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrDocument; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.UpdateParams; +import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.update.SolrCmdDistributor.Request; @@ -126,6 +131,7 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { // setLoggingLevel(null); del("*:*"); + indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men" ,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d); indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country." @@ -289,6 +295,9 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { testSearchByCollectionName(); testANewCollectionInOneInstanceWithManualShardAssignement(); testNumberOfCommitsWithCommitAfterAdd(); + + testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit"); + testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit"); // Thread.sleep(10000000000L); if (DEBUG) { @@ -296,6 +305,54 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { } } + /** + * Expects a RegexReplaceProcessorFactories in the chain which will + * "double up" the values in two (stored) string fields. + *

+ * If the values are "double-doubled" or "not-doubled" then we know + * the processor was not run the appropriate number of times + *

+ */ + private void testUpdateProcessorsRunOnlyOnce(final String chain) throws Exception { + + final String fieldA = "regex_dup_A_s"; + final String fieldB = "regex_dup_B_s"; + final String val = "x"; + final String expected = "x_x"; + final ModifiableSolrParams updateParams = new ModifiableSolrParams(); + updateParams.add(UpdateParams.UPDATE_CHAIN, chain); + + final int numLoops = atLeast(50); + + for (int i = 1; i < numLoops; i++) { + // add doc to random client + SolrServer updateClient = clients.get(random().nextInt(clients.size())); + SolrInputDocument doc = new SolrInputDocument(); + addFields(doc, id, i, fieldA, val, fieldB, val); + UpdateResponse ures = add(updateClient, updateParams, doc); + assertEquals(chain + ": update failed", 0, ures.getStatus()); + ures = updateClient.commit(); + assertEquals(chain + ": commit failed", 0, ures.getStatus()); + } + + // query for each doc, and check both fields to ensure the value is correct + for (int i = 1; i < numLoops; i++) { + final String query = id + ":" + i; + QueryResponse qres = queryServer(new SolrQuery(query)); + assertEquals(chain + ": query failed: " + query, + 0, qres.getStatus()); + assertEquals(chain + ": didn't find correct # docs with query: " + query, + 1, qres.getResults().getNumFound()); + SolrDocument doc = qres.getResults().get(0); + + for (String field : new String[] {fieldA, fieldB}) { + assertEquals(chain + ": doc#" + i+ " has wrong value for " + field, + expected, doc.getFirstValue(field)); + } + } + + } + // cloud level test mainly needed just to make sure that versions and errors are propagated correctly private void doOptimisticLockingAndUpdating() throws Exception { SolrInputDocument sd = sdoc("id", 1000, "_version_", -1); @@ -334,7 +391,6 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase { } } - private void testNumberOfCommitsWithCommitAfterAdd() throws MalformedURLException, SolrServerException, IOException { long startCommits = getNumCommits((HttpSolrServer) clients.get(0)); diff --git a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java index c9f59b3c786..c8e39e46cd1 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java +++ b/solr/core/src/test/org/apache/solr/search/TestRealTimeGet.java @@ -48,11 +48,13 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.apache.solr.core.SolrCore.verbose; -import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; public class TestRealTimeGet extends SolrTestCaseJ4 { - private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica) + // means we've seen the leader and have version info (i.e. we are a non-leader replica) + private static String FROM_LEADER = DistribPhase.FROMLEADER.toString(); @BeforeClass public static void beforeClass() throws Exception { @@ -149,7 +151,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { // simulate an update from the leader version += 10; - updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test version is there from rtg assertJQ(req("qt","/get","id","1") @@ -157,7 +159,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { ); // simulate reordering: test that a version less than that does not take affect - updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test that version hasn't changed assertJQ(req("qt","/get","id","1") @@ -166,7 +168,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { // simulate reordering: test that a delete w/ version less than that does not take affect // TODO: also allow passing version on delete instead of on URL? - updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1))); + updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1))); // test that version hasn't changed assertJQ(req("qt","/get","id","1") @@ -177,7 +179,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { assertU(commit()); // simulate reordering: test that a version less than that does not take affect - updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test that version hasn't changed assertJQ(req("qt","/get","id","1") @@ -185,7 +187,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { ); // simulate reordering: test that a delete w/ version less than that does not take affect - updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1))); + updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1))); // test that version hasn't changed assertJQ(req("qt","/get","id","1") @@ -194,10 +196,10 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { // now simulate a normal delete from the leader version += 5; - updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version))); + updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version))); // make sure a reordered add doesn't take affect. - updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test that it's still deleted assertJQ(req("qt","/get","id","1") @@ -208,7 +210,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { assertU(commit()); // make sure a reordered add doesn't take affect. - updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test that it's still deleted assertJQ(req("qt","/get","id","1") @@ -1071,7 +1073,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { if (oper < commitPercent + deletePercent) { verbose("deleting id",id,"val=",nextVal,"version",version); - Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL)); + Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER)); // TODO: returning versions for these types of updates is redundant // but if we do return, they had better be equal @@ -1093,7 +1095,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { } else { verbose("adding id", id, "val=", nextVal,"version",version); - Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL)); + Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); if (returnedVersion != null) { assertEquals(version, returnedVersion.longValue()); } @@ -1341,7 +1343,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { if (oper < commitPercent + deletePercent) { verbose("deleting id",id,"val=",nextVal,"version",version); - Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL)); + Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER)); // TODO: returning versions for these types of updates is redundant // but if we do return, they had better be equal @@ -1363,7 +1365,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 { } else { verbose("adding id", id, "val=", nextVal,"version",version); - Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL)); + Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); if (returnedVersion != null) { assertEquals(version, returnedVersion.longValue()); } diff --git a/solr/core/src/test/org/apache/solr/search/TestRecovery.java b/solr/core/src/test/org/apache/solr/search/TestRecovery.java index 2ad9a83b8af..0b54fbf6a7c 100644 --- a/solr/core/src/test/org/apache/solr/search/TestRecovery.java +++ b/solr/core/src/test/org/apache/solr/search/TestRecovery.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -39,10 +39,14 @@ import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; public class TestRecovery extends SolrTestCaseJ4 { - private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica) + + // means that we've seen the leader and have version info (i.e. we are a non-leader replica) + private static String FROM_LEADER = DistribPhase.FROMLEADER.toString(); + private static int timeout=60; // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing. // TODO: fix this test to not require FSDirectory @@ -214,12 +218,12 @@ public class TestRecovery extends SolrTestCaseJ4 { assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); // simulate updates from a leader - updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-1017")); - updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - deleteAndGetVersion("B1", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2010")); + updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-1017")); + updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + deleteAndGetVersion("B1", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2010")); assertJQ(req("qt","/get", "getVersions","6") ,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}" @@ -272,18 +276,18 @@ public class TestRecovery extends SolrTestCaseJ4 { assertEquals(1030L, ver.longValue()); // add a reordered doc that shouldn't overwrite one in the index - updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // reorder two buffered updates - updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - deleteAndGetVersion("B4", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-940")); // this update should not take affect - updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + deleteAndGetVersion("B4", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-940")); // this update should not take affect + updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // test that delete by query is at least buffered along with everything else so it will delete the // currently buffered id:8 (even if it doesn't currently support versioning) - updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-3000")); + updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-3000")); assertJQ(req("qt","/get", "getVersions","13") ,"=={'versions':[-3000,1080,1050,1060,-940,1040,3,-2010,1030,1020,-1017,1015,1010]}" // the "3" appears because versions aren't checked while buffering @@ -298,12 +302,12 @@ public class TestRecovery extends SolrTestCaseJ4 { logReplay.release(1); // now add another update - updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // a reordered update that should be dropped - deleteAndGetVersion("B5", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-950")); + deleteAndGetVersion("B5", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-950")); - deleteAndGetVersion("B6", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2060")); + deleteAndGetVersion("B6", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2060")); logReplay.release(1000); UpdateLog.RecoveryInfo recInfo = rinfoFuture.get(); @@ -376,14 +380,14 @@ public class TestRecovery extends SolrTestCaseJ4 { assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); // simulate updates from a leader - updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertTrue(ulog.dropBufferedUpdates()); ulog.bufferUpdates(); - updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); logReplay.release(1000); rinfoFuture = ulog.applyBufferedUpdates(); @@ -395,17 +399,17 @@ public class TestRecovery extends SolrTestCaseJ4 { ); // this time add some docs first before buffering starts (so tlog won't be at pos 0) - updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); ulog.bufferUpdates(); - updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertTrue(ulog.dropBufferedUpdates()); ulog.bufferUpdates(); - updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); rinfoFuture = ulog.applyBufferedUpdates(); rinfo = rinfoFuture.get(); @@ -428,14 +432,14 @@ public class TestRecovery extends SolrTestCaseJ4 { ulog.bufferUpdates(); assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); - updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertTrue(ulog.dropBufferedUpdates()); // make sure we can overwrite with a lower version // TODO: is this functionality needed? - updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertU(commit()); @@ -451,7 +455,7 @@ public class TestRecovery extends SolrTestCaseJ4 { ); - updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); @@ -494,9 +498,9 @@ public class TestRecovery extends SolrTestCaseJ4 { ulog.bufferUpdates(); // simulate updates from a leader - updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertEquals(UpdateLog.State.BUFFERING, ulog.getState()); req.close(); @@ -526,9 +530,9 @@ public class TestRecovery extends SolrTestCaseJ4 { assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0); // now do some normal non-buffered adds - updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); assertU(commit()); req.close(); @@ -816,9 +820,9 @@ public class TestRecovery extends SolrTestCaseJ4 { // Now test that the bad log file doesn't mess up retrieving latest versions // - updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL)); - updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(SEEN_LEADER,SEEN_LEADER_VAL)); + updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); + updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER)); // This currently skips the bad log file and also returns the version of the clearIndex (del *:*) // assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]"); diff --git a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java index 25a418c372f..cb3a571438b 100644 --- a/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java +++ b/solr/core/src/test/org/apache/solr/update/PeerSyncTest.java @@ -28,9 +28,15 @@ import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; +import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase; + public class PeerSyncTest extends BaseDistributedSearchTestCase { private static int numVersions = 100; // number of versions to use when syncing - private ModifiableSolrParams seenLeader = params("leader","true"); + private final String FROM_LEADER = DistribPhase.FROMLEADER.toString(); + + private ModifiableSolrParams seenLeader = + params(DISTRIB_UPDATE_PARAM, FROM_LEADER); public PeerSyncTest() { fixShardCount = true; @@ -112,9 +118,9 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { v=1000; add(client0, seenLeader, sdoc("id","1000","_version_",++v)); add(client0, seenLeader, sdoc("id","1001","_version_",++v)); - delQ(client0, params("leader","true","_version_",Long.toString(-++v)), "id:1001 OR id:1002"); + delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:1001 OR id:1002"); add(client0, seenLeader, sdoc("id","1002","_version_",++v)); - del(client0, params("leader","true","_version_",Long.toString(-++v)), "1000"); + del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000"); assertSync(client1, numVersions, true, shardsArr[0]); client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); @@ -124,18 +130,18 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { SolrServer client = client0; add(client, seenLeader, sdoc("id","2000","_version_",++v)); add(client, seenLeader, sdoc("id","2001","_version_",++v)); - delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002"); + delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002"); add(client, seenLeader, sdoc("id","2002","_version_",++v)); - del(client, params("leader","true","_version_",Long.toString(-++v)), "2000"); + del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000"); v=2000; client = client1; add(client, seenLeader, sdoc("id","2000","_version_",++v)); ++v; // pretend we missed the add of 2001. peersync should retrieve it, but should also retrieve any deleteByQuery objects after it // add(client, seenLeader, sdoc("id","2001","_version_",++v)); - delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002"); + delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002"); add(client, seenLeader, sdoc("id","2002","_version_",++v)); - del(client, params("leader","true","_version_",Long.toString(-++v)), "2000"); + del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000"); assertSync(client1, numVersions, true, shardsArr[0]); client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1); @@ -149,12 +155,12 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase { // the reorder in application shouldn't affect anything add(client0, seenLeader, sdoc("id","3000","_version_",3001)); add(client1, seenLeader, sdoc("id","3000","_version_",3001)); - del(client0, params("leader","true","_version_","3000"), "3000"); + del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3000"), "3000"); // this should cause us to retrieve an add tha was previously deleted add(client0, seenLeader, sdoc("id","3001","_version_",3003)); - del(client0, params("leader","true","_version_","3001"), "3004"); - del(client1, params("leader","true","_version_","3001"), "3004"); + del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004"); + del(client1, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004"); // this should cause us to retrieve an older add that was overwritten add(client0, seenLeader, sdoc("id","3002","_version_",3004)); diff --git a/solr/core/src/test/org/apache/solr/update/TestUpdate.java b/solr/core/src/test/org/apache/solr/update/TestUpdate.java index 4665b756807..f178e65c1b5 100644 --- a/solr/core/src/test/org/apache/solr/update/TestUpdate.java +++ b/solr/core/src/test/org/apache/solr/update/TestUpdate.java @@ -44,7 +44,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.apache.solr.core.SolrCore.verbose; -import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER; public class TestUpdate extends SolrTestCaseJ4 { @BeforeClass diff --git a/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessor.java b/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessor.java new file mode 100644 index 00000000000..41282f5870f --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import org.apache.solr.common.util.NamedList; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.processor.UpdateRequestProcessor; +import org.apache.solr.update.processor.UpdateRequestProcessorFactory; + + +/** + * A passthrough processor that does nothing. + */ +public class CustomUpdateRequestProcessor extends UpdateRequestProcessor { + public CustomUpdateRequestProcessor( UpdateRequestProcessor next) { + super(next); + } +} + diff --git a/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessorFactory.java b/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessorFactory.java index e853f282c8e..351d23a69f5 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessorFactory.java +++ b/solr/core/src/test/org/apache/solr/update/processor/CustomUpdateRequestProcessorFactory.java @@ -39,8 +39,7 @@ public class CustomUpdateRequestProcessorFactory extends UpdateRequestProcessorF @Override public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { - // TODO Auto-generated method stub - return null; + return new CustomUpdateRequestProcessor(next); } } diff --git a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java index 965259e5378..e3ed6e54cdc 100644 --- a/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java +++ b/solr/core/src/test/org/apache/solr/update/processor/UpdateRequestProcessorFactoryTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -18,9 +18,15 @@ package org.apache.solr.update.processor; import org.apache.solr.core.SolrCore; +import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.update.processor.CustomUpdateRequestProcessor; import org.apache.solr.util.AbstractSolrTestCase; +import java.util.Arrays; + +import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM; + /** * */ @@ -52,4 +58,37 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase { // Make sure the NamedListArgs got through ok assertEquals( "{name={n8=88,n9=99}}", link.args.toString() ); } + + public void testUpdateDistribChainSkipping() throws Exception { + SolrCore core = h.getCore(); + for (final String name : Arrays.asList("distrib-chain-explicit", + "distrib-chain-implicit", + "distrib-chain-noop")) { + + UpdateRequestProcessor proc; + UpdateRequestProcessorChain chain = core.getUpdateProcessingChain(name); + assertNotNull(name, chain); + + // either explicitly, or because of injection + assertEquals(name + " chain length", 4, + chain.getFactories().length); + + // Custom comes first in all three of our chains + proc = chain.createProcessor(req(), new SolrQueryResponse()); + assertTrue(name + " first processor isn't a CustomUpdateRequestProcessor: " + + proc.getClass().getName(), + proc instanceof CustomUpdateRequestProcessor); + + // varies depending on chain, but definitely shouldn't be Custom + proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "non_blank_value"), + new SolrQueryResponse()); + assertFalse(name + " post distrib proc should not be a CustomUpdateRequestProcessor: " + + proc.getClass().getName(), + proc instanceof CustomUpdateRequestProcessor); + + + } + + } + }