diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 3bf3b6bd7b9..80815e62dbd 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -106,6 +106,12 @@ New Features * SOLR-13374: Add fetchSize parameter to the jdbc Streaming Expression (Joel Bernstein) +* SOLR-12638: Partial/Atomic Updates for nested documents. This enables atomic updates for nested documents, without + the need to supply the whole nested hierarchy (which would be overwritten if absent). This is done by fetching the + whole document hierarchy, updating the specific doc in the path that is to be updated, removing the old document + hierarchy and indexing the new one with the atomic update merged into it. Also, [child] Doc Transformer now works + with RealTimeGet. (Moshe Bla, David Smiley) + Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index f49a555fe75..9908913d82e 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -19,6 +19,7 @@ package org.apache.solr.handler.component; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -30,6 +31,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.index.DocValuesType; @@ -46,10 +49,12 @@ import org.apache.solr.client.solrj.SolrResponse; import org.apache.solr.cloud.CloudDescriptor; import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentBase; import org.apache.solr.common.SolrDocumentList; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; import org.apache.solr.common.StringUtils; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.DocCollection; @@ -62,6 +67,7 @@ import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.SolrCore; +import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.ResultContext; import org.apache.solr.response.SolrQueryResponse; @@ -95,6 +101,7 @@ import static org.apache.solr.common.params.CommonParams.VERSION_FIELD; public class RealTimeGetComponent extends SearchComponent { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final Set NESTED_META_FIELDS = Sets.newHashSet(IndexSchema.NEST_PATH_FIELD_NAME, IndexSchema.NEST_PARENT_FIELD_NAME); public static final String COMPONENT_NAME = "get"; @Override @@ -350,7 +357,7 @@ public class RealTimeGetComponent extends SearchComponent String idStr = params.get("getInputDocument", null); if (idStr == null) return; AtomicLong version = new AtomicLong(); - SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, false, null, true); + SolrInputDocument doc = getInputDocument(req.getCore(), new BytesRef(idStr), version, null, Resolution.DOC); log.info("getInputDocument called for id="+idStr+", returning: "+doc); rb.rsp.add("inputDocument", doc); rb.rsp.add("version", version.get()); @@ -595,37 +602,30 @@ public class RealTimeGetComponent extends SearchComponent /** * Obtains the latest document for a given id from the tlog or index (if not found in the tlog). * - * NOTE: This method uses the effective value for avoidRetrievingStoredFields param as false and - * for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, AtomicLong, boolean, Set, boolean)}, - * so as to retrieve all stored and non-stored DV fields from all documents. Also, it uses the effective value of - * resolveFullDocument param as true, i.e. it resolves any partial documents (in-place updates), in case the - * document is fetched from the tlog, to a full document. + * NOTE: This method uses the effective value for nonStoredDVs as null in the call to @see {@link RealTimeGetComponent#getInputDocument(SolrCore, BytesRef, AtomicLong, Set, Resolution)}, + * so as to retrieve all stored and non-stored DV fields from all documents. */ - public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes) throws IOException { - return getInputDocument (core, idBytes, null, false, null, true); + + public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, Resolution lookupStrategy) throws IOException { + return getInputDocument (core, idBytes, null, null, lookupStrategy); } /** * Obtains the latest document for a given id from the tlog or through the realtime searcher (if not found in the tlog). * @param versionReturned If a non-null AtomicLong is passed in, it is set to the version of the update returned from the TLog. - * @param avoidRetrievingStoredFields Setting this to true avoids fetching stored fields through the realtime searcher, - * however has no effect on documents obtained from the tlog. - * Non-stored docValues fields are populated anyway, and are not affected by this parameter. Note that if - * the id field is a stored field, it will not be populated if this parameter is true and the document is - * obtained from the index. * @param onlyTheseNonStoredDVs If not-null, populate only these DV fields in the document fetched through the realtime searcher. * If this is null, decorate all non-stored DVs (that are not targets of copy fields) from the searcher. - * @param resolveFullDocument In case the document is fetched from the tlog, it could only be a partial document if the last update - * was an in-place update. In that case, should this partial document be resolved to a full document (by following - * back prevPointer/prevVersion)? + * When non-null, stored fields are not fetched. + * @param resolveStrategy The strategy to resolve the the document. + * @see Resolution */ - public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, AtomicLong versionReturned, boolean avoidRetrievingStoredFields, - Set onlyTheseNonStoredDVs, boolean resolveFullDocument) throws IOException { + public static SolrInputDocument getInputDocument(SolrCore core, BytesRef idBytes, AtomicLong versionReturned, + Set onlyTheseNonStoredDVs, Resolution resolveStrategy) throws IOException { SolrInputDocument sid = null; RefCounted searcherHolder = null; try { SolrIndexSearcher searcher = null; - sid = getInputDocumentFromTlog(core, idBytes, versionReturned, onlyTheseNonStoredDVs, resolveFullDocument); + sid = getInputDocumentFromTlog(core, idBytes, versionReturned, onlyTheseNonStoredDVs, true); if (sid == DELETED) { return null; } @@ -638,22 +638,44 @@ public class RealTimeGetComponent extends SearchComponent } // SolrCore.verbose("RealTimeGet using searcher ", searcher); - SchemaField idField = core.getLatestSchema().getUniqueKeyField(); + final IndexSchema schema = core.getLatestSchema(); + SchemaField idField = schema.getUniqueKeyField(); int docid = searcher.getFirstMatch(new Term(idField.getName(), idBytes)); if (docid < 0) return null; SolrDocumentFetcher docFetcher = searcher.getDocFetcher(); - if (avoidRetrievingStoredFields) { + if (onlyTheseNonStoredDVs != null) { sid = new SolrInputDocument(); } else { Document luceneDocument = docFetcher.doc(docid); - sid = toSolrInputDocument(luceneDocument, core.getLatestSchema()); + sid = toSolrInputDocument(luceneDocument, schema); } - if (onlyTheseNonStoredDVs != null) { - docFetcher.decorateDocValueFields(sid, docid, onlyTheseNonStoredDVs); - } else { - docFetcher.decorateDocValueFields(sid, docid, docFetcher.getNonStoredDVsWithoutCopyTargets()); + final boolean isNestedRequest = resolveStrategy == Resolution.DOC_WITH_CHILDREN || resolveStrategy == Resolution.ROOT_WITH_CHILDREN; + decorateDocValueFields(docFetcher, sid, docid, onlyTheseNonStoredDVs, isNestedRequest || schema.hasExplicitField(IndexSchema.NEST_PATH_FIELD_NAME)); + SolrInputField rootField = sid.getField(IndexSchema.ROOT_FIELD_NAME); + if((isNestedRequest) && schema.isUsableForChildDocs() && schema.hasExplicitField(IndexSchema.NEST_PATH_FIELD_NAME) && rootField!=null) { + // doc is part of a nested structure + final boolean resolveRootDoc = resolveStrategy == Resolution.ROOT_WITH_CHILDREN; + String id = resolveRootDoc? (String) rootField.getFirstValue(): (String) sid.getField(idField.getName()).getFirstValue(); + ModifiableSolrParams params = new ModifiableSolrParams() + .set("fl", "*, _nest_path_, [child]") + .set("limit", "-1"); + SolrQueryRequest nestedReq = new LocalSolrQueryRequest(core, params); + final BytesRef rootIdBytes = new BytesRef(id); + final int rootDocId = searcher.getFirstMatch(new Term(idField.getName(), rootIdBytes)); + final DocTransformer childDocTransformer = core.getTransformerFactory("child").create("child", params, nestedReq); + final ResultContext resultContext = new RTGResultContext(new SolrReturnFields(nestedReq), searcher, nestedReq); + childDocTransformer.setContext(resultContext); + final SolrDocument nestedDoc; + if(resolveRootDoc && rootIdBytes.equals(idBytes)) { + nestedDoc = toSolrDoc(sid, schema); + } else { + nestedDoc = toSolrDoc(docFetcher.doc(rootDocId), schema); + decorateDocValueFields(docFetcher, nestedDoc, rootDocId, onlyTheseNonStoredDVs, true); + } + childDocTransformer.transform(nestedDoc, rootDocId); + sid = toSolrInputDocument(nestedDoc, schema); } } } finally { @@ -670,6 +692,17 @@ public class RealTimeGetComponent extends SearchComponent return sid; } + private static void decorateDocValueFields(SolrDocumentFetcher docFetcher, SolrDocumentBase doc, int docid, Set onlyTheseNonStoredDVs, boolean resolveNestedFields) throws IOException { + if (onlyTheseNonStoredDVs != null) { + docFetcher.decorateDocValueFields(doc, docid, onlyTheseNonStoredDVs); + } else { + docFetcher.decorateDocValueFields(doc, docid, docFetcher.getNonStoredDVsWithoutCopyTargets()); + } + if(resolveNestedFields) { + docFetcher.decorateDocValueFields(doc, docid, NESTED_META_FIELDS); + } + } + private static SolrInputDocument toSolrInputDocument(Document doc, IndexSchema schema) { SolrInputDocument out = new SolrInputDocument(); for( IndexableField f : doc.getFields() ) { @@ -695,6 +728,7 @@ public class RealTimeGetComponent extends SearchComponent private static SolrInputDocument toSolrInputDocument(SolrDocument doc, IndexSchema schema) { SolrInputDocument out = new SolrInputDocument(); for( String fname : doc.getFieldNames() ) { + boolean fieldArrayListCreated = false; SchemaField sf = schema.getFieldOrNull(fname); if (sf != null) { if ((!sf.hasDocValues() && !sf.stored()) || schema.isCopyFieldTarget(sf)) continue; @@ -710,6 +744,14 @@ public class RealTimeGetComponent extends SearchComponent if (val == null) val = f.binaryValue(); if (val == null) val = f; } + } else if(val instanceof SolrDocument) { + val = toSolrInputDocument((SolrDocument) val, schema); + if(!fieldArrayListCreated && doc.getFieldValue(fname) instanceof Collection) { + // previous value was array so we must return as an array even if was a single value array + out.setField(fname, Lists.newArrayList(val)); + fieldArrayListCreated = true; + continue; + } } out.addField(fname, val); } @@ -813,7 +855,26 @@ public class RealTimeGetComponent extends SearchComponent } } - return toSolrDoc(out, schema); + SolrDocument solrDoc = toSolrDoc(out, schema); + + // add child docs + for(SolrInputField solrInputField: sdoc) { + if(solrInputField.getFirstValue() instanceof SolrInputDocument) { + // is child doc + Object val = solrInputField.getValue(); + Iterator childDocs = solrInputField.getValues().stream() + .map(x -> toSolrDoc((SolrInputDocument) x, schema)).iterator(); + if(val instanceof Collection) { + // add as collection even if single element collection + solrDoc.setField(solrInputField.getName(), Lists.newArrayList(childDocs)); + } else { + // single child doc + solrDoc.setField(solrInputField.getName(), childDocs.next()); + } + } + } + + return solrDoc; } @Override @@ -850,7 +911,7 @@ public class RealTimeGetComponent extends SearchComponent Map> sliceToId = new HashMap<>(); for (String id : reqIds.allIds) { - Slice slice = coll.getRouter().getTargetSlice(id, null, null, params, coll); + Slice slice = coll.getRouter().getTargetSlice(params.get(ShardParams._ROUTE_, id), null, null, params, coll); List idsForShard = sliceToId.get(slice.getName()); if (idsForShard == null) { @@ -1185,6 +1246,31 @@ public class RealTimeGetComponent extends SearchComponent return new ArrayList<>(versionsToRet); } + /** + *

+ * Lookup strategy for {@link #getInputDocument(SolrCore, BytesRef, AtomicLong, Set, Resolution)}. + *

+ *
    + *
  • {@link #DOC}
  • + *
  • {@link #DOC_WITH_CHILDREN}
  • + *
  • {@link #ROOT_WITH_CHILDREN}
  • + *
+ */ + public static enum Resolution { + /** + * Resolve this partial document to a full document (by following back prevPointer/prevVersion)? + */ + DOC, + /** + * Check whether the document has child documents. If so, return the document including its children. + */ + DOC_WITH_CHILDREN, + /** + * Check whether the document is part of a nested hierarchy. If so, return the whole hierarchy(look up root doc). + */ + ROOT_WITH_CHILDREN + } + /** * Simple struct for tracking what ids were requested and what response format is expected * acording to the request params diff --git a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java index 8042e80f71c..da2b71acfb2 100644 --- a/solr/core/src/java/org/apache/solr/schema/IndexSchema.java +++ b/solr/core/src/java/org/apache/solr/schema/IndexSchema.java @@ -1959,6 +1959,21 @@ public class IndexSchema { rootType.getTypeName().equals(uniqueKeyFieldType.getTypeName())); } + /** + * Helper method that returns true if the {@link #ROOT_FIELD_NAME} uses the exact + * same 'type' as the {@link #getUniqueKeyField()} and has {@link #NEST_PATH_FIELD_NAME} + * defined as a {@link NestPathField} + * @lucene.internal + */ + public boolean savesChildDocRelations() { + //TODO make this boolean a field so it needn't be looked up each time? + if (!isUsableForChildDocs()) { + return false; + } + FieldType nestPathType = getFieldTypeNoEx(NEST_PATH_FIELD_NAME); + return nestPathType instanceof NestPathField; + } + public PayloadDecoder getPayloadDecoder(String field) { FieldType ft = getFieldType(field); if (ft == null) diff --git a/solr/core/src/java/org/apache/solr/uninverting/DocTermOrds.java b/solr/core/src/java/org/apache/solr/uninverting/DocTermOrds.java index 8b5cd5cc577..a137567b1b1 100644 --- a/solr/core/src/java/org/apache/solr/uninverting/DocTermOrds.java +++ b/solr/core/src/java/org/apache/solr/uninverting/DocTermOrds.java @@ -704,6 +704,11 @@ public class DocTermOrds implements Accountable { } } + @Override + public boolean seekExact(BytesRef text) throws IOException { + return seekCeil(text) == SeekStatus.FOUND; + } + @Override public void seekExact(long targetOrd) throws IOException { int delta = (int) (targetOrd - ordBase - ord); diff --git a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java index b644f73f548..a56427ded5c 100644 --- a/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java +++ b/solr/core/src/java/org/apache/solr/update/AddUpdateCommand.java @@ -28,6 +28,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; @@ -63,6 +64,9 @@ public class AddUpdateCommand extends UpdateCommand { public boolean isLastDocInBatch = false; + /** Is this a nested update, null means not yet calculated. */ + public Boolean isNested = null; + // optional id in "internal" indexed form... if it is needed and not supplied, // it will be obtained from the doc. private BytesRef indexedId; @@ -100,7 +104,7 @@ public class AddUpdateCommand extends UpdateCommand { final boolean ignoreNestedDocs = false; // throw an exception if found SolrInputDocument solrInputDocument = getSolrInputDocument(); if (!isInPlaceUpdate() && getReq().getSchema().isUsableForChildDocs()) { - addRootField(solrInputDocument, getHashableId()); + addRootField(solrInputDocument, getRootIdUsingRouteParam()); } return DocumentBuilder.toDocument(solrInputDocument, req.getSchema(), isInPlaceUpdate(), ignoreNestedDocs); } @@ -150,6 +154,14 @@ public class AddUpdateCommand extends UpdateCommand { return "(null)"; } + /** + * + * @return value of _route_ param({@link ShardParams#_ROUTE_}), otherwise doc id. + */ + public String getRootIdUsingRouteParam() { + return req.getParams().get(ShardParams._ROUTE_, getHashableId()); + } + /** * @return String id to hash */ @@ -197,7 +209,7 @@ public class AddUpdateCommand extends UpdateCommand { return null; // caller should call getLuceneDocument() instead } - final String rootId = getHashableId(); + final String rootId = getRootIdUsingRouteParam(); final SolrInputField versionSif = solrDoc.get(CommonParams.VERSION_FIELD); for (SolrInputDocument sdoc : all) { diff --git a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java index 4dc5b3bd5a0..6c5c227149a 100644 --- a/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java +++ b/solr/core/src/java/org/apache/solr/update/DirectUpdateHandler2.java @@ -959,7 +959,7 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState Iterable nestedDocs = cmd.getLuceneDocsIfNested(); boolean isNested = nestedDocs != null; // AKA nested child docs - Term idTerm = getIdTerm(cmd.getIndexedId(), isNested); + Term idTerm = getIdTerm(isNested? new BytesRef(cmd.getRootIdUsingRouteParam()): cmd.getIndexedId(), isNested); Term updateTerm = hasUpdateTerm ? cmd.updateTerm : idTerm; if (isNested) { log.debug("updateDocuments({})", cmd); @@ -981,9 +981,9 @@ public class DirectUpdateHandler2 extends UpdateHandler implements SolrCoreState } } - private Term getIdTerm(BytesRef indexedId, boolean isNested) { + private Term getIdTerm(BytesRef termVal, boolean isNested) { boolean useRootId = isNested || core.getLatestSchema().isUsableForChildDocs(); - return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), indexedId); + return new Term(useRootId ? IndexSchema.ROOT_FIELD_NAME : idField.getName(), termVal); } ///////////////////////////////////////////////////////////////////// diff --git a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java index 3b142b37d82..08b249c7ac5 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java +++ b/solr/core/src/java/org/apache/solr/update/processor/AtomicUpdateDocumentMerger.java @@ -23,12 +23,15 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.Pair; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; @@ -38,6 +41,7 @@ import org.apache.solr.common.SolrException.ErrorCode; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; import org.apache.solr.common.params.CommonParams; +import org.apache.solr.common.util.StrUtils; import org.apache.solr.core.SolrCore; import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.request.SolrQueryRequest; @@ -193,6 +197,10 @@ public class AtomicUpdateDocumentMerger { // not a supported in-place update op return Collections.emptySet(); } + // fail fast if child doc + if(isChildDoc(((Map) fieldValue).get(op))) { + return Collections.emptySet(); + } } candidateFields.add(fieldName); } @@ -240,6 +248,52 @@ public class AtomicUpdateDocumentMerger { return candidateFields; } + + /** + * + * @param fullDoc the full doc to be compared against + * @param partialDoc the sub document to be tested + * @return whether partialDoc is derived from fullDoc + */ + public static boolean isDerivedFromDoc(SolrInputDocument fullDoc, SolrInputDocument partialDoc) { + for(SolrInputField subSif: partialDoc) { + Collection fieldValues = fullDoc.getFieldValues(subSif.getName()); + if (fieldValues == null) return false; + if (fieldValues.size() < subSif.getValueCount()) return false; + Collection partialFieldValues = subSif.getValues(); + // filter all derived child docs from partial field values since they fail List#containsAll check (uses SolrInputDocument#equals which fails). + // If a child doc exists in partialDoc but not in full doc, it will not be filtered, and therefore List#containsAll will return false + Stream nonChildDocElements = partialFieldValues.stream().filter(x -> !(isChildDoc(x) && + (fieldValues.stream().anyMatch(y -> + (isChildDoc(x) && + isDerivedFromDoc((SolrInputDocument) y, (SolrInputDocument) x) + ) + ) + ))); + if (!nonChildDocElements.allMatch(fieldValues::contains)) return false; + } + return true; + } + + /** + * + * @param completeHierarchy SolrInputDocument that represents the nested document hierarchy from its root + * @param fieldPath the path to fetch, seperated by a '/' e.g. /children/grandChildren + * @return the SolrInputField of fieldPath + */ + public static SolrInputField getFieldFromHierarchy(SolrInputDocument completeHierarchy, String fieldPath) { + // substr to remove first '/' + final List docPaths = StrUtils.splitSmart(fieldPath.substring(1), '/'); + Pair subPath; + SolrInputField sifToReplace = null; + SolrInputDocument currDoc = completeHierarchy; + for (String subPathString: docPaths) { + subPath = getPathAndIndexFromNestPath(subPathString); + sifToReplace = currDoc.getField(subPath.getLeft()); + currDoc = (SolrInputDocument) ((List)sifToReplace.getValues()).get(subPath.getRight()); + } + return sifToReplace; + } /** * Given an AddUpdateCommand containing update operations (e.g. set, inc), merge and resolve the operations into @@ -259,9 +313,8 @@ public class AtomicUpdateDocumentMerger { SolrInputDocument oldDocument = RealTimeGetComponent.getInputDocument (cmd.getReq().getCore(), idBytes, null, // don't want the version to be returned - true, // avoid stored fields from index updatedFields, - true); // resolve the full document + RealTimeGetComponent.Resolution.DOC); if (oldDocument == RealTimeGetComponent.DELETED || oldDocument == null) { // This doc was deleted recently. In-place update cannot work, hence a full atomic update should be tried. @@ -314,14 +367,57 @@ public class AtomicUpdateDocumentMerger { return true; } + /** + * + * Merges an Atomic Update inside a document hierarchy + * @param sdoc the doc containing update instructions + * @param oldDocWithChildren the doc (children included) before the update + * @param sdocWithChildren the updated doc prior to the update (children included) + * @return root doc (children included) after update + */ + public SolrInputDocument mergeChildDoc(SolrInputDocument sdoc, SolrInputDocument oldDocWithChildren, + SolrInputDocument sdocWithChildren) { + // get path of document to be updated + String updatedDocPath = (String) sdocWithChildren.getFieldValue(IndexSchema.NEST_PATH_FIELD_NAME); + // get the SolrInputField containing the document which the AddUpdateCommand updates + SolrInputField sifToReplace = getFieldFromHierarchy(oldDocWithChildren, updatedDocPath); + // update SolrInputField, either appending or replacing the updated document + updateDocInSif(sifToReplace, sdocWithChildren, sdoc); + return oldDocWithChildren; + } + + /** + * + * @param updateSif the SolrInputField to update its values + * @param cmdDocWChildren the doc to insert/set inside updateSif + * @param updateDoc the document that was sent as part of the Add Update Command + * @return updated SolrInputDocument + */ + public SolrInputDocument updateDocInSif(SolrInputField updateSif, SolrInputDocument cmdDocWChildren, SolrInputDocument updateDoc) { + List sifToReplaceValues = (List) updateSif.getValues(); + final boolean wasList = updateSif.getRawValue() instanceof Collection; + int index = getDocIndexFromCollection(cmdDocWChildren, sifToReplaceValues); + SolrInputDocument updatedDoc = merge(updateDoc, cmdDocWChildren); + if(index == -1) { + sifToReplaceValues.add(updatedDoc); + } else { + sifToReplaceValues.set(index, updatedDoc); + } + // in the case where value was a List prior to the update and post update there is no more then one value + // it should be kept as a List. + final boolean singleVal = !wasList && sifToReplaceValues.size() <= 1; + updateSif.setValue(singleVal? sifToReplaceValues.get(0): sifToReplaceValues); + return cmdDocWChildren; + } + protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) { - SchemaField sf = schema.getField(sif.getName()); - toDoc.setField(sif.getName(), sf.getType().toNativeType(fieldVal)); + String name = sif.getName(); + toDoc.setField(name, getNativeFieldValue(name, fieldVal)); } protected void doAdd(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) { - SchemaField sf = schema.getField(sif.getName()); - toDoc.addField(sif.getName(), sf.getType().toNativeType(fieldVal)); + String name = sif.getName(); + toDoc.addField(name, getNativeFieldValue(name, fieldVal)); } protected void doAddDistinct(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) { @@ -393,21 +489,16 @@ public class AtomicUpdateDocumentMerger { final String name = sif.getName(); SolrInputField existingField = toDoc.get(name); if (existingField == null) return; - SchemaField sf = schema.getField(name); - - if (sf != null) { - final Collection original = existingField.getValues(); - if (fieldVal instanceof Collection) { - for (Object object : (Collection) fieldVal) { - Object o = sf.getType().toNativeType(object); - original.remove(o); - } - } else { - original.remove(sf.getType().toNativeType(fieldVal)); + final Collection original = existingField.getValues(); + if (fieldVal instanceof Collection) { + for (Object object : (Collection) fieldVal) { + removeObj(original, object, name); } - - toDoc.setField(name, original); + } else { + removeObj(original, fieldVal, name); } + + toDoc.setField(name, original); } protected void doRemoveRegex(SolrInputDocument toDoc, SolrInputField sif, Object valuePatterns) { @@ -442,5 +533,64 @@ public class AtomicUpdateDocumentMerger { } return patterns; } + + private Object getNativeFieldValue(String fieldName, Object val) { + if(isChildDoc(val)) { + return val; + } + SchemaField sf = schema.getField(fieldName); + return sf.getType().toNativeType(val); + } + + private static boolean isChildDoc(Object obj) { + if(!(obj instanceof Collection)) { + return obj instanceof SolrDocumentBase; + } + Collection objValues = (Collection) obj; + if(objValues.size() == 0) { + return false; + } + return objValues.iterator().next() instanceof SolrDocumentBase; + } + + private void removeObj(Collection original, Object toRemove, String fieldName) { + if(isChildDoc(toRemove)) { + removeChildDoc(original, (SolrInputDocument) toRemove); + } else { + original.remove(getNativeFieldValue(fieldName, toRemove)); + } + } + + private static void removeChildDoc(Collection original, SolrInputDocument docToRemove) { + for(SolrInputDocument doc: (Collection) original) { + if(isDerivedFromDoc(doc, docToRemove)) { + original.remove(doc); + return; + } + } + } + + /** + * + * @param doc document to search for + * @param col collection of solrInputDocument + * @return index of doc in col, returns -1 if not found. + */ + private static int getDocIndexFromCollection(SolrInputDocument doc, List col) { + for(int i = 0; i < col.size(); ++i) { + if(isDerivedFromDoc(col.get(i), doc)) { + return i; + } + } + return -1; + } + + private static Pair getPathAndIndexFromNestPath(String nestPath) { + List splitPath = StrUtils.splitSmart(nestPath, '#'); + if(splitPath.size() == 1) { + return Pair.of(splitPath.get(0), 0); + } + return Pair.of(splitPath.get(0), Integer.parseInt(splitPath.get(1))); + } } diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 50660cba684..bf1255a3514 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -38,6 +38,7 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.Hash; @@ -46,6 +47,7 @@ import org.apache.solr.common.util.TimeSource; import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.schema.IndexSchema; import org.apache.solr.schema.SchemaField; import org.apache.solr.update.AddUpdateCommand; import org.apache.solr.update.CommitUpdateCommand; @@ -183,7 +185,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // this should always be used - see filterParams DistributedUpdateProcessorFactory.addParamToDistributedRequestWhitelist (this.req, UpdateParams.UPDATE_CHAIN, TEST_DISTRIB_SKIP_SERVERS, CommonParams.VERSION_FIELD, - UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS); + UpdateParams.EXPUNGE_DELETES, UpdateParams.OPTIMIZE, UpdateParams.MAX_OPTIMIZE_SEGMENTS, ShardParams._ROUTE_); //this.rsp = reqInfo != null ? reqInfo.getRsp() : null; } @@ -469,13 +471,18 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // TODO: possibly set checkDeleteByQueries as a flag on the command? doLocalAdd(cmd); + // if the update updates a doc that is part of a nested structure, + // force open a realTimeSearcher to trigger a ulog cache refresh. + // This refresh makes RTG handler aware of this update.q + if(req.getSchema().isUsableForChildDocs() && shouldRefreshUlogCaches(cmd)) { + ulog.openRealtimeSearcher(); + } + if (clonedDoc != null) { cmd.solrDoc = clonedDoc; } } finally { - bucket.unlock(); - vinfo.unlockForUpdate(); } return false; @@ -647,22 +654,41 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // full (non-inplace) atomic update SolrInputDocument sdoc = cmd.getSolrInputDocument(); BytesRef id = cmd.getIndexedId(); - SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id); + SolrInputDocument oldRootDocWithChildren = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), id, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); - if (oldDoc == null) { - // create a new doc by default if an old one wasn't found - if (versionOnUpdate <= 0) { - oldDoc = new SolrInputDocument(); - } else { + if (oldRootDocWithChildren == null) { + if (versionOnUpdate > 0) { // could just let the optimistic locking throw the error throw new SolrException(ErrorCode.CONFLICT, "Document not found for update. id=" + cmd.getPrintableId()); + } else if (req.getParams().get(ShardParams._ROUTE_) != null) { + // the specified document could not be found in this shard + // and was explicitly routed using _route_ + throw new SolrException(ErrorCode.BAD_REQUEST, + "Could not find document " + idField.getName() + ":" + id + + ", perhaps the wrong \"_route_\" param was supplied"); } } else { - oldDoc.remove(CommonParams.VERSION_FIELD); + oldRootDocWithChildren.remove(CommonParams.VERSION_FIELD); } - cmd.solrDoc = docMerger.merge(sdoc, oldDoc); + SolrInputDocument mergedDoc; + if(idField == null || oldRootDocWithChildren == null) { + // create a new doc by default if an old one wasn't found + mergedDoc = docMerger.merge(sdoc, new SolrInputDocument()); + } else { + if(req.getSchema().savesChildDocRelations() && + !sdoc.getField(idField.getName()).getFirstValue().toString() + .equals((String) oldRootDocWithChildren.getFieldValue(IndexSchema.ROOT_FIELD_NAME))) { + // this is an update where the updated doc is not the root document + SolrInputDocument sdocWithChildren = RealTimeGetComponent.getInputDocument(cmd.getReq().getCore(), + id, RealTimeGetComponent.Resolution.DOC_WITH_CHILDREN); + mergedDoc = docMerger.mergeChildDoc(sdoc, oldRootDocWithChildren, sdocWithChildren); + } else { + mergedDoc = docMerger.merge(sdoc, oldRootDocWithChildren); + } + } + cmd.solrDoc = mergedDoc; return true; } @@ -670,7 +696,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void processDelete(DeleteUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); - + updateCommand = cmd; if (!cmd.isDeleteById()) { @@ -798,12 +824,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { protected void versionDeleteByQuery(DeleteUpdateCommand cmd) throws IOException { // Find the version - long versionOnUpdate = cmd.getVersion(); - if (versionOnUpdate == 0) { - String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD); - versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); - } - versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version + long versionOnUpdate = findVersionOnUpdate(cmd); boolean isReplayOrPeersync = (cmd.getFlags() & (UpdateCommand.REPLAY | UpdateCommand.PEER_SYNC)) != 0; boolean leaderLogic = isLeader && !isReplayOrPeersync; @@ -815,31 +836,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { vinfo.blockUpdates(); try { - if (versionsStored) { - if (leaderLogic) { - long version = vinfo.getNewClock(); - cmd.setVersion(-version); - // TODO update versions in all buckets - - doLocalDelete(cmd); - - } else { - cmd.setVersion(-versionOnUpdate); - - if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { - // we're not in an active state, and this update isn't from a replay, so buffer it. - cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); - ulog.deleteByQuery(cmd); - return; - } - - if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { - // TLOG replica not leader, don't write the DBQ to IW - cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); - } - doLocalDelete(cmd); - } - } + doLocalDeleteByQuery(cmd, versionOnUpdate, isReplayOrPeersync); // since we don't know which documents were deleted, the easiest thing to do is to invalidate // all real-time caches (i.e. UpdateLog) which involves also getting a new version of the IndexReader @@ -850,6 +847,45 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { } } + private long findVersionOnUpdate(UpdateCommand cmd) { + long versionOnUpdate = cmd.getVersion(); + if (versionOnUpdate == 0) { + String versionOnUpdateS = req.getParams().get(CommonParams.VERSION_FIELD); + versionOnUpdate = versionOnUpdateS == null ? 0 : Long.parseLong(versionOnUpdateS); + } + versionOnUpdate = Math.abs(versionOnUpdate); // normalize to positive version + return versionOnUpdate; + } + + private void doLocalDeleteByQuery(DeleteUpdateCommand cmd, long versionOnUpdate, boolean isReplayOrPeersync) throws IOException { + if (versionsStored) { + final boolean leaderLogic = isLeader & !isReplayOrPeersync; + if (leaderLogic) { + long version = vinfo.getNewClock(); + cmd.setVersion(-version); + // TODO update versions in all buckets + + doLocalDelete(cmd); + + } else { + cmd.setVersion(-versionOnUpdate); + + if (ulog.getState() != UpdateLog.State.ACTIVE && isReplayOrPeersync == false) { + // we're not in an active state, and this update isn't from a replay, so buffer it. + cmd.setFlags(cmd.getFlags() | UpdateCommand.BUFFERING); + ulog.deleteByQuery(cmd); + return; + } + + if (!isSubShardLeader && replicaType == Replica.Type.TLOG && (cmd.getFlags() & UpdateCommand.REPLAY) == 0) { + // TLOG replica not leader, don't write the DBQ to IW + cmd.setFlags(cmd.getFlags() | UpdateCommand.IGNORE_INDEXWRITER); + } + doLocalDelete(cmd); + } + } + } + // internal helper method to setup request by processors who use this class. // NOTE: not called by this class! void setupRequest(UpdateCommand cmd) { @@ -990,7 +1026,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { public void processCommit(CommitUpdateCommand cmd) throws IOException { assert TestInjection.injectFailUpdateRequests(); - + updateCommand = cmd; // replica type can only be NRT in standalone mode @@ -1032,6 +1068,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { finished = true; } + /** + * + * {@link AddUpdateCommand#isNested} is set in {@link org.apache.solr.update.processor.NestedUpdateProcessorFactory}, + * which runs on leader and replicas just before run time processor + * @return whether this update changes a value of a nested document + */ + private static boolean shouldRefreshUlogCaches(AddUpdateCommand cmd) { + // should be set since this method should only be called after DistributedUpdateProcessor#doLocalAdd, + // which runs post-processor in the URP chain, having NestedURP set cmd#isNested. + assert !cmd.getReq().getSchema().savesChildDocRelations() || cmd.isNested != null; + // true if update adds children + return Boolean.TRUE.equals(cmd.isNested); + } + /** * Returns a boolean indicating whether or not the caller should behave as * if this is the "leader" even when ZooKeeper is not enabled. diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java index abe4754e0d9..22e6956f15d 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedZkUpdateProcessor.java @@ -236,7 +236,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { if (isLeader && !isSubShardLeader) { DocCollection coll = zkController.getClusterState().getCollection(collection); - List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getHashableId(), cmd.getSolrInputDocument()); + List subShardLeaders = getSubShardLeaders(coll, cloudDesc.getShardId(), cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument()); // the list will actually have only one element for an add request if (subShardLeaders != null && !subShardLeaders.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); @@ -246,7 +246,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { params.set(DISTRIB_FROM_PARENT, cloudDesc.getShardId()); cmdDistrib.distribAdd(cmd, subShardLeaders, params, true); } - final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getHashableId(), cmd.getSolrInputDocument()); + final List nodesByRoutingRules = getNodesByRoutingRules(zkController.getClusterState(), coll, cmd.getRootIdUsingRouteParam(), cmd.getSolrInputDocument()); if (nodesByRoutingRules != null && !nodesByRoutingRules.isEmpty()) { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); params.set(DISTRIB_UPDATE_PARAM, DistribPhase.FROMLEADER.toString()); @@ -566,7 +566,7 @@ public class DistributedZkUpdateProcessor extends DistributedUpdateProcessor { zkCheck(); if (cmd instanceof AddUpdateCommand) { AddUpdateCommand acmd = (AddUpdateCommand)cmd; - nodes = setupRequest(acmd.getHashableId(), acmd.getSolrInputDocument()); + nodes = setupRequest(acmd.getRootIdUsingRouteParam(), acmd.getSolrInputDocument()); } else if (cmd instanceof DeleteUpdateCommand) { DeleteUpdateCommand dcmd = (DeleteUpdateCommand)cmd; nodes = setupRequest(dcmd.getId(), null); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java index 125724b9e35..f822e10d707 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DocBasedVersionConstraintsProcessor.java @@ -218,7 +218,7 @@ public class DocBasedVersionConstraintsProcessor extends UpdateRequestProcessor private DocFoundAndOldUserAndSolrVersions getOldUserVersionsFromStored(BytesRef indexedDocId) throws IOException { // stored fields only... - SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId); + SolrInputDocument oldDoc = RealTimeGetComponent.getInputDocument(core, indexedDocId, RealTimeGetComponent.Resolution.DOC); if (null == oldDoc) { return DocFoundAndOldUserAndSolrVersions.NOT_FOUND; } else { diff --git a/solr/core/src/java/org/apache/solr/update/processor/NestedUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/NestedUpdateProcessorFactory.java index af109f77a0b..a6bb5d28fac 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/NestedUpdateProcessorFactory.java +++ b/solr/core/src/java/org/apache/solr/update/processor/NestedUpdateProcessorFactory.java @@ -75,31 +75,35 @@ public class NestedUpdateProcessorFactory extends UpdateRequestProcessorFactory @Override public void processAdd(AddUpdateCommand cmd) throws IOException { SolrInputDocument doc = cmd.getSolrInputDocument(); - processDocChildren(doc, null); + cmd.isNested = processDocChildren(doc, null); super.processAdd(cmd); } - private void processDocChildren(SolrInputDocument doc, String fullPath) { + private boolean processDocChildren(SolrInputDocument doc, String fullPath) { + boolean isNested = false; for(SolrInputField field: doc.values()) { int childNum = 0; boolean isSingleVal = !(field.getValue() instanceof Collection); for(Object val: field) { - if(!(val instanceof SolrInputDocument)) { + if (!(val instanceof SolrInputDocument)) { // either all collection items are child docs or none are. break; } final String fieldName = field.getName(); - if(fieldName.contains(PATH_SEP_CHAR)) { + if (fieldName.contains(PATH_SEP_CHAR)) { throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Field name: '" + fieldName + "' contains: '" + PATH_SEP_CHAR + "' , which is reserved for the nested URP"); } final String sChildNum = isSingleVal ? SINGULAR_VALUE_CHAR : String.valueOf(childNum); SolrInputDocument cDoc = (SolrInputDocument) val; - if(!cDoc.containsKey(uniqueKeyFieldName)) { + if (!cDoc.containsKey(uniqueKeyFieldName)) { String parentDocId = doc.getField(uniqueKeyFieldName).getFirstValue().toString(); cDoc.setField(uniqueKeyFieldName, generateChildUniqueId(parentDocId, fieldName, sChildNum)); } + if (!isNested) { + isNested = true; + } final String lastKeyPath = PATH_SEP_CHAR + fieldName + NUM_SEP_CHAR + sChildNum; // concat of all paths children.grandChild => /children#1/grandChild# final String childDocPath = fullPath == null ? lastKeyPath : fullPath + lastKeyPath; @@ -107,6 +111,7 @@ public class NestedUpdateProcessorFactory extends UpdateRequestProcessorFactory ++childNum; } } + return isNested; } private void processChildDoc(SolrInputDocument sdoc, SolrInputDocument parent, String fullPath) { diff --git a/solr/core/src/test-files/solr/collection1/conf/schema-nest.xml b/solr/core/src/test-files/solr/collection1/conf/schema-nest.xml index d20d734c1f5..93300579586 100644 --- a/solr/core/src/test-files/solr/collection1/conf/schema-nest.xml +++ b/solr/core/src/test-files/solr/collection1/conf/schema-nest.xml @@ -36,6 +36,8 @@ + + @@ -48,6 +50,10 @@ + + + + id diff --git a/solr/core/src/test/org/apache/solr/cloud/NestedShardedAtomicUpdateTest.java b/solr/core/src/test/org/apache/solr/cloud/NestedShardedAtomicUpdateTest.java new file mode 100644 index 00000000000..b640fe80cd7 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/NestedShardedAtomicUpdateTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.List; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrException; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.params.SolrParams; +import org.junit.Test; + +public class NestedShardedAtomicUpdateTest extends AbstractFullDistribZkTestBase { + + public NestedShardedAtomicUpdateTest() { + stress = 0; + sliceCount = 4; + schemaString = "schema-nest.xml"; + } + + @Override + protected String getCloudSolrConfig() { + return "solrconfig-tlog.xml"; + } + + @Override + protected String getCloudSchemaFile() { + return "schema-nest.xml"; + } + + @Test + @ShardsFixed(num = 4) + public void test() throws Exception { + boolean testFinished = false; + try { + sendWrongRouteParam(); + doNestedInplaceUpdateTest(); + doRootShardRoutingTest(); + testFinished = true; + } finally { + if (!testFinished) { + printLayoutOnTearDown = true; + } + } + } + + public void doRootShardRoutingTest() throws Exception { + assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size()); + final String[] ids = {"3", "4", "5", "6"}; + + assertEquals("size of ids to index should be the same as the number of clients", clients.size(), ids.length); + // for now, we know how ranges will be distributed to shards. + // may have to look it up in clusterstate if that assumption changes. + + SolrInputDocument doc = sdoc("id", "1", "level_s", "root"); + + final SolrParams params = params("wt", "json", "_route_", "1"); + + int which = (params.get("_route_").hashCode() & 0x7fffffff) % clients.size(); + SolrClient aClient = clients.get(which); + + indexDoc(aClient, params, doc); + + doc = sdoc("id", "1", "children", map("add", sdocs(sdoc("id", "2", "level_s", "child")))); + + indexDoc(aClient, params, doc); + + for(int idIndex = 0; idIndex < ids.length; ++idIndex) { + + doc = sdoc("id", "2", "grandChildren", map("add", sdocs(sdoc("id", ids[idIndex], "level_s", "grand_child")))); + + indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc); + + doc = sdoc("id", "3", "inplace_updatable_int", map("inc", "1")); + + indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc); + + // assert RTG request respects _route_ param + QueryResponse routeRsp = getRandomSolrClient().query(params("qt","/get", "id","2", "_route_", "1")); + SolrDocument results = (SolrDocument) routeRsp.getResponse().get("doc"); + assertNotNull("RTG should find doc because _route_ was set to the root documents' ID", results); + assertEquals("2", results.getFieldValue("id")); + + // assert all docs are indexed under the same root + getRandomSolrClient().commit(); + assertEquals(0, getRandomSolrClient().query(params("q", "-_root_:1")).getResults().size()); + + // assert all docs are indexed inside the same block + QueryResponse rsp = getRandomSolrClient().query(params("qt","/get", "id","1", "fl", "*, [child]")); + SolrDocument val = (SolrDocument) rsp.getResponse().get("doc"); + assertEquals("1", val.getFieldValue("id")); + List children = (List) val.getFieldValues("children"); + assertEquals(1, children.size()); + SolrDocument childDoc = children.get(0); + assertEquals("2", childDoc.getFieldValue("id")); + List grandChildren = (List) childDoc.getFieldValues("grandChildren"); + assertEquals(idIndex + 1, grandChildren.size()); + SolrDocument grandChild = grandChildren.get(0); + assertEquals(idIndex + 1, grandChild.getFirstValue("inplace_updatable_int")); + assertEquals("3", grandChild.getFieldValue("id")); + } + } + + public void doNestedInplaceUpdateTest() throws Exception { + assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size()); + final String[] ids = {"3", "4", "5", "6"}; + + assertEquals("size of ids to index should be the same as the number of clients", clients.size(), ids.length); + // for now, we know how ranges will be distributed to shards. + // may have to look it up in clusterstate if that assumption changes. + + SolrInputDocument doc = sdoc("id", "1", "level_s", "root"); + + final SolrParams params = params("wt", "json", "_route_", "1"); + + int which = (params.get("_route_").hashCode() & 0x7fffffff) % clients.size(); + SolrClient aClient = clients.get(which); + + indexDocAndRandomlyCommit(aClient, params, doc); + + doc = sdoc("id", "1", "children", map("add", sdocs(sdoc("id", "2", "level_s", "child")))); + + indexDocAndRandomlyCommit(aClient, params, doc); + + doc = sdoc("id", "2", "grandChildren", map("add", sdocs(sdoc("id", ids[0], "level_s", "grand_child")))); + + indexDocAndRandomlyCommit(aClient, params, doc); + + for (int fieldValue = 1; fieldValue < 5; ++fieldValue) { + doc = sdoc("id", "3", "inplace_updatable_int", map("inc", "1")); + + indexDocAndRandomlyCommit(getRandomSolrClient(), params, doc); + + // assert RTG request respects _route_ param + QueryResponse routeRsp = getRandomSolrClient().query(params("qt","/get", "id","2", "_route_", "1")); + SolrDocument results = (SolrDocument) routeRsp.getResponse().get("doc"); + assertNotNull("RTG should find doc because _route_ was set to the root documents' ID", results); + assertEquals("2", results.getFieldValue("id")); + + // assert all docs are indexed under the same root + getRandomSolrClient().commit(); + assertEquals(0, getRandomSolrClient().query(params("q", "-_root_:1")).getResults().size()); + + // assert all docs are indexed inside the same block + QueryResponse rsp = getRandomSolrClient().query(params("qt","/get", "id","1", "fl", "*, [child]")); + SolrDocument val = (SolrDocument) rsp.getResponse().get("doc"); + assertEquals("1", val.getFieldValue("id")); + List children = (List) val.getFieldValues("children"); + assertEquals(1, children.size()); + SolrDocument childDoc = children.get(0); + assertEquals("2", childDoc.getFieldValue("id")); + List grandChildren = (List) childDoc.getFieldValues("grandChildren"); + assertEquals(1, grandChildren.size()); + SolrDocument grandChild = grandChildren.get(0); + assertEquals(fieldValue, grandChild.getFirstValue("inplace_updatable_int")); + assertEquals("3", grandChild.getFieldValue("id")); + } + } + + public void sendWrongRouteParam() throws Exception { + assertEquals(4, cloudClient.getZkStateReader().getClusterState().getCollection(DEFAULT_COLLECTION).getSlices().size()); + final String rootId = "1"; + + SolrInputDocument doc = sdoc("id", rootId, "level_s", "root"); + + final SolrParams wrongRootParams = params("wt", "json", "_route_", "c"); + final SolrParams rightParams = params("wt", "json", "_route_", rootId); + + int which = (rootId.hashCode() & 0x7fffffff) % clients.size(); + SolrClient aClient = clients.get(which); + + indexDocAndRandomlyCommit(aClient, params("wt", "json", "_route_", rootId), doc, false); + + final SolrInputDocument childDoc = sdoc("id", rootId, "children", map("add", sdocs(sdoc("id", "2", "level_s", "child")))); + + indexDocAndRandomlyCommit(aClient, rightParams, childDoc, false); + + final SolrInputDocument grandChildDoc = sdoc("id", "2", "grandChildren", + map("add", sdocs( + sdoc("id", "3", "level_s", "grandChild") + ) + ) + ); + + SolrException e = expectThrows(SolrException.class, + "wrong \"_route_\" param should throw an exception", + () -> indexDocAndRandomlyCommit(aClient, wrongRootParams, grandChildDoc) + ); + + assertTrue("message should suggest the wrong \"_route_\" param was supplied", + e.getMessage().contains("perhaps the wrong \"_route_\" param was supplied")); + } + + private void indexDocAndRandomlyCommit(SolrClient client, SolrParams params, SolrInputDocument sdoc) throws IOException, SolrServerException { + indexDocAndRandomlyCommit(client, params, sdoc, true); + } + + private void indexDocAndRandomlyCommit(SolrClient client, SolrParams params, SolrInputDocument sdoc, boolean compareToControlCollection) throws IOException, SolrServerException { + if (compareToControlCollection) { + indexDoc(client, params, sdoc); + } else { + add(client, params, sdoc); + } + // randomly commit docs + if (random().nextBoolean()) { + client.commit(); + } + } + + private SolrClient getRandomSolrClient() { + return clients.get(random().nextInt(clients.size())); + } + +} diff --git a/solr/core/src/test/org/apache/solr/update/processor/NestedAtomicUpdateTest.java b/solr/core/src/test/org/apache/solr/update/processor/NestedAtomicUpdateTest.java new file mode 100644 index 00000000000..e52176f74a5 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/NestedAtomicUpdateTest.java @@ -0,0 +1,657 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.update.processor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.lucene.util.BytesRef; +import org.apache.solr.SolrTestCaseJ4; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.apache.solr.core.SolrCore; +import org.apache.solr.handler.component.RealTimeGetComponent; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +public class NestedAtomicUpdateTest extends SolrTestCaseJ4 { + + private final static String VERSION = "_version_"; + + @BeforeClass + public static void beforeTests() throws Exception { + initCore("solrconfig-tlog.xml", "schema-nest.xml"); // use "nest" schema + } + + @Before + public void before() { + clearIndex(); + assertU(commit()); + } + + @Test + public void testMergeChildDoc() throws Exception { + SolrInputDocument newChildDoc = sdoc("id", "3", "cat_ss", "child"); + SolrInputDocument addedDoc = sdoc("id", "1", + "cat_ss", Collections.singletonMap("add", "bbb"), + "child", Collections.singletonMap("add", sdocs(newChildDoc))); + + SolrInputDocument dummyBlock = sdoc("id", "1", + "cat_ss", new ArrayList<>(Arrays.asList("aaa", "ccc")), + "_root_", "1", "child", new ArrayList<>(sdocs(addedDoc))); + dummyBlock.removeField(VERSION); + + SolrInputDocument preMergeDoc = new SolrInputDocument(dummyBlock); + AtomicUpdateDocumentMerger docMerger = new AtomicUpdateDocumentMerger(req()); + docMerger.merge(addedDoc, dummyBlock); + assertEquals("merged document should have the same id", preMergeDoc.getFieldValue("id"), dummyBlock.getFieldValue("id")); + assertDocContainsSubset(preMergeDoc, dummyBlock); + assertDocContainsSubset(addedDoc, dummyBlock); + assertDocContainsSubset(newChildDoc, (SolrInputDocument) ((List) dummyBlock.getFieldValues("child")).get(1)); + assertEquals(dummyBlock.getFieldValue("id"), dummyBlock.getFieldValue("id")); + } + + @Test + public void testBlockAtomicInplaceUpdates() throws Exception { + SolrInputDocument doc = sdoc("id", "1", "string_s", "root"); + assertU(adoc(doc)); + + assertU(commit()); + + assertQ(req("q", "id:1", "fl", "*"), + "//*[@numFound='1']", + "//doc[1]/str[@name='id']=1" + ); + + List docs = IntStream.range(10, 20).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", + "child", "inplace_updatable_int", "0")).collect(Collectors.toList()); + doc = sdoc("id", "1", "children", Collections.singletonMap("add", docs)); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + + assertU(commit()); + + + assertQ(req("q", "_root_:1", "fl", "*", "rows", "11"), + "//*[@numFound='11']" + ); + + assertQ(req("q", "string_s:child", "fl", "*"), + "//*[@numFound='10']", + "*[count(//str[@name='string_s'][.='child'])=10]" + ); + + for(int i = 10; i < 20; ++i) { + doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1")); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + assertU(commit()); + } + + for(int i = 10; i < 20; ++i) { + doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1")); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + assertU(commit()); + } + + // ensure updates work when block has more than 10 children + for(int i = 10; i < 20; ++i) { + docs = IntStream.range(i * 10, (i * 10) + 5).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "grandChild")).collect(Collectors.toList()); + doc = sdoc("id", String.valueOf(i), "grandChildren", Collections.singletonMap("add", docs)); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + assertU(commit()); + } + + for(int i =10; i < 20; ++i) { + doc = sdoc("id", String.valueOf(i), "inplace_updatable_int", Collections.singletonMap("inc", "1")); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + assertU(commit()); + } + + assertQ(req("q", "-_root_:*", "fl", "*"), + "//*[@numFound='0']" + ); + + assertQ(req("q", "string_s:grandChild", "fl", "*", "rows", "50"), + "//*[@numFound='50']" + ); + + assertQ(req("q", "string_s:child", "fl", "*"), + "//*[@numFound='10']", + "*[count(//str[@name='string_s'][.='child'])=10]"); + + assertJQ(req("q", "id:1", "fl", "*,[child limit=-1]"), + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/children/[0]/id=='10'", + "/response/docs/[0]/children/[0]/inplace_updatable_int==3", + "/response/docs/[0]/children/[0]/grandChildren/[0]/id=='100'", + "/response/docs/[0]/children/[0]/grandChildren/[4]/id=='104'", + "/response/docs/[0]/children/[9]/id=='19'" + ); + + } + + @Test + public void testBlockAtomicQuantities() throws Exception { + SolrInputDocument doc = sdoc("id", "1", "string_s", "root"); + assertU(adoc(doc)); + + assertU(commit()); + + assertQ(req("q", "id:1", "fl", "*"), + "//*[@numFound='1']", + "//doc[1]/str[@name='id']=1" + ); + + List docs = IntStream.range(10, 20).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "child")).collect(Collectors.toList()); + doc = sdoc("id", "1", "children", Collections.singletonMap("add", docs)); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + + assertQ(req("q", "_root_:1", "fl", "*", "rows", "11"), + "//*[@numFound='11']", + "*[count(//str[@name='_root_'][.='1'])=11]" + ); + + assertQ(req("q", "string_s:child", "fl", "*"), + "//*[@numFound='10']", + "*[count(//str[@name='string_s'][.='child'])=10]" + ); + + // ensure updates work when block has more than 10 children + for(int i = 10; i < 20; ++i) { + docs = IntStream.range(i * 10, (i * 10) + 5).mapToObj(x -> sdoc("id", String.valueOf(x), "string_s", "grandChild")).collect(Collectors.toList()); + doc = sdoc("id", String.valueOf(i), "grandChildren", Collections.singletonMap("add", docs)); + addAndGetVersion(doc, params("wt", "json")); + assertU(commit()); + } + + assertQ(req("q", "string_s:grandChild", "fl", "*", "rows", "50"), + "//*[@numFound='50']", + "*[count(//str[@name='string_s'][.='grandChild'])=50]"); + + assertQ(req("q", "string_s:child", "fl", "*"), + "//*[@numFound='10']", + "*[count(//str[@name='string_s'][.='child'])=10]"); + } + + @Test + public void testBlockAtomicStack() throws Exception { + SolrInputDocument doc = sdoc("id", "1", "child1", sdocs(sdoc("id", "2", "child_s", "child"))); + assertU(adoc(doc)); + + assertU(commit()); + + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/child_s=='child'" + ); + + doc = sdoc("id", "1", "child1", Collections.singletonMap("add", sdocs(sdoc("id", "3", "child_s", "child")))); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/child_s=='child'", + "/response/docs/[0]/child1/[1]/id=='3'", + "/response/docs/[0]/child1/[0]/child_s=='child'" + ); + + doc = sdoc("id", "2", + "grandChild", Collections.singletonMap("add", sdocs(sdoc("id", "4", "child_s", "grandChild"), sdoc("id", "5", "child_s", "grandChild")))); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/child_s=='child'", + "/response/docs/[0]/child1/[1]/id=='3'", + "/response/docs/[0]/child1/[1]/child_s=='child'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'" + ); + + doc = sdoc("id", "1", + "child2", Collections.singletonMap("add", sdocs(sdoc("id", "8", "child_s", "child")))); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + assertJQ(req("q","id:1", "fl", "*, [child]", "sort", "id asc"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/child_s=='child'", + "/response/docs/[0]/child1/[1]/id=='3'", + "/response/docs/[0]/child1/[1]/child_s=='child'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'", + "/response/docs/[0]/child2/[0]/id=='8'", + "/response/docs/[0]/child2/[0]/child_s=='child'" + ); + + doc = sdoc("id", "1", + "new_s", Collections.singletonMap("add", "new string")); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + // ensure the whole block has been committed correctly to the index. + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/child_s=='child'", + "/response/docs/[0]/child1/[1]/id=='3'", + "/response/docs/[0]/child1/[1]/child_s=='child'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/id=='4'", + "/response/docs/[0]/child1/[0]/grandChild/[0]/child_s=='grandChild'", + "/response/docs/[0]/child1/[0]/grandChild/[1]/id=='5'", + "/response/docs/[0]/child1/[0]/grandChild/[1]/child_s=='grandChild'", + "/response/docs/[0]/new_s=='new string'", + "/response/docs/[0]/child2/[0]/id=='8'", + "/response/docs/[0]/child2/[0]/child_s=='child'" + ); + + } + + @Test + public void testBlockAtomicAdd() throws Exception { + + SolrInputDocument doc = sdoc("id", "1", + "cat_ss", new String[] {"aaa", "ccc"}, + "child1", sdoc("id", "2", "cat_ss", "child") + ); + assertU(adoc(doc)); + + BytesRef rootDocId = new BytesRef("1"); + SolrCore core = h.getCore(); + SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + // assert block doc has child docs + assertTrue(block.containsKey("child1")); + + assertJQ(req("q","id:1") + ,"/response/numFound==0" + ); + + // commit the changes + assertU(commit()); + + SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + BytesRef childDocId = new BytesRef("2"); + // ensure the whole block is returned when resolveBlock is true and id of a child doc is provided + assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString()); + + assertJQ(req("q","id:1") + ,"/response/numFound==1" + ); + + doc = sdoc("id", "1", + "cat_ss", Collections.singletonMap("add", "bbb"), + "child2", Collections.singletonMap("add", sdoc("id", "3", "cat_ss", "child"))); + addAndGetVersion(doc, params("wt", "json")); + + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," + + "child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + assertU(commit()); + + // a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log + // this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]") + , "=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}," + + "child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + doc = sdoc("id", "2", + "child3", Collections.singletonMap("add", sdoc("id", "4", "cat_ss", "grandChild"))); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, [child]") + ,"=={'doc':{'id':'1'" + + ", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}]," + + "child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" + + " }}" + ); + + assertJQ(req("qt","/get", "id","2", "fl","id, cat_ss, child, child3, [child]") + ,"=={'doc':{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]}}" + + " }}" + ); + + assertU(commit()); + + //add greatGrandChild + doc = sdoc("id", "4", + "child4", Collections.singletonMap("add", sdoc("id", "5", "cat_ss", "greatGrandChild"))); + addAndGetVersion(doc, params("wt", "json")); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, child3, child4, [child]") + ,"=={'doc':{'id':'1'" + + ", cat_ss:[\"aaa\",\"ccc\",\"bbb\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"], child3:{\"id\":\"4\",\"cat_ss\":[\"grandChild\"]," + + " child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}}], child2:{\"id\":\"3\", \"cat_ss\": [\"child\"]}" + + " }}" + ); + + assertJQ(req("qt","/get", "id","4", "fl","id, cat_ss, child4, [child]") + ,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}}" + + " }}" + ); + + assertU(commit()); + + //add another greatGrandChild + doc = sdoc("id", "4", + "child4", Collections.singletonMap("add", sdoc("id", "6", "cat_ss", "greatGrandChild"))); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + assertJQ(req("qt","/get", "id","4", "fl","id, cat_ss, child4, [child]") + ,"=={'doc':{\"id\":\"4\",\"cat_ss\":[\"grandChild\"], child4:[{\"id\":\"5\",\"cat_ss\":[\"greatGrandChild\"]}," + + "{\"id\":\"6\", \"cat_ss\":[\"greatGrandChild\"]}]}" + + " }}" + ); + + //add another child field name + doc = sdoc("id", "1", + "child5", Collections.singletonMap("add", sdocs(sdoc("id", "7", "cat_ss", "child"), + sdoc("id", "8", "cat_ss", "child") + )) + ); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + doc = sdoc("id", "1", + "new_s", Collections.singletonMap("add", "new string")); + addAndGetVersion(doc, params("wt", "json")); + + assertU(commit()); + + + // ensure the whole block has been committed correctly to the index. + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/cat_ss/[0]==\"aaa\"", + "/response/docs/[0]/cat_ss/[1]==\"ccc\"", + "/response/docs/[0]/cat_ss/[2]==\"bbb\"", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/cat_ss/[0]=='child'", + "/response/docs/[0]/child1/[0]/child3/id=='4'", + "/response/docs/[0]/child1/[0]/child3/cat_ss/[0]=='grandChild'", + "/response/docs/[0]/child1/[0]/child3/child4/[0]/id=='5'", + "/response/docs/[0]/child1/[0]/child3/child4/[0]/cat_ss/[0]=='greatGrandChild'", + "/response/docs/[0]/child1/[0]/child3/child4/[1]/id=='6'", + "/response/docs/[0]/child1/[0]/child3/child4/[1]/cat_ss/[0]=='greatGrandChild'", + "/response/docs/[0]/child2/id=='3'", + "/response/docs/[0]/child2/cat_ss/[0]=='child'", + "/response/docs/[0]/child5/[0]/id=='7'", + "/response/docs/[0]/child5/[0]/cat_ss/[0]=='child'", + "/response/docs/[0]/child5/[1]/id=='8'", + "/response/docs/[0]/child5/[1]/cat_ss/[0]=='child'", + "/response/docs/[0]/new_s=='new string'" + ); + } + + @Test + public void testBlockAtomicSet() throws Exception { + SolrInputDocument doc = sdoc("id", "1", + "cat_ss", new String[] {"aaa", "ccc"}, + "child1", Collections.singleton(sdoc("id", "2", "cat_ss", "child")) + ); + assertU(adoc(doc)); + + BytesRef rootDocId = new BytesRef("1"); + SolrCore core = h.getCore(); + SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + // assert block doc has child docs + assertTrue(block.containsKey("child1")); + + assertJQ(req("q","id:1") + ,"/response/numFound==0" + ); + + // commit the changes + assertU(commit()); + + SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + BytesRef childDocId = new BytesRef("2"); + // ensure the whole block is returned when resolveBlock is true and id of a child doc is provided + assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString()); + + assertJQ(req("q","id:1") + ,"/response/numFound==1" + ); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + assertU(commit()); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + doc = sdoc("id", "1", + "cat_ss", Collections.singletonMap("set", Arrays.asList("aaa", "bbb")), + "child1", Collections.singletonMap("set", sdoc("id", "3", "cat_ss", "child"))); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"]}" + + " }}" + ); + + assertU(commit()); + + // a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log + // this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={'doc':{'id':'1'" + + ", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"]}" + + " }}" + ); + + doc = sdoc("id", "3", + "child2", Collections.singletonMap("set", sdoc("id", "4", "cat_ss", "child"))); + addAndGetVersion(doc, params("wt", "json", "_route_", "1")); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, child2, [child]") + ,"=={'doc':{'id':'1'" + + ", cat_ss:[\"aaa\",\"bbb\"], child1:{\"id\":\"3\",\"cat_ss\":[\"child\"], child2:{\"id\":\"4\",\"cat_ss\":[\"child\"]}}" + + " }}" + ); + + assertJQ(req("qt","/get", "id","3", "fl","id, cat_ss, child, child2, [child]") + ,"=={'doc':{\"id\":\"3\",\"cat_ss\":[\"child\"], child2:{\"id\":\"4\",\"cat_ss\":[\"child\"]}}" + + " }}" + ); + + assertU(commit()); + + // ensure the whole block has been committed correctly to the index. + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/cat_ss/[0]==\"aaa\"", + "/response/docs/[0]/cat_ss/[1]==\"bbb\"", + "/response/docs/[0]/child1/id=='3'", + "/response/docs/[0]/child1/cat_ss/[0]=='child'", + "/response/docs/[0]/child1/child2/id=='4'", + "/response/docs/[0]/child1/child2/cat_ss/[0]=='child'" + ); + } + + @Test + public void testAtomicUpdateDeleteNoRootField() throws Exception { + SolrInputDocument doc = sdoc("id", "1", + "cat_ss", new String[]{"aaa", "bbb"}); + assertU(adoc(doc)); + + assertJQ(req("q", "id:1") + , "/response/numFound==0" + ); + + // commit the changes + assertU(commit()); + + assertJQ(req("q", "id:1"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/cat_ss/[0]==\"aaa\"", + "/response/docs/[0]/cat_ss/[1]==\"bbb\"" + ); + + doc = sdoc("id", "1", + "child1", Collections.singletonMap("add", sdoc("id", "2", "cat_ss", "child"))); + addAndGetVersion(doc, params("wt", "json")); + + // commit the changes + assertU(commit()); + + // assert that doc with id:1 was removed even though it did not have _root_:1 since it was not indexed with child documents. + assertJQ(req("q", "id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/cat_ss/[0]==\"aaa\"", + "/response/docs/[0]/cat_ss/[1]==\"bbb\"", + "/response/docs/[0]/child1/id==\"2\"", + "/response/docs/[0]/child1/cat_ss/[0]==\"child\"" + ); + + + } + + @Test + public void testBlockAtomicRemove() throws Exception { + SolrInputDocument doc = sdoc("id", "1", + "cat_ss", new String[] {"aaa", "ccc"}, + "child1", sdocs(sdoc("id", "2", "cat_ss", "child"), sdoc("id", "3", "cat_ss", "child")) + ); + assertU(adoc(doc)); + + BytesRef rootDocId = new BytesRef("1"); + SolrCore core = h.getCore(); + SolrInputDocument block = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + // assert block doc has child docs + assertTrue(block.containsKey("child1")); + + assertJQ(req("q","id:1") + ,"/response/numFound==0" + ); + + // commit the changes + assertU(commit()); + + SolrInputDocument committedBlock = RealTimeGetComponent.getInputDocument(core, rootDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN); + BytesRef childDocId = new BytesRef("2"); + // ensure the whole block is returned when resolveBlock is true and id of a child doc is provided + assertEquals(committedBlock.toString(), RealTimeGetComponent.getInputDocument(core, childDocId, RealTimeGetComponent.Resolution.ROOT_WITH_CHILDREN).toString()); + + assertJQ(req("q","id:1") + ,"/response/numFound==1" + ); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + assertU(commit()); + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}, {\"id\":\"3\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + doc = sdoc("id", "1", + "child1", Collections.singletonMap("remove", sdoc("id", "3", "cat_ss", "child"))); + addAndGetVersion(doc, params("wt", "json")); + + + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={\"doc\":{'id':\"1\"" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + assertU(commit()); + + // a cut-n-paste of the first big query, but this time it will be retrieved from the index rather than the transaction log + // this requires ChildDocTransformer to get the whole block, since the document is retrieved using an index lookup + assertJQ(req("qt","/get", "id","1", "fl","id, cat_ss, child1, [child]") + ,"=={'doc':{'id':'1'" + + ", cat_ss:[\"aaa\",\"ccc\"], child1:[{\"id\":\"2\",\"cat_ss\":[\"child\"]}]" + + " }}" + ); + + // ensure the whole block has been committed correctly to the index. + assertJQ(req("q","id:1", "fl", "*, [child]"), + "/response/numFound==1", + "/response/docs/[0]/id=='1'", + "/response/docs/[0]/cat_ss/[0]==\"aaa\"", + "/response/docs/[0]/cat_ss/[1]==\"ccc\"", + "/response/docs/[0]/child1/[0]/id=='2'", + "/response/docs/[0]/child1/[0]/cat_ss/[0]=='child'" + ); + } + + private static void assertDocContainsSubset(SolrInputDocument subsetDoc, SolrInputDocument fullDoc) { + for(SolrInputField field: subsetDoc) { + String fieldName = field.getName(); + assertTrue("doc should contain field: " + fieldName, fullDoc.containsKey(fieldName)); + Object fullValue = fullDoc.getField(fieldName).getValue(); + if(fullValue instanceof Collection) { + ((Collection) fullValue).containsAll(field.getValues()); + } else { + assertEquals("docs should have the same value for field: " + fieldName, field.getValue(), fullValue); + } + } + } +} diff --git a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java index 7e1ee0ae2c7..bff7a4e7ce3 100644 --- a/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java +++ b/solr/test-framework/src/java/org/apache/solr/SolrTestCaseJ4.java @@ -63,6 +63,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import com.carrotsearch.randomizedtesting.RandomizedContext; import com.carrotsearch.randomizedtesting.RandomizedTest; @@ -1374,9 +1375,35 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase { for (Object val : sfield) { if (firstVal) firstVal=false; else out.append(','); + if(val instanceof SolrInputDocument) { + json((SolrInputDocument) val, out); + } out.append(JSONUtil.toJSON(val)); } out.append(']'); + } else if(sfield.getValue() instanceof SolrInputDocument) { + json((SolrInputDocument) sfield.getValue(), out); + } else if (sfield.getValue() instanceof Map) { + Map valMap = (Map) sfield.getValue(); + Set childDocsKeys = valMap.entrySet().stream().filter(record -> isChildDoc(record.getValue())) + .map(Entry::getKey).collect(Collectors.toSet()); + if(childDocsKeys.size() > 0) { + Map newMap = new HashMap<>(); + for(Entry entry: valMap.entrySet()) { + String keyName = entry.getKey(); + Object val = entry.getValue(); + if(childDocsKeys.contains(keyName)) { + if(val instanceof Collection) { + val = ((Collection) val).stream().map(e -> toSolrDoc((SolrInputDocument) e)).collect(Collectors.toList()); + } else { + val = toSolrDoc((SolrInputDocument) val); + } + } + newMap.put(keyName, val); + } + valMap = newMap; + } + out.append(JSONUtil.toJSON(valMap)); } else { out.append(JSONUtil.toJSON(sfield.getValue())); } @@ -2920,6 +2947,25 @@ public abstract class SolrTestCaseJ4 extends SolrTestCase { private_RANDOMIZED_NUMERIC_FIELDTYPES.clear(); } + private static SolrDocument toSolrDoc(SolrInputDocument sid) { + SolrDocument doc = new SolrDocument(); + for(SolrInputField field: sid) { + doc.setField(field.getName(), field.getValue()); + } + return doc; + } + + private static boolean isChildDoc(Object o) { + if(o instanceof Collection) { + Collection col = (Collection) o; + if(col.size() == 0) { + return false; + } + return col.iterator().next() instanceof SolrInputDocument; + } + return o instanceof SolrInputDocument; + } + private static final Map private_RANDOMIZED_NUMERIC_FIELDTYPES = new HashMap<>(); /**