SOLR-12373: Let DocBasedVersionConstraintsProcessor define fields to use in tombstones

A new config option, "tombstoneConfig" allows the DocBasedVersionConstraintsProcessor
to add extra fields to the tombstone generated when a document is deleted. This can
be useful when the schema has required fields.
This commit is contained in:
Tomas Fernandez Lobbe 2019-01-25 13:38:18 -08:00
parent e2b8b0e5b1
commit 0bd1911db6
5 changed files with 271 additions and 21 deletions

View File

@ -250,6 +250,11 @@ New Features
* SOLR-12770: Make it possible to configure a host whitelist for distributed search
(Christine Poerschke, janhoy, Erick Erickson, Tomás Fernández Löbbe)
* SOLR-12373: Add a "tombstoneConfig" option to DocBasedVersionConstraintsProcessor that allows
users to configure which fields/values to add to tombstone documents. This can be useful to
make sure tombstone documents include fields that are marked as required in the schema
(Tomás Fernández Löbbe)
Bug Fixes
----------------------

View File

@ -17,12 +17,16 @@
package org.apache.solr.update.processor;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
@ -32,6 +36,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
@ -46,11 +51,6 @@ import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor {
private static final String[] EMPTY_STR_ARR = new String[0];
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -62,6 +62,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
private final boolean supportMissingVersionOnOldDocs;
private final String[] deleteVersionParamNames;
private final SolrCore core;
private final NamedList<Object> tombstoneConfig;
private final DistributedUpdateProcessor distribProc; // the distributed update processor following us
private final DistributedUpdateProcessor.DistribPhase phase;
@ -69,11 +70,33 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
private long oldSolrVersion; // current _version_ of the doc in the index/update log
/**
* @deprecated Use {@link #DocBasedVersionConstraintsProcessor(List, boolean, List, boolean, boolean, NamedList, SolrQueryRequest, UpdateRequestProcessor)}
*/
@Deprecated
public DocBasedVersionConstraintsProcessor(List<String> versionFields,
boolean ignoreOldUpdates,
List<String> deleteVersionParamNames,
boolean supportMissingVersionOnOldDocs,
boolean useFieldCache,
SolrQueryRequest req,
UpdateRequestProcessor next ) {
this(versionFields,
ignoreOldUpdates,
deleteVersionParamNames,
supportMissingVersionOnOldDocs,
useFieldCache,
null,
req,
next);
}
public DocBasedVersionConstraintsProcessor(List<String> versionFields,
boolean ignoreOldUpdates,
List<String> deleteVersionParamNames,
boolean supportMissingVersionOnOldDocs,
boolean useFieldCache,
NamedList<Object> tombstoneConfig,
SolrQueryRequest req,
UpdateRequestProcessor next ) {
super(next);
@ -93,6 +116,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
this.distribProc = getDistributedUpdateProcessor(next);
this.phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
this.tombstoneConfig = tombstoneConfig;
}
private static DistributedUpdateProcessor getDistributedUpdateProcessor(UpdateRequestProcessor next) {
@ -429,10 +453,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
if (isNotLeader(cmd)) {
// transform delete to add earlier rather than later
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
setDeleteParamValues(newDoc, deleteParamValues);
SolrInputDocument newDoc = createTombstoneDocument(core.getLatestSchema(), cmd.getId(), versionFieldNames, deleteParamValues, this.tombstoneConfig);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
@ -458,10 +479,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
// drop the delete, and instead propagate an AddDoc that
// replaces the doc with a new "empty" one that records the deleted version
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
setDeleteParamValues(newDoc, deleteParamValues);
SolrInputDocument newDoc = createTombstoneDocument(core.getLatestSchema(), cmd.getId(), versionFieldNames, deleteParamValues, this.tombstoneConfig);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
@ -480,6 +498,29 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
}
}
/**
* Creates a tombstone document, that will be used to update a document that's being deleted by ID. The returned document will contain:
* <ul>
* <li>uniqueKey</li>
* <li>versions (All the fields configured as in the {@code versionField} parameter)</li>
* <li>All the fields set in the {@code tombstoneConfig} parameter</li>
* </ul>
*
* Note that if the schema contains required fields (other than version fields or uniqueKey), they should be populated by the {@code tombstoneConfig},
* otherwise this tombstone will fail when indexing (and consequently the delete will fail). Alternatively, required fields must be populated by some
* other means (like {@code DocBasedVersionConstraintsProcessorFactory} or similar)
*/
protected SolrInputDocument createTombstoneDocument(IndexSchema schema, String id, String[] versionFieldNames, String[] deleteParamValues, NamedList<Object> tombstoneConfig) {
final SolrInputDocument newDoc = new SolrInputDocument();
if (tombstoneConfig != null) {
tombstoneConfig.forEach((k,v) -> newDoc.addField(k, v));
}
newDoc.setField(schema.getUniqueKeyField().getName(), id);
setDeleteParamValues(newDoc, versionFieldNames, deleteParamValues);
return newDoc;
}
private String[] getDeleteParamValuesFromRequest(DeleteUpdateCommand cmd) {
SolrParams params = cmd.getReq().getParams();
String[] returnArr = new String[deleteVersionParamNames.length];
@ -503,7 +544,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
}
}
private void setDeleteParamValues(SolrInputDocument doc, String[] values) {
private static void setDeleteParamValues(SolrInputDocument doc, String[] versionFieldNames, String[] values) {
for (int i = 0; i < values.length; i++) {
String versionFieldName = versionFieldNames[i];
String value = values[i];

View File

@ -16,23 +16,25 @@
*/
package org.apache.solr.update.processor;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
/**
* <p>
* This Factory generates an UpdateProcessor that helps to enforce Version
@ -85,6 +87,9 @@ import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
* <code>false</code>, but if set to <code>true</code> allows any documents written *before*
* this feature is enabled and which are missing the versionField to be overwritten.
* </li>
* <li><code>tombstoneConfig</code> - a list of field names to values to add to the
* created tombstone document. In general is not a good idea to populate tombsone documents
* with anything other than the minimum required fields so that it doean't match queries</li>
* </ul>
* @since 4.6.0
*/
@ -96,7 +101,9 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
private List<String> deleteVersionParamNames = Collections.emptyList();
private boolean useFieldCache;
private boolean supportMissingVersionOnOldDocs = false;
private NamedList<Object> tombstoneConfig;
@SuppressWarnings("unchecked")
@Override
public void init( NamedList args ) {
@ -146,6 +153,15 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
supportMissingVersionOnOldDocs = ((Boolean)tmp).booleanValue();
}
tmp = args.remove("tombstoneConfig");
if (null != tmp) {
if (! (tmp instanceof NamedList) ) {
throw new SolrException(SERVER_ERROR,
"'tombstoneConfig' must be configured as a <lst>.");
}
tombstoneConfig = (NamedList<Object>)tmp;
}
super.init(args);
}
@ -158,6 +174,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
deleteVersionParamNames,
supportMissingVersionOnOldDocs,
useFieldCache,
tombstoneConfig,
req, next);
}
@ -190,7 +207,34 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
}
}
}
canCreateTombstoneDocument(core.getLatestSchema());
}
/**
* Validates that the schema would allow tombstones to be created by DocBasedVersionConstraintsProcessor by
* checking if the required fields are of known types
*/
protected boolean canCreateTombstoneDocument(IndexSchema schema) {
Set<String> requiredFieldNames = schema.getRequiredFields().stream()
.filter(field -> field.getDefaultValue() == null)
.map(field -> field.getName())
.collect(Collectors.toSet());
if (tombstoneConfig != null) {
tombstoneConfig.forEach((k,v) -> requiredFieldNames.remove(k));
}
requiredFieldNames.remove(schema.getUniqueKeyField().getName());
if (versionFields != null) {
versionFields.forEach(field -> requiredFieldNames.remove(field));
}
if (!requiredFieldNames.isEmpty()) {
log.warn("The schema \"{}\" has required fields that aren't added in the tombstone."
+ " This can cause deletes to fail if those aren't being added in some other way. Required Fields={}",
schema.getSchemaName(),
requiredFieldNames);
return false;
}
return true;
}
}

View File

@ -152,4 +152,23 @@
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="tombstone-config">
<!-- process the external version constraint, ignoring any updates that
don't satisfy the constraint -->
<processor class="solr.DocBasedVersionConstraintsProcessorFactory">
<str name="versionField">my_version_l</str>
<str name="deleteVersionParam">del_version</str>
<lst name="tombstoneConfig">
<bool name="foo_b">true</bool>
<int name="foo_i">1</int>
<long name="foo_l">1</long>
<float name="foo_f">1.5</float>
<str name="foo_s">bar</str>
<str name="foo_ss">bar1</str>
<str name="foo_ss">bar2</str>
</lst>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -14,18 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
package org.apache.solr.update.processor;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.processor.DocBasedVersionConstraintsProcessorFactory;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
@ -353,11 +363,13 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
}
// And just verify if we pass version 1, we still error if version 2 isn't found.
try {
ignoreException("Delete by ID must specify doc version param");
deleteAndGetVersion("aaa", params("del_version", "1001",
"update.chain","external-version-failhard-multiple"));
fail("no 400");
} catch (SolrException ex) {
assertEquals(400, ex.code());
unIgnoreException("Delete by ID must specify doc version param");
}
//Verify we are still unchanged
assertU(commit());
@ -497,12 +509,14 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
updateJ(json("[{\"id\": \"a\", \"name\": \"a1\", \"my_version_l\": " + version + "}]"),
params("update.chain", "external-version-constraint"));
try {
ignoreException("Doc exists in index, but has null versionField: my_version_l");
updateJ(json("[{\"id\": \"b\", \"name\": \"b1\", \"my_version_l\": " + version + "}]"),
params("update.chain", "external-version-constraint"));
fail("Update to id=b should have failed because existing doc is missing version field");
} catch (final SolrException ex) {
// expected
assertEquals("Doc exists in index, but has null versionField: my_version_l", ex.getMessage());
unIgnoreException("Doc exists in index, but has null versionField: my_version_l");
}
assertU(commit());
assertJQ(req("q","*:*"), "/response/numFound==2");
@ -523,6 +537,133 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
assertJQ(req("qt","/get", "id", "b", "fl", "id,my_version_l"), "=={'doc':{'id':'b', 'my_version_l':1}}");
}
public void testTombstoneConfig() throws Exception {
assertJQ(req("q","*:*"),"/response/numFound==0");
updateWithChain("tombstone-config",
"id", "b!doc1",
"my_version_l", "1");
assertU(commit());
assertJQ(req("q","*:*"),"/response/numFound==1");
assertJQ(req("q","foo_b:true"),"/response/numFound==0");
assertJQ(req("q","foo_i:1"),"/response/numFound==0");
assertJQ(req("q","foo_l:1"),"/response/numFound==0");
assertJQ(req("q","foo_f:1.5"),"/response/numFound==0");
assertJQ(req("q","foo_s:bar"),"/response/numFound==0");
assertJQ(req("q","foo_ss:bar1"),"/response/numFound==0");
assertJQ(req("q","foo_ss:bar2"),"/response/numFound==0");
deleteAndGetVersion("b!doc1",
params("del_version", "2", "update.chain",
"tombstone-config"));
assertU(commit());
assertJQ(req("q","foo_b:true"),"/response/numFound==1");
assertJQ(req("q","foo_i:1"),"/response/numFound==1");
assertJQ(req("q","foo_l:1"),"/response/numFound==1");
assertJQ(req("q","foo_f:1.5"),"/response/numFound==1");
assertJQ(req("q","foo_s:bar"),"/response/numFound==1");
assertJQ(req("q","foo_ss:bar1"),"/response/numFound==1");
assertJQ(req("q","foo_ss:bar2"),"/response/numFound==1");
}
public void testCanCreateTombstonesBasic() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
assertThat(factory.canCreateTombstoneDocument(schema), is(true));
}
public void testCanCreateTombstonesMissingRequiredField() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
SchemaField sf = schema.getField("sku1");
assertThat(sf, is(not(nullValue())));
assertThat(schema.getRequiredFields(), not(hasItem(sf)));
try {
schema.getRequiredFields().add(sf);
assertThat(factory.canCreateTombstoneDocument(schema), is(false));
} finally {
schema.getRequiredFields().remove(sf);
}
}
public void testCanCreateTombstonesRequiredFieldWithDefault() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
SchemaField sf = schema.getField("sku1");
SchemaField sf2 = new SchemaField("sku1_with_default", sf.getType(), sf.getProperties(), "foo");
try {
schema.getRequiredFields().add(sf2);
assertThat(factory.canCreateTombstoneDocument(schema), is(true));
} finally {
schema.getRequiredFields().remove(sf2);
}
}
public void testCanCreateTombstonesRequiredFieldInTombstoneConfig() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
NamedList<Object> tombstoneConfig = new NamedList<>();
config.add("tombstoneConfig", tombstoneConfig);
tombstoneConfig.add("sku1", "foo");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
SchemaField sf = schema.getField("sku1");
assertThat(sf, is(not(nullValue())));
assertThat(schema.getRequiredFields(), not(hasItem(sf)));
try {
schema.getRequiredFields().add(sf);
assertThat(factory.canCreateTombstoneDocument(schema), is(true));
} finally {
schema.getRequiredFields().remove(sf);
}
}
public void testCanCreateTombstonesVersionFieldRequired() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
SchemaField versionField = schema.getField("_version_");
assertThat(versionField, is(not(nullValue())));
assertThat(schema.getRequiredFields(), not(hasItem(versionField)));
try {
schema.getRequiredFields().add(versionField);
assertThat(factory.canCreateTombstoneDocument(schema), is(true));
} finally {
schema.getRequiredFields().remove(versionField);
}
}
public void testCanCreateTombstonesUniqueKeyFieldRequired() {
DocBasedVersionConstraintsProcessorFactory factory = new DocBasedVersionConstraintsProcessorFactory();
NamedList<Object> config = new NamedList<>();
config.add("versionField", "_version_");
factory.init(config);
IndexSchema schema = h.getCore().getLatestSchema();
SchemaField uniqueKeyField = schema.getField("id");
assertThat(uniqueKeyField, is(not(nullValue())));
assertThat(uniqueKeyField, equalTo(schema.getUniqueKeyField()));
assertThat(schema.getRequiredFields(), hasItem(schema.getUniqueKeyField()));
assertThat(factory.canCreateTombstoneDocument(schema), is(true));
}
private void updateWithChain(String chain, String...fields) throws Exception {
assert fields.length % 2 == 0;
SolrInputDocument doc = new SolrInputDocument(fields);
updateJ(jsonAdd(doc), params("update.chain", chain));
}
private Callable<Object> delayedAdd(final String... fields) {
return Executors.callable(() -> {
// log.info("ADDING DOC: " + adoc(fields));