SOLR-11654 TimePartitionedUpdateProcessor.lookupShardLeaderOfCollection should route to the ideal shard

This commit is contained in:
Gus Heck 2018-06-21 11:45:59 -04:00 committed by David Smiley
parent 8eb006e4ed
commit bc9ac994a4
4 changed files with 439 additions and 50 deletions

View File

@ -109,6 +109,9 @@ Optimizations
* SOLR-12455: Refactor JSON serialization code into SolrJ package (noble)
* SOLR-11654: Time Routed Alias will now route documents to the ideal shard of a collection, thus avoiding a hop.
Usually documents were already routed well but not always. (Gus Heck, David Smiley)
Other Changes
----------------------

View File

@ -30,11 +30,14 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.api.collections.MaintainRoutedAliasCmd;
import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Aliases;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
@ -46,6 +49,7 @@ import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.admin.CollectionsHandler;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
@ -90,9 +94,11 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
private final SolrCmdDistributor cmdDistrib;
private final CollectionsHandler collHandler;
private final SolrParams outParamsToLeader;
private final CloudDescriptor cloudDesc;
private List<Map.Entry<Instant, String>> parsedCollectionsDesc; // k=timestamp (start), v=collection. Sorted descending
private Aliases parsedCollectionsAliases; // a cached reference to the source of what we parse into parsedCollectionsDesc
private SolrQueryRequest req;
public static UpdateRequestProcessor wrap(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
//TODO get from "Collection property"
@ -118,7 +124,9 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
super(next);
assert aliasDistribPhase == DistribPhase.NONE;
final SolrCore core = req.getCore();
this.thisCollection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
cloudDesc = core.getCoreDescriptor().getCloudDescriptor();
this.thisCollection = cloudDesc.getCollectionName();
this.req = req;
CoreContainer cc = core.getCoreContainer();
zkController = cc.getZkController();
cmdDistrib = new SolrCmdDistributor(cc.getUpdateShardHandler());
@ -157,7 +165,8 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
final Object routeValue = cmd.getSolrInputDocument().getFieldValue(timeRoutedAlias.getRouteField());
SolrInputDocument solrInputDocument = cmd.getSolrInputDocument();
final Object routeValue = solrInputDocument.getFieldValue(timeRoutedAlias.getRouteField());
final Instant routeTimestamp = parseRouteKey(routeValue);
updateParsedCollectionAliases();
@ -216,7 +225,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
super.processAdd(cmd);
} else {
// send to the right collection
SolrCmdDistributor.Node targetLeaderNode = lookupShardLeaderOfCollection(targetCollection);
SolrCmdDistributor.Node targetLeaderNode = routeDocToSlice(targetCollection, solrInputDocument);
cmdDistrib.distribAdd(cmd, Collections.singletonList(targetLeaderNode), new ModifiableSolrParams(outParamsToLeader));
}
}
@ -357,6 +366,16 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
}
private SolrCmdDistributor.Node routeDocToSlice(String collection, SolrInputDocument doc) {
SchemaField uniqueKeyField = req.getSchema().getUniqueKeyField();
// schema might not have key field...
String idFieldName = uniqueKeyField == null ? null : uniqueKeyField.getName();
String idValue = uniqueKeyField == null ? null : doc.getFieldValue(idFieldName).toString();
DocCollection coll = zkController.getClusterState().getCollection(collection);
Slice slice = coll.getRouter().getTargetSlice(idValue, doc, null, req.getParams(), coll);
return getLeaderNode(collection, slice);
}
private List<SolrCmdDistributor.Node> lookupShardLeadersOfCollections() {
final Aliases aliases = zkController.getZkStateReader().getAliases();
List<String> collections = aliases.getCollectionAliasListMap().get(getAliasName());
@ -367,12 +386,15 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
}
private SolrCmdDistributor.Node lookupShardLeaderOfCollection(String collection) {
//TODO consider router to get the right slice. Refactor common code in CloudSolrClient & DistributedUrp
final Collection<Slice> activeSlices = zkController.getClusterState().getCollection(collection).getActiveSlices();
if (activeSlices.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Cannot route to collection " + collection);
}
final Slice slice = activeSlices.iterator().next();
return getLeaderNode(collection, slice);
}
private SolrCmdDistributor.Node getLeaderNode(String collection, Slice slice) {
//TODO when should we do StdNode vs RetryNode?
final Replica leader = slice.getLeader();
if (leader == null) {
@ -380,7 +402,7 @@ public class TimeRoutedAliasUpdateProcessor extends UpdateRequestProcessor {
"No 'leader' replica available for shard " + slice.getName() + " of collection " + collection);
}
return new SolrCmdDistributor.RetryNode(new ZkCoreNodeProps(leader), zkController.getZkStateReader(),
collection, null);
collection, slice.getName());
}
}

View File

@ -22,9 +22,12 @@ import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@ -33,9 +36,12 @@ import org.apache.lucene.util.IOUtils;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.ConfigSetAdminRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.request.V2Request;
import org.apache.solr.client.solrj.response.FieldStatsInfo;
import org.apache.solr.client.solrj.response.QueryResponse;
@ -45,12 +51,21 @@ import org.apache.solr.cloud.api.collections.TimeRoutedAlias;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@ -68,62 +83,35 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(2).configure();
configureCluster(4).configure();
}
@Before
public void doBefore() throws Exception {
solrClient = getCloudSolrClient(cluster);
//log this to help debug potential causes of problems
System.out.println("SolrClient: " + solrClient);
if (solrClient instanceof CloudSolrClient) {
System.out.println(((CloudSolrClient)solrClient).getClusterStateProvider());
System.out.println(((CloudSolrClient) solrClient).getClusterStateProvider());
}
}
@After
public void doAfter() throws Exception {
cluster.deleteAllCollections(); // deletes aliases too
solrClient.close();
}
@AfterClass
public static void finish() throws Exception {
IOUtils.close(solrClient);
}
@Slow
@Test
public void test() throws Exception {
// First create a configSet
// Then we create a collection with the name of the eventual config.
// We configure it, and ultimately delete the collection, leaving a modified config-set behind.
// Then when we create the "real" collections referencing this modified config-set.
assertEquals(0, new ConfigSetAdminRequest.Create()
.setConfigSetName(configName)
.setBaseConfigSetName("_default")
.process(solrClient).getStatus());
CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
// manipulate the config...
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
" 'add-updateprocessor' : {" +
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
" }," +
" 'add-updateprocessor' : {" + // for testing
" 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
" 'fieldName':'" + intField + "'" +
" }," +
"}").build()));
// only sometimes test with "tolerant" URP:
final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set' : {" +
" '_UPDATE' : {'processor':'" + urpNames + "'}" +
" }" +
"}").build()));
CollectionAdminRequest.deleteCollection(configName).process(solrClient);
assertTrue(
new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
.contains(configName)
);
String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
createConfigSet(configName);
// Start with one collection manually created (and use higher numShards & replicas than we'll use for others)
// This tests we may pre-create the collection and it's acceptable.
@ -135,9 +123,11 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
List<String> retrievedConfigSetNames = new ConfigSetAdminRequest.List().process(solrClient).getConfigSets();
List<String> expectedConfigSetNames = Arrays.asList("_default", configName);
assertTrue("We only expect 2 configSets",
expectedConfigSetNames.size() == retrievedConfigSetNames.size());
assertTrue("ConfigNames should be :" + expectedConfigSetNames, expectedConfigSetNames.containsAll(retrievedConfigSetNames) && retrievedConfigSetNames.containsAll(expectedConfigSetNames));
// config sets leak between tests so we can't be any more specific than this on the next 2 asserts
assertTrue("We expect at least 2 configSets",
retrievedConfigSetNames.size() >= expectedConfigSetNames.size());
assertTrue("ConfigNames should include :" + expectedConfigSetNames, retrievedConfigSetNames.containsAll(expectedConfigSetNames));
CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
CollectionAdminRequest.createCollection("_unused_", configName, 1, 1)
@ -219,6 +209,144 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
assertInvariants(alias + "_2017-10-27", alias + "_2017-10-26");
}
private void createConfigSet(String configName) throws SolrServerException, IOException {
// First create a configSet
// Then we create a collection with the name of the eventual config.
// We configure it, and ultimately delete the collection, leaving a modified config-set behind.
// Later we create the "real" collections referencing this modified config-set.
assertEquals(0, new ConfigSetAdminRequest.Create()
.setConfigSetName(configName)
.setBaseConfigSetName("_default")
.process(solrClient).getStatus());
CollectionAdminRequest.createCollection(configName, configName, 1, 1).process(solrClient);
// manipulate the config...
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set-user-property' : {'update.autoCreateFields':false}," + // no data driven
" 'add-updateprocessor' : {" +
" 'name':'tolerant', 'class':'solr.TolerantUpdateProcessorFactory'" +
" }," +
// if other tracking tests are written, add another TUPF with a unique group name, don't re-use this one!
// See TrackingUpdateProcessorFactory javadocs for details...
" 'add-updateprocessor' : {" +
" 'name':'tracking-testSliceRouting', 'class':'solr.TrackingUpdateProcessorFactory', 'group':'testSliceRouting'" +
" }," +
" 'add-updateprocessor' : {" + // for testing
" 'name':'inc', 'class':'" + IncrementURPFactory.class.getName() + "'," +
" 'fieldName':'" + intField + "'" +
" }," +
"}").build()));
// only sometimes test with "tolerant" URP:
final String urpNames = "inc" + (random().nextBoolean() ? ",tolerant" : "");
checkNoError(solrClient.request(new V2Request.Builder("/collections/" + configName + "/config/params")
.withMethod(SolrRequest.METHOD.POST)
.withPayload("{" +
" 'set' : {" +
" '_UPDATE' : {'processor':'" + urpNames + "'}" +
" }" +
"}").build()));
CollectionAdminRequest.deleteCollection(configName).process(solrClient);
assertTrue(
new ConfigSetAdminRequest.List().process(solrClient).getConfigSets()
.contains(configName)
);
}
/**
* Test that the Tracking Update Processor Factory routes documents to leader shards and thus
* avoids the possibility of introducing an extra hop to find the leader.
*
* @throws Exception when it blows up unexpectedly :)
*/
@Slow
@Nightly
@Test
public void testSliceRouting() throws Exception {
String configName = TimeRoutedAliasUpdateProcessorTest.configName + getTestName();
createConfigSet(configName);
// each collection has 4 shards with 3 replicas for 12 possible destinations
// 4 of which are leaders, and 8 of which should fail this test.
final int numShards = 1 + random().nextInt(4);
final int numReplicas = 1 + random().nextInt(3);
CollectionAdminRequest.createTimeRoutedAlias(alias, "2017-10-23T00:00:00Z", "+1DAY", timeField,
CollectionAdminRequest.createCollection("_unused_", configName, numShards, numReplicas)
.setMaxShardsPerNode(numReplicas))
.process(solrClient);
// cause some collections to be created
assertUpdateResponse(solrClient.add(alias, new SolrInputDocument("id","1","timestamp_dt", "2017-10-25T00:00:00Z")));
assertUpdateResponse(solrClient.commit(alias));
// wait for all the collections to exist...
waitCol("2017-10-23", numShards);
waitCol("2017-10-24", numShards);
waitCol("2017-10-25", numShards);
// at this point we now have 3 collections with 4 shards each, and 3 replicas per shard for a total of
// 36 total replicas, 1/3 of which are leaders. We will add 3 docs and each has a 33% chance of hitting a
// leader randomly and not causing a failure if the code is broken, but as a whole this test will therefore only have
// about a 3.6% false positive rate (0.33^3). If that's not good enough, add more docs or more replicas per shard :).
try {
TrackingUpdateProcessorFactory.startRecording(getTestName());
// cause some collections to be created
ModifiableSolrParams params = params("post-processor", "tracking-" + getTestName());
assertUpdateResponse(add(alias, Arrays.asList(
sdoc("id", "2", "timestamp_dt", "2017-10-24T00:00:00Z"),
sdoc("id", "3", "timestamp_dt", "2017-10-25T00:00:00Z"),
sdoc("id", "4", "timestamp_dt", "2017-10-23T00:00:00Z")),
params));
} finally {
TrackingUpdateProcessorFactory.stopRecording(getTestName());
}
try (CloudSolrClient cloudSolrClient = getCloudSolrClient(cluster)) {
ClusterStateProvider clusterStateProvider = cloudSolrClient.getClusterStateProvider();
clusterStateProvider.connect();
Set<String> leaders = getLeaderCoreNames(clusterStateProvider.getClusterState());
assertEquals("should have " + 3 * numShards + " leaders, " + numShards + " per collection", 3 * numShards, leaders.size());
List<UpdateCommand> updateCommands = TrackingUpdateProcessorFactory.commandsForGroup(getTestName());
assertEquals(3, updateCommands.size());
for (UpdateCommand updateCommand : updateCommands) {
String node = (String) updateCommand.getReq().getContext().get(TrackingUpdateProcessorFactory.REQUEST_NODE);
assertTrue("Update was not routed to a leader (" + node + " not in list of leaders" + leaders, leaders.contains(node));
}
}
}
private Set<String> getLeaderCoreNames(ClusterState clusterState) {
Set<String> leaders = new TreeSet<>(); // sorted just to make it easier to read when debugging...
List<JettySolrRunner> jettySolrRunners = cluster.getJettySolrRunners();
for (JettySolrRunner jettySolrRunner : jettySolrRunners) {
List<CoreDescriptor> coreDescriptors = jettySolrRunner.getCoreContainer().getCoreDescriptors();
for (CoreDescriptor core : coreDescriptors) {
String nodeName = jettySolrRunner.getNodeName();
String collectionName = core.getCollectionName();
DocCollection collectionOrNull = clusterState.getCollectionOrNull(collectionName);
List<Replica> leaderReplicas = collectionOrNull.getLeaderReplicas(nodeName);
if (leaderReplicas != null) {
for (Replica leaderReplica : leaderReplicas) {
leaders.add(leaderReplica.getCoreName());
}
}
}
}
return leaders;
}
private void waitCol(final String datePart, int slices) {
waitForState("waiting for collections to be created",alias + "_" + datePart,
(liveNodes, collectionState) -> collectionState.getActiveSlices().size() == slices);
}
private void testFailedDocument(Instant timestamp, String errorMsg) throws SolrServerException, IOException {
try {
final UpdateResponse resp = solrClient.add(alias, newDoc(timestamp));
@ -353,6 +481,16 @@ public class TimeRoutedAliasUpdateProcessorTest extends SolrCloudTestCase {
intField, "0"); // always 0
}
/** Adds the docs to Solr via {@link #solrClient} with the params */
private static UpdateResponse add(String collection, Collection<SolrInputDocument> docs, SolrParams params) throws SolrServerException, IOException {
UpdateRequest req = new UpdateRequest();
if (params != null) {
req.setParams(new ModifiableSolrParams(params));// copy because will be modified
}
req.add(docs);
return req.process(solrClient, collection);
}
@Test
public void testParse() {
assertEquals(Instant.parse("2017-10-02T03:04:05Z"),

View File

@ -0,0 +1,226 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.mina.util.ConcurrentHashSet;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import org.apache.solr.update.UpdateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This Factory is similar to {@link RecordingUpdateProcessorFactory}, but with the goal of
* tracking requests across multiple collections/shards/replicas in a CloudSolrTestCase.
* It can optionally save references to the commands it receives inm a single global
* Map&lt;String,BlockingQueue&gt; keys in the map are arbitrary, but the intention is that tests
* generate a key that is unique to that test, and configure the factory with the key as "group name"
* to avoid cross talk between tests. Tests can poll for requests from a group to observe that the expected
* commands are executed. By default, this factory does nothing except return the "next"
* processor from the chain unless it's told to {@link #startRecording()} in which case all factories
* with the same group will begin recording. It is critical that tests utilizing this
* processor call {@link #close()} on at least one group member after the test finishes. The requests associated with
* the commands are also provided with a
*
* This class is only for unit test purposes and should not be used in any production capacity. It presumes all nodes
* exist within the same JVM (i. e. MiniSolrCloudCluster).
*/
public final class TrackingUpdateProcessorFactory
extends UpdateRequestProcessorFactory implements Closeable {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String REQUEST_COUNT = "TrackingUpdateProcessorRequestCount";
public static final String REQUEST_NODE = "TrackingUpdateProcessorRequestNode";
private final static Map<String,Set<TrackingUpdateProcessorFactory>> groupMembership = new ConcurrentHashMap<>();
private final static Map<String,AtomicInteger> groupSerialNums = new ConcurrentHashMap<>();
/**
* The map of group queues containing commands that were recorded
* @see #startRecording
*/
private final static Map<String, List<UpdateCommand>> commandQueueMap = new ConcurrentHashMap<>();
private static final Object memoryConsistency = new Object();
private volatile boolean recording = false;
private String group = "default";
/**
* Get a copy of the queue for the group.
*
* @param group the name of the group to fetch
* @return A cloned queue containing the same elements as the queue held in commandQueueMap
*/
public static ArrayList<UpdateCommand> commandsForGroup(String group) {
synchronized (memoryConsistency) {
return new ArrayList<>(commandQueueMap.get(group));
}
}
public static void startRecording(String group) {
synchronized (memoryConsistency) {
Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group);
if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group " + group);
}
for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) {
trackingUpdateProcessorFactory.startRecording();
}
}
}
public static void stopRecording(String group) {
synchronized (memoryConsistency) {
Set<TrackingUpdateProcessorFactory> trackingUpdateProcessorFactories = groupMembership.get(group);
if (trackingUpdateProcessorFactories == null || trackingUpdateProcessorFactories.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "There are no trackingUpdateProcessors for group "
+ group + " available groups are:" + groupMembership.keySet());
}
for (TrackingUpdateProcessorFactory trackingUpdateProcessorFactory : trackingUpdateProcessorFactories) {
trackingUpdateProcessorFactory.stopRecording();
}
}
}
@Override
public void init(NamedList args) {
if (args != null && args.indexOf("group",0) >= 0) {
group = (String) args.get("group");
} else {
log.warn("TrackingUpdateProcessorFactory initialized without group configuration, using 'default' but this group is shared" +
"across the entire VM and guaranteed to have unpredictable behavior if used by more than one test");
}
// compute if absent to avoid replacing in the case of multiple "default"
commandQueueMap.computeIfAbsent(group, s -> new ArrayList<>());
groupMembership.computeIfAbsent(group,s-> new ConcurrentHashSet<>());
groupSerialNums.computeIfAbsent(group,s-> new AtomicInteger(0));
groupMembership.get(group).add(this);
}
/**
* @see #stopRecording
* @see #commandQueueMap
*/
public synchronized void startRecording() {
Set<TrackingUpdateProcessorFactory> facts = groupMembership.get(group);
// facts being null is a bug, all instances should have a group.
for (TrackingUpdateProcessorFactory fact : facts) {
fact.recording = true;
}
}
/** @see #startRecording */
public synchronized void stopRecording() {
Set<TrackingUpdateProcessorFactory> factories = groupMembership.get(group);
// facts being null is a bug, all instances should have a group.
for (TrackingUpdateProcessorFactory fact : factories) {
fact.recording = false;
}
}
@Override
@SuppressWarnings("resource")
public synchronized UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
return recording ? new RecordingUpdateRequestProcessor(group, next) : next;
}
@Override
public void close() {
commandQueueMap.remove(group);
groupMembership.get(group).clear();
}
private static final class RecordingUpdateRequestProcessor
extends UpdateRequestProcessor {
private String group;
public RecordingUpdateRequestProcessor(String group,
UpdateRequestProcessor next) {
super(next);
this.group = group;
}
private void record(UpdateCommand cmd) {
synchronized (memoryConsistency) {
String coreName = cmd.getReq().getCore().getName();
Map<Object, Object> context = cmd.getReq().getContext();
context.put(REQUEST_COUNT, groupSerialNums.get(group).incrementAndGet());
context.put(REQUEST_NODE, coreName);
List<UpdateCommand> commands = commandQueueMap.get(group);
commands.add(cmd.clone()); // important because cmd.clear() will be called
}
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
record(cmd);
super.processAdd(cmd);
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
record(cmd);
super.processDelete(cmd);
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
record(cmd);
super.processMergeIndexes(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
record(cmd);
super.processCommit(cmd);
}
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
record(cmd);
super.processRollback(cmd);
}
@Override
protected void doClose() {
super.doClose();
groupMembership.get(group).remove(this);
}
}
}