SOLR-12638: Partial/Atomic updates of nested docs.

and [child] now works in RTG.
This commit is contained in:
Moshe 2019-04-10 03:02:59 -04:00 committed by David Smiley
parent 4a93199803
commit 8527ec11af
15 changed files with 1376 additions and 105 deletions

View File

@ -106,6 +106,12 @@ New Features
* SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression (Joel Bernstein)
* SOLR-12638: Partial/Atomic Updates for nested documents. This enables atomic updates for nested documents, without
the need to supply the whole nested hierarchy (which would be overwritten if absent). This is done by fetching the
whole document hierarchy, updating the specific doc in the path that is to be updated, removing the old document
hierarchy and indexing the new one with the atomic update merged into it. Also, [child] Doc Transformer now works
with RealTimeGet. (Moshe Bla, David Smiley)
Bug Fixes
----------------------

View File

@ -19,6 +19,7 @@ package org.apache.solr.handler.component;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -30,6 +31,8 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.DocValuesType;
@ -46,10 +49,12 @@ import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentBase;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.StringUtils;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
@ -62,6 +67,7 @@ import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.ResultContext;
import org.apache.solr.response.SolrQueryResponse;
@ -95,6 +101,7 @@ import static org.apache.solr.common.params.CommonParams.VERSION_FIELD;
public class RealTimeGetComponent extends SearchComponent
{
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Set<String> NESTED_META_FIELDS = Sets.newHashSet(IndexSchema.NEST_PATH_FIELD_NAME, IndexSchema.NEST_PARENT_FIELD_NAME);
public static final String COMPONENT_NAME = "get";
@Override
@ -350,7 +357,7 @@ public class RealTimeGetComponent extends SearchComponent
String idStr = params.get("getInputDocument", null);
if (idStr == null) return;
AtomicLong version = new AtomicLong();
SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, false, null, true);
SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, null, Resolution.DOC);
log.info("getInputDocument called for id="+idStr+", returning: "+doc);
rb.rsp.add("inputDocument", doc);
rb.rsp.add("version", version.get());
@ -595,37 +602,30 @@ public class RealTimeGetComponent extends SearchComponent
/**
* Obtains the latest document for a given id from the tlog or index (if not found in the tlog).
*
* NOTE: This method uses the effective value for avoidRetrievingStoredFields param as false and
* for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, AtomicLong, boolean, Set, boolean)},
* so as to retrieve all stored and non-stored DV fields from all documents. Also, it uses the effective value of
* resolveFullDocument param as true, i.e. it resolves any partial documents (in-place updates), in case the
* document is fetched from the tlog, to a full document.
* NOTE: This method uses the effective value for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, AtomicLong, Set, Resolution)},
* so as to retrieve all stored and non-stored DV fields from all documents.
*/
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException {
return getInputDocument (core, idBytes, null, false, null, true);
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, Resolution lookupStrategy) throws IOException {
return getInputDocument (core, idBytes, null, null, lookupStrategy);
}
/**
* Obtains the latest document for a given id from the tlog or through the realtime searcher (if not found in the tlog).
* @param versionReturned If a non-null AtomicLong is passed in, it is set to the version of the update returned from the TLog.
* @param avoidRetrievingStoredFields Setting this to true avoids fetching stored fields through the realtime searcher,
* however has no effect on documents obtained from the tlog.
* Non-stored docValues fields are populated anyway, and are not affected by this parameter. Note that if
* the id field is a stored field, it will not be populated if this parameter is true and the document is
* obtained from the index.
* @param onlyTheseNonStoredDVs If not-null, populate only these DV fields in the document fetched through the realtime searcher.
* If this is null, decorate all non-stored DVs (that are not targets of copy fields) from the searcher.
* @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update
* was an in-place update. In that case, should this partial document be resolved to a full document (by following
* back prevPointer/prevVersion)?
* When non-null, stored fields are not fetched.
* @param resolveStrategy The strategy to resolve the the document.
* @see Resolution
*/
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, AtomicLong versionReturned, boolean avoidRetrievingStoredFields,
Set<String> onlyTheseNonStoredDVs, boolean resolveFullDocument) throws IOException {
public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, AtomicLong versionReturned,
Set<String> onlyTheseNonStoredDVs, Resolution resolveStrategy) throws IOException {
SolrInputDocument sid = null;
RefCounted<SolrIndexSearcher> searcherHolder = null;
try {
SolrIndexSearcher searcher = null;
sid = getInputDocumentFromTlog(core, idBytes, versionReturned, onlyTheseNonStoredDVs, resolveFullDocument);
sid = getInputDocumentFromTlog(core, idBytes, versionReturned, onlyTheseNonStoredDVs, true);
if (sid == DELETED) {
return null;
}
@ -638,22 +638,44 @@ public class RealTimeGetComponent extends SearchComponent
}
// SolrCore.verbose("RealTimeGet using searcher ", searcher);
SchemaField idField = core.getLatestSchema().getUniqueKeyField();
final IndexSchema schema = core.getLatestSchema();
SchemaField idField = schema.getUniqueKeyField();
int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes));
if (docid < 0) return null;
SolrDocumentFetcher docFetcher = searcher.getDocFetcher();
if (avoidRetrievingStoredFields) {
if (onlyTheseNonStoredDVs != null) {
sid = new SolrInputDocument();
} else {
Document luceneDocument = docFetcher.doc(docid);
sid = toSolrInputDocument(luceneDocument, core.getLatestSchema());
sid = toSolrInputDocument(luceneDocument, schema);
}
if (onlyTheseNonStoredDVs != null) {
docFetcher.decorateDocValueFields(sid, docid, onlyTheseNonStoredDVs);
} else {
docFetcher.decorateDocValueFields(sid, docid, docFetcher.getNonStoredDVsWithoutCopyTargets());
final boolean isNestedRequest = resolveStrategy == Resolution.DOC_WITH_CHILDREN || resolveStrategy == Resolution.ROOT_WITH_CHILDREN;
decorateDocValueFields(docFetcher, sid, docid, onlyTheseNonStoredDVs, isNestedRequest || schema.hasExplicitField(IndexSchema.NEST_PATH_FIELD_NAME));
SolrInputField rootField = sid.getField(IndexSchema.ROOT_FIELD_NAME);
if((isNestedRequest) && schema.isUsableForChildDocs() && schema.hasExplicitField(IndexSchema.NEST_PATH_FIELD_NAME) && rootField!=null) {
// doc is part of a nested structure
final boolean resolveRootDoc = resolveStrategy == Resolution.ROOT_WITH_CHILDREN;
String id = resolveRootDoc? (String) rootField.getFirstValue(): (String) sid.getField(idField.getName()).getFirstValue();
ModifiableSolrParams params = new ModifiableSolrParams()
.set("fl", "*, _nest_path_, [child]")
.set("limit", "-1");
SolrQueryRequest nestedReq = new LocalSolrQueryRequest(core, params);
final BytesRef rootIdBytes = new BytesRef(id);
final int rootDocId = searcher.getFirstMatch(new Term(idField.getName(), rootIdBytes));
final DocTransformer childDocTransformer = core.getTransformerFactory("child").create("child", params, nestedReq);
final ResultContext resultContext = new RTGResultContext(new SolrReturnFields(nestedReq), searcher, nestedReq);
childDocTransformer.setContext(resultContext);
final SolrDocument nestedDoc;
if(resolveRootDoc && rootIdBytes.equals(idBytes)) {
nestedDoc = toSolrDoc(sid, schema);
} else {
nestedDoc = toSolrDoc(docFetcher.doc(rootDocId), schema);
decorateDocValueFields(docFetcher, nestedDoc, rootDocId, onlyTheseNonStoredDVs, true);
}
childDocTransformer.transform(nestedDoc, rootDocId);
sid = toSolrInputDocument(nestedDoc, schema);
}
}
} finally {
@ -670,6 +692,17 @@ public class RealTimeGetComponent extends SearchComponent
return sid;
}
private static void decorateDocValueFields(SolrDocumentFetcher docFetcher, SolrDocumentBase doc, int docid, Set<String> onlyTheseNonStoredDVs, boolean resolveNestedFields) throws IOException {
if (onlyTheseNonStoredDVs != null) {
docFetcher.decorateDocValueFields(doc, docid, onlyTheseNonStoredDVs);
} else {
docFetcher.decorateDocValueFields(doc, docid, docFetcher.getNonStoredDVsWithoutCopyTargets());
}
if(resolveNestedFields) {
docFetcher.decorateDocValueFields(doc, docid, NESTED_META_FIELDS);
}
}
private static SolrInputDocument toSolrInputDocument(Document doc, IndexSchema schema) {
SolrInputDocument out = new SolrInputDocument();
for( IndexableField f : doc.getFields() ) {
@ -695,6 +728,7 @@ public class RealTimeGetComponent extends SearchComponent
private static SolrInputDocument toSolrInputDocument(SolrDocument doc, IndexSchema schema) {
SolrInputDocument out = new SolrInputDocument();
for( String fname : doc.getFieldNames() ) {
boolean fieldArrayListCreated = false;
SchemaField sf = schema.getFieldOrNull(fname);
if (sf != null) {
if ((!sf.hasDocValues() && !sf.stored()) || schema.isCopyFieldTarget(sf)) continue;
@ -710,6 +744,14 @@ public class RealTimeGetComponent extends SearchComponent
if (val == null) val = f.binaryValue();
if (val == null) val = f;
}
} else if(val instanceof SolrDocument) {
val = toSolrInputDocument((SolrDocument) val, schema);
if(!fieldArrayListCreated && doc.getFieldValue(fname) instanceof Collection) {
// previous value was array so we must return as an array even if was a single value array
out.setField(fname, Lists.newArrayList(val));
fieldArrayListCreated = true;
continue;
}
}
out.addField(fname, val);
}
@ -813,7 +855,26 @@ public class RealTimeGetComponent extends SearchComponent
}
}
return toSolrDoc(out, schema);
SolrDocument solrDoc = toSolrDoc(out, schema);
// add child docs
for(SolrInputField solrInputField: sdoc) {
if(solrInputField.getFirstValue() instanceof SolrInputDocument) {
// is child doc
Object val = solrInputField.getValue();
Iterator<SolrDocument> childDocs = solrInputField.getValues().stream()
.map(x -> toSolrDoc((SolrInputDocument) x, schema)).iterator();
if(val instanceof Collection) {
// add as collection even if single element collection
solrDoc.setField(solrInputField.getName(), Lists.newArrayList(childDocs));
} else {
// single child doc
solrDoc.setField(solrInputField.getName(), childDocs.next());
}
}
}
return solrDoc;
}
@Override
@ -850,7 +911,7 @@ public class RealTimeGetComponent extends SearchComponent
Map<String, List<String>> sliceToId = new HashMap<>();
for (String id : reqIds.allIds) {
Slice slice = coll.getRouter().getTargetSlice(id, null, null, params, coll);
Slice slice = coll.getRouter().getTargetSlice(params.get(ShardParams._ROUTE_, id), null, null, params, coll);
List<String> idsForShard = sliceToId.get(slice.getName());
if (idsForShard == null) {
@ -1185,6 +1246,31 @@ public class RealTimeGetComponent extends SearchComponent
return new ArrayList<>(versionsToRet);
}
/**
* <p>
* Lookup strategy for {@link #getInputDocument(SolrCore, BytesRef, AtomicLong, Set, Resolution)}.
* </p>
* <ul>
* <li>{@link #DOC}</li>
* <li>{@link #DOC_WITH_CHILDREN}</li>
* <li>{@link #ROOT_WITH_CHILDREN}</li>
* </ul>
*/
public static enum Resolution {
/**
* Resolve this partial document to a full document (by following back prevPointer/prevVersion)?
*/
DOC,
/**
* Check whether the document has child documents. If so, return the document including its children.
*/
DOC_WITH_CHILDREN,
/**
* Check whether the document is part of a nested hierarchy. If so, return the whole hierarchy(look up root doc).
*/
ROOT_WITH_CHILDREN
}
/**
* Simple struct for tracking what ids were requested and what response format is expected
* acording to the request params

View File

@ -1959,6 +1959,21 @@ public class IndexSchema {
rootType.getTypeName().equals(uniqueKeyFieldType.getTypeName()));
}
/**
* Helper method that returns <code>true</code> if the {@link #ROOT_FIELD_NAME} uses the exact
* same 'type' as the {@link #getUniqueKeyField()} and has {@link #NEST_PATH_FIELD_NAME}
* defined as a {@link NestPathField}
* @lucene.internal
*/
public boolean savesChildDocRelations() {
//TODO make this boolean a field so it needn't be looked up each time?
if (!isUsableForChildDocs()) {
return false;
}
FieldType nestPathType = getFieldTypeNoEx(NEST_PATH_FIELD_NAME);
return nestPathType instanceof NestPathField;
}
public PayloadDecoder getPayloadDecoder(String field) {
FieldType ft = getFieldType(field);
if (ft == null)

View File

@ -704,6 +704,11 @@ public class DocTermOrds implements Accountable {
}
}
@Override
public boolean seekExact(BytesRef text) throws IOException {
return seekCeil(text) == SeekStatus.FOUND;
}
@Override
public void seekExact(long targetOrd) throws IOException {
int delta = (int) (targetOrd - ordBase - ord);

View File

@ -28,6 +28,7 @@ import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
@ -63,6 +64,9 @@ public class AddUpdateCommand extends UpdateCommand {
public boolean isLastDocInBatch = false;
/** Is this a nested update, null means not yet calculated. */
public Boolean isNested = null;
// optional id in "internal" indexed form... if it is needed and not supplied,
// it will be obtained from the doc.
private BytesRef indexedId;
@ -100,7 +104,7 @@ public class AddUpdateCommand extends UpdateCommand {
final boolean ignoreNestedDocs = false; // throw an exception if found
SolrInputDocument solrInputDocument = getSolrInputDocument();
if (!isInPlaceUpdate() && getReq().getSchema().isUsableForChildDocs()) {
addRootField(solrInputDocument, getHashableId());
addRootField(solrInputDocument, getRootIdUsingRouteParam());
}
return DocumentBuilder.toDocument(solrInputDocument, req.getSchema(), isInPlaceUpdate(), ignoreNestedDocs);
}
@ -150,6 +154,14 @@ public class AddUpdateCommand extends UpdateCommand {
return "(null)";
}
/**
*
* @return value of _route_ param({@link ShardParams#_ROUTE_}), otherwise doc id.
*/
public String getRootIdUsingRouteParam() {
return req.getParams().get(ShardParams._ROUTE_, getHashableId());
}
/**
* @return String id to hash
*/
@ -197,7 +209,7 @@ public class AddUpdateCommand extends UpdateCommand {
return null; // caller should call getLuceneDocument() instead
}
final String rootId = getHashableId();
final String rootId = getRootIdUsingRouteParam();
final SolrInputField versionSif = solrDoc.get(CommonParams.VERSION_FIELD);
for (SolrInputDocument sdoc : all) {

View File

@ -959,7 +959,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
Iterable<Document> nestedDocs = cmd.getLuceneDocsIfNested();
boolean isNested = nestedDocs != null; // AKA nested child docs
Term idTerm = getIdTerm(cmd.getIndexedId(), isNested);
Term idTerm = getIdTerm(isNested? new BytesRef(cmd.getRootIdUsingRouteParam()): cmd.getIndexedId(), isNested);
Term updateTerm = hasUpdateTerm ? cmd.updateTerm : idTerm;
if (isNested) {
log.debug("updateDocuments({})", cmd);
@ -981,9 +981,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState
}
}
private Term getIdTerm(BytesRef indexedId, boolean isNested) {
private Term getIdTerm(BytesRef termVal, boolean isNested) {
boolean useRootId = isNested || core.getLatestSchema().isUsableForChildDocs();
return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), indexedId);
return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), termVal);
}
/////////////////////////////////////////////////////////////////////

View File

@ -23,12 +23,15 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
@ -38,6 +41,7 @@ import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.util.StrUtils;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
@ -193,6 +197,10 @@ public class AtomicUpdateDocumentMerger {
// not a supported in-place update op
return Collections.emptySet();
}
// fail fast if child doc
if(isChildDoc(((Map<String, Object>) fieldValue).get(op))) {
return Collections.emptySet();
}
}
candidateFields.add(fieldName);
}
@ -240,6 +248,52 @@ public class AtomicUpdateDocumentMerger {
return candidateFields;
}
/**
*
* @param fullDoc the full doc to be compared against
* @param partialDoc the sub document to be tested
* @return whether partialDoc is derived from fullDoc
*/
public static boolean isDerivedFromDoc(SolrInputDocument fullDoc, SolrInputDocument partialDoc) {
for(SolrInputField subSif: partialDoc) {
Collection<Object> fieldValues = fullDoc.getFieldValues(subSif.getName());
if (fieldValues == null) return false;
if (fieldValues.size() < subSif.getValueCount()) return false;
Collection<Object> partialFieldValues = subSif.getValues();
// filter all derived child docs from partial field values since they fail List#containsAll check (uses SolrInputDocument#equals which fails).
// If a child doc exists in partialDoc but not in full doc, it will not be filtered, and therefore List#containsAll will return false
Stream<Object> nonChildDocElements = partialFieldValues.stream().filter(x -> !(isChildDoc(x) &&
(fieldValues.stream().anyMatch(y ->
(isChildDoc(x) &&
isDerivedFromDoc((SolrInputDocument) y, (SolrInputDocument) x)
)
)
)));
if (!nonChildDocElements.allMatch(fieldValues::contains)) return false;
}
return true;
}
/**
*
* @param completeHierarchy SolrInputDocument that represents the nested document hierarchy from its root
* @param fieldPath the path to fetch, seperated by a '/' e.g. /children/grandChildren
* @return the SolrInputField of fieldPath
*/
public static SolrInputField getFieldFromHierarchy(SolrInputDocument completeHierarchy, String fieldPath) {
// substr to remove first '/'
final List<String> docPaths = StrUtils.splitSmart(fieldPath.substring(1), '/');
Pair<String, Integer> subPath;
SolrInputField sifToReplace = null;
SolrInputDocument currDoc = completeHierarchy;
for (String subPathString: docPaths) {
subPath = getPathAndIndexFromNestPath(subPathString);
sifToReplace = currDoc.getField(subPath.getLeft());
currDoc = (SolrInputDocument) ((List)sifToReplace.getValues()).get(subPath.getRight());
}
return sifToReplace;
}
/**
* Given an AddUpdateCommand containing update operations (e.g. set, inc), merge and resolve the operations into
@ -259,9 +313,8 @@ public class AtomicUpdateDocumentMerger {
SolrInputDocument oldDocument = RealTimeGetComponent.getInputDocument
(cmd.getReq().getCore(), idBytes,
null, // don't want the version to be returned
true, // avoid stored fields from index
updatedFields,
true); // resolve the full document
RealTimeGetComponent.Resolution.DOC);
if (oldDocument == RealTimeGetComponent.DELETED || oldDocument == null) {
// This doc was deleted recently. In-place update cannot work, hence a full atomic update should be tried.
@ -314,14 +367,57 @@ public class AtomicUpdateDocumentMerger {
return true;
}
/**
*
* Merges an Atomic Update inside a document hierarchy
* @param sdoc the doc containing update instructions
* @param oldDocWithChildren the doc (children included) before the update
* @param sdocWithChildren the updated doc prior to the update (children included)
* @return root doc (children included) after update
*/
public SolrInputDocument mergeChildDoc(SolrInputDocument sdoc, SolrInputDocument oldDocWithChildren,
SolrInputDocument sdocWithChildren) {
// get path of document to be updated
String updatedDocPath = (String) sdocWithChildren.getFieldValue(IndexSchema.NEST_PATH_FIELD_NAME);
// get the SolrInputField containing the document which the AddUpdateCommand updates
SolrInputField sifToReplace = getFieldFromHierarchy(oldDocWithChildren, updatedDocPath);
// update SolrInputField, either appending or replacing the updated document
updateDocInSif(sifToReplace, sdocWithChildren, sdoc);
return oldDocWithChildren;
}
/**
*
* @param updateSif the SolrInputField to update its values
* @param cmdDocWChildren the doc to insert/set inside updateSif
* @param updateDoc the document that was sent as part of the Add Update Command
* @return updated SolrInputDocument
*/
public SolrInputDocument updateDocInSif(SolrInputField updateSif, SolrInputDocument cmdDocWChildren, SolrInputDocument updateDoc) {
List sifToReplaceValues = (List) updateSif.getValues();
final boolean wasList = updateSif.getRawValue() instanceof Collection;
int index = getDocIndexFromCollection(cmdDocWChildren, sifToReplaceValues);
SolrInputDocument updatedDoc = merge(updateDoc, cmdDocWChildren);
if(index == -1) {
sifToReplaceValues.add(updatedDoc);
} else {
sifToReplaceValues.set(index, updatedDoc);
}
// in the case where value was a List prior to the update and post update there is no more then one value
// it should be kept as a List.
final boolean singleVal = !wasList && sifToReplaceValues.size() <= 1;
updateSif.setValue(singleVal? sifToReplaceValues.get(0): sifToReplaceValues);
return cmdDocWChildren;
}
protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
SchemaField sf = schema.getField(sif.getName());
toDoc.setField(sif.getName(), sf.getType().toNativeType(fieldVal));
String name = sif.getName();
toDoc.setField(name, getNativeFieldValue(name, fieldVal));
}
protected void doAdd(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
SchemaField sf = schema.getField(sif.getName());
toDoc.addField(sif.getName(), sf.getType().toNativeType(fieldVal));
String name = sif.getName();
toDoc.addField(name, getNativeFieldValue(name, fieldVal));
}
protected void doAddDistinct(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
@ -393,21 +489,16 @@ public class AtomicUpdateDocumentMerger {
final String name = sif.getName();
SolrInputField existingField = toDoc.get(name);
if (existingField == null) return;
SchemaField sf = schema.getField(name);
if (sf != null) {
final Collection<Object> original = existingField.getValues();
if (fieldVal instanceof Collection) {
for (Object object : (Collection) fieldVal) {
Object o = sf.getType().toNativeType(object);
original.remove(o);
}
} else {
original.remove(sf.getType().toNativeType(fieldVal));
final Collection original = existingField.getValues();
if (fieldVal instanceof Collection) {
for (Object object : (Collection) fieldVal) {
removeObj(original, object, name);
}
toDoc.setField(name, original);
} else {
removeObj(original, fieldVal, name);
}
toDoc.setField(name, original);
}
protected void doRemoveRegex(SolrInputDocument toDoc, SolrInputField sif, Object valuePatterns) {
@ -442,5 +533,64 @@ public class AtomicUpdateDocumentMerger {
}
return patterns;
}
private Object getNativeFieldValue(String fieldName, Object val) {
if(isChildDoc(val)) {
return val;
}
SchemaField sf = schema.getField(fieldName);
return sf.getType().toNativeType(val);
}
private static boolean isChildDoc(Object obj) {
if(!(obj instanceof Collection)) {
return obj instanceof SolrDocumentBase;
}
Collection objValues = (Collection) obj;
if(objValues.size() == 0) {
return false;
}
return objValues.iterator().next() instanceof SolrDocumentBase;
}
private void removeObj(Collection original, Object toRemove, String fieldName) {
if(isChildDoc(toRemove)) {
removeChildDoc(original, (SolrInputDocument) toRemove);
} else {
original.remove(getNativeFieldValue(fieldName, toRemove));
}
}
private static void removeChildDoc(Collection original, SolrInputDocument docToRemove) {
for(SolrInputDocument doc: (Collection<SolrInputDocument>) original) {
if(isDerivedFromDoc(doc, docToRemove)) {
original.remove(doc);
return;
}
}
}
/**
*
* @param doc document to search for
* @param col collection of solrInputDocument
* @return index of doc in col, returns -1 if not found.
*/
private static int getDocIndexFromCollection(SolrInputDocument doc, List<SolrInputDocument> col) {
for(int i = 0; i < col.size(); ++i) {
if(isDerivedFromDoc(col.get(i), doc)) {
return i;
}
}
return -1;
}
private static Pair<String, Integer> getPathAndIndexFromNestPath(String nestPath) {
List<String> splitPath = StrUtils.splitSmart(nestPath, '#');
if(splitPath.size() == 1) {
return Pair.of(splitPath.get(0), 0);
}
return Pair.of(splitPath.get(0), Integer.parseInt(splitPath.get(1)));
}
}

View File

@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.ShardParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.Hash;
@ -46,6 +47,7 @@ import org.apache.solr.common.util.TimeSource;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
@ -183,7 +185,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// this should always be used - see filterParams
DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist
(this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD,
UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS);
UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS, ShardParams._ROUTE_);
//this.rsp = reqInfo != null ? reqInfo.getRsp() : null;
}
@ -469,13 +471,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// TODO: possibly set checkDeleteByQueries as a flag on the command?
doLocalAdd(cmd);
// if the update updates a doc that is part of a nested structure,
// force open a realTimeSearcher to trigger a ulog cache refresh.
// This refresh makes RTG handler aware of this update.q
if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) {
ulog.openRealtimeSearcher();
}
if (clonedDoc != null) {
cmd.solrDoc = clonedDoc;
}
} finally {
bucket.unlock();
vinfo.unlockForUpdate();
}
return false;
@ -647,22 +654,41 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
// full (non-inplace) atomic update
SolrInputDocument sdoc = cmd.getSolrInputDocument();
BytesRef id = cmd.getIndexedId();
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id);
SolrInputDocument oldRootDocWithChildren = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
if (oldDoc == null) {
// create a new doc by default if an old one wasn't found
if (versionOnUpdate <= 0) {
oldDoc = new SolrInputDocument();
} else {
if (oldRootDocWithChildren == null) {
if (versionOnUpdate > 0) {
// could just let the optimistic locking throw the error
throw new SolrException(ErrorCode.CONFLICT, "Document not found for update. id=" + cmd.getPrintableId());
} else if (req.getParams().get(ShardParams._ROUTE_) != null) {
// the specified document could not be found in this shard
// and was explicitly routed using _route_
throw new SolrException(ErrorCode.BAD_REQUEST,
"Could not find document " + idField.getName() + ":" + id +
", perhaps the wrong \"_route_\" param was supplied");
}
} else {
oldDoc.remove(CommonParams.VERSION_FIELD);
oldRootDocWithChildren.remove(CommonParams.VERSION_FIELD);
}
cmd.solrDoc = docMerger.merge(sdoc, oldDoc);
SolrInputDocument mergedDoc;
if(idField == null || oldRootDocWithChildren == null) {
// create a new doc by default if an old one wasn't found
mergedDoc = docMerger.merge(sdoc, new SolrInputDocument());
} else {
if(req.getSchema().savesChildDocRelations() &&
!sdoc.getField(idField.getName()).getFirstValue().toString()
.equals((String) oldRootDocWithChildren.getFieldValue(IndexSchema.ROOT_FIELD_NAME))) {
// this is an update where the updated doc is not the root document
SolrInputDocument sdocWithChildren = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(),
id, RealTimeGetComponent.Resolution.DOC_WITH_CHILDREN);
mergedDoc = docMerger.mergeChildDoc(sdoc, oldRootDocWithChildren, sdocWithChildren);
} else {
mergedDoc = docMerger.merge(sdoc, oldRootDocWithChildren);
}
}
cmd.solrDoc = mergedDoc;
return true;
}
@ -670,7 +696,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
updateCommand = cmd;
if (!cmd.isDeleteById()) {
@ -798,12 +824,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException {
// Find the version
long versionOnUpdate = cmd.getVersion();
if (versionOnUpdate == 0) {
String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD);
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
long versionOnUpdate = findVersionOnUpdate(cmd);
boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0;
boolean leaderLogic = isLeader && !isReplayOrPeersync;
@ -815,31 +836,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
vinfo.blockUpdates();
try {
if (versionsStored) {
if (leaderLogic) {
long version = vinfo.getNewClock();
cmd.setVersion(-version);
// TODO update versions in all buckets
doLocalDelete(cmd);
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.deleteByQuery(cmd);
return;
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// TLOG replica not leader, don't write the DBQ to IW
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
doLocalDelete(cmd);
}
}
doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync);
// since we don't know which documents were deleted, the easiest thing to do is to invalidate
// all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader
@ -850,6 +847,45 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
}
private long findVersionOnUpdate(UpdateCommand cmd) {
long versionOnUpdate = cmd.getVersion();
if (versionOnUpdate == 0) {
String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD);
versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS);
}
versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version
return versionOnUpdate;
}
private void doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync) throws IOException {
if (versionsStored) {
final boolean leaderLogic = isLeader & !isReplayOrPeersync;
if (leaderLogic) {
long version = vinfo.getNewClock();
cmd.setVersion(-version);
// TODO update versions in all buckets
doLocalDelete(cmd);
} else {
cmd.setVersion(-versionOnUpdate);
if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) {
// we're not in an active state, and this update isn't from a replay, so buffer it.
cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING);
ulog.deleteByQuery(cmd);
return;
}
if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) {
// TLOG replica not leader, don't write the DBQ to IW
cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER);
}
doLocalDelete(cmd);
}
}
}
// internal helper method to setup request by processors who use this class.
// NOTE: not called by this class!
void setupRequest(UpdateCommand cmd) {
@ -990,7 +1026,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
public void processCommit(CommitUpdateCommand cmd) throws IOException {
assert TestInjection.injectFailUpdateRequests();
updateCommand = cmd;
// replica type can only be NRT in standalone mode
@ -1032,6 +1068,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
finished = true;
}
/**
*
* {@link AddUpdateCommand#isNested} is set in {@link org.apache.solr.update.processor.NestedUpdateProcessorFactory},
* which runs on leader and replicas just before run time processor
* @return whether this update changes a value of a nested document
*/
private static boolean shouldRefreshUlogCaches(AddUpdateCommand cmd) {
// should be set since this method should only be called after DistributedUpdateProcessor#doLocalAdd,
// which runs post-processor in the URP chain, having NestedURP set cmd#isNested.
assert !cmd.getReq().getSchema().savesChildDocRelations() || cmd.isNested != null;
// true if update adds children
return Boolean.TRUE.equals(cmd.isNested);
}
/**
* Returns a boolean indicating whether or not the caller should behave as
* if this is the "leader" even when ZooKeeper is not enabled.

View File

@ -236,7 +236,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
if (isLeader && !isSubShardLeader) {
DocCollection coll = zkController.getClusterState().getCollection(collection);
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument());
List<SolrCmdDistributor.Node> subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
// the list<node> will actually have only one element for an add request
if (subShardLeaders != null && !subShardLeaders.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
@ -246,7 +246,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId());
cmdDistrib.distribAdd(cmd, subShardLeaders, params, true);
}
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument());
final List<SolrCmdDistributor.Node> nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument());
if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) {
ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams()));
params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString());
@ -566,7 +566,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor {
zkCheck();
if (cmd instanceof AddUpdateCommand) {
AddUpdateCommand acmd = (AddUpdateCommand)cmd;
nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument());
nodes = setupRequest(acmd.getRootIdUsingRouteParam(), acmd.getSolrInputDocument());
} else if (cmd instanceof DeleteUpdateCommand) {
DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd;
nodes = setupRequest(dcmd.getId(), null);

View File

@ -218,7 +218,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor
private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromStored(BytesRef indexedDocId) throws IOException {
// stored fields only...
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId);
SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId, RealTimeGetComponent.Resolution.DOC);
if (null == oldDoc) {
return DocFoundAndOldUserAndSolrVersions.NOT_FOUND;
} else {

View File

@ -75,31 +75,35 @@ public class NestedUpdateProcessorFactory extends UpdateRequestProcessorFactory
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
SolrInputDocument doc = cmd.getSolrInputDocument();
processDocChildren(doc, null);
cmd.isNested = processDocChildren(doc, null);
super.processAdd(cmd);
}
private void processDocChildren(SolrInputDocument doc, String fullPath) {
private boolean processDocChildren(SolrInputDocument doc, String fullPath) {
boolean isNested = false;
for(SolrInputField field: doc.values()) {
int childNum = 0;
boolean isSingleVal = !(field.getValue() instanceof Collection);
for(Object val: field) {
if(!(val instanceof SolrInputDocument)) {
if (!(val instanceof SolrInputDocument)) {
// either all collection items are child docs or none are.
break;
}
final String fieldName = field.getName();
if(fieldName.contains(PATH_SEP_CHAR)) {
if (fieldName.contains(PATH_SEP_CHAR)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Field name: '" + fieldName
+ "' contains: '" + PATH_SEP_CHAR + "' , which is reserved for the nested URP");
}
final String sChildNum = isSingleVal ? SINGULAR_VALUE_CHAR : String.valueOf(childNum);
SolrInputDocument cDoc = (SolrInputDocument) val;
if(!cDoc.containsKey(uniqueKeyFieldName)) {
if (!cDoc.containsKey(uniqueKeyFieldName)) {
String parentDocId = doc.getField(uniqueKeyFieldName).getFirstValue().toString();
cDoc.setField(uniqueKeyFieldName, generateChildUniqueId(parentDocId, fieldName, sChildNum));
}
if (!isNested) {
isNested = true;
}
final String lastKeyPath = PATH_SEP_CHAR + fieldName + NUM_SEP_CHAR + sChildNum;
// concat of all paths children.grandChild => /children#1/grandChild#
final String childDocPath = fullPath == null ? lastKeyPath : fullPath + lastKeyPath;
@ -107,6 +111,7 @@ public class NestedUpdateProcessorFactory extends UpdateRequestProcessorFactory
++childNum;
}
}
return isNested;
}
private void processChildDoc(SolrInputDocument sdoc, SolrInputDocument parent, String fullPath) {

View File

@ -36,6 +36,8 @@
<dynamicField name="*_s" type="string" indexed="true" stored="true"/>
<dynamicField name="*_ss" type="string" indexed="true" stored="true" multiValued="true"/>
<!-- required to test updates nested inplaceUpdates -->
<field name="inplace_updatable_int" type="int" indexed="false" stored="false" docValues="true"/>
<fieldType name="string" class="solr.StrField" sortMissingLast="true"/>
@ -48,6 +50,10 @@
<fieldType name="float" class="solr.FloatPointField" docValues="true"/>
<fieldType name="date" class="solr.DatePointField" docValues="true"/>
<!-- Fields required for SignatureUpdateProcessor -->
<field name="signatureField" type="string" indexed="true" stored="false"/>
<dynamicField name="*_sS" type="string" indexed="true" stored="true" multiValued="true"/>
<uniqueKey>id</uniqueKey>
</schema>

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.io.IOException;
import java.util.List;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.params.SolrParams;
import org.junit.Test;
public class NestedShardedAtomicUpdateTest extends AbstractFullDistribZkTestBase {
public NestedShardedAtomicUpdateTest() {
stress = 0;
sliceCount = 4;
schemaString = "schema-nest.xml";
}
@Override
protected String getCloudSolrConfig() {
return "solrconfig-tlog.xml";
}
@Override
protected String getCloudSchemaFile() {
return "schema-nest.xml";
}
@Test
@ShardsFixed(num = 4)
public void test() throws Exception {
boolean testFinished = false;
try {
sendWrongRouteParam();
doNestedInplaceUpdateTest();
doRootShardRoutingTest();
testFinished = true;
} finally {
if (!testFinished) {
printLayoutOnTearDown = true;
}
}
}
public void doRootShardRoutingTest() throws Exception {
assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
final String[] ids = {"3", "4", "5", "6"};
assertEquals("size of ids to index should be the same as the number of clients", clients.size(), ids.length);
// for now, we know how ranges will be distributed to shards.
// may have to look it up in clusterstate if that assumption changes.
SolrInputDocument doc = sdoc("id", "1", "level_s", "root");
final SolrParams params = params("wt", "json", "_route_", "1");
int which = (params.get("_route_").hashCode() & 0x7fffffff) % clients.size();
SolrClient aClient = clients.get(which);
indexDoc(aClient, params, doc);
doc = sdoc("id", "1", "children", map("add", sdocs(sdoc("id", "2", "level_s", "child"))));
indexDoc(aClient, params, doc);
for(int idIndex = 0; idIndex < ids.length; ++idIndex) {
doc = sdoc("id", "2", "grandChildren", map("add", sdocs(sdoc("id", ids[idIndex], "level_s", "grand_child"))));
indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc);
doc = sdoc("id", "3", "inplace_updatable_int", map("inc", "1"));
indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc);
// assert RTG request respects _route_ param
QueryResponse routeRsp = getRandomSolrClient().query(params("qt","/get", "id","2", "_route_", "1"));
SolrDocument results = (SolrDocument) routeRsp.getResponse().get("doc");
assertNotNull("RTG should find doc because _route_ was set to the root documents' ID", results);
assertEquals("2", results.getFieldValue("id"));
// assert all docs are indexed under the same root
getRandomSolrClient().commit();
assertEquals(0, getRandomSolrClient().query(params("q", "-_root_:1")).getResults().size());
// assert all docs are indexed inside the same block
QueryResponse rsp = getRandomSolrClient().query(params("qt","/get", "id","1", "fl", "*, [child]"));
SolrDocument val = (SolrDocument) rsp.getResponse().get("doc");
assertEquals("1", val.getFieldValue("id"));
List<SolrDocument> children = (List) val.getFieldValues("children");
assertEquals(1, children.size());
SolrDocument childDoc = children.get(0);
assertEquals("2", childDoc.getFieldValue("id"));
List<SolrDocument> grandChildren = (List) childDoc.getFieldValues("grandChildren");
assertEquals(idIndex + 1, grandChildren.size());
SolrDocument grandChild = grandChildren.get(0);
assertEquals(idIndex + 1, grandChild.getFirstValue("inplace_updatable_int"));
assertEquals("3", grandChild.getFieldValue("id"));
}
}
public void doNestedInplaceUpdateTest() throws Exception {
assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
final String[] ids = {"3", "4", "5", "6"};
assertEquals("size of ids to index should be the same as the number of clients", clients.size(), ids.length);
// for now, we know how ranges will be distributed to shards.
// may have to look it up in clusterstate if that assumption changes.
SolrInputDocument doc = sdoc("id", "1", "level_s", "root");
final SolrParams params = params("wt", "json", "_route_", "1");
int which = (params.get("_route_").hashCode() & 0x7fffffff) % clients.size();
SolrClient aClient = clients.get(which);
indexDocAndRandomlyCommit(aClient, params, doc);
doc = sdoc("id", "1", "children", map("add", sdocs(sdoc("id", "2", "level_s", "child"))));
indexDocAndRandomlyCommit(aClient, params, doc);
doc = sdoc("id", "2", "grandChildren", map("add", sdocs(sdoc("id", ids[0], "level_s", "grand_child"))));
indexDocAndRandomlyCommit(aClient, params, doc);
for (int fieldValue = 1; fieldValue < 5; ++fieldValue) {
doc = sdoc("id", "3", "inplace_updatable_int", map("inc", "1"));
indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc);
// assert RTG request respects _route_ param
QueryResponse routeRsp = getRandomSolrClient().query(params("qt","/get", "id","2", "_route_", "1"));
SolrDocument results = (SolrDocument) routeRsp.getResponse().get("doc");
assertNotNull("RTG should find doc because _route_ was set to the root documents' ID", results);
assertEquals("2", results.getFieldValue("id"));
// assert all docs are indexed under the same root
getRandomSolrClient().commit();
assertEquals(0, getRandomSolrClient().query(params("q", "-_root_:1")).getResults().size());
// assert all docs are indexed inside the same block
QueryResponse rsp = getRandomSolrClient().query(params("qt","/get", "id","1", "fl", "*, [child]"));
SolrDocument val = (SolrDocument) rsp.getResponse().get("doc");
assertEquals("1", val.getFieldValue("id"));
List<SolrDocument> children = (List) val.getFieldValues("children");
assertEquals(1, children.size());
SolrDocument childDoc = children.get(0);
assertEquals("2", childDoc.getFieldValue("id"));
List<SolrDocument> grandChildren = (List) childDoc.getFieldValues("grandChildren");
assertEquals(1, grandChildren.size());
SolrDocument grandChild = grandChildren.get(0);
assertEquals(fieldValue, grandChild.getFirstValue("inplace_updatable_int"));
assertEquals("3", grandChild.getFieldValue("id"));
}
}
public void sendWrongRouteParam() throws Exception {
assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size());
final String rootId = "1";
SolrInputDocument doc = sdoc("id", rootId, "level_s", "root");
final SolrParams wrongRootParams = params("wt", "json", "_route_", "c");
final SolrParams rightParams = params("wt", "json", "_route_", rootId);
int which = (rootId.hashCode() & 0x7fffffff) % clients.size();
SolrClient aClient = clients.get(which);
indexDocAndRandomlyCommit(aClient, params("wt", "json", "_route_", rootId), doc, false);
final SolrInputDocument childDoc = sdoc("id", rootId, "children", map("add", sdocs(sdoc("id", "2", "level_s", "child"))));
indexDocAndRandomlyCommit(aClient, rightParams, childDoc, false);
final SolrInputDocument grandChildDoc = sdoc("id", "2", "grandChildren",
map("add", sdocs(
sdoc("id", "3", "level_s", "grandChild")
)
)
);
SolrException e = expectThrows(SolrException.class,
"wrong \"_route_\" param should throw an exception",
() -> indexDocAndRandomlyCommit(aClient, wrongRootParams, grandChildDoc)
);
assertTrue("message should suggest the wrong \"_route_\" param was supplied",
e.getMessage().contains("perhaps the wrong \"_route_\" param was supplied"));
}
private void indexDocAndRandomlyCommit(SolrClient client, SolrParams params, SolrInputDocument sdoc) throws IOException, SolrServerException {
indexDocAndRandomlyCommit(client, params, sdoc, true);
}
private void indexDocAndRandomlyCommit(SolrClient client, SolrParams params, SolrInputDocument sdoc, boolean compareToControlCollection) throws IOException, SolrServerException {
if (compareToControlCollection) {
indexDoc(client, params, sdoc);
} else {
add(client, params, sdoc);
}
// randomly commit docs
if (random().nextBoolean()) {
client.commit();
}
}
private SolrClient getRandomSolrClient() {
return clients.get(random().nextInt(clients.size()));
}
}

View File

@ -0,0 +1,657 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.lucene.util.BytesRef;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.component.RealTimeGetComponent;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class NestedAtomicUpdateTest extends SolrTestCaseJ4 {
private final static String VERSION = "_version_";
@BeforeClass
public static void beforeTests() throws Exception {
initCore("solrconfig-tlog.xml", "schema-nest.xml"); // use "nest" schema
}
@Before
public void before() {
clearIndex();
assertU(commit());
}
@Test
public void testMergeChildDoc() throws Exception {
SolrInputDocument newChildDoc = sdoc("id", "3", "cat_ss", "child");
SolrInputDocument addedDoc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("add", "bbb"),
"child", Collections.singletonMap("add", sdocs(newChildDoc)));
SolrInputDocument dummyBlock = sdoc("id", "1",
"cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")),
"_root_", "1", "child", new ArrayList<>(sdocs(addedDoc)));
dummyBlock.removeField(VERSION);
SolrInputDocument preMergeDoc = new SolrInputDocument(dummyBlock);
AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req());
docMerger.merge(addedDoc, dummyBlock);
assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), dummyBlock.getFieldValue("id"));
assertDocContainsSubset(preMergeDoc, dummyBlock);
assertDocContainsSubset(addedDoc, dummyBlock);
assertDocContainsSubset(newChildDoc, (SolrInputDocument) ((List) dummyBlock.getFieldValues("child")).get(1));
assertEquals(dummyBlock.getFieldValue("id"), dummyBlock.getFieldValue("id"));
}
@Test
public void testBlockAtomicInplaceUpdates() throws Exception {
SolrInputDocument doc = sdoc("id", "1", "string_s", "root");
assertU(adoc(doc));
assertU(commit());
assertQ(req("q", "id:1", "fl", "*"),
"//*[@numFound='1']",
"//doc[1]/str[@name='id']=1"
);
List<SolrInputDocument> docs = IntStream.range(10, 20).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s",
"child", "inplace_updatable_int", "0")).collect(Collectors.toList());
doc = sdoc("id", "1", "children", Collections.singletonMap("add", docs));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertU(commit());
assertQ(req("q", "_root_:1", "fl", "*", "rows", "11"),
"//*[@numFound='11']"
);
assertQ(req("q", "string_s:child", "fl", "*"),
"//*[@numFound='10']",
"*[count(//str[@name='string_s'][.='child'])=10]"
);
for(int i = 10; i < 20; ++i) {
doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1"));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertU(commit());
}
for(int i = 10; i < 20; ++i) {
doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1"));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertU(commit());
}
// ensure updates work when block has more than 10 children
for(int i = 10; i < 20; ++i) {
docs = IntStream.range(i * 10, (i * 10) + 5).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "grandChild")).collect(Collectors.toList());
doc = sdoc("id", String.valueOf(i), "grandChildren", Collections.singletonMap("add", docs));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertU(commit());
}
for(int i =10; i < 20; ++i) {
doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1"));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertU(commit());
}
assertQ(req("q", "-_root_:*", "fl", "*"),
"//*[@numFound='0']"
);
assertQ(req("q", "string_s:grandChild", "fl", "*", "rows", "50"),
"//*[@numFound='50']"
);
assertQ(req("q", "string_s:child", "fl", "*"),
"//*[@numFound='10']",
"*[count(//str[@name='string_s'][.='child'])=10]");
assertJQ(req("q", "id:1", "fl", "*,[child limit=-1]"),
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/children/[0]/id=='10'",
"/response/docs/[0]/children/[0]/inplace_updatable_int==3",
"/response/docs/[0]/children/[0]/grandChildren/[0]/id=='100'",
"/response/docs/[0]/children/[0]/grandChildren/[4]/id=='104'",
"/response/docs/[0]/children/[9]/id=='19'"
);
}
@Test
public void testBlockAtomicQuantities() throws Exception {
SolrInputDocument doc = sdoc("id", "1", "string_s", "root");
assertU(adoc(doc));
assertU(commit());
assertQ(req("q", "id:1", "fl", "*"),
"//*[@numFound='1']",
"//doc[1]/str[@name='id']=1"
);
List<SolrInputDocument> docs = IntStream.range(10, 20).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "child")).collect(Collectors.toList());
doc = sdoc("id", "1", "children", Collections.singletonMap("add", docs));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
assertQ(req("q", "_root_:1", "fl", "*", "rows", "11"),
"//*[@numFound='11']",
"*[count(//str[@name='_root_'][.='1'])=11]"
);
assertQ(req("q", "string_s:child", "fl", "*"),
"//*[@numFound='10']",
"*[count(//str[@name='string_s'][.='child'])=10]"
);
// ensure updates work when block has more than 10 children
for(int i = 10; i < 20; ++i) {
docs = IntStream.range(i * 10, (i * 10) + 5).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "grandChild")).collect(Collectors.toList());
doc = sdoc("id", String.valueOf(i), "grandChildren", Collections.singletonMap("add", docs));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
}
assertQ(req("q", "string_s:grandChild", "fl", "*", "rows", "50"),
"//*[@numFound='50']",
"*[count(//str[@name='string_s'][.='grandChild'])=50]");
assertQ(req("q", "string_s:child", "fl", "*"),
"//*[@numFound='10']",
"*[count(//str[@name='string_s'][.='child'])=10]");
}
@Test
public void testBlockAtomicStack() throws Exception {
SolrInputDocument doc = sdoc("id", "1", "child1", sdocs(sdoc("id", "2", "child_s", "child")));
assertU(adoc(doc));
assertU(commit());
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/child_s=='child'"
);
doc = sdoc("id", "1", "child1", Collections.singletonMap("add", sdocs(sdoc("id", "3", "child_s", "child"))));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/child_s=='child'",
"/response/docs/[0]/child1/[1]/id=='3'",
"/response/docs/[0]/child1/[0]/child_s=='child'"
);
doc = sdoc("id", "2",
"grandChild", Collections.singletonMap("add", sdocs(sdoc("id", "4", "child_s", "grandChild"), sdoc("id", "5", "child_s", "grandChild"))));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/child_s=='child'",
"/response/docs/[0]/child1/[1]/id=='3'",
"/response/docs/[0]/child1/[1]/child_s=='child'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'"
);
doc = sdoc("id", "1",
"child2", Collections.singletonMap("add", sdocs(sdoc("id", "8", "child_s", "child"))));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/child_s=='child'",
"/response/docs/[0]/child1/[1]/id=='3'",
"/response/docs/[0]/child1/[1]/child_s=='child'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'",
"/response/docs/[0]/child2/[0]/id=='8'",
"/response/docs/[0]/child2/[0]/child_s=='child'"
);
doc = sdoc("id", "1",
"new_s", Collections.singletonMap("add", "new string"));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
// ensure the whole block has been committed correctly to the index.
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/child_s=='child'",
"/response/docs/[0]/child1/[1]/id=='3'",
"/response/docs/[0]/child1/[1]/child_s=='child'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'",
"/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'",
"/response/docs/[0]/child1/[0]/grandChild/[1]/id=='5'",
"/response/docs/[0]/child1/[0]/grandChild/[1]/child_s=='grandChild'",
"/response/docs/[0]/new_s=='new string'",
"/response/docs/[0]/child2/[0]/id=='8'",
"/response/docs/[0]/child2/[0]/child_s=='child'"
);
}
@Test
public void testBlockAtomicAdd() throws Exception {
SolrInputDocument doc = sdoc("id", "1",
"cat_ss", new String[] {"aaa", "ccc"},
"child1", sdoc("id", "2", "cat_ss", "child")
);
assertU(adoc(doc));
BytesRef rootDocId = new BytesRef("1");
SolrCore core = h.getCore();
SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
// assert block doc has child docs
assertTrue(block.containsKey("child1"));
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
// commit the changes
assertU(commit());
SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
BytesRef childDocId = new BytesRef("2");
// ensure the whole block is returned when resolveBlock is true and id of a child doc is provided
assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString());
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
doc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("add", "bbb"),
"child2", Collections.singletonMap("add", sdoc("id", "3", "cat_ss", "child")));
addAndGetVersion(doc, params("wt", "json"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," +
"child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
assertU(commit());
// a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log
// this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]")
, "=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," +
"child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
doc = sdoc("id", "2",
"child3", Collections.singletonMap("add", sdoc("id", "4", "cat_ss", "grandChild")));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}]," +
"child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" +
" }}"
);
assertJQ(req("qt","/get", "id","2", "fl","id, cat_ss, child, child3, [child]")
,"=={'doc':{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}" +
" }}"
);
assertU(commit());
//add greatGrandChild
doc = sdoc("id", "4",
"child4", Collections.singletonMap("add", sdoc("id", "5", "cat_ss", "greatGrandChild")));
addAndGetVersion(doc, params("wt", "json"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, child4, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]," +
" child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}}], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" +
" }}"
);
assertJQ(req("qt","/get", "id","4", "fl","id, cat_ss, child4, [child]")
,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}" +
" }}"
);
assertU(commit());
//add another greatGrandChild
doc = sdoc("id", "4",
"child4", Collections.singletonMap("add", sdoc("id", "6", "cat_ss", "greatGrandChild")));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
assertJQ(req("qt","/get", "id","4", "fl","id, cat_ss, child4, [child]")
,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:[{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}," +
"{\"id\":\"6\", \"cat_ss\":[\"greatGrandChild\"]}]}" +
" }}"
);
//add another child field name
doc = sdoc("id", "1",
"child5", Collections.singletonMap("add", sdocs(sdoc("id", "7", "cat_ss", "child"),
sdoc("id", "8", "cat_ss", "child")
))
);
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
doc = sdoc("id", "1",
"new_s", Collections.singletonMap("add", "new string"));
addAndGetVersion(doc, params("wt", "json"));
assertU(commit());
// ensure the whole block has been committed correctly to the index.
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"ccc\"",
"/response/docs/[0]/cat_ss/[2]==\"bbb\"",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/cat_ss/[0]=='child'",
"/response/docs/[0]/child1/[0]/child3/id=='4'",
"/response/docs/[0]/child1/[0]/child3/cat_ss/[0]=='grandChild'",
"/response/docs/[0]/child1/[0]/child3/child4/[0]/id=='5'",
"/response/docs/[0]/child1/[0]/child3/child4/[0]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child1/[0]/child3/child4/[1]/id=='6'",
"/response/docs/[0]/child1/[0]/child3/child4/[1]/cat_ss/[0]=='greatGrandChild'",
"/response/docs/[0]/child2/id=='3'",
"/response/docs/[0]/child2/cat_ss/[0]=='child'",
"/response/docs/[0]/child5/[0]/id=='7'",
"/response/docs/[0]/child5/[0]/cat_ss/[0]=='child'",
"/response/docs/[0]/child5/[1]/id=='8'",
"/response/docs/[0]/child5/[1]/cat_ss/[0]=='child'",
"/response/docs/[0]/new_s=='new string'"
);
}
@Test
public void testBlockAtomicSet() throws Exception {
SolrInputDocument doc = sdoc("id", "1",
"cat_ss", new String[] {"aaa", "ccc"},
"child1", Collections.singleton(sdoc("id", "2", "cat_ss", "child"))
);
assertU(adoc(doc));
BytesRef rootDocId = new BytesRef("1");
SolrCore core = h.getCore();
SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
// assert block doc has child docs
assertTrue(block.containsKey("child1"));
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
// commit the changes
assertU(commit());
SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
BytesRef childDocId = new BytesRef("2");
// ensure the whole block is returned when resolveBlock is true and id of a child doc is provided
assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString());
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
assertU(commit());
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
doc = sdoc("id", "1",
"cat_ss", Collections.singletonMap("set", Arrays.asList("aaa", "bbb")),
"child1", Collections.singletonMap("set", sdoc("id", "3", "cat_ss", "child")));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"]}" +
" }}"
);
assertU(commit());
// a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log
// this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"]}" +
" }}"
);
doc = sdoc("id", "3",
"child2", Collections.singletonMap("set", sdoc("id", "4", "cat_ss", "child")));
addAndGetVersion(doc, params("wt", "json", "_route_", "1"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"], child2:{\"id\":\"4\",\"cat_ss\":[\"child\"]}}" +
" }}"
);
assertJQ(req("qt","/get", "id","3", "fl","id, cat_ss, child, child2, [child]")
,"=={'doc':{\"id\":\"3\",\"cat_ss\":[\"child\"], child2:{\"id\":\"4\",\"cat_ss\":[\"child\"]}}" +
" }}"
);
assertU(commit());
// ensure the whole block has been committed correctly to the index.
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"bbb\"",
"/response/docs/[0]/child1/id=='3'",
"/response/docs/[0]/child1/cat_ss/[0]=='child'",
"/response/docs/[0]/child1/child2/id=='4'",
"/response/docs/[0]/child1/child2/cat_ss/[0]=='child'"
);
}
@Test
public void testAtomicUpdateDeleteNoRootField() throws Exception {
SolrInputDocument doc = sdoc("id", "1",
"cat_ss", new String[]{"aaa", "bbb"});
assertU(adoc(doc));
assertJQ(req("q", "id:1")
, "/response/numFound==0"
);
// commit the changes
assertU(commit());
assertJQ(req("q", "id:1"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"bbb\""
);
doc = sdoc("id", "1",
"child1", Collections.singletonMap("add", sdoc("id", "2", "cat_ss", "child")));
addAndGetVersion(doc, params("wt", "json"));
// commit the changes
assertU(commit());
// assert that doc with id:1 was removed even though it did not have _root_:1 since it was not indexed with child documents.
assertJQ(req("q", "id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"bbb\"",
"/response/docs/[0]/child1/id==\"2\"",
"/response/docs/[0]/child1/cat_ss/[0]==\"child\""
);
}
@Test
public void testBlockAtomicRemove() throws Exception {
SolrInputDocument doc = sdoc("id", "1",
"cat_ss", new String[] {"aaa", "ccc"},
"child1", sdocs(sdoc("id", "2", "cat_ss", "child"), sdoc("id", "3", "cat_ss", "child"))
);
assertU(adoc(doc));
BytesRef rootDocId = new BytesRef("1");
SolrCore core = h.getCore();
SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
// assert block doc has child docs
assertTrue(block.containsKey("child1"));
assertJQ(req("q","id:1")
,"/response/numFound==0"
);
// commit the changes
assertU(commit());
SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN);
BytesRef childDocId = new BytesRef("2");
// ensure the whole block is returned when resolveBlock is true and id of a child doc is provided
assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString());
assertJQ(req("q","id:1")
,"/response/numFound==1"
);
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
assertU(commit());
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
doc = sdoc("id", "1",
"child1", Collections.singletonMap("remove", sdoc("id", "3", "cat_ss", "child")));
addAndGetVersion(doc, params("wt", "json"));
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={\"doc\":{'id':\"1\"" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
assertU(commit());
// a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log
// this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup
assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]")
,"=={'doc':{'id':'1'" +
", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" +
" }}"
);
// ensure the whole block has been committed correctly to the index.
assertJQ(req("q","id:1", "fl", "*, [child]"),
"/response/numFound==1",
"/response/docs/[0]/id=='1'",
"/response/docs/[0]/cat_ss/[0]==\"aaa\"",
"/response/docs/[0]/cat_ss/[1]==\"ccc\"",
"/response/docs/[0]/child1/[0]/id=='2'",
"/response/docs/[0]/child1/[0]/cat_ss/[0]=='child'"
);
}
private static void assertDocContainsSubset(SolrInputDocument subsetDoc, SolrInputDocument fullDoc) {
for(SolrInputField field: subsetDoc) {
String fieldName = field.getName();
assertTrue("doc should contain field: " + fieldName, fullDoc.containsKey(fieldName));
Object fullValue = fullDoc.getField(fieldName).getValue();
if(fullValue instanceof Collection) {
((Collection) fullValue).containsAll(field.getValues());
} else {
assertEquals("docs should have the same value for field: " + fieldName, field.getValue(), fullValue);
}
}
}
}

View File

@ -63,6 +63,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import com.carrotsearch.randomizedtesting.RandomizedTest;
@ -1374,9 +1375,35 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
for (Object val : sfield) {
if (firstVal) firstVal=false;
else out.append(',');
if(val instanceof SolrInputDocument) {
json((SolrInputDocument) val, out);
}
out.append(JSONUtil.toJSON(val));
}
out.append(']');
} else if(sfield.getValue() instanceof SolrInputDocument) {
json((SolrInputDocument) sfield.getValue(), out);
} else if (sfield.getValue() instanceof Map) {
Map<String, Object> valMap = (Map<String, Object>) sfield.getValue();
Set<String> childDocsKeys = valMap.entrySet().stream().filter(record -> isChildDoc(record.getValue()))
.map(Entry::getKey).collect(Collectors.toSet());
if(childDocsKeys.size() > 0) {
Map<String, Object> newMap = new HashMap<>();
for(Entry<String, Object> entry: valMap.entrySet()) {
String keyName = entry.getKey();
Object val = entry.getValue();
if(childDocsKeys.contains(keyName)) {
if(val instanceof Collection) {
val = ((Collection) val).stream().map(e -> toSolrDoc((SolrInputDocument) e)).collect(Collectors.toList());
} else {
val = toSolrDoc((SolrInputDocument) val);
}
}
newMap.put(keyName, val);
}
valMap = newMap;
}
out.append(JSONUtil.toJSON(valMap));
} else {
out.append(JSONUtil.toJSON(sfield.getValue()));
}
@ -2920,6 +2947,25 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase {
private_RANDOMIZED_NUMERIC_FIELDTYPES.clear();
}
private static SolrDocument toSolrDoc(SolrInputDocument sid) {
SolrDocument doc = new SolrDocument();
for(SolrInputField field: sid) {
doc.setField(field.getName(), field.getValue());
}
return doc;
}
private static boolean isChildDoc(Object o) {
if(o instanceof Collection) {
Collection col = (Collection) o;
if(col.size() == 0) {
return false;
}
return col.iterator().next() instanceof SolrInputDocument;
}
return o instanceof SolrInputDocument;
}
private static final Map<Class,String> private_RANDOMIZED_NUMERIC_FIELDTYPES = new HashMap<>();
/**