mirror of https://github.com/apache/lucene.git
SOLR-6892: Improve the way update processors are used and make it simpler
git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1666436 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c9d344f09d
commit
40a1d7e22c
|
@ -234,6 +234,8 @@ Bug Fixes
|
||||||
* SOLR-6682: Fix response when using EnumField with StatsComponent
|
* SOLR-6682: Fix response when using EnumField with StatsComponent
|
||||||
(Xu Zhang via hossman)
|
(Xu Zhang via hossman)
|
||||||
|
|
||||||
|
* SOLR-6892: Improve the way update processors are used and make it simpler (Noble Paul)
|
||||||
|
|
||||||
Optimizations
|
Optimizations
|
||||||
----------------------
|
----------------------
|
||||||
|
|
||||||
|
|
|
@ -170,7 +170,7 @@ public class DataImportHandler extends RequestHandlerBase implements
|
||||||
IMPORT_CMD.equals(command)) {
|
IMPORT_CMD.equals(command)) {
|
||||||
importer.maybeReloadConfiguration(requestParams, defaultParams);
|
importer.maybeReloadConfiguration(requestParams, defaultParams);
|
||||||
UpdateRequestProcessorChain processorChain =
|
UpdateRequestProcessorChain processorChain =
|
||||||
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
|
req.getCore().getUpdateProcessorChain(params);
|
||||||
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
||||||
SolrResourceLoader loader = req.getCore().getResourceLoader();
|
SolrResourceLoader loader = req.getCore().getResourceLoader();
|
||||||
DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
|
DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);
|
||||||
|
|
|
@ -117,7 +117,7 @@ public class CloudUtil {
|
||||||
"/" + key, null, null, true));
|
"/" + key, null, null, true));
|
||||||
}
|
}
|
||||||
} catch (KeeperException.NoNodeException e) {
|
} catch (KeeperException.NoNodeException e) {
|
||||||
log.warn("Error fetching key names");
|
log.info("Error fetching key names");
|
||||||
return Collections.EMPTY_MAP;
|
return Collections.EMPTY_MAP;
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
|
|
@ -259,6 +259,12 @@ public class PluginBag<T> implements AutoCloseable {
|
||||||
if (inst != null && inst instanceof AutoCloseable) ((AutoCloseable) inst).close();
|
if (inst != null && inst instanceof AutoCloseable) ((AutoCloseable) inst).close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getClassName() {
|
||||||
|
if (isLoaded()) return inst.getClass().getName();
|
||||||
|
if (pluginInfo != null) return pluginInfo.className;
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -65,6 +65,7 @@ public class PluginInfo implements MapSerializable{
|
||||||
LinkedHashMap m = new LinkedHashMap<>(map);
|
LinkedHashMap m = new LinkedHashMap<>(map);
|
||||||
initArgs = new NamedList();
|
initArgs = new NamedList();
|
||||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||||
|
if (NAME.equals(entry.getKey()) || CLASS_NAME.equals(entry.getKey())) continue;
|
||||||
Object value = entry.getValue();
|
Object value = entry.getValue();
|
||||||
if (value instanceof Map) value = new NamedList((Map) value);
|
if (value instanceof Map) value = new NamedList((Map) value);
|
||||||
initArgs.add(entry.getKey(), value);
|
initArgs.add(entry.getKey(), value);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.solr.spelling.QueryConverter;
|
||||||
import org.apache.solr.update.SolrIndexConfig;
|
import org.apache.solr.update.SolrIndexConfig;
|
||||||
import org.apache.solr.update.UpdateLog;
|
import org.apache.solr.update.UpdateLog;
|
||||||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||||
|
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
|
||||||
import org.apache.solr.util.DOMUtil;
|
import org.apache.solr.util.DOMUtil;
|
||||||
import org.apache.solr.util.FileUtils;
|
import org.apache.solr.util.FileUtils;
|
||||||
import org.apache.solr.util.RegexFileFilter;
|
import org.apache.solr.util.RegexFileFilter;
|
||||||
|
@ -302,6 +303,7 @@ public class SolrConfig extends Config implements MapSerializable{
|
||||||
.add(new SolrPluginInfo(ValueSourceParser.class, "valueSourceParser", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
.add(new SolrPluginInfo(ValueSourceParser.class, "valueSourceParser", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
||||||
.add(new SolrPluginInfo(TransformerFactory.class, "transformer", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
.add(new SolrPluginInfo(TransformerFactory.class, "transformer", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
||||||
.add(new SolrPluginInfo(SearchComponent.class, "searchComponent", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
.add(new SolrPluginInfo(SearchComponent.class, "searchComponent", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
||||||
|
.add(new SolrPluginInfo(UpdateRequestProcessorFactory.class, "updateProcessor", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK))
|
||||||
// TODO: WTF is up with queryConverter???
|
// TODO: WTF is up with queryConverter???
|
||||||
// it aparently *only* works as a singleton? - SOLR-4304
|
// it aparently *only* works as a singleton? - SOLR-4304
|
||||||
// and even then -- only if there is a single SpellCheckComponent
|
// and even then -- only if there is a single SpellCheckComponent
|
||||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.solr.common.cloud.SolrZkClient;
|
||||||
import org.apache.solr.common.params.CommonParams;
|
import org.apache.solr.common.params.CommonParams;
|
||||||
import org.apache.solr.common.params.CommonParams.EchoParamStyle;
|
import org.apache.solr.common.params.CommonParams.EchoParamStyle;
|
||||||
import org.apache.solr.common.params.SolrParams;
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.params.UpdateParams;
|
||||||
import org.apache.solr.common.util.ExecutorUtil;
|
import org.apache.solr.common.util.ExecutorUtil;
|
||||||
import org.apache.solr.common.util.IOUtils;
|
import org.apache.solr.common.util.IOUtils;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
|
@ -129,7 +130,9 @@ import org.apache.solr.update.processor.DistributedUpdateProcessorFactory;
|
||||||
import org.apache.solr.update.processor.LogUpdateProcessorFactory;
|
import org.apache.solr.update.processor.LogUpdateProcessorFactory;
|
||||||
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
|
import org.apache.solr.update.processor.RunUpdateProcessorFactory;
|
||||||
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
|
||||||
|
import org.apache.solr.update.processor.UpdateRequestProcessorChain.ProcessorInfo;
|
||||||
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
|
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
|
||||||
|
import org.apache.solr.util.ConcurrentLRUCache;
|
||||||
import org.apache.solr.util.DefaultSolrThreadFactory;
|
import org.apache.solr.util.DefaultSolrThreadFactory;
|
||||||
import org.apache.solr.util.PropertiesInputStream;
|
import org.apache.solr.util.PropertiesInputStream;
|
||||||
import org.apache.solr.util.RefCounted;
|
import org.apache.solr.util.RefCounted;
|
||||||
|
@ -176,6 +179,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
private final long startTime;
|
private final long startTime;
|
||||||
private final RequestHandlers reqHandlers;
|
private final RequestHandlers reqHandlers;
|
||||||
private final PluginBag<SearchComponent> searchComponents = new PluginBag<>(SearchComponent.class, this);
|
private final PluginBag<SearchComponent> searchComponents = new PluginBag<>(SearchComponent.class, this);
|
||||||
|
private final PluginBag<UpdateRequestProcessorFactory> updateProcessors = new PluginBag<>(UpdateRequestProcessorFactory.class, this);
|
||||||
private final Map<String,UpdateRequestProcessorChain> updateProcessorChains;
|
private final Map<String,UpdateRequestProcessorChain> updateProcessorChains;
|
||||||
private final Map<String, SolrInfoMBean> infoRegistry;
|
private final Map<String, SolrInfoMBean> infoRegistry;
|
||||||
private IndexDeletionPolicyWrapper solrDelPolicy;
|
private IndexDeletionPolicyWrapper solrDelPolicy;
|
||||||
|
@ -796,6 +800,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
valueSourceParsers.init(ValueSourceParser.standardValueSourceParsers, this);
|
valueSourceParsers.init(ValueSourceParser.standardValueSourceParsers, this);
|
||||||
transformerFactories.init(TransformerFactory.defaultFactories, this);
|
transformerFactories.init(TransformerFactory.defaultFactories, this);
|
||||||
loadSearchComponents();
|
loadSearchComponents();
|
||||||
|
updateProcessors.init(Collections.EMPTY_MAP, this);
|
||||||
|
|
||||||
// Processors initialized before the handlers
|
// Processors initialized before the handlers
|
||||||
updateProcessorChains = loadUpdateProcessorChains();
|
updateProcessorChains = loadUpdateProcessorChains();
|
||||||
|
@ -993,7 +998,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
new DistributedUpdateProcessorFactory(),
|
new DistributedUpdateProcessorFactory(),
|
||||||
new RunUpdateProcessorFactory()
|
new RunUpdateProcessorFactory()
|
||||||
};
|
};
|
||||||
def = new UpdateRequestProcessorChain(factories, this);
|
def = new UpdateRequestProcessorChain(Arrays.asList(factories), this);
|
||||||
}
|
}
|
||||||
map.put(null, def);
|
map.put(null, def);
|
||||||
map.put("", def);
|
map.put("", def);
|
||||||
|
@ -1017,6 +1022,18 @@ public final class SolrCore implements SolrInfoMBean, Closeable {
|
||||||
return chain;
|
return chain;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public UpdateRequestProcessorChain getUpdateProcessorChain(SolrParams params) {
|
||||||
|
String chainName = params.get(UpdateParams.UPDATE_CHAIN);
|
||||||
|
UpdateRequestProcessorChain defaultUrp = getUpdateProcessingChain(chainName);
|
||||||
|
ProcessorInfo processorInfo = new ProcessorInfo(params);
|
||||||
|
if (processorInfo.isEmpty()) return defaultUrp;
|
||||||
|
return UpdateRequestProcessorChain.constructChain(defaultUrp, processorInfo, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PluginBag<UpdateRequestProcessorFactory> getUpdateProcessors() {
|
||||||
|
return updateProcessors;
|
||||||
|
}
|
||||||
|
|
||||||
// this core current usage count
|
// this core current usage count
|
||||||
private final AtomicInteger refCount = new AtomicInteger(1);
|
private final AtomicInteger refCount = new AtomicInteger(1);
|
||||||
|
|
||||||
|
|
|
@ -229,7 +229,7 @@ public class BlobHandler extends RequestHandlerBase implements PluginInfoInitia
|
||||||
public static void indexMap(SolrQueryRequest req, SolrQueryResponse rsp, Map<String, Object> doc) throws IOException {
|
public static void indexMap(SolrQueryRequest req, SolrQueryResponse rsp, Map<String, Object> doc) throws IOException {
|
||||||
SolrInputDocument solrDoc = new SolrInputDocument();
|
SolrInputDocument solrDoc = new SolrInputDocument();
|
||||||
for (Map.Entry<String, Object> e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue());
|
for (Map.Entry<String, Object> e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue());
|
||||||
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(req.getParams().get(UpdateParams.UPDATE_CHAIN));
|
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessorChain(req.getParams());
|
||||||
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
||||||
AddUpdateCommand cmd = new AddUpdateCommand(req);
|
AddUpdateCommand cmd = new AddUpdateCommand(req);
|
||||||
cmd.solrDoc = solrDoc;
|
cmd.solrDoc = solrDoc;
|
||||||
|
|
|
@ -55,7 +55,7 @@ public abstract class ContentStreamHandlerBase extends RequestHandlerBase {
|
||||||
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
|
||||||
SolrParams params = req.getParams();
|
SolrParams params = req.getParams();
|
||||||
UpdateRequestProcessorChain processorChain =
|
UpdateRequestProcessorChain processorChain =
|
||||||
req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN));
|
req.getCore().getUpdateProcessorChain(params);
|
||||||
|
|
||||||
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
package org.apache.solr.update.processor;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
|
import org.apache.solr.update.AddUpdateCommand;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A base class for writing a very simple UpdateProcessor without worrying too much about the API.
|
||||||
|
* This is deliberately made to support only the add operation
|
||||||
|
*/
|
||||||
|
public abstract class SimpleUpdateProcessorFactory extends UpdateRequestProcessorFactory {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) {
|
||||||
|
return new UpdateRequestProcessor(next) {
|
||||||
|
@Override
|
||||||
|
public void processAdd(AddUpdateCommand cmd) throws IOException {
|
||||||
|
process(cmd, req, rsp);
|
||||||
|
super.processAdd(cmd);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This object is reused across requests. So,this method should not store anything in the instance variable.
|
||||||
|
*/
|
||||||
|
protected abstract void process(AddUpdateCommand cmd, SolrQueryRequest req, SolrQueryResponse rsp);
|
||||||
|
}
|
|
@ -17,6 +17,9 @@
|
||||||
|
|
||||||
package org.apache.solr.update.processor;
|
package org.apache.solr.update.processor;
|
||||||
|
|
||||||
|
import org.apache.solr.common.params.MapSolrParams;
|
||||||
|
import org.apache.solr.common.params.SolrParams;
|
||||||
|
import org.apache.solr.common.util.StrUtils;
|
||||||
import org.apache.solr.request.SolrQueryRequest;
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
import org.apache.solr.response.SolrQueryResponse;
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
import org.apache.solr.util.plugin.PluginInfoInitialized;
|
import org.apache.solr.util.plugin.PluginInfoInitialized;
|
||||||
|
@ -28,8 +31,12 @@ import org.apache.solr.core.SolrCore;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Objects;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Manages a chain of UpdateRequestProcessorFactories.
|
* Manages a chain of UpdateRequestProcessorFactories.
|
||||||
|
@ -87,7 +94,7 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
|
||||||
{
|
{
|
||||||
public final static Logger log = LoggerFactory.getLogger(UpdateRequestProcessorChain.class);
|
public final static Logger log = LoggerFactory.getLogger(UpdateRequestProcessorChain.class);
|
||||||
|
|
||||||
private UpdateRequestProcessorFactory[] chain;
|
private List<UpdateRequestProcessorFactory> chain;
|
||||||
private final SolrCore solrCore;
|
private final SolrCore solrCore;
|
||||||
|
|
||||||
public UpdateRequestProcessorChain(SolrCore solrCore) {
|
public UpdateRequestProcessorChain(SolrCore solrCore) {
|
||||||
|
@ -152,14 +159,19 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
|
||||||
log.info("inserting DistributedUpdateProcessorFactory into " + infomsg);
|
log.info("inserting DistributedUpdateProcessorFactory into " + infomsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]);
|
chain = list;
|
||||||
|
ProcessorInfo processorInfo = new ProcessorInfo(new MapSolrParams(info.attributes));
|
||||||
|
if (processorInfo.isEmpty()) return;
|
||||||
|
UpdateRequestProcessorChain newChain = constructChain(this, processorInfo, solrCore);
|
||||||
|
chain = newChain.chain;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a chain backed directly by the specified array. Modifications to
|
* Creates a chain backed directly by the specified list. Modifications to
|
||||||
* the array will affect future calls to <code>createProcessor</code>
|
* the array will affect future calls to <code>createProcessor</code>
|
||||||
*/
|
*/
|
||||||
public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain,
|
public UpdateRequestProcessorChain(List<UpdateRequestProcessorFactory> chain,
|
||||||
SolrCore solrCore) {
|
SolrCore solrCore) {
|
||||||
this.chain = chain;
|
this.chain = chain;
|
||||||
this.solrCore = solrCore;
|
this.solrCore = solrCore;
|
||||||
|
@ -187,8 +199,8 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
|
||||||
final boolean skipToDistrib = distribPhase != null;
|
final boolean skipToDistrib = distribPhase != null;
|
||||||
boolean afterDistrib = true; // we iterate backwards, so true to start
|
boolean afterDistrib = true; // we iterate backwards, so true to start
|
||||||
|
|
||||||
for (int i = chain.length-1; i>=0; i--) {
|
for (int i = chain.size() - 1; i >= 0; i--) {
|
||||||
UpdateRequestProcessorFactory factory = chain[i];
|
UpdateRequestProcessorFactory factory = chain.get(i);
|
||||||
|
|
||||||
if (skipToDistrib) {
|
if (skipToDistrib) {
|
||||||
if (afterDistrib) {
|
if (afterDistrib) {
|
||||||
|
@ -208,13 +220,101 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized
|
||||||
return last;
|
return last;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
|
public UpdateRequestProcessorFactory[] getFactories() {
|
||||||
|
return chain.toArray(new UpdateRequestProcessorFactory[0]);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the underlying array of factories used in this chain.
|
* Returns the underlying array of factories used in this chain.
|
||||||
* Modifications to the array will affect future calls to
|
* Modifications to the array will affect future calls to
|
||||||
* <code>createProcessor</code>
|
* <code>createProcessor</code>
|
||||||
*/
|
*/
|
||||||
public UpdateRequestProcessorFactory[] getFactories() {
|
public List<UpdateRequestProcessorFactory> getProcessors() {
|
||||||
return chain;
|
return chain;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static UpdateRequestProcessorChain constructChain(UpdateRequestProcessorChain defaultUrp,
|
||||||
|
ProcessorInfo processorInfo, SolrCore core) {
|
||||||
|
LinkedList<UpdateRequestProcessorFactory> urps = new LinkedList(defaultUrp.chain);
|
||||||
|
List<UpdateRequestProcessorFactory> p = getReqProcessors(processorInfo.processor, core);
|
||||||
|
List<UpdateRequestProcessorFactory> post = getReqProcessors(processorInfo.postProcessor, core);
|
||||||
|
//processor are tried to be inserted before LogUpdateprocessor+DistributedUpdateProcessor
|
||||||
|
insertBefore(urps, p, DistributedUpdateProcessorFactory.class, 0);
|
||||||
|
//port-processor is tried to be inserted before RunUpdateProcessor
|
||||||
|
insertBefore(urps, post, RunUpdateProcessorFactory.class, urps.size() - 1);
|
||||||
|
UpdateRequestProcessorChain result = new UpdateRequestProcessorChain(urps, core);
|
||||||
|
if (log.isInfoEnabled()) {
|
||||||
|
ArrayList<String> names = new ArrayList<>(urps.size());
|
||||||
|
for (UpdateRequestProcessorFactory urp : urps) names.add(urp.getClass().getSimpleName());
|
||||||
|
log.info("New dynamic chain constructed : " + StrUtils.join(names, '>'));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void insertBefore(LinkedList<UpdateRequestProcessorFactory> urps, List<UpdateRequestProcessorFactory> newFactories, Class klas, int idx) {
|
||||||
|
if (newFactories.isEmpty()) return;
|
||||||
|
for (int i = 0; i < urps.size(); i++) {
|
||||||
|
if (klas.isInstance(urps.get(i))) {
|
||||||
|
idx = i;
|
||||||
|
if (klas == DistributedUpdateProcessorFactory.class) {
|
||||||
|
if (i > 0 && urps.get(i - 1) instanceof LogUpdateProcessorFactory) {
|
||||||
|
idx = i - 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = newFactories.size() - 1; 0 <= i; i--) urps.add(idx, newFactories.get(i));
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<UpdateRequestProcessorFactory> getReqProcessors(String processor, SolrCore core) {
|
||||||
|
if (processor == null) return Collections.EMPTY_LIST;
|
||||||
|
List<UpdateRequestProcessorFactory> result = new ArrayList<>();
|
||||||
|
if (processor != null) {
|
||||||
|
List<String> names = StrUtils.splitSmart(processor, ',');
|
||||||
|
List<UpdateRequestProcessorFactory> l = new ArrayList<>(names.size());
|
||||||
|
for (String s : names) {
|
||||||
|
s = s.trim();
|
||||||
|
if (s.isEmpty()) continue;
|
||||||
|
UpdateRequestProcessorFactory p = core.getUpdateProcessors().get(s);
|
||||||
|
if (p == null)
|
||||||
|
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such processor " + s);
|
||||||
|
result.add(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class ProcessorInfo {
|
||||||
|
public final String processor, postProcessor;
|
||||||
|
|
||||||
|
public ProcessorInfo(SolrParams params) {
|
||||||
|
processor = params.get("processor");
|
||||||
|
postProcessor = params.get("post-processor");
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return processor == null && postProcessor == null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
int hash = 0;
|
||||||
|
if (processor != null) hash += processor.hashCode();
|
||||||
|
if (postProcessor != null) hash += postProcessor.hashCode();
|
||||||
|
return hash;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (!(obj instanceof ProcessorInfo)) return false;
|
||||||
|
ProcessorInfo that = (ProcessorInfo) obj;
|
||||||
|
|
||||||
|
return Objects.equals(this.processor, that.processor) &&
|
||||||
|
Objects.equals(this.postProcessor, that.postProcessor);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,7 @@ public class TestBlobHandler extends AbstractFullDistribZkTestBase {
|
||||||
Map m = (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response)));
|
Map m = (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response)));
|
||||||
assertFalse("Error in posting blob "+ getAsString(m),m.containsKey("error"));
|
assertFalse("Error in posting blob "+ getAsString(m),m.containsKey("error"));
|
||||||
} catch (JSONParser.ParseException e) {
|
} catch (JSONParser.ParseException e) {
|
||||||
log.error(response);
|
log.error("$ERROR$", response, e);
|
||||||
fail();
|
fail();
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
package org.apache.solr.update.processor;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.solr.common.util.StrUtils;
|
||||||
|
import org.apache.solr.request.SolrQueryRequest;
|
||||||
|
import org.apache.solr.response.SolrQueryResponse;
|
||||||
|
import org.apache.solr.update.AddUpdateCommand;
|
||||||
|
|
||||||
|
public class RuntimeUrp extends SimpleUpdateProcessorFactory {
|
||||||
|
@Override
|
||||||
|
protected void process(AddUpdateCommand cmd, SolrQueryRequest req, SolrQueryResponse rsp) {
|
||||||
|
UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessorChain(req.getParams());
|
||||||
|
List<String> names = new ArrayList<>();
|
||||||
|
for (UpdateRequestProcessorFactory p : processorChain.getProcessors()) {
|
||||||
|
names.add(p.getClass().getSimpleName());
|
||||||
|
}
|
||||||
|
cmd.solrDoc.addField("processors_s", StrUtils.join(names,'>'));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,188 @@
|
||||||
|
package org.apache.solr.update.processor;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.FileInputStream;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.zip.ZipEntry;
|
||||||
|
import java.util.zip.ZipOutputStream;
|
||||||
|
|
||||||
|
import org.apache.solr.client.solrj.SolrClient;
|
||||||
|
import org.apache.solr.client.solrj.SolrQuery;
|
||||||
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
import org.apache.solr.cloud.AbstractFullDistribZkTestBase;
|
||||||
|
import org.apache.solr.common.SolrDocument;
|
||||||
|
import org.apache.solr.common.SolrInputDocument;
|
||||||
|
import org.apache.solr.common.util.StrUtils;
|
||||||
|
import org.apache.solr.core.TestDynamicLoading;
|
||||||
|
import org.apache.solr.core.TestSolrConfigHandler;
|
||||||
|
import org.apache.solr.handler.TestBlobHandler;
|
||||||
|
import org.apache.solr.util.RESTfulServerProvider;
|
||||||
|
import org.apache.solr.util.RestTestHarness;
|
||||||
|
import org.apache.solr.util.SimplePostTool;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
public class TestNamedUpdateProcessors extends AbstractFullDistribZkTestBase {
|
||||||
|
static final Logger log = LoggerFactory.getLogger(TestNamedUpdateProcessors.class);
|
||||||
|
private List<RestTestHarness> restTestHarnesses = new ArrayList<>();
|
||||||
|
|
||||||
|
private void setupHarnesses() {
|
||||||
|
for (final SolrClient client : clients) {
|
||||||
|
RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() {
|
||||||
|
@Override
|
||||||
|
public String getBaseURL() {
|
||||||
|
return ((HttpSolrClient) client).getBaseURL();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
restTestHarnesses.add(harness);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void distribTearDown() throws Exception {
|
||||||
|
super.distribTearDown();
|
||||||
|
for (RestTestHarness r : restTestHarnesses) {
|
||||||
|
r.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
System.setProperty("enable.runtime.lib", "true");
|
||||||
|
setupHarnesses();
|
||||||
|
|
||||||
|
String blobName = "colltest";
|
||||||
|
|
||||||
|
HttpSolrClient randomClient = (HttpSolrClient) clients.get(random().nextInt(clients.size()));
|
||||||
|
String baseURL = randomClient.getBaseURL();
|
||||||
|
|
||||||
|
TestBlobHandler.createSystemCollection(new HttpSolrClient(baseURL.substring(0, baseURL.lastIndexOf('/')), randomClient.getHttpClient()));
|
||||||
|
waitForRecoveriesToFinish(".system", true);
|
||||||
|
|
||||||
|
TestBlobHandler.postAndCheck(cloudClient, baseURL.substring(0, baseURL.lastIndexOf('/')), blobName, TestDynamicLoading.generateZip(RuntimeUrp.class), 1);
|
||||||
|
|
||||||
|
String payload = "{\n" +
|
||||||
|
"'add-runtimelib' : { 'name' : 'colltest' ,'version':1}\n" +
|
||||||
|
"}";
|
||||||
|
RestTestHarness client = restTestHarnesses.get(random().nextInt(restTestHarnesses.size()));
|
||||||
|
TestSolrConfigHandler.runConfigCommand(client, "/config?wt=json", payload);
|
||||||
|
TestSolrConfigHandler.testForResponseElement(client,
|
||||||
|
null,
|
||||||
|
"/config/overlay?wt=json",
|
||||||
|
null,
|
||||||
|
Arrays.asList("overlay", "runtimeLib", blobName, "version"),
|
||||||
|
1l, 10);
|
||||||
|
|
||||||
|
payload = "{\n" +
|
||||||
|
"'create-updateprocessor' : { 'name' : 'firstFld', 'class': 'solr.FirstFieldValueUpdateProcessorFactory', 'fieldName':'test_s'}, \n" +
|
||||||
|
"'create-updateprocessor' : { 'name' : 'test', 'class': 'org.apache.solr.update.processor.RuntimeUrp', 'runtimeLib':true }, \n" +
|
||||||
|
"'create-updateprocessor' : { 'name' : 'maxFld', 'class': 'solr.MaxFieldValueUpdateProcessorFactory', 'fieldName':'mul_s'} \n" +
|
||||||
|
"}";
|
||||||
|
|
||||||
|
client = restTestHarnesses.get(random().nextInt(restTestHarnesses.size()));
|
||||||
|
TestSolrConfigHandler.runConfigCommand(client, "/config?wt=json", payload);
|
||||||
|
for (RestTestHarness restTestHarness : restTestHarnesses) {
|
||||||
|
TestSolrConfigHandler.testForResponseElement(restTestHarness,
|
||||||
|
null,
|
||||||
|
"/config/overlay?wt=json",
|
||||||
|
null,
|
||||||
|
Arrays.asList("overlay", "updateProcessor", "firstFld", "fieldName"),
|
||||||
|
"test_s", 10);
|
||||||
|
}
|
||||||
|
|
||||||
|
SolrInputDocument doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", "123");
|
||||||
|
doc.addField("test_s", Arrays.asList("one", "two"));
|
||||||
|
doc.addField("mul_s", Arrays.asList("aaa", "bbb"));
|
||||||
|
randomClient.add(doc);
|
||||||
|
randomClient.commit(true, true);
|
||||||
|
QueryResponse result = randomClient.query(new SolrQuery("id:123"));
|
||||||
|
assertEquals(2, ((Collection) result.getResults().get(0).getFieldValues("test_s")).size());
|
||||||
|
assertEquals(2, ((Collection) result.getResults().get(0).getFieldValues("mul_s")).size());
|
||||||
|
doc = new SolrInputDocument();
|
||||||
|
doc.addField("id", "456");
|
||||||
|
doc.addField("test_s", Arrays.asList("three", "four"));
|
||||||
|
doc.addField("mul_s", Arrays.asList("aaa", "bbb"));
|
||||||
|
UpdateRequest ur = new UpdateRequest();
|
||||||
|
ur.add(doc).setParam("processor", "firstFld,maxFld,test");
|
||||||
|
randomClient.request(ur);
|
||||||
|
randomClient.commit(true, true);
|
||||||
|
result = randomClient.query(new SolrQuery("id:456"));
|
||||||
|
SolrDocument d = result.getResults().get(0);
|
||||||
|
assertEquals(1, d.getFieldValues("test_s").size());
|
||||||
|
assertEquals(1, d.getFieldValues("mul_s").size());
|
||||||
|
assertEquals("three", d.getFieldValues("test_s").iterator().next());
|
||||||
|
assertEquals("bbb", d.getFieldValues("mul_s").iterator().next());
|
||||||
|
String processors = (String) d.getFirstValue("processors_s");
|
||||||
|
assertNotNull(processors);
|
||||||
|
assertEquals(StrUtils.splitSmart(processors, '>'),
|
||||||
|
Arrays.asList("FirstFieldValueUpdateProcessorFactory", "MaxFieldValueUpdateProcessorFactory", "RuntimeUrp", "LogUpdateProcessorFactory", "DistributedUpdateProcessorFactory", "RunUpdateProcessorFactory"));
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ByteBuffer getFileContent(String f) throws IOException {
|
||||||
|
ByteBuffer jar;
|
||||||
|
try (FileInputStream fis = new FileInputStream(getFile(f))) {
|
||||||
|
byte[] buf = new byte[fis.available()];
|
||||||
|
fis.read(buf);
|
||||||
|
jar = ByteBuffer.wrap(buf);
|
||||||
|
}
|
||||||
|
return jar;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ByteBuffer persistZip(String loc, Class... classes) throws IOException {
|
||||||
|
ByteBuffer jar = generateZip(classes);
|
||||||
|
try (FileOutputStream fos = new FileOutputStream(loc)) {
|
||||||
|
fos.write(jar.array(), 0, jar.limit());
|
||||||
|
fos.flush();
|
||||||
|
}
|
||||||
|
return jar;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public static ByteBuffer generateZip(Class... classes) throws IOException {
|
||||||
|
ZipOutputStream zipOut = null;
|
||||||
|
SimplePostTool.BAOS bos = new SimplePostTool.BAOS();
|
||||||
|
zipOut = new ZipOutputStream(bos);
|
||||||
|
zipOut.setLevel(ZipOutputStream.DEFLATED);
|
||||||
|
for (Class c : classes) {
|
||||||
|
String path = c.getName().replace('.', '/').concat(".class");
|
||||||
|
ZipEntry entry = new ZipEntry(path);
|
||||||
|
ByteBuffer b = SimplePostTool.inputStreamToByteArray(c.getClassLoader().getResourceAsStream(path));
|
||||||
|
zipOut.putNextEntry(entry);
|
||||||
|
zipOut.write(b.array(), 0, b.limit());
|
||||||
|
zipOut.closeEntry();
|
||||||
|
}
|
||||||
|
zipOut.close();
|
||||||
|
return bos.getByteBuffer();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue