From 40a1d7e22c46f4a48372579a9fd1c7bf95b21d2d Mon Sep 17 00:00:00 2001 From: Noble Paul Date: Fri, 13 Mar 2015 13:13:15 +0000 Subject: [PATCH] SOLR-6892: Improve the way update processors are used and make it simpler git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1666436 13f79535-47bb-0310-9956-ffa450edef68 --- solr/CHANGES.txt | 2 + .../handler/dataimport/DataImportHandler.java | 2 +- .../java/org/apache/solr/cloud/CloudUtil.java | 2 +- .../java/org/apache/solr/core/PluginBag.java | 6 + .../java/org/apache/solr/core/PluginInfo.java | 1 + .../java/org/apache/solr/core/SolrConfig.java | 2 + .../java/org/apache/solr/core/SolrCore.java | 19 +- .../org/apache/solr/handler/BlobHandler.java | 2 +- .../handler/ContentStreamHandlerBase.java | 2 +- .../SimpleUpdateProcessorFactory.java | 48 +++++ .../UpdateRequestProcessorChain.java | 120 ++++++++++- .../apache/solr/handler/TestBlobHandler.java | 2 +- .../solr/update/processor/RuntimeUrp.java | 38 ++++ .../processor/TestNamedUpdateProcessors.java | 188 ++++++++++++++++++ 14 files changed, 418 insertions(+), 16 deletions(-) create mode 100644 solr/core/src/java/org/apache/solr/update/processor/SimpleUpdateProcessorFactory.java create mode 100644 solr/core/src/test/org/apache/solr/update/processor/RuntimeUrp.java create mode 100644 solr/core/src/test/org/apache/solr/update/processor/TestNamedUpdateProcessors.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 883b675bd1d..e90d572eec6 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -234,6 +234,8 @@ Bug Fixes * SOLR-6682: Fix response when using EnumField with StatsComponent (Xu Zhang via hossman) +* SOLR-6892: Improve the way update processors are used and make it simpler (Noble Paul) + Optimizations ---------------------- diff --git a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java index 5e93c86e01d..2b65894b2d3 100644 --- a/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java +++ b/solr/contrib/dataimporthandler/src/java/org/apache/solr/handler/dataimport/DataImportHandler.java @@ -170,7 +170,7 @@ public class DataImportHandler extends RequestHandlerBase implements IMPORT_CMD.equals(command)) { importer.maybeReloadConfiguration(requestParams, defaultParams); UpdateRequestProcessorChain processorChain = - req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN)); + req.getCore().getUpdateProcessorChain(params); UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp); SolrResourceLoader loader = req.getCore().getResourceLoader(); DIHWriter sw = getSolrWriter(processor, loader, requestParams, req); diff --git a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java index 14f7e699c4d..b9f16c7b0ee 100644 --- a/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java +++ b/solr/core/src/java/org/apache/solr/cloud/CloudUtil.java @@ -117,7 +117,7 @@ public class CloudUtil { "/" + key, null, null, true)); } } catch (KeeperException.NoNodeException e) { - log.warn("Error fetching key names"); + log.info("Error fetching key names"); return Collections.EMPTY_MAP; } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/solr/core/src/java/org/apache/solr/core/PluginBag.java b/solr/core/src/java/org/apache/solr/core/PluginBag.java index a73d376e752..054490906ec 100644 --- a/solr/core/src/java/org/apache/solr/core/PluginBag.java +++ b/solr/core/src/java/org/apache/solr/core/PluginBag.java @@ -259,6 +259,12 @@ public class PluginBag implements AutoCloseable { if (inst != null && inst instanceof AutoCloseable) ((AutoCloseable) inst).close(); } + + public String getClassName() { + if (isLoaded()) return inst.getClass().getName(); + if (pluginInfo != null) return pluginInfo.className; + return null; + } } /** diff --git a/solr/core/src/java/org/apache/solr/core/PluginInfo.java b/solr/core/src/java/org/apache/solr/core/PluginInfo.java index 82b0b239521..c6fe42c9115 100644 --- a/solr/core/src/java/org/apache/solr/core/PluginInfo.java +++ b/solr/core/src/java/org/apache/solr/core/PluginInfo.java @@ -65,6 +65,7 @@ public class PluginInfo implements MapSerializable{ LinkedHashMap m = new LinkedHashMap<>(map); initArgs = new NamedList(); for (Map.Entry entry : map.entrySet()) { + if (NAME.equals(entry.getKey()) || CLASS_NAME.equals(entry.getKey())) continue; Object value = entry.getValue(); if (value instanceof Map) value = new NamedList((Map) value); initArgs.add(entry.getKey(), value); diff --git a/solr/core/src/java/org/apache/solr/core/SolrConfig.java b/solr/core/src/java/org/apache/solr/core/SolrConfig.java index 04586358024..8d8295ed785 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrConfig.java +++ b/solr/core/src/java/org/apache/solr/core/SolrConfig.java @@ -42,6 +42,7 @@ import org.apache.solr.spelling.QueryConverter; import org.apache.solr.update.SolrIndexConfig; import org.apache.solr.update.UpdateLog; import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.update.processor.UpdateRequestProcessorFactory; import org.apache.solr.util.DOMUtil; import org.apache.solr.util.FileUtils; import org.apache.solr.util.RegexFileFilter; @@ -302,6 +303,7 @@ public class SolrConfig extends Config implements MapSerializable{ .add(new SolrPluginInfo(ValueSourceParser.class, "valueSourceParser", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK)) .add(new SolrPluginInfo(TransformerFactory.class, "transformer", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK)) .add(new SolrPluginInfo(SearchComponent.class, "searchComponent", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK)) + .add(new SolrPluginInfo(UpdateRequestProcessorFactory.class, "updateProcessor", REQUIRE_NAME, REQUIRE_CLASS, MULTI_OK)) // TODO: WTF is up with queryConverter??? // it aparently *only* works as a singleton? - SOLR-4304 // and even then -- only if there is a single SpellCheckComponent diff --git a/solr/core/src/java/org/apache/solr/core/SolrCore.java b/solr/core/src/java/org/apache/solr/core/SolrCore.java index e1ae0ba87ae..4afaf8a65a7 100644 --- a/solr/core/src/java/org/apache/solr/core/SolrCore.java +++ b/solr/core/src/java/org/apache/solr/core/SolrCore.java @@ -77,6 +77,7 @@ import org.apache.solr.common.cloud.SolrZkClient; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.CommonParams.EchoParamStyle; import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.params.UpdateParams; import org.apache.solr.common.util.ExecutorUtil; import org.apache.solr.common.util.IOUtils; import org.apache.solr.common.util.NamedList; @@ -129,7 +130,9 @@ import org.apache.solr.update.processor.DistributedUpdateProcessorFactory; import org.apache.solr.update.processor.LogUpdateProcessorFactory; import org.apache.solr.update.processor.RunUpdateProcessorFactory; import org.apache.solr.update.processor.UpdateRequestProcessorChain; +import org.apache.solr.update.processor.UpdateRequestProcessorChain.ProcessorInfo; import org.apache.solr.update.processor.UpdateRequestProcessorFactory; +import org.apache.solr.util.ConcurrentLRUCache; import org.apache.solr.util.DefaultSolrThreadFactory; import org.apache.solr.util.PropertiesInputStream; import org.apache.solr.util.RefCounted; @@ -176,6 +179,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { private final long startTime; private final RequestHandlers reqHandlers; private final PluginBag searchComponents = new PluginBag<>(SearchComponent.class, this); + private final PluginBag updateProcessors = new PluginBag<>(UpdateRequestProcessorFactory.class, this); private final Map updateProcessorChains; private final Map infoRegistry; private IndexDeletionPolicyWrapper solrDelPolicy; @@ -796,6 +800,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { valueSourceParsers.init(ValueSourceParser.standardValueSourceParsers, this); transformerFactories.init(TransformerFactory.defaultFactories, this); loadSearchComponents(); + updateProcessors.init(Collections.EMPTY_MAP, this); // Processors initialized before the handlers updateProcessorChains = loadUpdateProcessorChains(); @@ -993,7 +998,7 @@ public final class SolrCore implements SolrInfoMBean, Closeable { new DistributedUpdateProcessorFactory(), new RunUpdateProcessorFactory() }; - def = new UpdateRequestProcessorChain(factories, this); + def = new UpdateRequestProcessorChain(Arrays.asList(factories), this); } map.put(null, def); map.put("", def); @@ -1017,6 +1022,18 @@ public final class SolrCore implements SolrInfoMBean, Closeable { return chain; } + public UpdateRequestProcessorChain getUpdateProcessorChain(SolrParams params) { + String chainName = params.get(UpdateParams.UPDATE_CHAIN); + UpdateRequestProcessorChain defaultUrp = getUpdateProcessingChain(chainName); + ProcessorInfo processorInfo = new ProcessorInfo(params); + if (processorInfo.isEmpty()) return defaultUrp; + return UpdateRequestProcessorChain.constructChain(defaultUrp, processorInfo, this); + } + + public PluginBag getUpdateProcessors() { + return updateProcessors; + } + // this core current usage count private final AtomicInteger refCount = new AtomicInteger(1); diff --git a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java index 7399e785a57..acc93ee016d 100644 --- a/solr/core/src/java/org/apache/solr/handler/BlobHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/BlobHandler.java @@ -229,7 +229,7 @@ public class BlobHandler extends RequestHandlerBase implements PluginInfoInitia public static void indexMap(SolrQueryRequest req, SolrQueryResponse rsp, Map doc) throws IOException { SolrInputDocument solrDoc = new SolrInputDocument(); for (Map.Entry e : doc.entrySet()) solrDoc.addField(e.getKey(),e.getValue()); - UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessingChain(req.getParams().get(UpdateParams.UPDATE_CHAIN)); + UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessorChain(req.getParams()); UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp); AddUpdateCommand cmd = new AddUpdateCommand(req); cmd.solrDoc = solrDoc; diff --git a/solr/core/src/java/org/apache/solr/handler/ContentStreamHandlerBase.java b/solr/core/src/java/org/apache/solr/handler/ContentStreamHandlerBase.java index 125c61364d1..72839bf5303 100644 --- a/solr/core/src/java/org/apache/solr/handler/ContentStreamHandlerBase.java +++ b/solr/core/src/java/org/apache/solr/handler/ContentStreamHandlerBase.java @@ -55,7 +55,7 @@ public abstract class ContentStreamHandlerBase extends RequestHandlerBase { public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception { SolrParams params = req.getParams(); UpdateRequestProcessorChain processorChain = - req.getCore().getUpdateProcessingChain(params.get(UpdateParams.UPDATE_CHAIN)); + req.getCore().getUpdateProcessorChain(params); UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp); diff --git a/solr/core/src/java/org/apache/solr/update/processor/SimpleUpdateProcessorFactory.java b/solr/core/src/java/org/apache/solr/update/processor/SimpleUpdateProcessorFactory.java new file mode 100644 index 00000000000..af4619cea66 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/update/processor/SimpleUpdateProcessorFactory.java @@ -0,0 +1,48 @@ +package org.apache.solr.update.processor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.IOException; + +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.AddUpdateCommand; + + +/** + * A base class for writing a very simple UpdateProcessor without worrying too much about the API. + * This is deliberately made to support only the add operation + */ +public abstract class SimpleUpdateProcessorFactory extends UpdateRequestProcessorFactory { + + @Override + public UpdateRequestProcessor getInstance(SolrQueryRequest req, SolrQueryResponse rsp, UpdateRequestProcessor next) { + return new UpdateRequestProcessor(next) { + @Override + public void processAdd(AddUpdateCommand cmd) throws IOException { + process(cmd, req, rsp); + super.processAdd(cmd); + } + }; + } + + /** + * This object is reused across requests. So,this method should not store anything in the instance variable. + */ + protected abstract void process(AddUpdateCommand cmd, SolrQueryRequest req, SolrQueryResponse rsp); +} diff --git a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java index 506e4558b7f..f4ea70a7d2f 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java +++ b/solr/core/src/java/org/apache/solr/update/processor/UpdateRequestProcessorChain.java @@ -17,6 +17,9 @@ package org.apache.solr.update.processor; +import org.apache.solr.common.params.MapSolrParams; +import org.apache.solr.common.params.SolrParams; +import org.apache.solr.common.util.StrUtils; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.util.plugin.PluginInfoInitialized; @@ -28,8 +31,12 @@ import org.apache.solr.core.SolrCore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.ArrayList; +import java.util.Objects; /** * Manages a chain of UpdateRequestProcessorFactories. @@ -87,7 +94,7 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized { public final static Logger log = LoggerFactory.getLogger(UpdateRequestProcessorChain.class); - private UpdateRequestProcessorFactory[] chain; + private List chain; private final SolrCore solrCore; public UpdateRequestProcessorChain(SolrCore solrCore) { @@ -144,7 +151,7 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized } if (0 <= runIndex && 0 == numDistrib) { // by default, add distrib processor immediately before run - DistributedUpdateProcessorFactory distrib + DistributedUpdateProcessorFactory distrib = new DistributedUpdateProcessorFactory(); distrib.init(new NamedList()); list.add(runIndex, distrib); @@ -152,14 +159,19 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized log.info("inserting DistributedUpdateProcessorFactory into " + infomsg); } - chain = list.toArray(new UpdateRequestProcessorFactory[list.size()]); + chain = list; + ProcessorInfo processorInfo = new ProcessorInfo(new MapSolrParams(info.attributes)); + if (processorInfo.isEmpty()) return; + UpdateRequestProcessorChain newChain = constructChain(this, processorInfo, solrCore); + chain = newChain.chain; + } /** - * Creates a chain backed directly by the specified array. Modifications to + * Creates a chain backed directly by the specified list. Modifications to * the array will affect future calls to createProcessor */ - public UpdateRequestProcessorChain( UpdateRequestProcessorFactory[] chain, + public UpdateRequestProcessorChain(List chain, SolrCore solrCore) { this.chain = chain; this.solrCore = solrCore; @@ -187,8 +199,8 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized final boolean skipToDistrib = distribPhase != null; boolean afterDistrib = true; // we iterate backwards, so true to start - for (int i = chain.length-1; i>=0; i--) { - UpdateRequestProcessorFactory factory = chain[i]; + for (int i = chain.size() - 1; i >= 0; i--) { + UpdateRequestProcessorFactory factory = chain.get(i); if (skipToDistrib) { if (afterDistrib) { @@ -208,13 +220,101 @@ public final class UpdateRequestProcessorChain implements PluginInfoInitialized return last; } + + @Deprecated + public UpdateRequestProcessorFactory[] getFactories() { + return chain.toArray(new UpdateRequestProcessorFactory[0]); + } + /** - * Returns the underlying array of factories used in this chain. - * Modifications to the array will affect future calls to + * Returns the underlying array of factories used in this chain. + * Modifications to the array will affect future calls to * createProcessor */ - public UpdateRequestProcessorFactory[] getFactories() { + public List getProcessors() { return chain; } + public static UpdateRequestProcessorChain constructChain(UpdateRequestProcessorChain defaultUrp, + ProcessorInfo processorInfo, SolrCore core) { + LinkedList urps = new LinkedList(defaultUrp.chain); + List p = getReqProcessors(processorInfo.processor, core); + List post = getReqProcessors(processorInfo.postProcessor, core); + //processor are tried to be inserted before LogUpdateprocessor+DistributedUpdateProcessor + insertBefore(urps, p, DistributedUpdateProcessorFactory.class, 0); + //port-processor is tried to be inserted before RunUpdateProcessor + insertBefore(urps, post, RunUpdateProcessorFactory.class, urps.size() - 1); + UpdateRequestProcessorChain result = new UpdateRequestProcessorChain(urps, core); + if (log.isInfoEnabled()) { + ArrayList names = new ArrayList<>(urps.size()); + for (UpdateRequestProcessorFactory urp : urps) names.add(urp.getClass().getSimpleName()); + log.info("New dynamic chain constructed : " + StrUtils.join(names, '>')); + } + return result; + } + + private static void insertBefore(LinkedList urps, List newFactories, Class klas, int idx) { + if (newFactories.isEmpty()) return; + for (int i = 0; i < urps.size(); i++) { + if (klas.isInstance(urps.get(i))) { + idx = i; + if (klas == DistributedUpdateProcessorFactory.class) { + if (i > 0 && urps.get(i - 1) instanceof LogUpdateProcessorFactory) { + idx = i - 1; + } + } + break; + } + } + for (int i = newFactories.size() - 1; 0 <= i; i--) urps.add(idx, newFactories.get(i)); + } + + static List getReqProcessors(String processor, SolrCore core) { + if (processor == null) return Collections.EMPTY_LIST; + List result = new ArrayList<>(); + if (processor != null) { + List names = StrUtils.splitSmart(processor, ','); + List l = new ArrayList<>(names.size()); + for (String s : names) { + s = s.trim(); + if (s.isEmpty()) continue; + UpdateRequestProcessorFactory p = core.getUpdateProcessors().get(s); + if (p == null) + throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such processor " + s); + result.add(p); + } + } + return result; + } + + public static class ProcessorInfo { + public final String processor, postProcessor; + + public ProcessorInfo(SolrParams params) { + processor = params.get("processor"); + postProcessor = params.get("post-processor"); + } + + public boolean isEmpty() { + return processor == null && postProcessor == null; + } + + @Override + public int hashCode() { + int hash = 0; + if (processor != null) hash += processor.hashCode(); + if (postProcessor != null) hash += postProcessor.hashCode(); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof ProcessorInfo)) return false; + ProcessorInfo that = (ProcessorInfo) obj; + + return Objects.equals(this.processor, that.processor) && + Objects.equals(this.postProcessor, that.postProcessor); + } + } + } diff --git a/solr/core/src/test/org/apache/solr/handler/TestBlobHandler.java b/solr/core/src/test/org/apache/solr/handler/TestBlobHandler.java index 9994b9e557d..5e57adf5c7c 100644 --- a/solr/core/src/test/org/apache/solr/handler/TestBlobHandler.java +++ b/solr/core/src/test/org/apache/solr/handler/TestBlobHandler.java @@ -186,7 +186,7 @@ public class TestBlobHandler extends AbstractFullDistribZkTestBase { Map m = (Map) ObjectBuilder.getVal(new JSONParser(new StringReader(response))); assertFalse("Error in posting blob "+ getAsString(m),m.containsKey("error")); } catch (JSONParser.ParseException e) { - log.error(response); + log.error("$ERROR$", response, e); fail(); } } finally { diff --git a/solr/core/src/test/org/apache/solr/update/processor/RuntimeUrp.java b/solr/core/src/test/org/apache/solr/update/processor/RuntimeUrp.java new file mode 100644 index 00000000000..a80f0975dbb --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/RuntimeUrp.java @@ -0,0 +1,38 @@ +package org.apache.solr.update.processor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.ArrayList; +import java.util.List; + +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.request.SolrQueryRequest; +import org.apache.solr.response.SolrQueryResponse; +import org.apache.solr.update.AddUpdateCommand; + +public class RuntimeUrp extends SimpleUpdateProcessorFactory { + @Override + protected void process(AddUpdateCommand cmd, SolrQueryRequest req, SolrQueryResponse rsp) { + UpdateRequestProcessorChain processorChain = req.getCore().getUpdateProcessorChain(req.getParams()); + List names = new ArrayList<>(); + for (UpdateRequestProcessorFactory p : processorChain.getProcessors()) { + names.add(p.getClass().getSimpleName()); + } + cmd.solrDoc.addField("processors_s", StrUtils.join(names,'>')); + } +} diff --git a/solr/core/src/test/org/apache/solr/update/processor/TestNamedUpdateProcessors.java b/solr/core/src/test/org/apache/solr/update/processor/TestNamedUpdateProcessors.java new file mode 100644 index 00000000000..7af7730f1c4 --- /dev/null +++ b/solr/core/src/test/org/apache/solr/update/processor/TestNamedUpdateProcessors.java @@ -0,0 +1,188 @@ +package org.apache.solr.update.processor; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.cloud.AbstractFullDistribZkTestBase; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.util.StrUtils; +import org.apache.solr.core.TestDynamicLoading; +import org.apache.solr.core.TestSolrConfigHandler; +import org.apache.solr.handler.TestBlobHandler; +import org.apache.solr.util.RESTfulServerProvider; +import org.apache.solr.util.RestTestHarness; +import org.apache.solr.util.SimplePostTool; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestNamedUpdateProcessors extends AbstractFullDistribZkTestBase { + static final Logger log = LoggerFactory.getLogger(TestNamedUpdateProcessors.class); + private List restTestHarnesses = new ArrayList<>(); + + private void setupHarnesses() { + for (final SolrClient client : clients) { + RestTestHarness harness = new RestTestHarness(new RESTfulServerProvider() { + @Override + public String getBaseURL() { + return ((HttpSolrClient) client).getBaseURL(); + } + }); + restTestHarnesses.add(harness); + } + } + + + @Override + public void distribTearDown() throws Exception { + super.distribTearDown(); + for (RestTestHarness r : restTestHarnesses) { + r.close(); + } + } + + @Test + public void test() throws Exception { + System.setProperty("enable.runtime.lib", "true"); + setupHarnesses(); + + String blobName = "colltest"; + + HttpSolrClient randomClient = (HttpSolrClient) clients.get(random().nextInt(clients.size())); + String baseURL = randomClient.getBaseURL(); + + TestBlobHandler.createSystemCollection(new HttpSolrClient(baseURL.substring(0, baseURL.lastIndexOf('/')), randomClient.getHttpClient())); + waitForRecoveriesToFinish(".system", true); + + TestBlobHandler.postAndCheck(cloudClient, baseURL.substring(0, baseURL.lastIndexOf('/')), blobName, TestDynamicLoading.generateZip(RuntimeUrp.class), 1); + + String payload = "{\n" + + "'add-runtimelib' : { 'name' : 'colltest' ,'version':1}\n" + + "}"; + RestTestHarness client = restTestHarnesses.get(random().nextInt(restTestHarnesses.size())); + TestSolrConfigHandler.runConfigCommand(client, "/config?wt=json", payload); + TestSolrConfigHandler.testForResponseElement(client, + null, + "/config/overlay?wt=json", + null, + Arrays.asList("overlay", "runtimeLib", blobName, "version"), + 1l, 10); + + payload = "{\n" + + "'create-updateprocessor' : { 'name' : 'firstFld', 'class': 'solr.FirstFieldValueUpdateProcessorFactory', 'fieldName':'test_s'}, \n" + + "'create-updateprocessor' : { 'name' : 'test', 'class': 'org.apache.solr.update.processor.RuntimeUrp', 'runtimeLib':true }, \n" + + "'create-updateprocessor' : { 'name' : 'maxFld', 'class': 'solr.MaxFieldValueUpdateProcessorFactory', 'fieldName':'mul_s'} \n" + + "}"; + + client = restTestHarnesses.get(random().nextInt(restTestHarnesses.size())); + TestSolrConfigHandler.runConfigCommand(client, "/config?wt=json", payload); + for (RestTestHarness restTestHarness : restTestHarnesses) { + TestSolrConfigHandler.testForResponseElement(restTestHarness, + null, + "/config/overlay?wt=json", + null, + Arrays.asList("overlay", "updateProcessor", "firstFld", "fieldName"), + "test_s", 10); + } + + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", "123"); + doc.addField("test_s", Arrays.asList("one", "two")); + doc.addField("mul_s", Arrays.asList("aaa", "bbb")); + randomClient.add(doc); + randomClient.commit(true, true); + QueryResponse result = randomClient.query(new SolrQuery("id:123")); + assertEquals(2, ((Collection) result.getResults().get(0).getFieldValues("test_s")).size()); + assertEquals(2, ((Collection) result.getResults().get(0).getFieldValues("mul_s")).size()); + doc = new SolrInputDocument(); + doc.addField("id", "456"); + doc.addField("test_s", Arrays.asList("three", "four")); + doc.addField("mul_s", Arrays.asList("aaa", "bbb")); + UpdateRequest ur = new UpdateRequest(); + ur.add(doc).setParam("processor", "firstFld,maxFld,test"); + randomClient.request(ur); + randomClient.commit(true, true); + result = randomClient.query(new SolrQuery("id:456")); + SolrDocument d = result.getResults().get(0); + assertEquals(1, d.getFieldValues("test_s").size()); + assertEquals(1, d.getFieldValues("mul_s").size()); + assertEquals("three", d.getFieldValues("test_s").iterator().next()); + assertEquals("bbb", d.getFieldValues("mul_s").iterator().next()); + String processors = (String) d.getFirstValue("processors_s"); + assertNotNull(processors); + assertEquals(StrUtils.splitSmart(processors, '>'), + Arrays.asList("FirstFieldValueUpdateProcessorFactory", "MaxFieldValueUpdateProcessorFactory", "RuntimeUrp", "LogUpdateProcessorFactory", "DistributedUpdateProcessorFactory", "RunUpdateProcessorFactory")); + + + } + + public static ByteBuffer getFileContent(String f) throws IOException { + ByteBuffer jar; + try (FileInputStream fis = new FileInputStream(getFile(f))) { + byte[] buf = new byte[fis.available()]; + fis.read(buf); + jar = ByteBuffer.wrap(buf); + } + return jar; + } + + public static ByteBuffer persistZip(String loc, Class... classes) throws IOException { + ByteBuffer jar = generateZip(classes); + try (FileOutputStream fos = new FileOutputStream(loc)) { + fos.write(jar.array(), 0, jar.limit()); + fos.flush(); + } + return jar; + } + + + public static ByteBuffer generateZip(Class... classes) throws IOException { + ZipOutputStream zipOut = null; + SimplePostTool.BAOS bos = new SimplePostTool.BAOS(); + zipOut = new ZipOutputStream(bos); + zipOut.setLevel(ZipOutputStream.DEFLATED); + for (Class c : classes) { + String path = c.getName().replace('.', '/').concat(".class"); + ZipEntry entry = new ZipEntry(path); + ByteBuffer b = SimplePostTool.inputStreamToByteArray(c.getClassLoader().getResourceAsStream(path)); + zipOut.putNextEntry(entry); + zipOut.write(b.array(), 0, b.limit()); + zipOut.closeEntry(); + } + zipOut.close(); + return bos.getByteBuffer(); + } + +}