SOLR-11336: DocBasedVersionConstraints URP is now more extensible; versionField is a list

This commit is contained in:
David Smiley 2018-04-12 14:04:28 -04:00
parent 4f694d5c72
commit 3d5f2f24c3
6 changed files with 698 additions and 402 deletions

View File

@ -93,6 +93,9 @@ New Features
* SOLR-11982: Add possibility to define replica order with the shards.preference parameter to e.g. prefer PULL replicas
for distributed queries. (Ere Maijala, Tomás Fernández Löbbe)
* SOLR-11336: DocBasedVersionConstraintsProcessorFactory is more extensible and now supports a list of versioned fields.
(versionField config may now be a comma-delimited list). (Michael Braun via David Smiley)
Bug Fixes
----------------------

View File

@ -0,0 +1,512 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor {
private static final String[] EMPTY_STR_ARR = new String[0];
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final String[] versionFieldNames;
private final SchemaField[] userVersionFields;
private final SchemaField solrVersionField;
private final boolean ignoreOldUpdates;
private final String[] deleteVersionParamNames;
private final SolrCore core;
private final DistributedUpdateProcessor distribProc; // the distributed update processor following us
private final DistributedUpdateProcessor.DistribPhase phase;
private final boolean useFieldCache;
private long oldSolrVersion; // current _version_ of the doc in the index/update log
public DocBasedVersionConstraintsProcessor(List<String> versionFields,
boolean ignoreOldUpdates,
List<String> deleteVersionParamNames,
boolean useFieldCache,
SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
super(next);
this.ignoreOldUpdates = ignoreOldUpdates;
this.deleteVersionParamNames = deleteVersionParamNames.toArray(EMPTY_STR_ARR);
this.core = req.getCore();
this.versionFieldNames = versionFields.toArray(EMPTY_STR_ARR);
IndexSchema schema = core.getLatestSchema();
userVersionFields = new SchemaField[versionFieldNames.length];
for (int i = 0; i < versionFieldNames.length; i++) {
userVersionFields[i] = schema.getField(versionFieldNames[i]);
}
this.solrVersionField = schema.getField(CommonParams.VERSION_FIELD);
this.useFieldCache = useFieldCache;
this.distribProc = getDistributedUpdateProcessor(next);
this.phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
}
private static DistributedUpdateProcessor getDistributedUpdateProcessor(UpdateRequestProcessor next) {
for (UpdateRequestProcessor proc = next; proc != null; proc = proc.next) {
if (proc instanceof DistributedUpdateProcessor) {
return (DistributedUpdateProcessor)proc;
}
}
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
}
/**
* Inspects a raw field value (which may come from a doc in the index, or a
* doc in the UpdateLog that still has String values, or a String sent by
* the user as a param) and if it is a String, asks the versionField FieldType
* to convert it to an Object suitable for comparison.
*/
private static Object convertFieldValueUsingType(final Object rawValue, SchemaField field) {
if (rawValue instanceof CharSequence) {
// in theory, the FieldType might still be CharSequence based,
// but in that case trust it to do an identity conversion...
FieldType fieldType = field.getType();
BytesRefBuilder term = new BytesRefBuilder();
fieldType.readableToIndexed((CharSequence)rawValue, term);
return fieldType.toObject(field, term.get());
}
// else...
return rawValue;
}
private static Object[] convertFieldValuesUsingType(Object[] rawValues, SchemaField[] fields) {
Object[] returnArr = new Object[rawValues.length];
for (int i = 0; i < returnArr.length; i++) {
returnArr[i] = convertFieldValueUsingType(rawValues[i], fields[i]);
}
return returnArr;
}
/**
* Returns true if the specified new version values are greater the the ones
* already known to exist for the document, or if the document does not already
* exist.
* Returns false if the specified new versions are not high enough but the
* processor has been configured with ignoreOldUpdates=true
* Throws a SolrException if the version is not high enough and
* ignoreOldUpdates=false
*/
private boolean isVersionNewEnough(BytesRef indexedDocId,
Object[] newUserVersions) throws IOException {
assert null != indexedDocId;
assert null != newUserVersions;
newUserVersions = convertFieldValuesUsingType(newUserVersions, userVersionFields);
final DocFoundAndOldUserAndSolrVersions docFoundAndOldUserVersions;
if (useFieldCache) {
docFoundAndOldUserVersions = getOldUserVersionsFromFieldCache(indexedDocId);
} else {
docFoundAndOldUserVersions = getOldUserVersionsFromStored(indexedDocId);
}
oldSolrVersion = docFoundAndOldUserVersions.oldSolrVersion;
if (!docFoundAndOldUserVersions.found) {
return true;
}
final Object[] oldUserVersions = docFoundAndOldUserVersions.oldUserVersions;
validateUserVersions(oldUserVersions, versionFieldNames, "Doc exists in index, but has null versionField: ");
return versionInUpdateIsAcceptable(newUserVersions, oldUserVersions);
}
private static void validateUserVersions(Object[] userVersions, String[] fieldNames, String errorMessage) {
assert userVersions.length == fieldNames.length;
for (int i = 0; i < userVersions.length; i++) {
Object userVersion = userVersions[i];
if ( null == userVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField, or if validating a new doc, not present.
throw new SolrException(SERVER_ERROR, errorMessage + fieldNames[i]);
}
}
}
private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromFieldCache(BytesRef indexedDocId) {
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, null, null, true);
if (oldDoc == RealTimeGetComponent.DELETED) {
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
}
if (oldDoc == null) {
// need to look up in index now...
RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(indexedDocId);
if (lookup < 0) {
// doc not in index either...
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
}
final LeafReaderContext segmentContext = searcher.getTopReaderContext().leaves().get((int)(lookup>>32));
final int docIdInSegment = (int)lookup;
long oldSolrVersion = getFunctionValues(segmentContext, solrVersionField, searcher).longVal(docIdInSegment);
Object[] oldUserVersions = getObjectValues(segmentContext, userVersionFields, searcher, docIdInSegment);
return new DocFoundAndOldUserAndSolrVersions(oldUserVersions, oldSolrVersion);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
} finally {
if (newestSearcher != null) { //TODO can this ever be null?
newestSearcher.decref();
}
}
} else {
return getUserVersionAndSolrVersionFromDocument(oldDoc);
}
}
private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromStored(BytesRef indexedDocId) throws IOException {
// stored fields only...
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
} else {
return getUserVersionAndSolrVersionFromDocument(oldDoc);
}
}
private static final class DocFoundAndOldUserAndSolrVersions {
private static final DocFoundAndOldUserAndSolrVersions NOT_FOUND = new DocFoundAndOldUserAndSolrVersions();
private final boolean found;
private final Object[] oldUserVersions;
private final long oldSolrVersion;
private DocFoundAndOldUserAndSolrVersions() {
this.found = false;
this.oldSolrVersion = -1;
this.oldUserVersions = null;
}
private DocFoundAndOldUserAndSolrVersions(Object[] oldUserVersions, long oldSolrVersion) {
this.found = true;
this.oldUserVersions = oldUserVersions;
this.oldSolrVersion = oldSolrVersion;
}
}
private DocFoundAndOldUserAndSolrVersions getUserVersionAndSolrVersionFromDocument(SolrInputDocument oldDoc) {
Object[] oldUserVersions = getUserVersionsFromDocument(oldDoc);
Object o = oldDoc.getFieldValue(solrVersionField.getName());
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document " + oldDoc);
}
long solrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
return new DocFoundAndOldUserAndSolrVersions(oldUserVersions, solrVersion);
}
private Object[] getUserVersionsFromDocument(SolrInputDocument doc) {
Object[] versions = new Object[versionFieldNames.length];
for (int i = 0; i < versionFieldNames.length; i++) {
String fieldName = versionFieldNames[i];
SchemaField schemaField = userVersionFields[i];
Object userVersion = doc.getFieldValue(fieldName);
// Make the FieldType resolve any conversion we need.
userVersion = convertFieldValueUsingType(userVersion, schemaField);
versions[i] = userVersion;
}
return versions;
}
/**
* Returns whether or not the versions in the command are acceptable to be indexed.
* If the instance is set to ignoreOldUpdates==false, it will throw a SolrException
* with CONFLICT in the event the version is not acceptable rather than return false.
*
* @param newUserVersions New versions in update request
* @param oldUserVersions Old versions currently in solr index
* @return True if acceptable, false if not (or will throw exception)
*/
protected boolean versionInUpdateIsAcceptable(Object[] newUserVersions,
Object[] oldUserVersions) {
for (int i = 0; i < oldUserVersions.length; i++) {
Object oldUserVersion = oldUserVersions[i];
Object newUserVersion = newUserVersions[i];
if (!(oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable)) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass() + " vs " + newUserVersion.getClass());
}
try {
if (newUpdateComparePasses((Comparable) newUserVersion, (Comparable) oldUserVersion, versionFieldNames[i])) {
return true;
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass() + " vs " + newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
if (ignoreOldUpdates) {
if (log.isDebugEnabled()) {
log.debug("Dropping update since user version is not high enough: {}; old user version={}",
Arrays.toString(newUserVersions), Arrays.toString(oldUserVersions));
}
return false;
} else {
throw new SolrException(CONFLICT,
"user version is not high enough: " + Arrays.toString(newUserVersions));
}
}
/**
* Given two comparable user versions, returns whether the new version is acceptable
* to replace the old version.
* @param newUserVersion User-specified version on the new version of the document
* @param oldUserVersion User-specified version on the old version of the document
* @param userVersionFieldName Field name of the user versions being compared
* @return True if acceptable, false if not.
*/
protected boolean newUpdateComparePasses(Comparable newUserVersion, Comparable oldUserVersion, String userVersionFieldName) {
return newUserVersion.compareTo(oldUserVersion) > 0;
}
private static Object[] getObjectValues(LeafReaderContext segmentContext,
SchemaField[] fields,
SolrIndexSearcher searcher,
int docIdInSegment) throws IOException {
FunctionValues[] functionValues = getManyFunctionValues(segmentContext, fields, searcher);
Object[] objectValues = new Object[functionValues.length];
for (int i = 0; i < functionValues.length; i++) {
objectValues[i] = functionValues[i].objectVal(docIdInSegment);
}
return objectValues;
}
private static FunctionValues[] getManyFunctionValues(LeafReaderContext segmentContext,
SchemaField[] fields,
SolrIndexSearcher searcher) throws IOException {
FunctionValues[] values = new FunctionValues[fields.length];
for (int i = 0; i < fields.length; i++) {
values[i] = getFunctionValues(segmentContext, fields[i], searcher);
}
return values;
}
private static FunctionValues getFunctionValues(LeafReaderContext segmentContext,
SchemaField field,
SolrIndexSearcher searcher) throws IOException {
ValueSource vs = field.getType().getValueSource(field, null);
Map context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
return vs.getValues(context, segmentContext);
}
private boolean isNotLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return true;
}
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return true;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
return !distribProc.isLeader(cmd);
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (isNotLeader(cmd)) {
super.processAdd(cmd);
return;
}
final SolrInputDocument newDoc = cmd.getSolrInputDocument();
Object[] newVersions = getUserVersionsFromDocument(newDoc);
validateUserVersions(newVersions, versionFieldNames, "Doc does not have versionField: ");
for (int i=0; ;i++) {
logOverlyFailedRetries(i, cmd);
if (!isVersionNewEnough(cmd.getIndexedId(), newVersions)) {
// drop older update
return;
}
try {
cmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(cmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
private static void logOverlyFailedRetries(int i, UpdateCommand cmd) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
}
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (deleteVersionParamNames.length == 0) {
// not suppose to look at deletes at all
super.processDelete(cmd);
return;
}
if ( ! cmd.isDeleteById() ) {
// nothing to do
super.processDelete(cmd);
return;
}
String[] deleteParamValues = getDeleteParamValuesFromRequest(cmd);
validateDeleteParamValues(deleteParamValues);
if (isNotLeader(cmd)) {
// transform delete to add earlier rather than later
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
setDeleteParamValues(newDoc, deleteParamValues);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
super.processAdd(newCmd);
return;
}
for (int i=0; ;i++) {
logOverlyFailedRetries(i, cmd);
if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValues)) {
// drop this older update
return;
}
// :TODO: should this logic be split and driven by two params?
// - deleteVersionParam to do a version check
// - some new boolean param to determine if a stub document gets added in place?
try {
// drop the delete, and instead propagate an AddDoc that
// replaces the doc with a new "empty" one that records the deleted version
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
setDeleteParamValues(newDoc, deleteParamValues);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
newCmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(newCmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
private String[] getDeleteParamValuesFromRequest(DeleteUpdateCommand cmd) {
SolrParams params = cmd.getReq().getParams();
String[] returnArr = new String[deleteVersionParamNames.length];
for (int i = 0; i < deleteVersionParamNames.length; i++) {
String deleteVersionParamName = deleteVersionParamNames[i];
String deleteParamValue = params.get(deleteVersionParamName);
returnArr[i] = deleteParamValue;
}
return returnArr;
}
private void validateDeleteParamValues(String[] values) {
for (int i = 0; i < values.length; i++) {
String deleteParamValue = values[i];
if (null == deleteParamValue) {
String deleteVersionParamName = deleteVersionParamNames[i];
throw new SolrException(BAD_REQUEST,
"Delete by ID must specify doc version param: " +
deleteVersionParamName);
}
}
}
private void setDeleteParamValues(SolrInputDocument doc, String[] values) {
for (int i = 0; i < values.length; i++) {
String versionFieldName = versionFieldNames[i];
String value = values[i];
doc.setField(versionFieldName, value);
}
}
}

View File

@ -16,52 +16,39 @@
*/
package org.apache.solr.update.processor;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.List;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
* <p>
* This Factory generates an UpdateProcessor that helps to enforce Version
* constraints on documents based on per-document version numbers using a configured
* name of a <code>versionField</code>. It should be configured on the "default"
* <code>versionField</code>, a comma-delimited list of fields to check for version
* numbers. It should be configured on the "default"
* update processor somewhere before the DistributedUpdateProcessorFactory.
* As an example, see the solrconfig.xml that the tests use:
* solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
* </p>
* <p>
* When documents are added through this processor, if a document with the same
* unique key already exists in the collection, then the value of the
* <code>versionField</code> in the <i>existing</i> document is not less then the
* field value in the <i>new</i> document then the new document is rejected with a
* unique key already exists in the collection, then the values within the fields
* as specified by the comma-delimited <code>versionField</code> property are checked,
* and if in the <i>existing</i> document the values for all fields are not less than the
* field values in the <i>new</i> document, then the new document is rejected with a
* 409 Version Conflict error.
* </p>
* <p>
@ -71,15 +58,20 @@ import static org.apache.solr.update.processor.DistributingUpdateProcessorFactor
* <ul>
* <li><code>deleteVersionParam</code> - This string parameter controls whether this
* processor will intercept and inspect Delete By Id commands in addition to adding
* documents. If specified, then the value will specify the name of a request
* paramater which becomes mandatory for all Delete By Id commands. This param
* must then be used to specify the document version associated with the delete.
* If the version specified using this param is not greater then the value in the
* <code>versionField</code> for any existing document, then the delete will fail
* with a 409 Version Conflict error. When using this param, Any Delete By Id
* command with a high enough document version number to succeed will be internally
* converted into an Add Document command that replaces the existing document with
* a new one which is empty except for the Unique Key and <code>versionField</code>
* documents. If specified, then the value will specify the name(s) of the request
* parameter(s) which becomes mandatory for all Delete By Id commands. Like
* <code>versionField</code>, <code>deleteVersionParam</code> is comma-delimited.
* For each of the params given, it specifies the document version associated with
* the delete, where the index matches <code>versionField</code>. For example, if
* <code>versionField</code> was set to 'a,b' and <code>deleteVersionParam</code>
* was set to 'p1,p2', p1 should give the version for field 'a' and p2 should give
* the version for field 'b'. If the versions specified using these params are not
* greater then the value in the <code>versionField</code> for any existing document,
* then the delete will fail with a 409 Version Conflict error. When using this
* param, Any Delete By Id command with a high enough document version number to
* succeed will be internally converted into an Add Document command that replaces
* the existing document with a new one which is empty except for the Unique Key
* and fields corresponding to the fields listed in <code>versionField</code>
* to keeping a record of the deleted version so future Add Document commands will
* fail if their "new" version is not high enough.</li>
*
@ -95,8 +87,8 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean ignoreOldUpdates = false;
private String versionField = null;
private String deleteVersionParamName = null;
private List<String> versionFields = null;
private List<String> deleteVersionParamNames = Collections.emptyList();
private boolean useFieldCache;
@Override
@ -111,7 +103,7 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
throw new SolrException(SERVER_ERROR,
"'versionField' must be configured as a <str>");
}
versionField = tmp.toString();
versionFields = StrUtils.splitSmart((String)tmp, ',');
// optional
tmp = args.remove("deleteVersionParam");
@ -120,7 +112,12 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
throw new SolrException(SERVER_ERROR,
"'deleteVersionParam' must be configured as a <str>");
}
deleteVersionParamName = tmp.toString();
deleteVersionParamNames = StrUtils.splitSmart((String)tmp, ',');
}
if (deleteVersionParamNames.size() > 0 && deleteVersionParamNames.size() != versionFields.size()) {
throw new SolrException(SERVER_ERROR, "The number of 'deleteVersionParam' params " +
"must either be 0 or equal to the number of 'versionField' fields");
}
// optional - defaults to false
@ -130,18 +127,18 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
throw new SolrException(SERVER_ERROR,
"'ignoreOldUpdates' must be configured as a <bool>");
}
ignoreOldUpdates = ((Boolean)tmp).booleanValue();
ignoreOldUpdates = (Boolean) tmp;
}
super.init(args);
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
return new DocBasedVersionConstraintsProcessor(versionField,
return new DocBasedVersionConstraintsProcessor(versionFields,
ignoreOldUpdates,
deleteVersionParamName,
deleteVersionParamNames,
useFieldCache,
req, rsp, next);
}
@ -159,341 +156,23 @@ public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestPro
"schema must have uniqueKey defined.");
}
useFieldCache = true;
for (String versionField : versionFields) {
SchemaField userVersionField = core.getLatestSchema().getField(versionField);
if (userVersionField == null || !userVersionField.stored() || userVersionField.multiValued()) {
throw new SolrException(SERVER_ERROR,
"field " + versionField + " must be defined in schema, be stored, and be single valued.");
}
if (useFieldCache) {
try {
ValueSource vs = userVersionField.getType().getValueSource(userVersionField, null);
useFieldCache = true;
userVersionField.getType().getValueSource(userVersionField, null);
} catch (Exception e) {
useFieldCache = false;
log.warn("Can't use fieldcache/valuesource: " + e.getMessage());
}
}
private static class DocBasedVersionConstraintsProcessor
extends UpdateRequestProcessor {
private final String versionFieldName;
private final SchemaField userVersionField;
private final SchemaField solrVersionField;
private final boolean ignoreOldUpdates;
private final String deleteVersionParamName;
private final SolrCore core;
private long oldSolrVersion; // current _version_ of the doc in the index/update log
private DistributedUpdateProcessor distribProc; // the distributed update processor following us
private DistributedUpdateProcessor.DistribPhase phase;
private boolean useFieldCache;
public DocBasedVersionConstraintsProcessor(String versionField,
boolean ignoreOldUpdates,
String deleteVersionParamName,
boolean useFieldCache,
SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
super(next);
this.ignoreOldUpdates = ignoreOldUpdates;
this.deleteVersionParamName = deleteVersionParamName;
this.core = req.getCore();
this.versionFieldName = versionField;
this.userVersionField = core.getLatestSchema().getField(versionField);
this.solrVersionField = core.getLatestSchema().getField(CommonParams.VERSION_FIELD);
this.useFieldCache = useFieldCache;
for (UpdateRequestProcessor proc = next ;proc != null; proc = proc.next) {
if (proc instanceof DistributedUpdateProcessor) {
distribProc = (DistributedUpdateProcessor)proc;
break;
}
}
if (distribProc == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
}
phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
}
/**
* Inspects a raw field value (which may come from a doc in the index, or a
* doc in the UpdateLog that still has String values, or a String sent by
* the user as a param) and if it is a String, asks the versionField FieldType
* to convert it to an Object suitable for comparison.
*/
private Object convertFieldValueUsingType(SchemaField sf, final Object rawValue) {
if (rawValue instanceof CharSequence) {
// in theory, the FieldType might still be CharSequence based,
// but in that case trust it to do an identity conversion...
FieldType fieldType = userVersionField.getType();
BytesRefBuilder term = new BytesRefBuilder();
fieldType.readableToIndexed((CharSequence)rawValue, term);
return fieldType.toObject(userVersionField, term.get());
}
// else...
return rawValue;
}
/**
* Returns true if the specified new version value is greater the the one
* already known to exist for the document, or the document does not already
* exist.
* Returns false if the specified new version is not high enough but the
* processor has been configured with ignoreOldUpdates=true
* Throws a SolrException if the version is not high enough and
* ignoreOldUpdates=false
*/
private boolean isVersionNewEnough(BytesRef indexedDocId,
Object newUserVersion) throws IOException {
assert null != indexedDocId;
assert null != newUserVersion;
oldSolrVersion = -1;
// log.info("!!!!!!!!! isVersionNewEnough being called for " + indexedDocId.utf8ToString() + " newVersion=" + newUserVersion);
newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
Object oldUserVersion = null;
SolrInputDocument oldDoc = null;
if (useFieldCache) {
oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId, null, null, true);
if (oldDoc == RealTimeGetComponent.DELETED) {
return true;
}
if (oldDoc == null) {
// need to look up in index now...
RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(indexedDocId);
if (lookup < 0) {
// doc not in index either...
return true;
}
ValueSource vs = solrVersionField.getType().getValueSource(solrVersionField, null);
Map context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
oldSolrVersion = fv.longVal((int)lookup);
vs = userVersionField.getType().getValueSource(userVersionField, null);
context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
oldUserVersion = fv.objectVal((int)lookup);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
} finally {
if (newestSearcher != null) {
newestSearcher.decref();
}
}
}
} else {
// stored fields only...
oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
// log.info("VERSION no doc found, returning true");
return true;
}
}
if (oldDoc != null) {
oldUserVersion = oldDoc.getFieldValue(versionFieldName);
// Make the FieldType resolve any conversion we need.
oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
Object o = oldDoc.getFieldValue(solrVersionField.getName());
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
}
oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
}
// log.info("VERSION old=" + oldUserVersion + " new=" +newUserVersion );
if ( null == oldUserVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField
throw new SolrException(SERVER_ERROR,
"Doc exists in index, but has null versionField: "
+ versionFieldName);
}
if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
}
try {
if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
// log.info("VERSION returning true (proceed with update)" );
return true;
}
if (ignoreOldUpdates) {
if (log.isDebugEnabled()) {
log.debug("Dropping update since user version is not high enough: " + newUserVersion + "; old user version=" + oldUserVersion);
}
// log.info("VERSION returning false (dropping update)" );
return false;
} else {
// log.info("VERSION will throw conflict" );
throw new SolrException(CONFLICT,
"user version is not high enough: " + newUserVersion);
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
public boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
}
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return false;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
boolean x = distribProc.isLeader(cmd);
// log.info("VERSION: checking if we are leader:" + x);
return x;
}
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (!isLeader(cmd)) {
super.processAdd(cmd);
return;
}
final SolrInputDocument newDoc = cmd.getSolrInputDocument();
Object newVersion = newDoc.getFieldValue(versionFieldName);
if ( null == newVersion ) {
throw new SolrException(BAD_REQUEST, "Doc does not have versionField: " + versionFieldName);
}
for (int i=0; ;i++) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
}
if (!isVersionNewEnough(cmd.getIndexedId(), newVersion)) {
// drop older update
return;
}
try {
cmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(cmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
// log.info ("##################### CONFLICT ADDING newDoc=" + newDoc + " newVersion=" + newVersion );
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (null == deleteVersionParamName) {
// not suppose to look at deletes at all
super.processDelete(cmd);
return;
}
if ( ! cmd.isDeleteById() ) {
// nothing to do
super.processDelete(cmd);
return;
}
String deleteParamValue = cmd.getReq().getParams().get(deleteVersionParamName);
if (null == deleteParamValue) {
throw new SolrException(BAD_REQUEST,
"Delete by ID must specify doc version param: " +
deleteVersionParamName);
}
if (!isLeader(cmd)) {
// transform delete to add earlier rather than later
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
newDoc.setField(versionFieldName, deleteParamValue);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
super.processAdd(newCmd);
return;
}
for (int i=0; ;i++) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
}
if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValue)) {
// drop this older update
return;
}
// :TODO: should this logic be split and driven by two params?
// - deleteVersionParam to do a version check
// - some new boolean param to determine if a stub document gets added in place?
try {
// drop the delete, and instead propagate an AddDoc that
// replaces the doc with a new "empty" one that records the deleted version
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
newDoc.setField(versionFieldName, deleteParamValue);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
newCmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(newCmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
} // end inner class
}

View File

@ -109,6 +109,20 @@
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="external-version-failhard-multiple">
<!-- Uses the default behavior of failing with a 409 version conflict
when the external version is too low.
If my_version_l is equal to the previous, then checks my_version_f.
-->
<processor class="solr.DocBasedVersionConstraintsProcessorFactory">
<str name="versionField">my_version_l,my_version_f</str>
<str name="deleteVersionParam">del_version,del_version_2</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="/select" class="solr.SearchHandler">
</requestHandler>

View File

@ -16,6 +16,12 @@
*/
package org.apache.solr.update;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
@ -24,12 +30,6 @@ import org.apache.solr.util.DefaultSolrThreadFactory;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
@BeforeClass
@ -288,6 +288,88 @@ public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
, "=={'doc':{'my_version_l':1010}}");
}
// Test multiple versions, that it has to be greater than my_version_l and my_version_f
public void testMultipleVersions() throws Exception {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_l", "1001", "my_version_f", "1.0")),
params("update.chain","external-version-failhard-multiple"));
assertU(commit());
// All variations of additional versions should fail other than my_version_l greater or my_version_f greater.
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "X1", "my_version_l", "1000", "my_version_f", "1.0")),
params("update.chain","external-version-failhard-multiple"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "X2", "my_version_l", "1001", "my_version_f", "0.9")),
params("update.chain","external-version-failhard-multiple"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
// Also fails on the exact same version
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "X3", "my_version_l", "1001", "my_version_f", "1.0")),
params("update.chain","external-version-failhard-multiple"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
//Verify we are still unchanged
assertU(commit());
assertJQ(req("q","+id:aaa +name:a1"), "/response/numFound==1");
// update version 1
updateJ(jsonAdd(sdoc("id", "aaa", "name", "Y1", "my_version_l", "2001", "my_version_f", "1.0")),
params("update.chain","external-version-failhard-multiple"));
assertU(commit());
assertJQ(req("q","+id:aaa +name:Y1"), "/response/numFound==1");
// update version 2
updateJ(jsonAdd(sdoc("id", "aaa", "name", "Y2", "my_version_l", "2001", "my_version_f", "2.0")),
params("update.chain","external-version-failhard-multiple"));
assertU(commit());
assertJQ(req("q","+id:aaa +name:Y2"), "/response/numFound==1");
}
public void testMultipleVersionDeletes() throws Exception {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_l", "1001", "my_version_f", "1.0")),
params("update.chain","external-version-failhard-multiple"));
assertU(commit());
try {
deleteAndGetVersion("aaa", params("del_version", "1000", "del_version_2", "1.0",
"update.chain","external-version-failhard-multiple"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
try {
deleteAndGetVersion("aaa", params("del_version", "1001", "del_version_2", "0.9",
"update.chain","external-version-failhard-multiple"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
// And just verify if we pass version 1, we still error if version 2 isn't found.
try {
deleteAndGetVersion("aaa", params("del_version", "1001",
"update.chain","external-version-failhard-multiple"));
fail("no 400");
} catch (SolrException ex) {
assertEquals(400, ex.code());
}
//Verify we are still unchanged
assertU(commit());
assertJQ(req("q","+id:aaa +name:a1"), "/response/numFound==1");
//And let's verify the actual case.
deleteAndGetVersion("aaa", params("del_version", "1001", "del_version_2", "2.0",
"update.chain","external-version-failhard-multiple"));
assertU(commit());
assertJQ(req("q","+id:aaa +name:a1"), "/response/numFound==0"); //Delete allowed
}
/**
* Proof of concept test demonstrating how to manage and periodically cleanup

View File

@ -266,7 +266,9 @@ For more information, please also see Yonik Seeley's presentation on https://www
== Document Centric Versioning Constraints
Optimistic Concurrency is extremely powerful, and works very efficiently because it uses an internally assigned, globally unique values for the `\_version_` field. However, in some situations users may want to configure their own document specific version field, where the version values are assigned on a per-document basis by an external system, and have Solr reject updates that attempt to replace a document with an "older" version. In situations like this the {solr-javadocs}/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.html[`DocBasedVersionConstraintsProcessorFactory`] can be useful.
Optimistic Concurrency is extremely powerful, and works very efficiently because it uses an internally assigned, globally unique values for the `\_version_` field.
However, in some situations users may want to configure their own document specific version field, where the version values are assigned on a per-document basis by an external system, and have Solr reject updates that attempt to replace a document with an "older" version.
In situations like this the {solr-javadocs}/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.html[`DocBasedVersionConstraintsProcessorFactory`] can be useful.
The basic usage of `DocBasedVersionConstraintsProcessorFactory` is to configure it in `solrconfig.xml` as part of the <<update-request-processors.adoc#update-request-processor-configuration,UpdateRequestProcessorChain>> and specify the name of your custom `versionField` in your schema that should be checked when validating updates:
@ -277,6 +279,7 @@ The basic usage of `DocBasedVersionConstraintsProcessorFactory` is to configure
</processor>
----
Note that `versionField` is a comma delimited list of fields to check for version numbers.
Once configured, this update processor will reject (HTTP error code 409) any attempt to update an existing document where the value of the `my_version_l` field in the "new" document is not greater then the value of that field in the existing document.
.versionField vs `\_version_`
@ -288,6 +291,9 @@ The `\_version_` field used by Solr for its normal optimistic concurrency also h
`DocBasedVersionConstraintsProcessorFactory` supports two additional configuration params which are optional:
* `ignoreOldUpdates` - A boolean option which defaults to `false`. If set to `true` then instead of rejecting updates where the `versionField` is too low, the update will be silently ignored (and return a status 200 to the client).
* `deleteVersionParam` - A String parameter that can be specified to indicate that this processor should also inspect Delete By Id commands. The value of this configuration option should be the name of a request parameter that the processor will now consider mandatory for all attempts to Delete By Id, and must be be used by clients to specify a value for the `versionField` which is greater then the existing value of the document to be deleted. When using this request param, any Delete By Id command with a high enough document version number to succeed will be internally converted into an Add Document command that replaces the existing document with a new one which is empty except for the Unique Key and `versionField` to keeping a record of the deleted version so future Add Document commands will fail if their "new" version is not high enough.
* `deleteVersionParam` - A String parameter that can be specified to indicate that this processor should also inspect Delete By Id commands.
The value of this configuration option should be the name of a request parameter that the processor will now consider mandatory for all attempts to Delete By Id, and must be be used by clients to specify a value for the `versionField` which is greater then the existing value of the document to be deleted.
When using this request param, any Delete By Id command with a high enough document version number to succeed will be internally converted into an Add Document command that replaces the existing document with a new one which is empty except for the Unique Key and `versionField` to keeping a record of the deleted version so future Add Document commands will fail if their "new" version is not high enough.
If `versionField` is specified as a list, then this parameter too must be specified as a comma delimited list of the same size so that the parameters correspond with the fields.
Please consult the {solr-javadocs}/solr-core/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessorFactory.html[DocBasedVersionConstraintsProcessorFactory javadocs] and https://git1-us-west.apache.org/repos/asf?p=lucene-solr.git;a=blob;f=solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml;hb=HEAD[test solrconfig.xml file] for additional information and example usages.