SOLR-5374: user version update processor

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1537587 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2013-10-31 19:13:35 +00:00
parent bebc5c76d5
commit d18a6f1e03
8 changed files with 1791 additions and 20 deletions

View File

@ -115,6 +115,11 @@ New Features
* SOLR-5406: CloudSolrServer failed to propagate request parameters * SOLR-5406: CloudSolrServer failed to propagate request parameters
along with delete updates. (yonik) along with delete updates. (yonik)
* SOLR-5374: Support user configured doc-centric versioning rules
via the optional DocBasedVersionConstraintsProcessorFactory
update processor (Hossman, yonik)
Bug Fixes Bug Fixes
---------------------- ----------------------

View File

@ -203,13 +203,16 @@ public class RealTimeGetComponent extends SearchComponent
} }
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
public static SolrInputDocument DELETED = new SolrInputDocument();
/** returns the SolrInputDocument from the current tlog, or DELETED if it has been deleted, or
* null if there is no record of it in the current update log. If null is returned, it could
* still be in the latest index.
*/
public static SolrInputDocument getInputDocumentFromTlog(SolrCore core, BytesRef idBytes) {
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
if (ulog != null) { if (ulog != null) {
Object o = ulog.lookup(idBytes); Object o = ulog.lookup(idBytes);
@ -220,16 +223,28 @@ public class RealTimeGetComponent extends SearchComponent
int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK; int oper = (Integer)entry.get(0) & UpdateLog.OPERATION_MASK;
switch (oper) { switch (oper) {
case UpdateLog.ADD: case UpdateLog.ADD:
sid = (SolrInputDocument)entry.get(entry.size()-1); return (SolrInputDocument)entry.get(entry.size()-1);
break;
case UpdateLog.DELETE: case UpdateLog.DELETE:
return null; return DELETED;
default: default:
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper); throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unknown Operation! " + oper);
} }
} }
} }
return null;
}
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
sid = getInputDocumentFromTlog(core, idBytes);
if (sid == DELETED) {
return null;
}
if (sid == null) { if (sid == null) {
// didn't find it in the update log, so it should be in the newest searcher opened // didn't find it in the update log, so it should be in the newest searcher opened
if (searcher == null) { if (searcher == null) {

View File

@ -1111,6 +1111,25 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} }
} }
// internal helper method to tell if we are the leader for an add or deleteById update
boolean isLeader(UpdateCommand cmd) {
updateCommand = cmd;
if (zkEnabled) {
zkCheck();
if (cmd instanceof AddUpdateCommand) {
AddUpdateCommand acmd = (AddUpdateCommand)cmd;
nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
} else if (cmd instanceof DeleteUpdateCommand) {
DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
nodes = setupRequest(dcmd.getId(), null);
}
} else {
isLeader = getNonZkLeaderAssumption(req);
}
return isLeader;
}
private void zkCheck() { private void zkCheck() {
if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) { if ((updateCommand.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {

View File

@ -0,0 +1,560 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.lucene.queries.function.FunctionValues;
import org.apache.lucene.queries.function.ValueSource;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.FieldType;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
import static org.apache.solr.common.SolrException.ErrorCode.BAD_REQUEST;
import static org.apache.solr.common.SolrException.ErrorCode.CONFLICT;
import static org.apache.solr.common.SolrException.ErrorCode.SERVER_ERROR;
import static org.apache.solr.update.processor.DistributingUpdateProcessorFactory.DISTRIB_UPDATE_PARAM;
/**
* <p>
* This Factory generates an UpdateProcessor that helps to enforce Version
* constraints on documents based on per-document version numbers using a configured
* name of a <code>versionField</code>. It should be configured on the "default"
* update processor somewhere before the DistributedUpdateProcessorFactory.
* As an example, see the solrconfig.xml that the tests use:
* solr/core/src/test-files/solr/collection1/conf/solrconfig-externalversionconstraint.xml
* </p>
* <p>
* When documents are added through this processor, if a document with the same
* unique key already exists in the collection, then the value of the
* <code>versionField</code> in the <i>existing</i> document is not less then the
* field value in the <i>new</i> document then the new document is rejected with a
* 409 Version Conflict error.
* </p>
* <p>
* In addition to the mandatory <code>versionField</code> init param, two additional
* optional init params affect the behavior of this factory:
* </p>
* <ul>
* <li><code>deleteVersionParam</code> - This string parameter controls whether this
* processor will intercept and inspect Delete By Id commands in addition to adding
* documents. If specified, then the value will specify the name of a request
* paramater which becomes mandatory for all Delete By Id commands. This param
* must then be used to specify the document version associated with the delete.
* If the version specified using this param is not greater then the value in the
* <code>versionField</code> for any existing document, then the delete will fail
* with a 409 Version Conflict error. When using this param, Any Delete By Id
* command with a high enough document version number to succeed will be internally
* converted into an Add Document command that replaces the existing document with
* a new one which is empty except for the Unique Key and <code>versionField</code>
* to keeping a record of the deleted version so future Add Document commands will
* fail if their "new" version is not high enough.</li>
*
* <li><code>ignoreOldUpdates</code> - This boolean parameter defaults to
* <code>false</code>, but if set to <code>true</code> causes any update with a
* document version that is not great enough to be silently ignored (and return
* a status 200 to the client) instead of generating a 409 Version Conflict error.
* </li>
* </ul>
*/
public class DocBasedVersionConstraintsProcessorFactory extends UpdateRequestProcessorFactory implements SolrCoreAware, UpdateRequestProcessorFactory.RunAlways {
public final static Logger log = LoggerFactory.getLogger(DocBasedVersionConstraintsProcessorFactory.class);
private boolean ignoreOldUpdates = false;
private String versionField = null;
private String deleteVersionParamName = null;
private boolean useFieldCache;
@Override
public void init( NamedList args ) {
Object tmp = args.remove("versionField");
if (null == tmp) {
throw new SolrException(SERVER_ERROR,
"'versionField' must be configured");
}
if (! (tmp instanceof String) ) {
throw new SolrException(SERVER_ERROR,
"'versionField' must be configured as a <str>");
}
versionField = tmp.toString();
// optional
tmp = args.remove("deleteVersionParam");
if (null != tmp) {
if (! (tmp instanceof String) ) {
throw new SolrException(SERVER_ERROR,
"'deleteVersionParam' must be configured as a <str>");
}
deleteVersionParamName = tmp.toString();
}
// optional - defaults to false
tmp = args.remove("ignoreOldUpdates");
if (null != tmp) {
if (! (tmp instanceof Boolean) ) {
throw new SolrException(SERVER_ERROR,
"'ignoreOldUpdates' must be configured as a <bool>");
}
ignoreOldUpdates = ((Boolean)tmp).booleanValue();
}
super.init(args);
}
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
return new DocBasedVersionConstraintsProcessor(versionField,
ignoreOldUpdates,
deleteVersionParamName,
useFieldCache,
req, rsp, next);
}
@Override
public void inform(SolrCore core) {
if (core.getUpdateHandler().getUpdateLog() == null) {
throw new SolrException(SERVER_ERROR,
"updateLog must be enabled.");
}
if (core.getLatestSchema().getUniqueKeyField() == null) {
throw new SolrException(SERVER_ERROR,
"schema must have uniqueKey defined.");
}
SchemaField userVersionField = core.getLatestSchema().getField(versionField);
if (userVersionField == null || !userVersionField.stored() || userVersionField.multiValued()) {
throw new SolrException(SERVER_ERROR,
"field " + versionField + " must be defined in schema, be stored, and be single valued.");
}
try {
ValueSource vs = userVersionField.getType().getValueSource(userVersionField, null);
useFieldCache = true;
} catch (Exception e) {
log.warn("Can't use fieldcache/valuesource: " + e.getMessage());
}
}
private static class DocBasedVersionConstraintsProcessor
extends UpdateRequestProcessor {
private final String versionFieldName;
private final SchemaField userVersionField;
private final SchemaField solrVersionField;
private final boolean ignoreOldUpdates;
private final String deleteVersionParamName;
private final SolrCore core;
private long oldSolrVersion; // current _version_ of the doc in the index/update log
private DistributedUpdateProcessor distribProc; // the distributed update processor following us
private DistributedUpdateProcessor.DistribPhase phase;
private boolean useFieldCache;
public DocBasedVersionConstraintsProcessor(String versionField,
boolean ignoreOldUpdates,
String deleteVersionParamName,
boolean useFieldCache,
SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
super(next);
this.ignoreOldUpdates = ignoreOldUpdates;
this.deleteVersionParamName = deleteVersionParamName;
this.core = req.getCore();
this.versionFieldName = versionField;
this.userVersionField = core.getLatestSchema().getField(versionField);
this.solrVersionField = core.getLatestSchema().getField(VersionInfo.VERSION_FIELD);
this.useFieldCache = useFieldCache;
for (UpdateRequestProcessor proc = next ;proc != null; proc = proc.next) {
if (proc instanceof DistributedUpdateProcessor) {
distribProc = (DistributedUpdateProcessor)proc;
break;
}
}
if (distribProc == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "DistributedUpdateProcessor must follow DocBasedVersionConstraintsProcessor");
}
phase = DistributedUpdateProcessor.DistribPhase.parseParam(req.getParams().get(DISTRIB_UPDATE_PARAM));
}
/**
* Inspects a raw field value (which may come from a doc in the index, or a
* doc in the UpdateLog that still has String values, or a String sent by
* the user as a param) and if it is a String, asks the versionField FieldType
* to convert it to an Object suitable for comparison.
*/
private Object convertFieldValueUsingType(SchemaField sf, final Object rawValue) {
if (rawValue instanceof CharSequence) {
// in theory, the FieldType might still be CharSequence based,
// but in that case trust it to do an identiy conversion...
FieldType fieldType = userVersionField.getType();
BytesRef term = new BytesRef();
fieldType.readableToIndexed((CharSequence)rawValue, term);
return fieldType.toObject(userVersionField, term);
}
// else...
return rawValue;
}
/**
* Returns true if the specified new version value is greater the the one
* already known to exist for the document, or the document does not already
* exist.
* Returns false if the specified new version is not high enough but the
* processor has been configured with ignoreOldUpdates=true
* Throws a SolrException if the version is not high enough and
* ignoreOldUpdates=false
*/
private boolean isVersionNewEnough(BytesRef indexedDocId,
Object newUserVersion) throws IOException {
assert null != indexedDocId;
assert null != newUserVersion;
oldSolrVersion = -1;
newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
Object oldUserVersion = null;
SolrInputDocument oldDoc = null;
if (useFieldCache) {
oldDoc = RealTimeGetComponent.getInputDocumentFromTlog(core, indexedDocId);
if (oldDoc == RealTimeGetComponent.DELETED) {
return true;
}
if (oldDoc == null) {
// need to look up in index now...
RefCounted<SolrIndexSearcher> newestSearcher = core.getRealtimeSearcher();
try {
SolrIndexSearcher searcher = newestSearcher.get();
long lookup = searcher.lookupId(indexedDocId);
if (lookup < 0) {
// doc not in index either...
return true;
}
ValueSource vs = solrVersionField.getType().getValueSource(solrVersionField, null);
Map context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
FunctionValues fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
oldSolrVersion = fv.longVal((int)lookup);
vs = userVersionField.getType().getValueSource(userVersionField, null);
context = ValueSource.newContext(searcher);
vs.createWeight(context, searcher);
fv = vs.getValues(context, searcher.getTopReaderContext().leaves().get((int)(lookup>>32)));
oldUserVersion = fv.objectVal((int)lookup);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Error reading version from index", e);
} finally {
if (newestSearcher != null) {
newestSearcher.decref();
}
}
}
} else {
// stored fields only...
oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
return true;
}
}
if (oldDoc != null) {
oldUserVersion = oldDoc.getFieldValue(versionFieldName);
// Make the FieldType resolve any conversion we need.
oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
Object o = oldDoc.getFieldValue(solrVersionField.getName());
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
}
oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
}
if ( null == oldUserVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField
throw new SolrException(SERVER_ERROR,
"Doc exists in index, but has null versionField: "
+ versionFieldName);
}
if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
}
try {
if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
return true;
}
if (ignoreOldUpdates) {
return false;
} else {
throw new SolrException(CONFLICT,
"user version is not high enough: " + newUserVersion);
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
private boolean isVersionNewEnoughStoredOnly(BytesRef indexedDocId,
Object newUserVersion) throws IOException {
assert null != indexedDocId;
assert null != newUserVersion;
oldSolrVersion = -1;
// :TODO: would be nice if a full RTG was not always needed here, ideas...
// - first check fieldCache/docVals - if a versionField exists
// in index that is already greater then this cmd, fail fast
// (no need to check updateLog, new version already too low)
// - first check if docId is in the updateLog w/o doing the full get, if
// it's not then check fieldCache/docVals
// - track versionField externally from updateLog (or as a special case
// that can be looked up by itself - similar to how _version_ is dealt with)
//
// Depending on if/when/how this is changed, what we assert about
// versionField on init will need updated.
newUserVersion = convertFieldValueUsingType(userVersionField, newUserVersion);
Object oldUserVersion = null;
SolrInputDocument oldDoc =
RealTimeGetComponent.getInputDocument(core, indexedDocId);
if (null == oldDoc) {
return true;
}
oldUserVersion = oldDoc.getFieldValue(versionFieldName);
if ( null == oldUserVersion) {
// could happen if they turn this feature on after building an index
// w/o the versionField
throw new SolrException(SERVER_ERROR,
"Doc exists in index, but has null versionField: "
+ versionFieldName);
}
// Make the FieldType resolve any conversion we need.
oldUserVersion = convertFieldValueUsingType(userVersionField, oldUserVersion);
if (! (oldUserVersion instanceof Comparable && newUserVersion instanceof Comparable) ) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass());
}
try {
if (0 < ((Comparable)newUserVersion).compareTo((Comparable) oldUserVersion)) {
// since we're going to proceed with this update, we need to find the _version_
// so we can use optimistic concurrency.
Object o = oldDoc.getFieldValue(VersionInfo.VERSION_FIELD);
if (o == null) {
throw new SolrException(SERVER_ERROR, "No _version_ for document "+ oldDoc);
}
oldSolrVersion = o instanceof Number ? ((Number) o).longValue() : Long.parseLong(o.toString());
return true;
}
if (ignoreOldUpdates) {
return false;
} else {
throw new SolrException(CONFLICT,
"user version is not high enough: " + newUserVersion);
}
} catch (ClassCastException e) {
throw new SolrException(BAD_REQUEST,
"old version and new version are not comparable: " +
oldUserVersion.getClass()+" vs "+newUserVersion.getClass() +
": " + e.getMessage(), e);
}
}
public boolean isLeader(UpdateCommand cmd) {
if ((cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0) {
return false;
}
if (phase == DistributedUpdateProcessor.DistribPhase.FROMLEADER) {
return false;
}
// if phase==TOLEADER, we can't just assume we are the leader... let the normal logic check.
return distribProc.isLeader(cmd);
}
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (!isLeader(cmd)) {
super.processAdd(cmd);
}
final SolrInputDocument newDoc = cmd.getSolrInputDocument();
Object newVersion = newDoc.getFieldValue(versionFieldName);
if ( null == newVersion ) {
throw new SolrException(BAD_REQUEST, "Doc does not have versionField: " + versionFieldName);
}
for (int i=0; ;i++) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
}
if (!isVersionNewEnough(cmd.getIndexedId(), newVersion)) {
// drop older update
return;
}
try {
cmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(cmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
if (null == deleteVersionParamName) {
// not suppose to look at deletes at all
super.processDelete(cmd);
return;
}
if ( ! cmd.isDeleteById() ) {
// nothing to do
super.processDelete(cmd);
return;
}
String deleteParamValue = cmd.getReq().getParams().get(deleteVersionParamName);
if (null == deleteParamValue) {
throw new SolrException(BAD_REQUEST,
"Delete by ID must specify doc version param: " +
deleteVersionParamName);
}
if (!isLeader(cmd)) {
// transform delete to add earlier rather than later
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
newDoc.setField(versionFieldName, deleteParamValue);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
super.processAdd(newCmd);
}
for (int i=0; ;i++) {
// Log a warning every 256 retries.... even a few retries should normally be very unusual.
if ((i&0xff) == 0xff) {
log.warn("Unusual number of optimistic concurrency retries: retries=" + i + " cmd=" + cmd);
}
if (!isVersionNewEnough(cmd.getIndexedId(), deleteParamValue)) {
// drop this older update
return;
}
// :TODO: should this logic be split and driven by two params?
// - deleteVersionParam to do a version check
// - some new boolean param to determine if a stub document gets added in place?
try {
// drop the delete, and instead propogate an AddDoc that
// replaces the doc with a new "empty" one that records the deleted version
SolrInputDocument newDoc = new SolrInputDocument();
newDoc.setField(core.getLatestSchema().getUniqueKeyField().getName(),
cmd.getId());
newDoc.setField(versionFieldName, deleteParamValue);
AddUpdateCommand newCmd = new AddUpdateCommand(cmd.getReq());
newCmd.solrDoc = newDoc;
newCmd.commitWithin = cmd.commitWithin;
newCmd.setVersion(oldSolrVersion); // use optimistic concurrency to ensure that the doc has not changed in the meantime
super.processAdd(newCmd);
return;
} catch (SolrException e) {
if (e.code() == 409) {
continue; // if a version conflict, retry
}
throw e; // rethrow
}
}
}
} // end inner class
}

View File

@ -0,0 +1,125 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<config>
<jmx />
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<directoryFactory name="DirectoryFactory" class="${solr.directoryFactory:solr.RAMDirectoryFactory}">
<!-- used to keep RAM reqs down for HdfsDirectoryFactory -->
<bool name="solr.hdfs.blockcache.enabled">${solr.hdfs.blockcache.enabled:true}</bool>
<int name="solr.hdfs.blockcache.blocksperbank">${solr.hdfs.blockcache.blocksperbank:1024}</int>
<str name="solr.hdfs.home">${solr.hdfs.home:}</str>
<str name="solr.hdfs.confdir">${solr.hdfs.confdir:}</str>
</directoryFactory>
<dataDir>${solr.data.dir:}</dataDir>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog>
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<updateRequestProcessorChain name="external-version-constraint" default="true">
<!-- this chain uses the processor using the "deleteVersionParam" option
so that deleteById requests are translated into updates to preserve the
(logically) deleted document in the index with a record of it's deleted
version.
It also demonstrates how to mix in TimestampUpdateProcessorFactory and
DefaultValueUpdateProcessorFactory to ensure these logically deleted
documents are kept out of searches, but can be cleaned up periodically
after some amount of time has elapsed.
-->
<processor class="solr.DefaultValueUpdateProcessorFactory">
<!-- give all docs a true value to denote that they are alive -->
<str name="fieldName">live_b</str>
<bool name="value">true</bool>
</processor>
<!-- process the external version constraint, ignoring any updates that
don't satisfy the constraint -->
<processor class="solr.DocBasedVersionConstraintsProcessorFactory">
<bool name="ignoreOldUpdates">true</bool>
<str name="versionField">my_version_l</str>
<str name="deleteVersionParam">del_version</str>
</processor>
<processor class="solr.DefaultValueUpdateProcessorFactory">
<!-- any doc that makes it this here w/o a live value is a logically
deleted doc generated by the previous processor in place of deleteById
-->
<str name="fieldName">live_b</str>
<bool name="value">false</bool>
</processor>
<processor class="solr.TimestampUpdateProcessorFactory">
<!-- give every doc, including the logically deleted ones, a timestamp -->
<str name="fieldName">timestamp_tdt</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="external-version-float">
<!-- this chain uses the processor with float based versionField just
to sanity check that there are no hardcoded assumptions about the
field type used that could byte us in the ass.
-->
<processor class="solr.DocBasedVersionConstraintsProcessorFactory">
<!-- process the external version constraint, ignoring any updates that
don't satisfy the constraint
-->
<bool name="ignoreOldUpdates">true</bool>
<str name="versionField">my_version_f</str>
<str name="deleteVersionParam">del_version</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<updateRequestProcessorChain name="external-version-failhard">
<!-- Uses the default behavior of failing with a 409 version conflict
when the external version is too low.
-->
<processor class="solr.DocBasedVersionConstraintsProcessorFactory">
<str name="versionField">my_version_l</str>
<str name="deleteVersionParam">del_version</str>
</processor>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="standard" class="solr.StandardRequestHandler">
</requestHandler>
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
</config>

View File

@ -0,0 +1,305 @@
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.util.StrUtils;
import org.junit.BeforeClass;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestDistribDocBasedVersion extends AbstractFullDistribZkTestBase {
String bucket1 = "shard1"; // shard1: top bits:10 80000000:ffffffff
String bucket2 = "shard2"; // shard2: top bits:00 00000000:7fffffff
private static String vfield = "my_version_l";
@BeforeClass
public static void beforeShardHashingTest() throws Exception {
useFactory(null);
}
@Override
protected String getCloudSolrConfig() {
return "solrconfig-externalversionconstraint.xml";
}
public TestDistribDocBasedVersion() {
schemaString = "schema15.xml"; // we need a string id
super.sliceCount = 2;
super.shardCount = 4;
super.fixShardCount = true; // we only want to test with exactly 2 slices.
/***
hash of a is 3c2569b2 high bits=0 shard=shard3
hash of b is 95de7e03 high bits=2 shard=shard1
hash of c is e132d65f high bits=3 shard=shard2
hash of d is 27191473 high bits=0 shard=shard3
hash of e is 656c4367 high bits=1 shard=shard4
hash of f is 2b64883b high bits=0 shard=shard3
hash of g is f18ae416 high bits=3 shard=shard2
hash of h is d482b2d3 high bits=3 shard=shard2
hash of i is 811a702b high bits=2 shard=shard1
hash of j is ca745a39 high bits=3 shard=shard2
hash of k is cfbda5d1 high bits=3 shard=shard2
hash of l is 1d5d6a2c high bits=0 shard=shard3
hash of m is 5ae4385c high bits=1 shard=shard4
hash of n is c651d8ac high bits=3 shard=shard2
hash of o is 68348473 high bits=1 shard=shard4
hash of p is 986fdf9a high bits=2 shard=shard1
hash of q is ff8209e8 high bits=3 shard=shard2
hash of r is 5c9373f1 high bits=1 shard=shard4
hash of s is ff4acaf1 high bits=3 shard=shard2
hash of t is ca87df4d high bits=3 shard=shard2
hash of u is 62203ae0 high bits=1 shard=shard4
hash of v is bdafcc55 high bits=2 shard=shard1
hash of w is ff439d1f high bits=3 shard=shard2
hash of x is 3e9a9b1b high bits=0 shard=shard3
hash of y is 477d9216 high bits=1 shard=shard4
hash of z is c1f69a17 high bits=3 shard=shard2
***/
}
@Override
public void doTest() throws Exception {
boolean testFinished = false;
try {
handle.clear();
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
// todo: do I have to do this here?
waitForRecoveriesToFinish(false);
doTestDocVersions();
testFinished = true;
} finally {
if (!testFinished) {
printLayoutOnTearDown = true;
}
}
}
private void doTestDocVersions() throws Exception {
log.info("### STARTING doTestDocVersions");
assertEquals(2, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
ss = cloudClient;
vadd("b!doc1", 10);
vadd("c!doc2", 11);
vadd("d!doc3", 10);
vadd("e!doc4", 11);
doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
vadd("b!doc1", 5);
vadd("c!doc2", 10);
vadd("d!doc3", 9);
vadd("e!doc4", 8);
doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "10,11,10,11");
vadd("b!doc1", 24);
vadd("c!doc2", 23);
vadd("d!doc3", 22);
vadd("e!doc4", 21);
doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
vdelete("b!doc1", 20);
doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "24,23,22,21");
vdelete("b!doc1", 30);
doRTG("b!doc1,c!doc2,d!doc3,e!doc4", "30,23,22,21");
// try delete before add
vdelete("b!doc123", 100);
vadd("b!doc123", 99);
doRTG("b!doc123", "100");
// now add greater
vadd("b!doc123", 101);
doRTG("b!doc123", "101");
//
// now test with a non-smart client
//
List<CloudJettyRunner> runners = shardToJetty.get(bucket2);
CloudJettyRunner leader = shardToLeaderJetty.get(bucket2);
CloudJettyRunner replica = null;
for (CloudJettyRunner r : runners) {
if (r != leader) replica = r;
}
ss = replica.client.solrClient;
vadd("b!doc5", 10);
vadd("c!doc6", 11);
vadd("d!doc7", 10);
vadd("e!doc8", 11);
doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
vadd("b!doc5", 5);
vadd("c!doc6", 10);
vadd("d!doc7", 9);
vadd("e!doc8", 8);
doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "10,11,10,11");
vadd("b!doc5", 24);
vadd("c!doc6", 23);
vadd("d!doc7", 22);
vadd("e!doc8", 21);
doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
vdelete("b!doc5", 20);
doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "24,23,22,21");
vdelete("b!doc5", 30);
doRTG("b!doc5,c!doc6,d!doc7,e!doc8", "30,23,22,21");
// try delete before add
vdelete("b!doc1234", 100);
vadd("b!doc1234", 99);
doRTG("b!doc1234", "100");
// now add greater
vadd("b!doc1234", 101);
doRTG("b!doc1234", "101");
commit();
// check liveness for all docs
doQuery("b!doc123,101,c!doc2,23,d!doc3,22,e!doc4,21,b!doc1234,101,c!doc6,23,d!doc7,22,e!doc8,21", "q","live_b:true");
doQuery("b!doc1,30,b!doc5,30", "q","live_b:false");
// delete by query should just work like normal
doDBQ("id:b!doc1 OR id:e*");
commit();
doQuery("b!doc123,101,c!doc2,23,d!doc3,22,b!doc1234,101,c!doc6,23,d!doc7,22", "q","live_b:true");
doQuery("b!doc5,30", "q","live_b:false");
}
SolrServer ss;
void vdelete(String id, long version) throws Exception {
UpdateRequest req = new UpdateRequest();
req.deleteById(id);
req.setParam("del_version", Long.toString(version));
ss.request(req);
// req.process(cloudClient);
}
void vadd(String id, long version) throws Exception {
index("id", id, vfield, version);
}
void doQuery(String expectedDocs, String... queryParams) throws Exception {
List<String> strs = StrUtils.splitSmart(expectedDocs, ",", true);
Map<String, Object> expectedIds = new HashMap<String,Object>();
for (int i=0; i<strs.size(); i+=2) {
String id = strs.get(i);
String vS = strs.get(i+1);
Long v = Long.valueOf(vS);
expectedIds.put(id,v);
}
QueryResponse rsp = cloudClient.query(params(queryParams));
Map<String, Object> obtainedIds = new HashMap<String,Object>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.put((String) doc.get("id"), doc.get(vfield));
}
assertEquals(expectedIds, obtainedIds);
}
void doRTG(String ids, String versions) throws Exception {
Map<String, Object> expectedIds = new HashMap<String,Object>();
List<String> strs = StrUtils.splitSmart(ids, ",", true);
List<String> verS = StrUtils.splitSmart(versions, ",", true);
for (int i=0; i<strs.size(); i++) {
expectedIds.put(strs.get(i), Long.valueOf(verS.get(i)));
}
ss.query(params("qt","/get", "ids",ids));
QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
Map<String, Object> obtainedIds = new HashMap<String,Object>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.put((String) doc.get("id"), doc.get(vfield));
}
assertEquals(expectedIds, obtainedIds);
}
void doRTG(String ids) throws Exception {
ss.query(params("qt","/get", "ids",ids));
Set<String> expectedIds = new HashSet<String>( StrUtils.splitSmart(ids, ",", true) );
QueryResponse rsp = cloudClient.query(params("qt","/get", "ids",ids));
Set<String> obtainedIds = new HashSet<String>();
for (SolrDocument doc : rsp.getResults()) {
obtainedIds.add((String) doc.get("id"));
}
assertEquals(expectedIds, obtainedIds);
}
// TODO: refactor some of this stuff into the SolrJ client... it should be easier to use
void doDBQ(String q, String... reqParams) throws Exception {
UpdateRequest req = new UpdateRequest();
req.deleteByQuery(q);
req.setParams(params(reqParams));
req.process(cloudClient);
}
@Override
public void tearDown() throws Exception {
super.tearDown();
}
}

View File

@ -0,0 +1,321 @@
package org.apache.solr.search;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.util.TestHarness;
import org.junit.BeforeClass;
import org.junit.Test;
import org.noggit.ObjectBuilder;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TestStressUserVersions extends TestRTGBase {
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-externalversionconstraint.xml","schema15.xml");
}
private static String vfield = "my_version_l";
private static String lfield = "live_b";
private static String dversion = "del_version";
public static void verbose(Object... args) {
// if (!log.isDebugEnabled()) return;
StringBuilder sb = new StringBuilder("VERBOSE:");
for (Object o : args) {
sb.append(' ');
sb.append(o==null ? "(null)" : o.toString());
}
log.info(sb.toString());
}
// This version simulates user versions sometimes being reordered.
// It should fail (and currently does) if optimistic concurrency is disabled (cmd.setVersion(currVersion))
// in DocBasedVersionConstraintsProcessor.
@Test
public void testStressReorderVersions() throws Exception {
clearIndex();
assertU(commit());
final int commitPercent = 5 + random().nextInt(20);
final int softCommitPercent = 30+random().nextInt(75); // what percent of the commits are soft
final int deletePercent = 4+random().nextInt(25);
final int deleteByQueryPercent = random().nextInt(8);
final int ndocs = 5 + (random().nextBoolean() ? random().nextInt(25) : random().nextInt(200));
int nWriteThreads = 5 + random().nextInt(25);
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
final int percentRealtimeQuery = 75;
final AtomicLong operations = new AtomicLong(10000); // number of query operations to perform in total - ramp up for a longer test
int nReadThreads = 5 + random().nextInt(25);
/** // testing
final int commitPercent = 5;
final int softCommitPercent = 100; // what percent of the commits are soft
final int deletePercent = 0;
final int deleteByQueryPercent = 50;
final int ndocs = 1;
int nWriteThreads = 2;
final int maxConcurrentCommits = nWriteThreads; // number of committers at a time... it should be <= maxWarmingSearchers
// query variables
final int percentRealtimeQuery = 101;
final AtomicLong operations = new AtomicLong(50000); // number of query operations to perform in total
int nReadThreads = 1;
**/
verbose("commitPercent",commitPercent, "softCommitPercent",softCommitPercent, "deletePercent",deletePercent, "deleteByQueryPercent",deleteByQueryPercent
, "ndocs",ndocs,"nWriteThreads",nWriteThreads,"percentRealtimeQuery",percentRealtimeQuery,"operations",operations, "nReadThreads",nReadThreads);
initModel(ndocs);
final AtomicInteger numCommitting = new AtomicInteger();
List<Thread> threads = new ArrayList<Thread>();
final AtomicLong testVersion = new AtomicLong(0);
for (int i=0; i<nWriteThreads; i++) {
Thread thread = new Thread("WRITER"+i) {
Random rand = new Random(random().nextInt());
@Override
public void run() {
try {
while (operations.get() > 0) {
int oper = rand.nextInt(100);
if (oper < commitPercent) {
if (numCommitting.incrementAndGet() <= maxConcurrentCommits) {
Map<Integer,DocInfo> newCommittedModel;
long version;
synchronized(TestStressUserVersions.this) {
newCommittedModel = new HashMap<Integer,DocInfo>(model); // take a snapshot
version = snapshotCount++;
}
if (rand.nextInt(100) < softCommitPercent) {
verbose("softCommit start");
assertU(TestHarness.commit("softCommit","true"));
verbose("softCommit end");
} else {
verbose("hardCommit start");
assertU(commit());
verbose("hardCommit end");
}
synchronized(TestStressUserVersions.this) {
// install this model snapshot only if it's newer than the current one
if (version >= committedModelClock) {
if (VERBOSE) {
verbose("installing new committedModel version="+committedModelClock);
}
committedModel = newCommittedModel;
committedModelClock = version;
}
}
}
numCommitting.decrementAndGet();
continue;
}
int id;
if (rand.nextBoolean()) {
id = rand.nextInt(ndocs);
} else {
id = lastId; // reuse the last ID half of the time to force more race conditions
}
// set the lastId before we actually change it sometimes to try and
// uncover more race conditions between writing and reading
boolean before = rand.nextBoolean();
if (before) {
lastId = id;
}
DocInfo info = model.get(id);
long val = info.val;
long nextVal = Math.abs(val)+1;
// the version we set on the update should determine who wins
// These versions are not derived from the actual leader update handler hand hence this
// test may need to change depending on how we handle version numbers.
long version = testVersion.incrementAndGet();
// yield after getting the next version to increase the odds of updates happening out of order
if (rand.nextBoolean()) Thread.yield();
if (oper < commitPercent + deletePercent) {
verbose("deleting id",id,"val=",nextVal,"version",version);
Long returnedVersion = deleteAndGetVersion(Integer.toString(id), params(dversion, Long.toString(version)));
// only update model if the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (Math.abs(version) > Math.abs(currInfo.version)) {
model.put(id, new DocInfo(version, -nextVal));
}
}
verbose("deleting id", id, "val=",nextVal,"version",version,"DONE");
} else {
verbose("adding id", id, "val=", nextVal,"version",version);
Long returnedVersion = addAndGetVersion(sdoc("id", Integer.toString(id), field, Long.toString(nextVal), vfield, Long.toString(version)), null);
// only update model if the version is newer
synchronized (model) {
DocInfo currInfo = model.get(id);
if (version > currInfo.version) {
model.put(id, new DocInfo(version, nextVal));
}
}
if (VERBOSE) {
verbose("adding id", id, "val=", nextVal,"version",version,"DONE");
}
}
// } // end sync
if (!before) {
lastId = id;
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("",e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
for (int i=0; i<nReadThreads; i++) {
Thread thread = new Thread("READER"+i) {
Random rand = new Random(random().nextInt());
@Override
public void run() {
try {
while (operations.decrementAndGet() >= 0) {
// bias toward a recently changed doc
int id = rand.nextInt(100) < 25 ? lastId : rand.nextInt(ndocs);
// when indexing, we update the index, then the model
// so when querying, we should first check the model, and then the index
boolean realTime = rand.nextInt(100) < percentRealtimeQuery;
DocInfo info;
if (realTime) {
info = model.get(id);
} else {
synchronized(TestStressUserVersions.this) {
info = committedModel.get(id);
}
}
if (VERBOSE) {
verbose("querying id", id);
}
SolrQueryRequest sreq;
if (realTime) {
sreq = req("wt","json", "qt","/get", "ids",Integer.toString(id));
} else {
sreq = req("wt","json", "q","id:"+Integer.toString(id), "omitHeader","true");
}
String response = h.query(sreq);
Map rsp = (Map)ObjectBuilder.fromJSON(response);
List doclist = (List)(((Map)rsp.get("response")).get("docs"));
if (doclist.size() == 0) {
// there's no info we can get back with a delete, so not much we can check without further synchronization
} else {
assertEquals(1, doclist.size());
boolean isLive = (Boolean)(((Map)doclist.get(0)).get(lfield));
long foundVer = (Long)(((Map)doclist.get(0)).get(vfield));
if (isLive) {
long foundVal = (Long)(((Map)doclist.get(0)).get(field));
if (foundVer < Math.abs(info.version)
|| (foundVer == info.version && foundVal != info.val) ) { // if the version matches, the val must
log.error("ERROR, id=" + id + " found=" + response + " model" + info);
assertTrue(false);
}
} else {
// if the doc is deleted (via tombstone), it shouldn't have a value on it.
assertNull( ((Map)doclist.get(0)).get(field) );
if (foundVer < Math.abs(info.version)) {
log.error("ERROR, id=" + id + " found=" + response + " model" + info);
assertTrue(false);
}
}
}
}
} catch (Throwable e) {
operations.set(-1L);
log.error("",e);
throw new RuntimeException(e);
}
}
};
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
}
}

View File

@ -0,0 +1,421 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import org.apache.lucene.util._TestUtil;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrException;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestDocBasedVersionConstraints extends SolrTestCaseJ4 {
@BeforeClass
public static void beforeClass() throws Exception {
initCore("solrconfig-externalversionconstraint.xml", "schema15.xml");
}
@Before
public void before() throws Exception {
assertU(delQ("*:*"));
assertU(commit());
}
public void testSimpleUpdates() throws Exception {
// skip low version against commited data
assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
assertU(commit());
assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
assertU(commit());
assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "1"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
// skip low version against uncommited data from updateLog
assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "7"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a3'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a3'}}");
// interleave updates to multiple docs using same versions
for (long ver = 1010; ver < 1020; ver++) {
for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
assertU(adoc("id", id, "my_version_l", ""+ver));
}
}
for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
assertU(adoc("id", id, "name", "XX", "my_version_l", "10"));
assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
, "=={'doc':{'my_version_l':"+1019+"}}");
}
assertU(commit());
assertJQ(req("q","name:XX"), "/response/numFound==0");
for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
assertJQ(req("q","+id:"+id), "/response/numFound==1");
assertJQ(req("q","+name:XX +id:"+id), "/response/numFound==0");
assertJQ(req("q","+id:"+id + " +my_version_l:1019"), "/response/numFound==1");
assertJQ(req("qt","/get", "id",id, "fl","my_version_l")
, "=={'doc':{'my_version_l':"+1019+"}}");
}
}
public void testSimpleDeletes() throws Exception {
// skip low version delete against commited doc
assertU(adoc("id", "aaa", "name", "a1", "my_version_l", "1001"));
assertU(commit());
assertU(adoc("id", "aaa", "name", "a2", "my_version_l", "1002"));
assertU(commit());
deleteAndGetVersion("aaa",
params("del_version", "7"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
// skip low version delete against uncommited doc from updateLog
assertU(adoc("id", "aaa", "name", "a3", "my_version_l", "1003"));
deleteAndGetVersion("aaa",
params("del_version", "8"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a3'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:a3"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a3'}}");
// skip low version add against uncommited "delete" from updateLog
deleteAndGetVersion("aaa", params("del_version", "1010"));
assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "22"));
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}");
// skip low version add against committed "delete"
// (delete was already done & committed above)
assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "23"));
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}");
}
/**
* Sanity check that there are no hardcoded assumptions about the
* field type used that could byte us in the ass.
*/
public void testFloatVersionField() throws Exception {
// skip low version add & low version delete against commited doc
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_f", "10.01")),
params("update.chain","external-version-float"));
assertU(commit());
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "4.2")),
params("update.chain","external-version-float"));
assertU(commit());
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a1'}}");
deleteAndGetVersion("aaa", params("del_version", "7",
"update.chain","external-version-float"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a1'}}");
assertU(commit());
// skip low version delete against uncommited doc from updateLog
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_f", "10.02")),
params("update.chain","external-version-float"));
deleteAndGetVersion("aaa", params("del_version", "8",
"update.chain","external-version-float"));
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
// skip low version add against uncommited "delete" from updateLog
deleteAndGetVersion("aaa", params("del_version", "10.10",
"update.chain","external-version-float"));
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.05")),
params("update.chain","external-version-float"));
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
, "=={'doc':{'my_version_f':10.10}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
, "=={'doc':{'my_version_f':10.10}}");
// skip low version add against committed "delete"
// (delete was already done & committed above)
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_f", "10.09")),
params("update.chain","external-version-float"));
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
, "=={'doc':{'my_version_f':10.10}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_f")
, "=={'doc':{'my_version_f':10.10}}");
}
public void testFailOnOldVersion() throws Exception {
// fail low version add & low version delete against commited doc
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a1", "my_version_l", "1001")),
params("update.chain","external-version-failhard"));
assertU(commit());
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "42")),
params("update.chain","external-version-failhard"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
assertU(commit());
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a1'}}");
try {
deleteAndGetVersion("aaa", params("del_version", "7",
"update.chain","external-version-failhard"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a1'}}");
assertU(commit());
// fail low version delete against uncommited doc from updateLog
updateJ(jsonAdd(sdoc("id", "aaa", "name", "a2", "my_version_l", "1002")),
params("update.chain","external-version-failhard"));
try {
deleteAndGetVersion("aaa", params("del_version", "8",
"update.chain","external-version-failhard"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:a2"), "/response/numFound==1");
assertJQ(req("qt","/get", "id","aaa", "fl","name")
, "=={'doc':{'name':'a2'}}");
// fail low version add against uncommited "delete" from updateLog
deleteAndGetVersion("aaa", params("del_version", "1010",
"update.chain","external-version-failhard"));
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1005")),
params("update.chain","external-version-failhard"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}");
// fail low version add against committed "delete"
// (delete was already done & committed above)
try {
updateJ(jsonAdd(sdoc("id", "aaa", "name", "XX", "my_version_l", "1009")),
params("update.chain","external-version-failhard"));
fail("no 409");
} catch (SolrException ex) {
assertEquals(409, ex.code());
}
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}}");
assertU(commit());
assertJQ(req("q","+id:aaa"), "/response/numFound==1");
assertJQ(req("q","+id:aaa +name:XX"), "/response/numFound==0");
assertJQ(req("qt","/get", "id","aaa", "fl","my_version_l")
, "=={'doc':{'my_version_l':1010}}");
}
/**
* Proof of concept test demonstrating how to manage and periodically cleanup
* the "logically" deleted documents
*/
public void testManagingDeletes() throws Exception {
// add some docs
for (long ver = 1010; ver < 1020; ver++) {
for (String id : new String[] {"aaa", "bbb", "ccc", "ddd"}) {
assertU(adoc("id", id, "name", "name_"+id, "my_version_l", ""+ver));
}
}
assertU(adoc("id", "aaa", "name", "name_aaa", "my_version_l", "1030"));
assertU(commit());
// sample queries
assertJQ(req("q","*:*",
"fq","live_b:true")
,"/response/numFound==4");
assertJQ(req("q","id:aaa",
"fq","live_b:true",
"fl","id,my_version_l")
,"/response/numFound==1"
,"/response/docs==[{'id':'aaa','my_version_l':1030}]}");
// logically delete
deleteAndGetVersion("aaa",
params("del_version", "1031"));
assertU(commit());
// sample queries
assertJQ(req("q","*:*",
"fq","live_b:true")
,"/response/numFound==3");
assertJQ(req("q","id:aaa",
"fq","live_b:true")
,"/response/numFound==0");
// placeholder doc is still in the index though
assertJQ(req("q","id:aaa",
"fq","live_b:false",
"fq", "timestamp_tdt:[* TO *]",
"fl","id,live_b,my_version_l")
,"/response/numFound==1"
,"/response/docs==[{'id':'aaa','my_version_l':1031,'live_b':false}]}");
// doc can't be re-added with a low version
assertU(adoc("id", "aaa", "name", "XX", "my_version_l", "1025"));
assertU(commit());
assertJQ(req("q","id:aaa",
"fq","live_b:true")
,"/response/numFound==0");
// "dead" placeholder docs can be periodically cleaned up
// ie: assertU(delQ("+live_b:false +timestamp_tdt:[* TO NOW/MINUTE-5MINUTE]"));
// but to prevent the test from ebing time sensitive we'll just purge them all
assertU(delQ("+live_b:false"));
assertU(commit());
// now doc can be re-added w/any version, no matter how low
assertU(adoc("id", "aaa", "name", "aaa", "my_version_l", "7"));
assertU(commit());
assertJQ(req("q","id:aaa",
"fq","live_b:true",
"fl","id,live_b,my_version_l")
,"/response/numFound==1"
,"/response/docs==[{'id':'aaa','my_version_l':7,'live_b':true}]}");
}
/**
* Constantly hammer the same doc with multiple concurrent threads and diff versions,
* confirm that the highest version wins.
*/
public void testConcurrentAdds() throws Exception {
final int NUM_DOCS = atLeast(50);
final int MAX_CONCURENT = atLeast(10);
ExecutorService runner = Executors.newFixedThreadPool(MAX_CONCURENT);
// runner = Executors.newFixedThreadPool(1); // to test single threaded
try {
for (int id = 0; id < NUM_DOCS; id++) {
final int numAdds = _TestUtil.nextInt(random(),3,MAX_CONCURENT);
final int winner = _TestUtil.nextInt(random(),0,numAdds-1);
final int winnerVersion = atLeast(100);
final boolean winnerIsDeleted = (0 == _TestUtil.nextInt(random(),0,4));
List<Callable<Object>> tasks = new ArrayList<Callable<Object>>(numAdds);
for (int variant = 0; variant < numAdds; variant++) {
final boolean iShouldWin = (variant==winner);
final long version = (iShouldWin ? winnerVersion
: _TestUtil.nextInt(random(),1,winnerVersion-1));
if ((iShouldWin && winnerIsDeleted)
|| (!iShouldWin && 0 == _TestUtil.nextInt(random(),0,4))) {
tasks.add(delayedDelete(""+id, ""+version));
} else {
tasks.add(delayedAdd("id",""+id,"name","name"+id+"_"+variant,
"my_version_l", ""+ version));
}
}
runner.invokeAll(tasks);
final String expectedDoc = "{'id':'"+id+"','my_version_l':"+winnerVersion +
( ! winnerIsDeleted ? ",'name':'name"+id+"_"+winner+"'}" : "}");
assertJQ(req("qt","/get", "id",""+id, "fl","id,name,my_version_l")
, "=={'doc':" + expectedDoc + "}");
assertU(commit());
assertJQ(req("q","id:"+id,
"fl","id,name,my_version_l")
,"/response/numFound==1"
,"/response/docs==["+expectedDoc+"]");
}
} finally {
runner.shutdownNow();
}
}
private Callable<Object> delayedAdd(final String... fields) {
return Executors.callable(new Runnable() {
public void run() {
// log.info("ADDING DOC: " + adoc(fields));
assertU(adoc(fields));
}
});
}
private Callable<Object> delayedDelete(final String id, final String externalVersion) {
return Executors.callable(new Runnable() {
public void run() {
try {
// Why does this throw "Exception" ???
// log.info("DELETING DOC: " + id + " v="+externalVersion);
deleteAndGetVersion(id, params("del_version", externalVersion));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
}