SOLR-2822: Skip update processors already run on other nodes (solr cloud)

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1342743 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris M. Hostetter 2012-05-25 17:24:58 +00:00
parent a72fb4fb77
commit f4819005cf
18 changed files with 528 additions and 110 deletions

View File

@ -128,6 +128,7 @@ New Features
SOLR-3080: Remove shard info from zookeeper when SolrCore is explicitly unloaded.
(yonik, Mark Miller, siren)
SOLR-3437: Recovery issues a spurious commit to the cluster. (Trym R. Møller via Mark Miller)
SOLR-2822: Skip update processors already run on other nodes (hossman)
* SOLR-1566: Transforming documents in the ResponseWriters. This will allow
for more complex results in responses and open the door for function queries

View File

@ -55,6 +55,9 @@ import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
/** @lucene.experimental */
public class PeerSync {
public static Logger log = LoggerFactory.getLogger(PeerSync.class);
@ -431,7 +434,7 @@ public class PeerSync {
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
// params.set("peersync",true); // debugging
SolrQueryRequest req = new LocalSolrQueryRequest(uhandler.core, params);
SolrQueryResponse rsp = new SolrQueryResponse();
@ -563,4 +566,4 @@ public class PeerSync {
}
}
}

View File

@ -47,6 +47,10 @@ import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase.FROMLEADER;
/** @lucene.experimental */
public class UpdateLog implements PluginInfoInitialized {
public static String LOG_FILENAME_PATTERN = "%s.%019d";
@ -982,7 +986,7 @@ public class UpdateLog implements PluginInfoInitialized {
@Override
public void run() {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(DistributedUpdateProcessor.SEEN_LEADER, true);
params.set(DISTRIB_UPDATE_PARAM, FROMLEADER.toString());
req = new LocalSolrQueryRequest(uhandler.core, params);
rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp)); // setting request info will help logging

View File

@ -65,14 +65,37 @@ import org.apache.solr.update.VersionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
// NOT mt-safe... create a new processor for each add thread
// TODO: we really should not wait for distrib after local? unless a certain replication factor is asked for
public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public final static Logger log = LoggerFactory.getLogger(DistributedUpdateProcessor.class);
public static final String SEEN_LEADER = "leader";
/**
* Values this processor supports for the <code>DISTRIB_UPDATE_PARAM</code>.
* This is an implementation detail exposed solely for tests.
*
* @see DistributingUpdateProcessorFactory#DISTRIB_UPDATE_PARAM
*/
public static enum DistribPhase {
NONE, TOLEADER, FROMLEADER;
public static DistribPhase parseParam(final String param) {
if (param == null || param.trim().isEmpty()) {
return NONE;
}
try {
return valueOf(param);
} catch (IllegalArgumentException e) {
throw new SolrException
(SolrException.ErrorCode.BAD_REQUEST, "Illegal value for " +
DISTRIB_UPDATE_PARAM + ": " + param, e);
}
}
}
public static final String COMMIT_END_POINT = "commit_end_point";
public static final String DELETE_BY_QUERY_LEVEL = "dbq_level";
private final SolrQueryRequest req;
private final SolrQueryResponse rsp;
@ -166,7 +189,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
String coreNodeName = zkController.getNodeName() + "_" + coreName;
isLeader = coreNodeName.equals(leaderNodeName);
if (req.getParams().getBool(SEEN_LEADER, false)) {
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
if (DistribPhase.FROMLEADER == phase) {
// we are coming from the leader, just go local - add no urls
forwardToLeader = false;
} else if (isLeader) {
@ -254,8 +280,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
hash = hash(cmd);
nodes = setupRequest(hash);
} else {
// even in non zk mode, tests simulate updates from a leader
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
isLeader = getNonZkLeaderAssumption(req);
}
boolean dropCmd = false;
@ -271,9 +296,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = null;
if (nodes != null) {
params = new ModifiableSolrParams(req.getParams());
if (isLeader) {
params.set(SEEN_LEADER, true);
}
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.remove("commit"); // this will be distributed from the local commit
cmdDistrib.distribAdd(cmd, nodes, params);
}
@ -573,8 +599,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
hash = hash(cmd);
nodes = setupRequest(hash);
} else {
// even in non zk mode, tests simulate updates from a leader
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
isLeader = getNonZkLeaderAssumption(req);
}
boolean dropCmd = false;
@ -590,9 +615,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
ModifiableSolrParams params = null;
if (nodes != null) {
params = new ModifiableSolrParams(req.getParams());
if (isLeader) {
params.set(SEEN_LEADER, true);
}
params.set(DISTRIB_UPDATE_PARAM,
(isLeader ?
DistribPhase.FROMLEADER.toString() :
DistribPhase.TOLEADER.toString()));
params.remove("commit"); // we already will have forwarded this from our local commit
cmdDistrib.distribDelete(cmd, nodes, params);
}
@ -613,23 +639,24 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void doDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// even in non zk mode, tests simulate updates from a leader
if(!zkEnabled) {
isLeader = !req.getParams().getBool(SEEN_LEADER, false);
isLeader = getNonZkLeaderAssumption(req);
} else {
zkCheck();
}
// Lev1: we are the first to receive this deleteByQuery, it must be forwarded to the leader of every shard
// Lev2: we are a leader receiving a forwarded deleteByQuery... we must:
// NONE: we are the first to receive this deleteByQuery
// - it must be forwarded to the leader of every shard
// TO: we are a leader receiving a forwarded deleteByQuery... we must:
// - block all updates (use VersionInfo)
// - flush *all* updates going to our replicas
// - forward the DBQ to our replicas and wait for the response
// - log + execute the local DBQ
// Lev3: we are a replica receiving a DBQ from our leader
// FROM: we are a replica receiving a DBQ from our leader
// - log + execute the local DBQ
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
int dbqlevel = req.getParams().getInt(DELETE_BY_QUERY_LEVEL, 1);
if (zkEnabled && dbqlevel == 1) {
if (zkEnabled && DistribPhase.NONE == phase) {
boolean leaderForAnyShard = false; // start off by assuming we are not a leader for any shard
Map<String,Slice> slices = zkController.getCloudState().getSlices(collection);
@ -640,7 +667,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(DELETE_BY_QUERY_LEVEL, 2);
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.TOLEADER.toString());
List<Node> leaders = new ArrayList<Node>(slices.size());
for (Map.Entry<String,Slice> sliceEntry : slices.entrySet()) {
@ -677,13 +704,13 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return;
}
// change the level to 2 so we look up and forward to our own replicas (if any)
dbqlevel = 2;
// change the phase to TOLEADER so we look up and forward to our own replicas (if any)
phase = DistribPhase.TOLEADER;
}
List<Node> replicas = null;
if (zkEnabled && dbqlevel == 2) {
if (zkEnabled && DistribPhase.TOLEADER == phase) {
// This core should be a leader
replicas = setupRequest();
}
@ -750,9 +777,8 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// forward to all replicas
if (leaderLogic && replicas != null) {
ModifiableSolrParams params = new ModifiableSolrParams(req.getParams());
params.set(DELETE_BY_QUERY_LEVEL, 3);
params.set(VERSION_FIELD, Long.toString(cmd.getVersion()));
params.set(SEEN_LEADER, "true");
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
cmdDistrib.distribDelete(cmd, replicas, params);
cmdDistrib.finish();
}
@ -1029,5 +1055,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return true;
}
}
/**
* Returns a boolean indicating wether or not the caller should behave as
* if this is the "leader" even when ZooKeeper is not enabled.
* (Even in non zk mode, tests may simulate updates to/from a leader)
*/
public static boolean getNonZkLeaderAssumption(SolrQueryRequest req) {
DistribPhase phase =
DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
// if we have been told we are coming from a leader, then we are
// definitely not the leader. Otherwise assume we are.
return DistribPhase.FROMLEADER != phase;
}
}

View File

@ -21,9 +21,9 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
public class DistributedUpdateProcessorFactory extends
UpdateRequestProcessorFactory {
public class DistributedUpdateProcessorFactory
extends UpdateRequestProcessorFactory
implements DistributingUpdateProcessorFactory {
@Override
public void init(NamedList args) {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.SolrException;
/**
* A marker interface for denoting that a factory is responsible for handling
* distributed communication of updates across a SolrCloud cluster.
*
* @see UpdateRequestProcessorChain
*/
public interface DistributingUpdateProcessorFactory {
/**
* Internal param used to specify the current phase of a distributed update,
* not intended for use by clients. Any non-blank value can be used to
* indicate to the <code>UpdateRequestProcessorChain</code> that factories
* prior to the <code>DistributingUpdateProcessorFactory</code> can be skipped.
* Implementations of this interface may use the non-blank values any way
* they wish.
*/
public static final String DISTRIB_UPDATE_PARAM = "update.distrib";
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
/**
* A No-Op implementation of DistributingUpdateProcessorFactory that
* allways returns null.
* <p>
* This implementation may be useful for Solr installations in which neither
* the <code>{@link DistributedUpdateProcessorFactory}</code> nor any custom
* implementation of <code>{@link DistributingUpdateProcessorFactory}</code>
* is desired (ie: shards are managed externally from Solr)
* </p>
*/
public class NoOpDistributingUpdateProcessorFactory
extends UpdateRequestProcessorFactory
implements DistributingUpdateProcessorFactory {
/** Returns null
*/
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
return null;
}
}

View File

@ -20,10 +20,16 @@ package org.apache.solr.update.processor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.util.plugin.PluginInfoInitialized;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.SolrException;
import org.apache.solr.core.PluginInfo;
import org.apache.solr.core.SolrCore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.ArrayList;
/**
* Manages a chain of UpdateRequestProcessorFactories.
@ -42,10 +48,13 @@ import java.util.List;
* </pre>
*
* @see UpdateRequestProcessorFactory
* @see #init
* @since solr 1.3
*/
public final class UpdateRequestProcessorChain implements PluginInfoInitialized
{
public final static Logger log = LoggerFactory.getLogger(UpdateRequestProcessorChain.class);
private UpdateRequestProcessorFactory[] chain;
private final SolrCore solrCore;
@ -53,31 +62,115 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
this.solrCore = solrCore;
}
/**
* Initializes the chain using the factories specified by the <code>PluginInfo</code>.
* if the chain includes the <code>RunUpdateProcessorFactory</code>, but
* does not include an implementation of the
* <code>DistributingUpdateProcessorFactory</code> interface, then an
* instance of <code>DistributedUpdateProcessorFactory</code> will be
* injected immediately prior to the <code>RunUpdateProcessorFactory</code>.
*
* @see DistributingUpdateProcessorFactory
* @see RunUpdateProcessorFactory
* @see DistributedUpdateProcessorFactory
*/
public void init(PluginInfo info) {
List<UpdateRequestProcessorFactory> list = solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null);
final String infomsg = "updateRequestProcessorChain \"" +
(null != info.name ? info.name : "") + "\"" +
(info.isDefault() ? " (default)" : "");
// wrap in an ArrayList so we know we know we can do fast index lookups
// and that add(int,Object) is supported
List<UpdateRequestProcessorFactory> list = new ArrayList
(solrCore.initPlugins(info.getChildren("processor"),UpdateRequestProcessorFactory.class,null));
if(list.isEmpty()){
throw new RuntimeException( "updateRequestProcessorChain require at least one processor");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
infomsg + " require at least one processor");
}
int numDistrib = 0;
int runIndex = -1;
// hi->lo incase multiple run instances, add before first one
// (no idea why someone might use multiple run instances, but just in case)
for (int i = list.size()-1; 0 <= i; i--) {
UpdateRequestProcessorFactory factory = list.get(i);
if (factory instanceof DistributingUpdateProcessorFactory) {
numDistrib++;
}
if (factory instanceof RunUpdateProcessorFactory) {
runIndex = i;
}
}
if (1 < numDistrib) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
infomsg + " may not contain more then one " +
"instance of DistributingUpdateProcessorFactory");
}
if (0 <= runIndex && 0 == numDistrib) {
// by default, add distrib processor immediately before run
DistributedUpdateProcessorFactory distrib
= new DistributedUpdateProcessorFactory();
distrib.init(new NamedList());
list.add(runIndex, distrib);
log.info("inserting DistributedUpdateProcessorFactory into " + infomsg);
}
chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]);
}
public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain , SolrCore solrCore) {
/**
* Creates a chain backed directly by the specified array. Modifications to
* the array will affect future calls to <code>createProcessor</code>
*/
public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain,
SolrCore solrCore) {
this.chain = chain;
this.solrCore = solrCore;
}
public UpdateRequestProcessor createProcessor(SolrQueryRequest req, SolrQueryResponse rsp)
/**
* Uses the factories in this chain to creates a new
* <code>UpdateRequestProcessor</code> instance specific for this request.
* If the <code>DISTRIB_UPDATE_PARAM</code> is present in the request and is
* non-blank, then any factory in this chain prior to the instance of
* <code>{@link DistributingUpdateProcessorFactory}</code> will be skipped,
* and the <code>UpdateRequestProcessor</code> returned will be from that
* <code>DistributingUpdateProcessorFactory</code>
*
* @see UpdateRequestProcessorFactory#getInstance
* @see DistributingUpdateProcessorFactory#DISTRIB_UPDATE_PARAM
*/
public UpdateRequestProcessor createProcessor(SolrQueryRequest req,
SolrQueryResponse rsp)
{
UpdateRequestProcessor processor = null;
UpdateRequestProcessor last = null;
final String distribPhase = req.getParams().get
(DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM, "");
final boolean skipToDistrib = ! distribPhase.trim().isEmpty();
for (int i = chain.length-1; i>=0; i--) {
processor = chain[i].getInstance(req, rsp, last);
last = processor == null ? last : processor;
if (skipToDistrib
&& chain[i] instanceof DistributingUpdateProcessorFactory) {
break;
}
}
return last;
}
/**
* Returns the underlying array of factories used in this chain.
* Modifications to the array will affect future calls to
* <code>createProcessor</code>
*/
public UpdateRequestProcessorFactory[] getFactories() {
return chain;
}
}

View File

@ -52,4 +52,26 @@
</processor>
</updateRequestProcessorChain>
<updateRequestProcessorChain name="distrib-chain-explicit">
<!-- explicit test using processors before and after distrib -->
<processor class="solr.CustomUpdateRequestProcessorFactory" />
<processor class="solr.DistributedUpdateProcessorFactory" />
<processor class="solr.RemoveBlankFieldUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="distrib-chain-implicit">
<!-- implicit test w/o distrib declared -->
<processor class="solr.CustomUpdateRequestProcessorFactory" />
<processor class="solr.RemoveBlankFieldUpdateProcessorFactory" />
<!-- distrib should be injected here -->
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="distrib-chain-noop">
<!-- explicit test using noop distrib -->
<processor class="solr.CustomUpdateRequestProcessorFactory" />
<processor class="solr.NoOpDistributingUpdateProcessorFactory" />
<processor class="solr.RemoveBlankFieldUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -507,4 +507,35 @@
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="distrib-dup-test-chain-explicit">
<!-- explicit test using processors before and after distrib -->
<processor class="solr.RegexReplaceProcessorFactory">
<str name="fieldName">regex_dup_A_s</str>
<str name="pattern">x</str>
<str name="replacement">x_x</str>
</processor>
<processor class="solr.DistributedUpdateProcessorFactory" />
<processor class="solr.RegexReplaceProcessorFactory">
<str name="fieldName">regex_dup_B_s</str>
<str name="pattern">x</str>
<str name="replacement">x_x</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="distrib-dup-test-chain-implicit">
<!-- implicit test w/o distrib declared-->
<processor class="solr.RegexReplaceProcessorFactory">
<str name="fieldName">regex_dup_A_s</str>
<str name="pattern">x</str>
<str name="replacement">x_x</str>
</processor>
<processor class="solr.RegexReplaceProcessorFactory">
<str name="fieldName">regex_dup_B_s</str>
<str name="pattern">x</str>
<str name="replacement">x_x</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -48,12 +48,17 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.ContentStreamUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.Create;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.update.SolrCmdDistributor.Request;
@ -126,6 +131,7 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
// setLoggingLevel(null);
del("*:*");
indexr(id,1, i1, 100, tlong, 100,t1,"now is the time for all good men"
,"foo_f", 1.414f, "foo_b", "true", "foo_d", 1.414d);
indexr(id,2, i1, 50 , tlong, 50,t1,"to come to the aid of their country."
@ -289,6 +295,9 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
testSearchByCollectionName();
testANewCollectionInOneInstanceWithManualShardAssignement();
testNumberOfCommitsWithCommitAfterAdd();
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-explicit");
testUpdateProcessorsRunOnlyOnce("distrib-dup-test-chain-implicit");
// Thread.sleep(10000000000L);
if (DEBUG) {
@ -296,6 +305,54 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
}
}
/**
* Expects a RegexReplaceProcessorFactories in the chain which will
* "double up" the values in two (stored) string fields.
* <p>
* If the values are "double-doubled" or "not-doubled" then we know
* the processor was not run the appropriate number of times
* </p>
*/
private void testUpdateProcessorsRunOnlyOnce(final String chain) throws Exception {
final String fieldA = "regex_dup_A_s";
final String fieldB = "regex_dup_B_s";
final String val = "x";
final String expected = "x_x";
final ModifiableSolrParams updateParams = new ModifiableSolrParams();
updateParams.add(UpdateParams.UPDATE_CHAIN, chain);
final int numLoops = atLeast(50);
for (int i = 1; i < numLoops; i++) {
// add doc to random client
SolrServer updateClient = clients.get(random().nextInt(clients.size()));
SolrInputDocument doc = new SolrInputDocument();
addFields(doc, id, i, fieldA, val, fieldB, val);
UpdateResponse ures = add(updateClient, updateParams, doc);
assertEquals(chain + ": update failed", 0, ures.getStatus());
ures = updateClient.commit();
assertEquals(chain + ": commit failed", 0, ures.getStatus());
}
// query for each doc, and check both fields to ensure the value is correct
for (int i = 1; i < numLoops; i++) {
final String query = id + ":" + i;
QueryResponse qres = queryServer(new SolrQuery(query));
assertEquals(chain + ": query failed: " + query,
0, qres.getStatus());
assertEquals(chain + ": didn't find correct # docs with query: " + query,
1, qres.getResults().getNumFound());
SolrDocument doc = qres.getResults().get(0);
for (String field : new String[] {fieldA, fieldB}) {
assertEquals(chain + ": doc#" + i+ " has wrong value for " + field,
expected, doc.getFirstValue(field));
}
}
}
// cloud level test mainly needed just to make sure that versions and errors are propagated correctly
private void doOptimisticLockingAndUpdating() throws Exception {
SolrInputDocument sd = sdoc("id", 1000, "_version_", -1);
@ -334,7 +391,6 @@ public class BasicDistributedZkTest extends AbstractDistributedZkTestCase {
}
}
private void testNumberOfCommitsWithCommitAfterAdd()
throws MalformedURLException, SolrServerException, IOException {
long startCommits = getNumCommits((HttpSolrServer) clients.get(0));

View File

@ -48,11 +48,13 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.solr.core.SolrCore.verbose;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
public class TestRealTimeGet extends SolrTestCaseJ4 {
private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
// means we've seen the leader and have version info (i.e. we are a non-leader replica)
private static String FROM_LEADER = DistribPhase.FROMLEADER.toString();
@BeforeClass
public static void beforeClass() throws Exception {
@ -149,7 +151,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
// simulate an update from the leader
version += 10;
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test version is there from rtg
assertJQ(req("qt","/get","id","1")
@ -157,7 +159,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
);
// simulate reordering: test that a version less than that does not take affect
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that version hasn't changed
assertJQ(req("qt","/get","id","1")
@ -166,7 +168,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
// simulate reordering: test that a delete w/ version less than that does not take affect
// TODO: also allow passing version on delete instead of on URL?
updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1)));
// test that version hasn't changed
assertJQ(req("qt","/get","id","1")
@ -177,7 +179,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
assertU(commit());
// simulate reordering: test that a version less than that does not take affect
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that version hasn't changed
assertJQ(req("qt","/get","id","1")
@ -185,7 +187,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
);
// simulate reordering: test that a delete w/ version less than that does not take affect
updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version - 1)));
updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version - 1)));
// test that version hasn't changed
assertJQ(req("qt","/get","id","1")
@ -194,10 +196,10 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
// now simulate a normal delete from the leader
version += 5;
updateJ(jsonDelId("1"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_",Long.toString(version)));
updateJ(jsonDelId("1"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_",Long.toString(version)));
// make sure a reordered add doesn't take affect.
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that it's still deleted
assertJQ(req("qt","/get","id","1")
@ -208,7 +210,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
assertU(commit());
// make sure a reordered add doesn't take affect.
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","1", "_version_",Long.toString(version - 1))), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that it's still deleted
assertJQ(req("qt","/get","id","1")
@ -1071,7 +1073,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
if (oper < commitPercent + deletePercent) {
verbose("deleting id",id,"val=",nextVal,"version",version);
Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
// TODO: returning versions for these types of updates is redundant
// but if we do return, they had better be equal
@ -1093,7 +1095,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
} else {
verbose("adding id", id, "val=", nextVal,"version",version);
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
if (returnedVersion != null) {
assertEquals(version, returnedVersion.longValue());
}
@ -1341,7 +1343,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
if (oper < commitPercent + deletePercent) {
verbose("deleting id",id,"val=",nextVal,"version",version);
Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), SEEN_LEADER,SEEN_LEADER_VAL));
Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params("_version_",Long.toString(-version), DISTRIB_UPDATE_PARAM,FROM_LEADER));
// TODO: returning versions for these types of updates is redundant
// but if we do return, they had better be equal
@ -1363,7 +1365,7 @@ public class TestRealTimeGet extends SolrTestCaseJ4 {
} else {
verbose("adding id", id, "val=", nextVal,"version",version);
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(SEEN_LEADER,SEEN_LEADER_VAL));
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), "_version_",Long.toString(version)), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
if (returnedVersion != null) {
assertEquals(version, returnedVersion.longValue());
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -39,10 +39,14 @@ import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
public class TestRecovery extends SolrTestCaseJ4 {
private static String SEEN_LEADER_VAL="true"; // value that means we've seen the leader and have version info (i.e. we are a non-leader replica)
// means that we've seen the leader and have version info (i.e. we are a non-leader replica)
private static String FROM_LEADER = DistribPhase.FROMLEADER.toString();
private static int timeout=60; // acquire timeout in seconds. change this to a huge number when debugging to prevent threads from advancing.
// TODO: fix this test to not require FSDirectory
@ -214,12 +218,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-1017"));
updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(SEEN_LEADER,SEEN_LEADER_VAL));
deleteAndGetVersion("B1", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2010"));
updateJ(jsonAdd(sdoc("id","B1", "_version_","1010")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B11", "_version_","1015")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonDelQ("id:B1 id:B11 id:B2 id:B3"), params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-1017"));
updateJ(jsonAdd(sdoc("id","B2", "_version_","1020")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B3", "_version_","1030")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B1", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2010"));
assertJQ(req("qt","/get", "getVersions","6")
,"=={'versions':[-2010,1030,1020,-1017,1015,1010]}"
@ -272,18 +276,18 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(1030L, ver.longValue());
// add a reordered doc that shouldn't overwrite one in the index
updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B3", "_version_","3")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// reorder two buffered updates
updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(SEEN_LEADER,SEEN_LEADER_VAL));
deleteAndGetVersion("B4", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-940")); // this update should not take affect
updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B4", "_version_","1040")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
deleteAndGetVersion("B4", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-940")); // this update should not take affect
updateJ(jsonAdd(sdoc("id","B6", "_version_","1060")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B5", "_version_","1050")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","B8", "_version_","1080")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// test that delete by query is at least buffered along with everything else so it will delete the
// currently buffered id:8 (even if it doesn't currently support versioning)
updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-3000"));
updateJ("{\"delete\": { \"query\":\"id:B2 OR id:B8\" }}", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-3000"));
assertJQ(req("qt","/get", "getVersions","13")
,"=={'versions':[-3000,1080,1050,1060,-940,1040,3,-2010,1030,1020,-1017,1015,1010]}" // the "3" appears because versions aren't checked while buffering
@ -298,12 +302,12 @@ public class TestRecovery extends SolrTestCaseJ4 {
logReplay.release(1);
// now add another update
updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","B7", "_version_","1070")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// a reordered update that should be dropped
deleteAndGetVersion("B5", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-950"));
deleteAndGetVersion("B5", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-950"));
deleteAndGetVersion("B6", params(SEEN_LEADER,SEEN_LEADER_VAL, "_version_","-2060"));
deleteAndGetVersion("B6", params(DISTRIB_UPDATE_PARAM,FROM_LEADER, "_version_","-2060"));
logReplay.release(1000);
UpdateLog.RecoveryInfo recInfo = rinfoFuture.get();
@ -376,14 +380,14 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id", "C4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id", "C5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
logReplay.release(1000);
rinfoFuture = ulog.applyBufferedUpdates();
@ -395,17 +399,17 @@ public class TestRecovery extends SolrTestCaseJ4 {
);
// this time add some docs first before buffering starts (so tlog won't be at pos 0)
updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C100", "_version_","200")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C101", "_version_","201")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C103", "_version_","203")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C104", "_version_","204")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
ulog.bufferUpdates();
updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C105", "_version_","205")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C106", "_version_","206")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
rinfoFuture = ulog.applyBufferedUpdates();
rinfo = rinfoFuture.get();
@ -428,14 +432,14 @@ public class TestRecovery extends SolrTestCaseJ4 {
ulog.bufferUpdates();
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C301", "_version_","998")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_","999")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertTrue(ulog.dropBufferedUpdates());
// make sure we can overwrite with a lower version
// TODO: is this functionality needed?
updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C301", "_version_","301")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","C302", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertU(commit());
@ -451,7 +455,7 @@ public class TestRecovery extends SolrTestCaseJ4 {
);
updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","C2", "_version_","302")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
@ -494,9 +498,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
ulog.bufferUpdates();
// simulate updates from a leader
updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q1", "_version_","101")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q2", "_version_","102")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q3", "_version_","103")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertEquals(UpdateLog.State.BUFFERING, ulog.getState());
req.close();
@ -526,9 +530,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
assertTrue((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0);
// now do some normal non-buffered adds
updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","Q4", "_version_","114")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q5", "_version_","115")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","Q6", "_version_","116")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
assertU(commit());
req.close();
@ -816,9 +820,9 @@ public class TestRecovery extends SolrTestCaseJ4 {
// Now test that the bad log file doesn't mess up retrieving latest versions
//
updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(SEEN_LEADER,SEEN_LEADER_VAL));
updateJ(jsonAdd(sdoc("id","F4", "_version_","104")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F5", "_version_","105")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
updateJ(jsonAdd(sdoc("id","F6", "_version_","106")), params(DISTRIB_UPDATE_PARAM,FROM_LEADER));
// This currently skips the bad log file and also returns the version of the clearIndex (del *:*)
// assertJQ(req("qt","/get", "getVersions","6"), "/versions==[106,105,104]");

View File

@ -28,9 +28,15 @@ import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.DistribPhase;
public class PeerSyncTest extends BaseDistributedSearchTestCase {
private static int numVersions = 100; // number of versions to use when syncing
private ModifiableSolrParams seenLeader = params("leader","true");
private final String FROM_LEADER = DistribPhase.FROMLEADER.toString();
private ModifiableSolrParams seenLeader =
params(DISTRIB_UPDATE_PARAM, FROM_LEADER);
public PeerSyncTest() {
fixShardCount = true;
@ -112,9 +118,9 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
v=1000;
add(client0, seenLeader, sdoc("id","1000","_version_",++v));
add(client0, seenLeader, sdoc("id","1001","_version_",++v));
delQ(client0, params("leader","true","_version_",Long.toString(-++v)), "id:1001 OR id:1002");
delQ(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:1001 OR id:1002");
add(client0, seenLeader, sdoc("id","1002","_version_",++v));
del(client0, params("leader","true","_version_",Long.toString(-++v)), "1000");
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "1000");
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
@ -124,18 +130,18 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
SolrServer client = client0;
add(client, seenLeader, sdoc("id","2000","_version_",++v));
add(client, seenLeader, sdoc("id","2001","_version_",++v));
delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002");
delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002");
add(client, seenLeader, sdoc("id","2002","_version_",++v));
del(client, params("leader","true","_version_",Long.toString(-++v)), "2000");
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
v=2000;
client = client1;
add(client, seenLeader, sdoc("id","2000","_version_",++v));
++v; // pretend we missed the add of 2001. peersync should retrieve it, but should also retrieve any deleteByQuery objects after it
// add(client, seenLeader, sdoc("id","2001","_version_",++v));
delQ(client, params("leader","true","_version_",Long.toString(-++v)), "id:2001 OR id:2002");
delQ(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "id:2001 OR id:2002");
add(client, seenLeader, sdoc("id","2002","_version_",++v));
del(client, params("leader","true","_version_",Long.toString(-++v)), "2000");
del(client, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_",Long.toString(-++v)), "2000");
assertSync(client1, numVersions, true, shardsArr[0]);
client0.commit(); client1.commit(); queryAndCompare(params("q", "*:*", "sort","_version_ desc"), client0, client1);
@ -149,12 +155,12 @@ public class PeerSyncTest extends BaseDistributedSearchTestCase {
// the reorder in application shouldn't affect anything
add(client0, seenLeader, sdoc("id","3000","_version_",3001));
add(client1, seenLeader, sdoc("id","3000","_version_",3001));
del(client0, params("leader","true","_version_","3000"), "3000");
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3000"), "3000");
// this should cause us to retrieve an add tha was previously deleted
add(client0, seenLeader, sdoc("id","3001","_version_",3003));
del(client0, params("leader","true","_version_","3001"), "3004");
del(client1, params("leader","true","_version_","3001"), "3004");
del(client0, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004");
del(client1, params(DISTRIB_UPDATE_PARAM,FROM_LEADER,"_version_","3001"), "3004");
// this should cause us to retrieve an older add that was overwritten
add(client0, seenLeader, sdoc("id","3002","_version_",3004));

View File

@ -44,7 +44,6 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.solr.core.SolrCore.verbose;
import static org.apache.solr.update.processor.DistributedUpdateProcessor.SEEN_LEADER;
public class TestUpdate extends SolrTestCaseJ4 {
@BeforeClass

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
/**
* A passthrough processor that does nothing.
*/
public class CustomUpdateRequestProcessor extends UpdateRequestProcessor {
public CustomUpdateRequestProcessor( UpdateRequestProcessor next) {
super(next);
}
}

View File

@ -39,8 +39,7 @@ public class CustomUpdateRequestProcessorFactory extends UpdateRequestProcessorF
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
// TODO Auto-generated method stub
return null;
return new CustomUpdateRequestProcessor(next);
}
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@ -18,9 +18,15 @@
package org.apache.solr.update.processor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.CustomUpdateRequestProcessor;
import org.apache.solr.util.AbstractSolrTestCase;
import java.util.Arrays;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
*
*/
@ -52,4 +58,37 @@ public class UpdateRequestProcessorFactoryTest extends AbstractSolrTestCase {
// Make sure the NamedListArgs got through ok
assertEquals( "{name={n8=88,n9=99}}", link.args.toString() );
}
public void testUpdateDistribChainSkipping() throws Exception {
SolrCore core = h.getCore();
for (final String name : Arrays.asList("distrib-chain-explicit",
"distrib-chain-implicit",
"distrib-chain-noop")) {
UpdateRequestProcessor proc;
UpdateRequestProcessorChain chain = core.getUpdateProcessingChain(name);
assertNotNull(name, chain);
// either explicitly, or because of injection
assertEquals(name + " chain length", 4,
chain.getFactories().length);
// Custom comes first in all three of our chains
proc = chain.createProcessor(req(), new SolrQueryResponse());
assertTrue(name + " first processor isn't a CustomUpdateRequestProcessor: "
+ proc.getClass().getName(),
proc instanceof CustomUpdateRequestProcessor);
// varies depending on chain, but definitely shouldn't be Custom
proc = chain.createProcessor(req(DISTRIB_UPDATE_PARAM, "non_blank_value"),
new SolrQueryResponse());
assertFalse(name + " post distrib proc should not be a CustomUpdateRequestProcessor: "
+ proc.getClass().getName(),
proc instanceof CustomUpdateRequestProcessor);
}
}
}