SOLR-12803: Ensure CUSC routes docs to right cores

ConcurrentUpdateSolrClient can batch together many documents when making
an indexing request to Solr.  When adding an update request to the
current batch being made, it checks that the query-parameters of the
docs being added match those already in the batch.  But prior to this
commit it never checked that the collections/cores were the same.

This could result in documents being sent to the wrong collection if the
same client is used to index documents to two different
cores/collections simultaneously.

This commit addresses this problem, ensuring that documents aren't added
to a batch directed at a different core/collection.
This commit is contained in:
Jason Gerlowski 2018-10-07 09:51:43 -04:00
parent e2b8beccb0
commit 367bdf7f74
4 changed files with 110 additions and 2 deletions

View File

@ -148,6 +148,8 @@ Other Changes
Bug Fixes
----------------------
* SOLR-12803: Ensure ConcurrentUpdateSolrClient sends documents to correct collection (Jason Gerlowski)
* SOLR-11836: FacetStream works with bucketSizeLimit of -1 which will fetch all the buckets.
(Alfonso Muñoz-Pomer Fuentes, Amrit Sarkar via Varun Thacker)

View File

@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.request.RequestWriter;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.util.ClientUtils;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
@ -243,6 +244,7 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
final boolean isXml = ClientUtils.TEXT_XML.equals(contentType);
final ModifiableSolrParams origParams = new ModifiableSolrParams(update.getRequest().getParams());
final String origTargetCollection = update.getCollection();
EntityTemplate template = new EntityTemplate(new ContentProducer() {
@ -256,8 +258,8 @@ public class ConcurrentUpdateSolrClient extends SolrClient {
while (upd != null) {
UpdateRequest req = upd.getRequest();
SolrParams currentParams = new ModifiableSolrParams(req.getParams());
if (!origParams.toNamedList().equals(currentParams.toNamedList())) {
queue.add(upd); // params are different, push back to queue
if (!origParams.toNamedList().equals(currentParams.toNamedList()) || !StringUtils.equals(origTargetCollection, upd.getCollection())) {
queue.add(upd); // Request has different params or destination core/collection, return to queue
break;
}

View File

@ -22,4 +22,14 @@ public class StringUtils {
return (s == null) || s.isEmpty();
}
/**
* A "null-safe" equals method. Returns true if the two provided references are both null, or if they are string-equal.
*/
public static boolean equals(String first, String second) {
if (first == null) {
return second == null;
}
return first.equals(second);
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.impl;
import java.io.File;
import java.io.IOException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.util.ExternalPaths;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* {@link ConcurrentUpdateSolrClient} reuses the same HTTP connection to send multiple requests. These tests ensure
* that this connection-reuse never results in documents being sent to the wrong collection. See SOLR-12803
*/
public class ConcurrentUpdateSolrClientMultiCollectionTest extends SolrCloudTestCase {
private static final String COLLECTION_ONE_NAME = "collection1";
private static final String COLLECTION_TWO_NAME = "collection2";
private String solrUrl;
@BeforeClass
public static void setupCluster() throws Exception {
configureCluster(1)
.addConfig("conf", new File(ExternalPaths.TECHPRODUCTS_CONFIGSET).toPath())
.configure();
}
@Before
public void createCollections() throws Exception {
solrUrl = cluster.getJettySolrRunner(0).getBaseUrl().toString();
CollectionAdminRequest.createCollection(COLLECTION_ONE_NAME, "conf", 1, 1).process(cluster.getSolrClient());
CollectionAdminRequest.createCollection(COLLECTION_TWO_NAME, "conf", 1, 1).process(cluster.getSolrClient());
}
@After
public void deleteCollections() throws Exception {
cluster.deleteAllCollections();
}
@Test
public void testEnsureDocumentsSentToCorrectCollection() throws Exception {
int numTotalDocs = 1000;
int numExpectedPerCollection = numTotalDocs / 2;
try (SolrClient client = new ConcurrentUpdateSolrClient.Builder(solrUrl)
.withQueueSize(numTotalDocs).build()) {
splitDocumentsAcrossCollections(client, numTotalDocs);
assertEquals(numExpectedPerCollection, client.query(COLLECTION_ONE_NAME, new SolrQuery("*:*")).getResults().getNumFound());
assertEquals(numExpectedPerCollection, client.query(COLLECTION_TWO_NAME, new SolrQuery("*:*")).getResults().getNumFound());
}
}
private void splitDocumentsAcrossCollections(SolrClient client, int numTotalDocs) throws IOException, SolrServerException {
for (int docNum = 0; docNum < numTotalDocs; docNum++) {
final SolrInputDocument doc = new SolrInputDocument();
doc.setField("id", "value" + docNum);
if (docNum %2 == 0) {
client.add(COLLECTION_ONE_NAME, doc);
} else {
client.add(COLLECTION_TWO_NAME, doc);
}
}
client.commit(COLLECTION_ONE_NAME);
client.commit(COLLECTION_TWO_NAME);
}
}