SOLR-6909: Extract atomic update handling logic into AtomicUpdateDocumentMerger

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1652660 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Yonik Seeley 2015-01-17 20:04:27 +00:00
parent df42c143bb
commit 5fa8807541
5 changed files with 243 additions and 160 deletions

View File

@ -55,7 +55,16 @@ Other Changes
Erik Hatcher)
================== 5.1.0 ==================
(No Changes)
Detailed Change List
----------------------
New Features
----------------------
* SOLR-6909: Extract atomic update handling logic into AtomicUpdateDocumentMerger class
and enable subclassing. (Steve Davids, yonik)
================== 5.0.0 ==================

View File

@ -0,0 +1,216 @@
package org.apache.solr.update.processor;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @lucene.experimental
*/
public class AtomicUpdateDocumentMerger {
private final static Logger log = LoggerFactory.getLogger(AtomicUpdateDocumentMerger.class);
protected final IndexSchema schema;
protected final SchemaField idField;
public AtomicUpdateDocumentMerger(SolrQueryRequest queryReq) {
schema = queryReq.getSchema();
idField = schema.getUniqueKeyField();
}
/**
* Utility method that examines the SolrInputDocument in an AddUpdateCommand
* and returns true if the documents contains atomic update instructions.
*/
public static boolean isAtomicUpdate(final AddUpdateCommand cmd) {
SolrInputDocument sdoc = cmd.getSolrInputDocument();
for (SolrInputField sif : sdoc.values()) {
if (sif.getValue() instanceof Map) {
return true;
}
}
return false;
}
/**
* Merges the fromDoc into the toDoc using the atomic update syntax.
*
* @param fromDoc SolrInputDocument which will merged into the toDoc
* @param toDoc the final SolrInputDocument that will be mutated with the values from the fromDoc atomic commands
* @return toDoc with mutated values
*/
public SolrInputDocument merge(final SolrInputDocument fromDoc, SolrInputDocument toDoc) {
for (SolrInputField sif : fromDoc.values()) {
Object val = sif.getValue();
if (val instanceof Map) {
for (Entry<String,Object> entry : ((Map<String,Object>) val).entrySet()) {
String key = entry.getKey();
Object fieldVal = entry.getValue();
boolean updateField = false;
switch (key) {
case "add":
updateField = true;
doAdd(toDoc, sif, fieldVal);
break;
case "set":
updateField = true;
doSet(toDoc, sif, fieldVal);
break;
case "remove":
updateField = true;
doRemove(toDoc, sif, fieldVal);
break;
case "removeregex":
updateField = true;
doRemoveRegex(toDoc, sif, fieldVal);
break;
case "inc":
updateField = true;
doInc(toDoc, sif, fieldVal);
break;
default:
//Perhaps throw an error here instead?
log.warn("Unknown operation for the an atomic update, operation ignored: " + key);
break;
}
// validate that the field being modified is not the id field.
if (updateField && idField.getName().equals(sif.getName())) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid update of id field: " + sif);
}
}
} else {
// normal fields are treated as a "set"
toDoc.put(sif.getName(), sif);
}
}
return toDoc;
}
protected void doSet(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
toDoc.setField(sif.getName(), fieldVal, sif.getBoost());
}
private void doAdd(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
toDoc.addField(sif.getName(), fieldVal, sif.getBoost());
}
protected void doInc(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
SolrInputField numericField = toDoc.get(sif.getName());
if (numericField == null) {
toDoc.setField(sif.getName(), fieldVal, sif.getBoost());
} else {
// TODO: fieldtype needs externalToObject?
String oldValS = numericField.getFirstValue().toString();
SchemaField sf = schema.getField(sif.getName());
BytesRefBuilder term = new BytesRefBuilder();
sf.getType().readableToIndexed(oldValS, term);
Object oldVal = sf.getType().toObject(sf, term.get());
String fieldValS = fieldVal.toString();
Number result;
if (oldVal instanceof Long) {
result = ((Long) oldVal).longValue() + Long.parseLong(fieldValS);
} else if (oldVal instanceof Float) {
result = ((Float) oldVal).floatValue() + Float.parseFloat(fieldValS);
} else if (oldVal instanceof Double) {
result = ((Double) oldVal).doubleValue() + Double.parseDouble(fieldValS);
} else {
// int, short, byte
result = ((Integer) oldVal).intValue() + Integer.parseInt(fieldValS);
}
toDoc.setField(sif.getName(), result, sif.getBoost());
}
}
protected void doRemove(SolrInputDocument toDoc, SolrInputField sif, Object fieldVal) {
final String name = sif.getName();
SolrInputField existingField = toDoc.get(name);
if(existingField == null) return;
SchemaField sf = schema.getField(name);
if (sf != null) {
final Collection<Object> original = existingField.getValues();
if (fieldVal instanceof Collection) {
for (Object object : (Collection)fieldVal){
original.remove(sf.getType().toNativeType(object));
}
} else {
original.remove(sf.getType().toNativeType(fieldVal));
}
toDoc.setField(name, original);
}
}
protected void doRemoveRegex(SolrInputDocument toDoc, SolrInputField sif, Object valuePatterns) {
final String name = sif.getName();
final SolrInputField existingField = toDoc.get(name);
if (existingField != null) {
final Collection<Object> valueToRemove = new HashSet<>();
final Collection<Object> original = existingField.getValues();
final Collection<Pattern> patterns = preparePatterns(valuePatterns);
for (Object value : original) {
for(Pattern pattern : patterns) {
final Matcher m = pattern.matcher(value.toString());
if (m.matches()) {
valueToRemove.add(value);
}
}
}
original.removeAll(valueToRemove);
toDoc.setField(name, original);
}
}
private Collection<Pattern> preparePatterns(Object fieldVal) {
final Collection<Pattern> patterns = new LinkedHashSet<>(1);
if (fieldVal instanceof Collection) {
Collection<String> patternVals = (Collection<String>) fieldVal;
for (String patternVal : patternVals) {
patterns.add(Pattern.compile(patternVal));
}
} else {
patterns.add(Pattern.compile(fieldVal.toString()));
}
return patterns;
}
}

View File

@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -35,11 +34,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.CharsRefBuilder;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.CloudDescriptor;
@ -76,7 +72,6 @@ import org.apache.solr.handler.component.RealTimeGetComponent;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.IndexSchema;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
@ -231,6 +226,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
private final SolrQueryRequest req;
private final SolrQueryResponse rsp;
private final UpdateRequestProcessor next;
private final AtomicUpdateDocumentMerger docMerger;
public static final String VERSION_FIELD = "_version_";
@ -266,12 +262,20 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
//used for keeping track of replicas that have processed an add/update from the leader
private RequestReplicationTracker replicationTracker = null;
public DistributedUpdateProcessor(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
this(req, rsp, new AtomicUpdateDocumentMerger(req), next);
}
/** Specification of AtomicUpdateDocumentMerger is currently experimental.
* @lucene.experimental
*/
public DistributedUpdateProcessor(SolrQueryRequest req,
SolrQueryResponse rsp, UpdateRequestProcessor next) {
SolrQueryResponse rsp, AtomicUpdateDocumentMerger docMerger, UpdateRequestProcessor next) {
super(next);
this.rsp = rsp;
this.next = next;
this.docMerger = docMerger;
this.idField = req.getSchema().getUniqueKeyField();
// version init
@ -949,7 +953,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
}
if (vinfo == null) {
if (isAtomicUpdate(cmd)) {
if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {
throw new SolrException
(SolrException.ErrorCode.BAD_REQUEST,
"Atomic document updates are not supported unless <updateLog/> is configured");
@ -1095,24 +1099,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
return false;
}
/**
* Utility method that examines the SolrInputDocument in an AddUpdateCommand
* and returns true if the documents contains atomic update instructions.
*/
public static boolean isAtomicUpdate(final AddUpdateCommand cmd) {
SolrInputDocument sdoc = cmd.getSolrInputDocument();
for (SolrInputField sif : sdoc.values()) {
if (sif.getValue() instanceof Map) {
return true;
}
}
return false;
}
// TODO: may want to switch to using optimistic locking in the future for better concurrency
// that's why this code is here... need to retry in a loop closely around/in versionAdd
boolean getUpdatedDocument(AddUpdateCommand cmd, long versionOnUpdate) throws IOException {
if (!isAtomicUpdate(cmd)) return false;
if (!AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) return false;
SolrInputDocument sdoc = cmd.getSolrInputDocument();
BytesRef id = cmd.getIndexedId();
@ -1129,142 +1119,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor {
} else {
oldDoc.remove(VERSION_FIELD);
}
IndexSchema schema = cmd.getReq().getSchema();
for (SolrInputField sif : sdoc.values()) {
Object val = sif.getValue();
if (val instanceof Map) {
for (Entry<String,Object> entry : ((Map<String,Object>) val).entrySet()) {
String key = entry.getKey();
Object fieldVal = entry.getValue();
boolean updateField = false;
switch (key) {
case "add":
updateField = true;
oldDoc.addField(sif.getName(), fieldVal, sif.getBoost());
break;
case "set":
updateField = true;
oldDoc.setField(sif.getName(), fieldVal, sif.getBoost());
break;
case "remove":
updateField = true;
doRemove(oldDoc, sif, fieldVal, schema);
break;
case "removeregex":
updateField = true;
doRemoveRegex(oldDoc, sif, fieldVal);
break;
case "inc":
updateField = true;
doInc(oldDoc, schema, sif, fieldVal);
break;
default:
//Perhaps throw an error here instead?
log.warn("Unknown operation for the an atomic update, operation ignored: " + key);
break;
}
// validate that the field being modified is not the id field.
if (updateField && idField.getName().equals(sif.getName())) {
throw new SolrException(ErrorCode.BAD_REQUEST, "Invalid update of id field: " + sif);
}
}
} else {
// normal fields are treated as a "set"
oldDoc.put(sif.getName(), sif);
}
}
cmd.solrDoc = oldDoc;
return true;
}
private void doInc(SolrInputDocument oldDoc, IndexSchema schema, SolrInputField sif, Object fieldVal) {
SolrInputField numericField = oldDoc.get(sif.getName());
if (numericField == null) {
oldDoc.setField(sif.getName(), fieldVal, sif.getBoost());
} else {
// TODO: fieldtype needs externalToObject?
String oldValS = numericField.getFirstValue().toString();
SchemaField sf = schema.getField(sif.getName());
BytesRefBuilder term = new BytesRefBuilder();
sf.getType().readableToIndexed(oldValS, term);
Object oldVal = sf.getType().toObject(sf, term.get());
String fieldValS = fieldVal.toString();
Number result;
if (oldVal instanceof Long) {
result = ((Long) oldVal).longValue() + Long.parseLong(fieldValS);
} else if (oldVal instanceof Float) {
result = ((Float) oldVal).floatValue() + Float.parseFloat(fieldValS);
} else if (oldVal instanceof Double) {
result = ((Double) oldVal).doubleValue() + Double.parseDouble(fieldValS);
} else {
// int, short, byte
result = ((Integer) oldVal).intValue() + Integer.parseInt(fieldValS);
}
oldDoc.setField(sif.getName(), result, sif.getBoost());
}
}
private boolean doRemove(SolrInputDocument oldDoc, SolrInputField sif, Object fieldVal, IndexSchema schema) {
final String name = sif.getName();
SolrInputField existingField = oldDoc.get(name);
if(existingField == null) return false;
SchemaField sf = schema.getField(name);
int oldSize = existingField.getValueCount();
if (sf != null) {
final Collection<Object> original = existingField.getValues();
if (fieldVal instanceof Collection) {
for (Object object : (Collection)fieldVal){
original.remove(sf.getType().toNativeType(object));
}
} else {
original.remove(sf.getType().toNativeType(fieldVal));
}
oldDoc.setField(name, original);
}
return oldSize > existingField.getValueCount();
}
private void doRemoveRegex(SolrInputDocument oldDoc, SolrInputField sif, Object valuePatterns) {
final String name = sif.getName();
final SolrInputField existingField = oldDoc.get(name);
if (existingField != null) {
final Collection<Object> valueToRemove = new HashSet<>();
final Collection<Object> original = existingField.getValues();
final Collection<Pattern> patterns = preparePatterns(valuePatterns);
for (Object value : original) {
for(Pattern pattern : patterns) {
final Matcher m = pattern.matcher(value.toString());
if (m.matches()) {
valueToRemove.add(value);
}
}
}
original.removeAll(valueToRemove);
oldDoc.setField(name, original);
}
}
private Collection<Pattern> preparePatterns(Object fieldVal) {
final Collection<Pattern> patterns = new LinkedHashSet<>(1);
if (fieldVal instanceof Collection) {
Collection<String> patternVals = (Collection<String>) fieldVal;
for (String patternVal : patternVals) {
patterns.add(Pattern.compile(patternVal));
}
} else {
patterns.add(Pattern.compile(fieldVal.toString()));
}
return patterns;
cmd.solrDoc = docMerger.merge(sdoc, oldDoc);
return true;
}
@Override

View File

@ -60,7 +60,7 @@ class RunUpdateProcessor extends UpdateRequestProcessor
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
if (DistributedUpdateProcessor.isAtomicUpdate(cmd)) {
if (AtomicUpdateDocumentMerger.isAtomicUpdate(cmd)) {
throw new SolrException
(SolrException.ErrorCode.BAD_REQUEST,
"RunUpdateProcessor has received an AddUpdateCommand containing a document that appears to still contain Atomic document update operations, most likely because DistributedUpdateProcessorFactory was explicitly disabled from this updateRequestProcessorChain");

View File

@ -134,7 +134,7 @@ public class SignatureUpdateProcessorFactory
if (enabled) {
SolrInputDocument doc = cmd.getSolrInputDocument();
List<String> currDocSigFields = null;
boolean isPartialUpdate = DistributedUpdateProcessor.isAtomicUpdate(cmd);
boolean isPartialUpdate = AtomicUpdateDocumentMerger.isAtomicUpdate(cmd);
if (sigFields == null || sigFields.size() == 0) {
if (isPartialUpdate) {
throw new SolrException