SOLR-13081: Let in-place update work with route.field

This commit is contained in:
Mikhail Khludnev 2019-04-25 09:59:07 +03:00
parent 9c77889217
commit 6d94631538
5 changed files with 173 additions and 4 deletions

View File

@ -218,6 +218,8 @@ Bug Fixes
* SOLR-13281: Fixed NPE in DocExpirationUpdateProcessorFactory (Munendra S N, Tomás Fernández Löbbe)
* SOLR-13081: In-Place Update doesn't work with route.field (Dr Oleg Savrasov via Mikhail Khludnev)
Improvements
----------------------

View File

@ -16,6 +16,8 @@
*/
package org.apache.solr.update.processor;
import static org.apache.solr.common.params.CommonParams.ID;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
@ -35,11 +37,14 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
@ -54,8 +59,6 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.ID;
/**
* @lucene.experimental
*/
@ -176,11 +179,13 @@ public class AtomicUpdateDocumentMerger {
return Collections.emptySet();
}
String routeFieldOrNull = getRouteField(cmd);
// first pass, check the things that are virtually free,
// and bail out early if anything is obviously not a valid in-place update
for (String fieldName : sdoc.getFieldNames()) {
if (fieldName.equals(uniqueKeyFieldName)
|| fieldName.equals(CommonParams.VERSION_FIELD)) {
|| fieldName.equals(CommonParams.VERSION_FIELD)
|| fieldName.equals(routeFieldOrNull)) {
continue;
}
Object fieldValue = sdoc.getField(fieldName).getValue();
@ -246,6 +251,19 @@ public class AtomicUpdateDocumentMerger {
return candidateFields;
}
private static String getRouteField(AddUpdateCommand cmd) {
String result = null;
SolrCore core = cmd.getReq().getCore();
CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
if (cloudDescriptor != null) {
String collectionName = cloudDescriptor.getCollectionName();
ZkController zkController = core.getCoreContainer().getZkController();
DocCollection collection = zkController.getClusterState().getCollection(collectionName);
result = collection.getRouter().getRouteField(collection);
}
return result;
}
/**
*
* @param fullDoc the full doc to be compared against

View File

@ -20,6 +20,7 @@
<uniqueKey>id</uniqueKey>
<field name="id" type="string" indexed="true" stored="true" docValues="true"/>
<field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
<field name="shardName" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
<!-- specific schema fields for dv in-place updates -->
<field name="inplace_updatable_float" type="float" indexed="false" stored="false" docValues="true" />

View File

@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import static org.hamcrest.CoreMatchers.is;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.SolrCloudTestCase;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestInPlaceUpdateWithRouteField extends SolrCloudTestCase {
private static final int NUMBER_OF_DOCS = 100;
private static final String COLLECTION = "collection1";
private static final String[] shards = new String[]{"shard1","shard2","shard3"};
@BeforeClass
public static void setupCluster() throws Exception {
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
String configName = "solrCloudCollectionConfig";
int nodeCount = TestUtil.nextInt(random(), 1, 3);
configureCluster(nodeCount)
.addConfig(configName, configDir)
.configure();
Map<String, String> collectionProperties = new HashMap<>();
collectionProperties.put("config", "solrconfig-tlog.xml" );
collectionProperties.put("schema", "schema-inplace-updates.xml");
int replicas = 2;
// router field can be defined either for ImplicitDocRouter or CompositeIdRouter
boolean implicit = random().nextBoolean();
String routerName = implicit ? "implicit":"compositeId";
Create createCmd = CollectionAdminRequest.createCollection(COLLECTION, configName, shards.length, replicas)
.setMaxShardsPerNode(shards.length * replicas)
.setProperties(collectionProperties)
.setRouterName(routerName)
.setRouterField("shardName");
if (implicit) {
createCmd.setShards(Arrays.stream(shards).collect(Collectors.joining(",")));
}
createCmd.process(cluster.getSolrClient());
}
@Test
public void testUpdatingDocValuesWithRouteField() throws Exception {
new UpdateRequest()
.deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTION);
new UpdateRequest().add(createDocs(NUMBER_OF_DOCS)).commit(cluster.getSolrClient(), COLLECTION);
int id = TestUtil.nextInt(random(), 1, NUMBER_OF_DOCS - 1);
SolrDocument solrDocument = queryDoc(id);
Long initialVersion = (Long) solrDocument.get("_version_");
Integer luceneDocId = (Integer) solrDocument.get("[docid]");
String shardName = (String) solrDocument.get("shardName");
Assert.assertThat(solrDocument.get("inplace_updatable_int"), is(id));
int newDocValue = TestUtil.nextInt(random(), 1, 2 * NUMBER_OF_DOCS - 1);
SolrInputDocument sdoc = sdoc("id", ""+id,
// use route field in update command
"shardName", shardName,
"inplace_updatable_int", map("set", newDocValue));
UpdateRequest updateRequest = new UpdateRequest()
.add(sdoc);
updateRequest.commit(cluster.getSolrClient(), COLLECTION);
solrDocument = queryDoc(id);
Long newVersion = (Long) solrDocument.get("_version_");
Assert.assertTrue("Version of updated document must be greater than original one",
newVersion > initialVersion);
Assert.assertThat( "Doc value must be updated", solrDocument.get("inplace_updatable_int"), is(newDocValue));
Assert.assertThat("Lucene doc id should not be changed for In-Place Updates.", solrDocument.get("[docid]"), is(luceneDocId));
try {
sdoc.remove("shardName");
new UpdateRequest()
.add(sdoc).process(cluster.getSolrClient(), COLLECTION);
fail("expect an exception w/o route field");
}catch(SolrException ex) {
assertThat("expecting 400 in "+ex.getMessage(), ex.code(), is(400));
}
}
private Collection<SolrInputDocument> createDocs(int number) {
List<SolrInputDocument> result = new ArrayList<>();
for (int i = 0; i < number; i++) {
String randomShard = shards[random().nextInt(shards.length)];
result.add(sdoc("id", String.valueOf(i),
"shardName", randomShard,
"inplace_updatable_int", i));
}
return result;
}
private SolrDocument queryDoc(int id) throws SolrServerException, IOException {
SolrQuery query = new SolrQuery(
"q", "id:" + id,
"fl", "_version_,inplace_updatable_int,[docid],shardName",
"targetCollection", COLLECTION);
QueryResponse response = cluster.getSolrClient().query(COLLECTION, query);
SolrDocumentList result = (SolrDocumentList) response.getResponse().get("response");
Assert.assertThat(result.getNumFound(), is(1L));
return result.get(0);
}
}

View File

@ -48,7 +48,7 @@ public abstract class DocRouter {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerName + "'");
}
protected String getRouteField(DocCollection coll) {
public String getRouteField(DocCollection coll) {
if (coll == null) return null;
Map m = (Map) coll.get(DOC_ROUTER);
if (m == null) return null;