mirror of https://github.com/apache/lucene.git
SOLR-13081: Let in-place update work with route.field
This commit is contained in:
parent
f08ddbc713
commit
13be0b1af2
|
@ -184,6 +184,8 @@ Bug Fixes
|
||||||
|
|
||||||
* SOLR-13281: Fixed NPE in DocExpirationUpdateProcessorFactory (Munendra S N, Tomás Fernández Löbbe)
|
* SOLR-13281: Fixed NPE in DocExpirationUpdateProcessorFactory (Munendra S N, Tomás Fernández Löbbe)
|
||||||
|
|
||||||
|
* SOLR-13081: In-Place Update doesn't work with route.field (Dr Oleg Savrasov via Mikhail Khludnev)
|
||||||
|
|
||||||
Improvements
|
Improvements
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.solr.update.processor;
|
package org.apache.solr.update.processor;
|
||||||
|
|
||||||
|
import static org.apache.solr.common.params.CommonParams.ID;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.invoke.MethodHandles;
|
import java.lang.invoke.MethodHandles;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -35,11 +37,14 @@ import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.util.BytesRef;
|
import org.apache.lucene.util.BytesRef;
|
||||||
import org.apache.lucene.util.BytesRefBuilder;
|
import org.apache.lucene.util.BytesRefBuilder;
|
||||||
|
import org.apache.solr.cloud.CloudDescriptor;
|
||||||
|
import org.apache.solr.cloud.ZkController;
|
||||||
import org.apache.solr.common.SolrDocumentBase;
|
import org.apache.solr.common.SolrDocumentBase;
|
||||||
import org.apache.solr.common.SolrException;
|
import org.apache.solr.common.SolrException;
|
||||||
import org.apache.solr.common.SolrException.ErrorCode;
|
import org.apache.solr.common.SolrException.ErrorCode;
|
||||||
import org.apache.solr.common.SolrInputDocument;
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
import org.apache.solr.common.SolrInputField;
|
import org.apache.solr.common.SolrInputField;
|
||||||
|
import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.util.StrUtils;
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
|
@ -54,8 +59,6 @@ import org.apache.solr.util.RefCounted;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.solr.common.params.CommonParams.ID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @lucene.experimental
|
* @lucene.experimental
|
||||||
*/
|
*/
|
||||||
|
@ -176,11 +179,13 @@ public class AtomicUpdateDocumentMerger {
|
||||||
return Collections.emptySet();
|
return Collections.emptySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String routeFieldOrNull = getRouteField(cmd);
|
||||||
// first pass, check the things that are virtually free,
|
// first pass, check the things that are virtually free,
|
||||||
// and bail out early if anything is obviously not a valid in-place update
|
// and bail out early if anything is obviously not a valid in-place update
|
||||||
for (String fieldName : sdoc.getFieldNames()) {
|
for (String fieldName : sdoc.getFieldNames()) {
|
||||||
if (fieldName.equals(uniqueKeyFieldName)
|
if (fieldName.equals(uniqueKeyFieldName)
|
||||||
|| fieldName.equals(CommonParams.VERSION_FIELD)) {
|
|| fieldName.equals(CommonParams.VERSION_FIELD)
|
||||||
|
|| fieldName.equals(routeFieldOrNull)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Object fieldValue = sdoc.getField(fieldName).getValue();
|
Object fieldValue = sdoc.getField(fieldName).getValue();
|
||||||
|
@ -246,6 +251,19 @@ public class AtomicUpdateDocumentMerger {
|
||||||
return candidateFields;
|
return candidateFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getRouteField(AddUpdateCommand cmd) {
|
||||||
|
String result = null;
|
||||||
|
SolrCore core = cmd.getReq().getCore();
|
||||||
|
CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
|
||||||
|
if (cloudDescriptor != null) {
|
||||||
|
String collectionName = cloudDescriptor.getCollectionName();
|
||||||
|
ZkController zkController = core.getCoreContainer().getZkController();
|
||||||
|
DocCollection collection = zkController.getClusterState().getCollection(collectionName);
|
||||||
|
result = collection.getRouter().getRouteField(collection);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param fullDoc the full doc to be compared against
|
* @param fullDoc the full doc to be compared against
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
<uniqueKey>id</uniqueKey>
|
<uniqueKey>id</uniqueKey>
|
||||||
<field name="id" type="string" indexed="true" stored="true" docValues="true"/>
|
<field name="id" type="string" indexed="true" stored="true" docValues="true"/>
|
||||||
<field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
|
<field name="_version_" type="long" indexed="false" stored="false" docValues="true" />
|
||||||
|
<field name="shardName" type="string" multiValued="false" indexed="false" required="false" stored="true"/>
|
||||||
|
|
||||||
<!-- specific schema fields for dv in-place updates -->
|
<!-- specific schema fields for dv in-place updates -->
|
||||||
<field name="inplace_updatable_float" type="float" indexed="false" stored="false" docValues="true" />
|
<field name="inplace_updatable_float" type="float" indexed="false" stored="false" docValues="true" />
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.solr.update;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.TestUtil;
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
|
import org.apache.solr.client.solrj.SolrServerException;
|
||||||
|
import org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrException;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest.Create;
|
||||||
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.cloud.SolrCloudTestCase;
|
||||||
|
import org.apache.solr.common.SolrDocument;
|
||||||
|
import org.apache.solr.common.SolrDocumentList;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestInPlaceUpdateWithRouteField extends SolrCloudTestCase {
|
||||||
|
|
||||||
|
private static final int NUMBER_OF_DOCS = 100;
|
||||||
|
|
||||||
|
private static final String COLLECTION = "collection1";
|
||||||
|
private static final String[] shards = new String[]{"shard1","shard2","shard3"};
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
final Path configDir = Paths.get(TEST_HOME(), "collection1", "conf");
|
||||||
|
|
||||||
|
String configName = "solrCloudCollectionConfig";
|
||||||
|
int nodeCount = TestUtil.nextInt(random(), 1, 3);
|
||||||
|
configureCluster(nodeCount)
|
||||||
|
|
||||||
|
.addConfig(configName, configDir)
|
||||||
|
.configure();
|
||||||
|
|
||||||
|
Map<String, String> collectionProperties = new HashMap<>();
|
||||||
|
collectionProperties.put("config", "solrconfig-tlog.xml" );
|
||||||
|
collectionProperties.put("schema", "schema-inplace-updates.xml");
|
||||||
|
|
||||||
|
int replicas = 2;
|
||||||
|
// router field can be defined either for ImplicitDocRouter or CompositeIdRouter
|
||||||
|
boolean implicit = random().nextBoolean();
|
||||||
|
String routerName = implicit ? "implicit":"compositeId";
|
||||||
|
Create createCmd = CollectionAdminRequest.createCollection(COLLECTION, configName, shards.length, replicas)
|
||||||
|
.setMaxShardsPerNode(shards.length * replicas)
|
||||||
|
.setProperties(collectionProperties)
|
||||||
|
.setRouterName(routerName)
|
||||||
|
.setRouterField("shardName");
|
||||||
|
if (implicit) {
|
||||||
|
createCmd.setShards(Arrays.stream(shards).collect(Collectors.joining(",")));
|
||||||
|
}
|
||||||
|
createCmd.process(cluster.getSolrClient());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdatingDocValuesWithRouteField() throws Exception {
|
||||||
|
|
||||||
|
new UpdateRequest()
|
||||||
|
.deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
|
||||||
|
new UpdateRequest().add(createDocs(NUMBER_OF_DOCS)).commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
|
||||||
|
int id = TestUtil.nextInt(random(), 1, NUMBER_OF_DOCS - 1);
|
||||||
|
SolrDocument solrDocument = queryDoc(id);
|
||||||
|
Long initialVersion = (Long) solrDocument.get("_version_");
|
||||||
|
Integer luceneDocId = (Integer) solrDocument.get("[docid]");
|
||||||
|
String shardName = (String) solrDocument.get("shardName");
|
||||||
|
Assert.assertThat(solrDocument.get("inplace_updatable_int"), is(id));
|
||||||
|
|
||||||
|
int newDocValue = TestUtil.nextInt(random(), 1, 2 * NUMBER_OF_DOCS - 1);
|
||||||
|
SolrInputDocument sdoc = sdoc("id", ""+id,
|
||||||
|
// use route field in update command
|
||||||
|
"shardName", shardName,
|
||||||
|
"inplace_updatable_int", map("set", newDocValue));
|
||||||
|
|
||||||
|
UpdateRequest updateRequest = new UpdateRequest()
|
||||||
|
.add(sdoc);
|
||||||
|
updateRequest.commit(cluster.getSolrClient(), COLLECTION);
|
||||||
|
solrDocument = queryDoc(id);
|
||||||
|
Long newVersion = (Long) solrDocument.get("_version_");
|
||||||
|
Assert.assertTrue("Version of updated document must be greater than original one",
|
||||||
|
newVersion > initialVersion);
|
||||||
|
Assert.assertThat( "Doc value must be updated", solrDocument.get("inplace_updatable_int"), is(newDocValue));
|
||||||
|
Assert.assertThat("Lucene doc id should not be changed for In-Place Updates.", solrDocument.get("[docid]"), is(luceneDocId));
|
||||||
|
|
||||||
|
try {
|
||||||
|
sdoc.remove("shardName");
|
||||||
|
new UpdateRequest()
|
||||||
|
.add(sdoc).process(cluster.getSolrClient(), COLLECTION);
|
||||||
|
fail("should be exception w/o route field");
|
||||||
|
}catch(RemoteSolrException ex) {
|
||||||
|
assertThat("expecting 400 in "+ex.getMessage(), ex.code(), is(400));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Collection<SolrInputDocument> createDocs(int number) {
|
||||||
|
List<SolrInputDocument> result = new ArrayList<>();
|
||||||
|
for (int i = 0; i < number; i++) {
|
||||||
|
String randomShard = shards[random().nextInt(shards.length)];
|
||||||
|
result.add(sdoc("id", String.valueOf(i),
|
||||||
|
"shardName", randomShard,
|
||||||
|
"inplace_updatable_int", i));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private SolrDocument queryDoc(int id) throws SolrServerException, IOException {
|
||||||
|
SolrQuery query = new SolrQuery(
|
||||||
|
"q", "id:" + id,
|
||||||
|
"fl", "_version_,inplace_updatable_int,[docid],shardName",
|
||||||
|
"targetCollection", COLLECTION);
|
||||||
|
QueryResponse response = cluster.getSolrClient().query(COLLECTION, query);
|
||||||
|
SolrDocumentList result = (SolrDocumentList) response.getResponse().get("response");
|
||||||
|
Assert.assertThat(result.getNumFound(), is(1L));
|
||||||
|
return result.get(0);
|
||||||
|
}
|
||||||
|
}
|
|
@ -48,7 +48,7 @@ public abstract class DocRouter {
|
||||||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerName + "'");
|
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown document router '"+ routerName + "'");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getRouteField(DocCollection coll) {
|
public String getRouteField(DocCollection coll) {
|
||||||
if (coll == null) return null;
|
if (coll == null) return null;
|
||||||
Map m = (Map) coll.get(DOC_ROUTER);
|
Map m = (Map) coll.get(DOC_ROUTER);
|
||||||
if (m == null) return null;
|
if (m == null) return null;
|
||||||
|
|
Loading…
Reference in New Issue