SOLR-5795: New DocExpirationUpdateProcessorFactory supports computing an expiration date for documents from the TTL expression, as well as automatically deleting expired documents on a periodic basis

git-svn-id: https://svn.apache.org/repos/asf/lucene/dev/trunk@1583734 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris M. Hostetter 2014-04-01 16:55:42 +00:00
parent ca71f2f8d0
commit fe148ceb1a
7 changed files with 1151 additions and 1 deletions

View File

@ -163,6 +163,10 @@ New Features
to read collection and shard information instead of reading data directly from ZooKeeper.
(Dave Seltzer, Varun Thacker, Vitaliy Zhovtyuk, Erick Erickson, shalin)
* SOLR-5795: New DocExpirationUpdateProcessorFactory supports computing an expiration
date for documents from the "TTL" expression, as well as automatically deleting expired
documents on a periodic basis. (hossman)
Bug Fixes
----------------------

View File

@ -0,0 +1,500 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import org.apache.solr.common.SolrException;
import static org.apache.solr.common.SolrException.ErrorCode.*;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.schema.DateField;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.util.DateMathParser;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.plugin.SolrCoreAware;
import java.text.ParseException;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.List;
import java.util.Collections;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Update Processor Factory for managing automatic "expiration" of documents.
* </p>
*
* <p>
* The <code>DocExpirationUpdateProcessorFactory</code> provides two features related
* to the "expiration" of documents which can be used individually, or in combination:
* </p>
* <ol>
* <li>Computing expiration field values for documents from a "time to live" (TTL)</li>
* <li>Periodically delete documents from the index based on an expiration field</li>
* </ol>
*
* <p>
* Documents with expiration field values computed from a TTL can be be excluded from
* searchers using simple date based filters relative to <code>NOW</code>, or completely
* removed from the index using the periodic delete function of this factory. Alternatively,
* the periodic delete function of this factory can be used to remove any document with an
* expiration value - even if that expiration was explicitly set with-out leveraging the TTL
* feature of this factory.
* </p>
*
* <p>
* The following configuration options are supported:
* </p>
*
* <ul>
* <li><code>expirationFieldName</code> - The name of the expiration field to use
* in any operations (mandatory).
* </li>
* <li><code>ttlFieldName</code> - Name of a field this process should look
* for in each document processed, defaulting to <code>_ttl_</code>.
* If the specified field name exists in a document, the document field value
* will be parsed as a {@linkplain DateMathParser Date Math Expression} relative to
* <code>NOW</code> and the result will be added to the document using the
* <code>expirationFieldName</code>. Use <code>&lt;null name="ttlFieldName"/&gt;</code>
* to disable this feature.
* </li>
* <li><code>ttlParamName</code> - Name of an update request param this process should
* look for in each request when processing document additions, defaulting to
* <code>_ttl_</code>. If the the specified param name exists in an update request,
* the param value will be parsed as a {@linkplain DateMathParser Date Math Expression}
* relative to <code>NOW</code> and the result will be used as a default for any
* document included in that request that does not already have a value in the
* field specified by <code>ttlFieldName</code>. Use
* <code>&lt;null name="ttlParamName"/&gt;</code> to disable this feature.
* </li>
* <li><code>autoDeletePeriodSeconds</code> - Optional numeric value indicating how
* often this factory should trigger a delete to remove documents. If this option is
* used, and specifies a non-negative numeric value, a background thread will be
* created that will execute recurring <code>deleteByQuery</code> commands using the
* specified period. The delete query will remove all documents with an
* <code>expirationFieldName</code> up to <code>NOW</code>.
* </li>
* <li><code>autoDeleteChainName</code> - Optional name of an
* <code>updateRequestProcessorChain</code> to use when executing automatic deletes.
* If not specified, or <code>&lt;null/&gt;</code>, the default
* <code>updateRequestProcessorChain</code> for this collection is used.
* This option is ignored unless <code>autoDeletePeriodSeconds</code> is configured
* and is non-negative.
* </li>
* </ul>
*
* <p>
* For example: The configuration below will cause any document with a field named
* <code>_ttl_</code> to have a Date field named <code>_expire_at_</code> computed
* for it when added -- but no automatic deletion will happen.
* </p>
*
* <pre class="prettyprint">
* &lt;processor class="solr.processor.DocExpirationUpdateProcessorFactory"&gt;
* &lt;str name="expirationFieldName"&gt;_expire_at_&lt;/str&gt;
* &lt;/processor&gt;</pre>
*
* <p>
* Alternatively, in this configuration deletes will occur automatically against the
* <code>_expire_at_</code> field every 5 minutes - but this processor will not
* automatically populate the <code>_expire_at_</code> using any sort of TTL expression.
* Only documents that were added with an explicit <code>_expire_at_</code> field value
* will ever be deleted.
* </p>
*
* <pre class="prettyprint">
* &lt;processor class="solr.processor.DocExpirationUpdateProcessorFactory"&gt;
* &lt;null name="ttlFieldName"/&gt;
* &lt;null name="ttlParamName"/&gt;
* &lt;int name="autoDeletePeriodSeconds"&gt;300&lt;/int&gt;
* &lt;str name="expirationFieldName"&gt;_expire_at_&lt;/str&gt;
* &lt;/processor&gt;</pre>
*
* <p>
* This last example shows the combination of both features using a custom
* <code>ttlFieldName</code>: Documents with a <code>my_ttl</code> field will
* have an <code>_expire_at_</code> field computed, and deletes will be triggered
* every 5 minutes to remove documents whose
* <code>_expire_at_</code> field value is in the past.
* </p>
*
* <pre class="prettyprint">
* &lt;processor class="solr.processor.DocExpirationUpdateProcessorFactory"&gt;
* &lt;int name="autoDeletePeriodSeconds"&gt;300&lt;/int&gt;
* &lt;str name="ttlFieldName"&gt;my_ttl&lt;/str&gt;
* &lt;null name="ttlParamName"/&gt;
* &lt;str name="expirationFieldName"&gt;_expire_at_&lt;/str&gt;
* &lt;/processor&gt;</pre>
*/
public final class DocExpirationUpdateProcessorFactory
extends UpdateRequestProcessorFactory
implements SolrCoreAware {
public final static Logger log = LoggerFactory.getLogger(DocExpirationUpdateProcessorFactory.class);
private static final String DEF_TTL_KEY = "_ttl_";
private static final String EXP_FIELD_NAME_CONF = "expirationFieldName";
private static final String TTL_FIELD_NAME_CONF = "ttlFieldName";
private static final String TTL_PARAM_NAME_CONF = "ttlParamName";
private static final String DEL_CHAIN_NAME_CONF = "autoDeleteChainName";
private static final String DEL_PERIOD_SEC_CONF = "autoDeletePeriodSeconds";
private SolrCore core;
private ScheduledThreadPoolExecutor executor;
private String expireField = null;
private String ttlField = null;
private String ttlParam = null;
private String deleteChainName = null;
private long deletePeriodSeconds = -1L;
private SolrException confErr(final String msg) {
return confErr(msg, null);
}
private SolrException confErr(final String msg, SolrException root) {
return new SolrException(SERVER_ERROR, this.getClass().getSimpleName()+": "+msg, root);
}
private String removeArgStr(final NamedList args, final String arg, final String def,
final String errMsg) {
if (args.indexOf(arg,0) < 0) return def;
Object tmp = args.remove(arg);
if (null == tmp) return null;
if (tmp instanceof String) return tmp.toString();
throw confErr(arg + " " + errMsg);
}
@SuppressWarnings("unchecked")
@Override
public void init(NamedList args) {
deleteChainName = removeArgStr(args, DEL_CHAIN_NAME_CONF, null,
"must be a <str> or <null/> for default chain");
ttlField = removeArgStr(args, TTL_FIELD_NAME_CONF, DEF_TTL_KEY,
"must be a <str> or <null/> to disable");
ttlParam = removeArgStr(args, TTL_PARAM_NAME_CONF, DEF_TTL_KEY,
"must be a <str> or <null/> to disable");
expireField = removeArgStr(args, EXP_FIELD_NAME_CONF, null, "must be a <str>");
if (null == expireField) {
throw confErr(EXP_FIELD_NAME_CONF + " must be configured");
}
Object tmp = args.remove(DEL_PERIOD_SEC_CONF);
if (null != tmp) {
if (! (tmp instanceof Number)) {
throw confErr(DEL_PERIOD_SEC_CONF + " must be an <int> or <long>");
}
deletePeriodSeconds = ((Number)tmp).longValue();
}
super.init(args);
}
@Override
public void inform(SolrCore core) {
this.core = core;
if (null == core.getLatestSchema().getFieldTypeNoEx(expireField)) {
// TODO: check for managed schema and auto-add as a date field?
throw confErr(EXP_FIELD_NAME_CONF + " does not exist in schema: " + expireField);
}
if (0 < deletePeriodSeconds) {
// validate that we have a chain we can work with
try {
Object ignored = core.getUpdateProcessingChain(deleteChainName);
} catch (SolrException e) {
throw confErr(DEL_CHAIN_NAME_CONF + " does not exist: " + deleteChainName, e);
}
// schedule recuring deletion
initDeleteExpiredDocsScheduler(core);
}
}
private void initDeleteExpiredDocsScheduler(SolrCore core) {
executor = new ScheduledThreadPoolExecutor
(1, new DefaultSolrThreadFactory("autoExpireDocs"),
new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
log.warn("Skipping execution of '{}' using '{}'", r, e);
}
});
core.addCloseHook(new CloseHook() {
public void postClose(SolrCore core) {
// update handler is gone, hard terminiate anything that's left.
if (executor.isTerminating()) {
log.info("Triggering hard shutdown of DocExpiration Executor");
executor.shutdownNow();
}
}
public void preClose(SolrCore core) {
log.info("Triggering Graceful shutdown of DocExpiration Executor");
executor.shutdown();
}
});
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
// we don't want this firing right away, since the core may not be ready
final long initialDelay = deletePeriodSeconds;
// TODO: should we make initialDelay configurable
// TODO: should we make initialDelay some fraction of the period?
executor.scheduleAtFixedRate(new DeleteExpiredDocsRunnable(this),
deletePeriodSeconds,
deletePeriodSeconds,
TimeUnit.SECONDS);
}
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
String defaultTtl = (null == ttlParam) ? null : req.getParams().get(ttlParam);
if (null == ttlField && null == defaultTtl) {
// nothing to do, shortcircut ourselves out of the chain.
return next;
} else {
return new TTLUpdateProcessor(defaultTtl, expireField, ttlField, next);
}
}
private static final class TTLUpdateProcessor extends UpdateRequestProcessor {
final String defaultTtl;
final String expireField;
final String ttlField;
public TTLUpdateProcessor(final String defaultTtl,
final String expireField,
final String ttlField,
final UpdateRequestProcessor next) {
super(next);
this.defaultTtl = defaultTtl;
this.expireField = expireField;
this.ttlField = ttlField;
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
final SolrInputDocument doc = cmd.getSolrInputDocument();
final String math = doc.containsKey(ttlField)
? doc.getFieldValue(ttlField).toString() : defaultTtl;
if (null != math) {
try {
final DateMathParser dmp = new DateMathParser();
// TODO: should we try to accept things like "1DAY" as well as "+1DAY" ?
// How?
// 'startsWith("+")' is a bad idea because it would cause porblems with
// things like "/DAY+1YEAR"
// Maybe catch ParseException and rety with "+" prepended?
doc.addField(expireField, dmp.parseMath(math));
} catch (ParseException pe) {
throw new SolrException(BAD_REQUEST, "Can't parse ttl as date math: " + math, pe);
}
}
super.processAdd(cmd);
}
}
/**
* <p>
* Runnable that uses the the <code>deleteChainName</code> configured for
* this factory to execute a delete by query (using the configured
* <code>expireField</code>) followed by a soft commit to re-open searchers (if needed)
* </p>
* <p>
* This logic is all wrapped up in a new SolrRequestInfo context with
* some logging to help make it obvious this background activity is happening.
* </p>
* <p>
* In cloud mode, this runner only triggers deletes if
* {@link #iAmInChargeOfPeriodicDeletes} is true.
* (logging is minimal in this situation)
* </p>
*
* @see #iAmInChargeOfPeriodicDeletes
*/
private static final class DeleteExpiredDocsRunnable implements Runnable {
final DocExpirationUpdateProcessorFactory factory;
final SolrCore core;
final String deleteChainName;
final String expireField;
public DeleteExpiredDocsRunnable(final DocExpirationUpdateProcessorFactory factory) {
this.factory = factory;
this.core = factory.core;
this.deleteChainName = factory.deleteChainName;
this.expireField = factory.expireField;
}
public void run() {
// setup the request context early so the logging (including any from
// shouldWeDoPeriodicDelete() ) includes the core context info
final SolrQueryRequest req = new LocalSolrQueryRequest
(factory.core, Collections.<String,String[]>emptyMap());
try {
final SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
try {
if (! factory.iAmInChargeOfPeriodicDeletes() ) {
// No-Op
return;
}
log.info("Begining periodic deletion of expired docs");
UpdateRequestProcessorChain chain = core.getUpdateProcessingChain(deleteChainName);
UpdateRequestProcessor proc = chain.createProcessor(req, rsp);
if (null == proc) {
log.warn("No active processors, skipping automatic deletion " +
"of expired docs using chain: {}", deleteChainName);
return;
}
try {
DeleteUpdateCommand del = new DeleteUpdateCommand(req);
del.setQuery("{!cache=false}" + expireField + ":[* TO " +
DateField.formatExternal(SolrRequestInfo.getRequestInfo().getNOW())
+ "]");
proc.processDelete(del);
// TODO: should this be more configurable?
// TODO: in particular: should hard commit be optional?
CommitUpdateCommand commit = new CommitUpdateCommand(req, false);
commit.softCommit = true;
commit.openSearcher = true;
proc.processCommit(commit);
} finally {
proc.finish();
}
log.info("Finished periodic deletion of expired docs");
} catch (IOException ioe) {
log.error("IOException in periodic deletion of expired docs: " +
ioe.getMessage(), ioe);
// DO NOT RETHROW: ScheduledExecutor will supress subsequent executions
} catch (RuntimeException re) {
log.error("Runtime error in periodic deletion of expired docs: " +
re.getMessage(), re);
// DO NOT RETHROW: ScheduledExecutor will supress subsequent executions
} finally {
SolrRequestInfo.clearRequestInfo();
}
} finally {
req.close();
}
}
}
/**
* <p>
* Helper method that returns true if the Runnable managed by this factory
* should be responseible of doing periodica deletes.
* </p>
* <p>
* In simple standalone instalations this method always returns true,
* but in cloud mode it will be true if and only if we are currently the leader
* of the (active) slice with the first name (lexigraphically).
* </p>
* <p>
* If this method returns false, it may have also logged a message letting the user
* know why we aren't attempting period deletion (but it will attempt to not log
* this excessively)
* </p>
*/
private boolean iAmInChargeOfPeriodicDeletes() {
ZkController zk = core.getCoreDescriptor().getCoreContainer().getZkController();
if (null == zk) return true;
// This is a lot simpler then doing our own "leader" election across all replicas
// of all shards since:
// a) we already have a per shard leader
// b) shard names must be unique
// c) ClusterState is already being "watched" by ZkController, no additional zk hits
// d) there might be multiple instances of this factory (in multiple chains) per
// collection, so picking an ephemeral node name for our election would be tricky
CloudDescriptor desc = core.getCoreDescriptor().getCloudDescriptor();
String col = desc.getCollectionName();
List<Slice> slices = new ArrayList<Slice>(zk.getClusterState().getActiveSlices(col));
Collections.sort(slices, COMPARE_SLICES_BY_NAME);
String leaderInCharge = slices.get(0).getLeader().getName();
String myCoreNodeName = desc.getCoreNodeName();
boolean inChargeOfDeletesRightNow = leaderInCharge.equals(myCoreNodeName);
if (previouslyInChargeOfDeletes && ! inChargeOfDeletesRightNow) {
// don't spam the logs constantly, just log when we know that we're not the guy
// (the first time -- or anytime we were, but no longer are)
log.info("Not currently in charge of periodic deletes for this collection, " +
"will not trigger delete or log again until this changes");
}
previouslyInChargeOfDeletes = inChargeOfDeletesRightNow;
return inChargeOfDeletesRightNow;
}
/** @see #iAmInChargeOfPeriodicDeletes */
private volatile boolean previouslyInChargeOfDeletes = true;
private static final Comparator<Slice> COMPARE_SLICES_BY_NAME = new Comparator<Slice>() {
public int compare(Slice a, Slice b) {
return a.getName().compareTo(b.getName());
}
};
}

View File

@ -0,0 +1,103 @@
<?xml version="1.0" ?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
See DocExpirationUpdateProcessorFactoryTest
and DistribDocExpirationUpdateProcessorTest
-->
<config>
<dataDir>${solr.data.dir:}</dataDir>
<directoryFactory name="DirectoryFactory"
class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>
<luceneMatchVersion>${tests.luceneMatchVersion:LUCENE_CURRENT}</luceneMatchVersion>
<xi:include href="solrconfig.snippet.randomindexconfig.xml" xmlns:xi="http://www.w3.org/2001/XInclude"/>
<updateHandler class="solr.DirectUpdateHandler2">
<updateLog>
<str name="dir">${solr.ulog.dir:}</str>
</updateLog>
</updateHandler>
<requestHandler name="/get" class="solr.RealTimeGetHandler">
<lst name="defaults">
<str name="omitHeader">true</str>
</lst>
</requestHandler>
<requestHandler name="/replication" class="solr.ReplicationHandler" startup="lazy" />
<requestHandler name="/select" class="solr.SearchHandler" default="true" />
<requestHandler name="/update" class="solr.UpdateRequestHandler" />
<requestHandler name="/admin/" class="solr.admin.AdminHandlers" />
<updateRequestProcessorChain name="convert-ttl-defaults">
<processor class="solr.processor.DocExpirationUpdateProcessorFactory">
<str name="expirationFieldName">_expire_at_tdt</str>
</processor>
</updateRequestProcessorChain>
<updateRequestProcessorChain name="convert-ttl-field">
<processor class="solr.processor.DocExpirationUpdateProcessorFactory">
<str name="ttlFieldName">_ttl_field_</str>
<null name="ttlParamName"/>
<str name="expirationFieldName">_expire_at_tdt</str>
</processor>
<processor class="solr.IgnoreFieldUpdateProcessorFactory">
<str name="fieldName">_ttl_field_</str>
</processor>
</updateRequestProcessorChain>
<updateRequestProcessorChain name="convert-ttl-param">
<processor class="solr.processor.DocExpirationUpdateProcessorFactory">
<str name="ttlParamName">_ttl_param_</str>
<null name="ttlFieldName"/>
<str name="expirationFieldName">_expire_at_tdt</str>
</processor>
</updateRequestProcessorChain>
<updateRequestProcessorChain name="convert-ttl-field-with-param-default">
<processor class="solr.processor.DocExpirationUpdateProcessorFactory">
<str name="ttlFieldName">_ttl_field_</str>
<str name="ttlParamName">_ttl_param_</str>
<str name="expirationFieldName">_expire_at_tdt</str>
</processor>
<processor class="solr.IgnoreFieldUpdateProcessorFactory">
<str name="fieldName">_ttl_field_</str>
</processor>
</updateRequestProcessorChain>
<updateRequestProcessorChain name="scheduled-delete" default="true">
<!-- NOTE: this chain is default so we can see that
autoDeleteChainName defaults to the default chain for the SolrCore
-->
<processor class="solr.processor.DocExpirationUpdateProcessorFactory">
<!-- str name="autoDeleteChainName">scheduled-delete</str -->
<int name="autoDeletePeriodSeconds">3</int>
<str name="expirationFieldName">eXpField_tdt</str>
<str name="ttlFieldName">tTl_s</str>
<null name="ttlParamName"/>
</processor>
<processor class="solr.RecordingUpdateProcessorFactory" />
<processor class="solr.LogUpdateProcessorFactory" />
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
</config>

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactory; // jdoc
import org.apache.solr.update.processor.DocExpirationUpdateProcessorFactoryTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.HashMap;
/** Test of {@link DocExpirationUpdateProcessorFactory} in a cloud setup */
@Slow // Has to do some sleeping to wait for a future expiration
public class DistribDocExpirationUpdateProcessorTest extends AbstractFullDistribZkTestBase {
public static Logger log = LoggerFactory.getLogger(DistribDocExpirationUpdateProcessorTest.class);
public DistribDocExpirationUpdateProcessorTest() {
configString = DocExpirationUpdateProcessorFactoryTest.CONFIG_XML;
schemaString = DocExpirationUpdateProcessorFactoryTest.SCHEMA_XML;
}
@Override
protected String getCloudSolrConfig() {
return configString;
}
@Override
public void doTest() throws Exception {
assertTrue("only one shard?!?!?!", 1 < shardToJetty.keySet().size());
log.info("number of shards: {}", shardToJetty.keySet().size());
handle.clear();
handle.put("maxScore", SKIPVAL);
handle.put("QTime", SKIPVAL);
handle.put("timestamp", SKIPVAL);
// some docs with no expiration
for (int i = 1; i <= 100; i++) {
indexDoc(sdoc("id", i));
}
commit();
waitForThingsToLevelOut(30);
// this doc better not already exist
waitForNoResults(0, params("q","id:999","rows","0","_trace","sanity_check"));
// record the indexversion for each server so we can check later
// that it only changes for one shard
final Map<String,Long> initIndexVersions = getIndexVersionOfAllReplicas();
assertTrue("WTF? no versions?", 0 < initIndexVersions.size());
// add a doc with a short TTL
indexDoc(sdoc("id", "999", "tTl_s","+30SECONDS"));
commit();
// wait for one doc to be deleted
waitForNoResults(180, params("q","id:999","rows","0","_trace","did_it_expire_yet"));
// verify only one shard changed
waitForThingsToLevelOut(30);
final Map<String,Long> finalIndexVersions = getIndexVersionOfAllReplicas();
assertEquals("WTF? not same num versions?",
initIndexVersions.size(),
finalIndexVersions.size());
final Set<String> nodesThatChange = new HashSet<String>();
final Set<String> shardsThatChange = new HashSet<String>();
int coresCompared = 0;
for (String shard : shardToJetty.keySet()) {
for (CloudJettyRunner replicaRunner : shardToJetty.get(shard)) {
coresCompared++;
String core = replicaRunner.coreNodeName;
Long initVersion = initIndexVersions.get(core);
Long finalVersion = finalIndexVersions.get(core);
assertNotNull(shard + ": no init version for core: " + core, initVersion);
assertNotNull(shard + ": no final version for core: " + core, finalVersion);
if (!initVersion.equals(finalVersion)) {
nodesThatChange.add(core + "("+shard+")");
shardsThatChange.add(shard);
}
}
}
assertEquals("Exactly one shard should have changed, instead: " + shardsThatChange
+ " nodes=(" + nodesThatChange + ")",
1, shardsThatChange.size());
assertEquals("somehow we missed some cores?",
initIndexVersions.size(), coresCompared);
// TODO: above logic verifies that deleteByQuery happens on all nodes, and ...
// doesn't affect searcher re-open on shards w/o expired docs ... can we also verify
// that *only* one node is sending the deletes ?
// (ie: no flood of redundent deletes?)
}
/**
* returns a map whose key is the coreNodeName and whose value is what the replication
* handler returns for the indexversion
*/
private Map<String,Long> getIndexVersionOfAllReplicas() throws IOException, SolrServerException {
Map<String,Long> results = new HashMap<String,Long>();
for (List<CloudJettyRunner> listOfReplicas : shardToJetty.values()) {
for (CloudJettyRunner replicaRunner : listOfReplicas) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("command","indexversion");
params.set("_trace","getIndexVersion");
params.set("qt","/replication");
QueryRequest req = new QueryRequest(params);
NamedList<Object> res = replicaRunner.client.solrClient.request(req);
assertNotNull("null response from server: " + replicaRunner.coreNodeName, res);
Object version = res.get("indexversion");
assertNotNull("null version from server: " + replicaRunner.coreNodeName, version);
assertTrue("version isn't a long: "+replicaRunner.coreNodeName,
version instanceof Long);
results.put(replicaRunner.coreNodeName, (Long)version);
long numDocs = replicaRunner.client.solrClient.query
(params("q","*:*","distrib","false","rows","0","_trace","counting_docs"))
.getResults().getNumFound();
log.info("core=" + replicaRunner.coreNodeName + "; ver=" + version +
"; numDocs=" + numDocs);
}
}
return results;
}
/**
* Executes a query over and over against the cloudClient every 5 seconds
* until the numFound is 0 or the maxTimeLimitSeconds is exceeded.
* Query is garunteed to be executed at least once.
*/
private void waitForNoResults(int maxTimeLimitSeconds,
SolrParams params)
throws SolrServerException, InterruptedException {
final long giveUpAfter = System.currentTimeMillis() + (1000L * maxTimeLimitSeconds);
long numFound = cloudClient.query(params).getResults().getNumFound();
while (0L < numFound && System.currentTimeMillis() < giveUpAfter) {
Thread.sleep(Math.min(5000, giveUpAfter - System.currentTimeMillis()));
numFound = cloudClient.query(params).getResults().getNumFound();
}
assertEquals("Give up waiting for no results: " + params,
0L, numFound);
}
}

View File

@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.update.processor.UpdateRequestProcessor;
import org.apache.solr.update.processor.UpdateRequestProcessorChain;
import org.apache.solr.update.processor.UpdateRequestProcessorFactory;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.junit.BeforeClass;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* Tests various configurations of DocExpirationUpdateProcessorFactory
*/
public class DocExpirationUpdateProcessorFactoryTest extends UpdateProcessorTestBase {
public static final String CONFIG_XML = "solrconfig-doc-expire-update-processor.xml";
public static final String SCHEMA_XML = "schema15.xml";
@BeforeClass
public static void beforeClass() throws Exception {
initCore(CONFIG_XML, SCHEMA_XML);
}
public void testTTLDefaultsConversion() throws Exception {
SolrInputDocument d = null;
d = processAdd("convert-ttl-defaults",
params("NOW","1394059630042"),
doc(f("id", "1111"),
f("_ttl_","+5MINUTES")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd("convert-ttl-defaults",
params("NOW","1394059630042",
"_ttl_","+5MINUTES"),
doc(f("id", "1111")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
}
public void testTTLFieldConversion() throws Exception {
final String chain = "convert-ttl-field";
SolrInputDocument d = null;
d = processAdd(chain,
params("NOW","1394059630042"),
doc(f("id", "1111"),
f("_ttl_field_","+5MINUTES")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd(chain,
params("NOW","1394059630042"),
doc(f("id", "2222"),
f("_ttl_field_","+27MINUTES")));
assertNotNull(d);
assertEquals(new Date(1394061250042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd(chain,
params("NOW","1394059630042"),
doc(f("id", "3333"),
f("_ttl_field_","+1YEAR")));
assertNotNull(d);
assertEquals(new Date(1425595630042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd(chain,
params("NOW","1394059630042"),
doc(f("id", "1111"),
f("_ttl_field_","/DAY+1YEAR")));
assertNotNull(d);
assertEquals(new Date(1425513600000L), d.getFieldValue("_expire_at_tdt"));
// default ttlParamName is disabled, this should not convert...
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_","+5MINUTES"),
doc(f("id", "1111")));
assertNotNull(d);
assertNull(d.getFieldValue("_expire_at_tdt"));
}
public void testTTLParamConversion() throws Exception {
final String chain = "convert-ttl-param";
SolrInputDocument d = null;
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_param_","+5MINUTES"),
doc(f("id", "1111")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_param_","+27MINUTES"),
doc(f("id", "2222")));
assertNotNull(d);
assertEquals(new Date(1394061250042L), d.getFieldValue("_expire_at_tdt"));
// default ttlFieldName is disabled, param should be used...
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_param_","+5MINUTES"),
doc(f("id", "1111"),
f("_ttl_field_","+999MINUTES")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
// default ttlFieldName is disabled, this should not convert...
d = processAdd(chain,
params("NOW","1394059630042"),
doc(f("id", "1111"),
f("_ttl_","/DAY+1YEAR")));
assertNotNull(d);
assertNull(d.getFieldValue("_expire_at_tdt"));
}
public void testTTLFieldConversionWithDefaultParam() throws Exception {
final String chain = "convert-ttl-field-with-param-default";
SolrInputDocument d = null;
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_param_","+999MINUTES"),
doc(f("id", "1111"),
f("_ttl_field_","+5MINUTES")));
assertNotNull(d);
assertEquals(new Date(1394059930042L), d.getFieldValue("_expire_at_tdt"));
d = processAdd(chain,
params("NOW","1394059630042",
"_ttl_param_","+27MINUTES"),
doc(f("id", "2222")));
assertNotNull(d);
assertEquals(new Date(1394061250042L), d.getFieldValue("_expire_at_tdt"));
}
public void testAutomaticDeletes() throws Exception {
// get a handle on our recorder
UpdateRequestProcessorChain chain =
h.getCore().getUpdateProcessingChain("scheduled-delete");
assertNotNull(chain);
UpdateRequestProcessorFactory[] factories = chain.getFactories();
assertEquals("did number of processors configured in chain get changed?",
5, factories.length);
assertTrue("Expected [1] RecordingUpdateProcessorFactory: " + factories[1].getClass(),
factories[1] instanceof RecordingUpdateProcessorFactory);
RecordingUpdateProcessorFactory recorder =
(RecordingUpdateProcessorFactory) factories[1];
// now start recording, and monitor for the expected commands
try {
recorder.startRecording();
// more then one iter to verify it's recurring
final int numItersToCheck = 1 + RANDOM_MULTIPLIER;
for (int i = 0; i < numItersToCheck; i++) {
UpdateCommand tmp;
// be generous in how long we wait, some jenkins machines are slooooow
tmp = recorder.commandQueue.poll(30, TimeUnit.SECONDS);
// we can be confident in the order because DocExpirationUpdateProcessorFactory
// uses the same request for both the delete & the commit -- and both
// RecordingUpdateProcessorFactory's getInstance & startRecording methods are
// synchronized. So it should not be possible to start recording in the
// middle of the two commands
assertTrue("expected DeleteUpdateCommand: " + tmp.getClass(),
tmp instanceof DeleteUpdateCommand);
DeleteUpdateCommand delete = (DeleteUpdateCommand) tmp;
assertFalse(delete.isDeleteById());
assertNotNull(delete.getQuery());
assertTrue(delete.getQuery(),
delete.getQuery().startsWith("{!cache=false}eXpField_tdt:[* TO "));
// commit should be immediately after the delete
tmp = recorder.commandQueue.poll(5, TimeUnit.SECONDS);
assertTrue("expected CommitUpdateCommand: " + tmp.getClass(),
tmp instanceof CommitUpdateCommand);
CommitUpdateCommand commit = (CommitUpdateCommand) tmp;
assertTrue(commit.softCommit);
assertTrue(commit.openSearcher);
}
} finally {
recorder.stopRecording();
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update.processor;
import java.io.IOException;
import org.apache.solr.common.SolrException;
import static org.apache.solr.common.SolrException.ErrorCode.*;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.UpdateCommand;
import org.apache.solr.update.AddUpdateCommand;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.DeleteUpdateCommand;
import org.apache.solr.update.MergeIndexesCommand;
import org.apache.solr.update.RollbackUpdateCommand;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* This Factory can optionally save refrences to the commands it receives in
* BlockingQueues that tests can poll from to observe that the exepected commands
* are executed. By default, this factory does nothing except return the "next"
* processor from the chain unless it's told to {@link #startRecording()}
*/
public final class RecordingUpdateProcessorFactory
extends UpdateRequestProcessorFactory {
private boolean recording = false;
/** The queue containing commands that were recorded
* @see #startRecording
*/
public final BlockingQueue<UpdateCommand> commandQueue
= new LinkedBlockingQueue<UpdateCommand>();
/**
* @see #stopRecording
* @see #commandQueue
*/
public synchronized void startRecording() {
recording = true;
}
/** @see #startRecording */
public synchronized void stopRecording() {
recording = false;
}
@Override
public synchronized UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse rsp,
UpdateRequestProcessor next ) {
return recording ? new RecordingUpdateRequestProcessor(commandQueue, next) : next;
}
private static final class RecordingUpdateRequestProcessor
extends UpdateRequestProcessor {
private final BlockingQueue<UpdateCommand> commandQueue;
public RecordingUpdateRequestProcessor(BlockingQueue<UpdateCommand> commandQueue,
UpdateRequestProcessor next) {
super(next);
this.commandQueue = commandQueue;
}
private void record(UpdateCommand cmd) {
if (! commandQueue.offer(cmd) ) {
throw new RuntimeException
("WTF: commandQueue should be unbounded but offer failed: " + cmd.toString());
}
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
record(cmd);
super.processAdd(cmd);
}
@Override
public void processDelete(DeleteUpdateCommand cmd) throws IOException {
record(cmd);
super.processDelete(cmd);
}
@Override
public void processMergeIndexes(MergeIndexesCommand cmd) throws IOException {
record(cmd);
super.processMergeIndexes(cmd);
}
@Override
public void processCommit(CommitUpdateCommand cmd) throws IOException {
record(cmd);
super.processCommit(cmd);
}
@Override
public void processRollback(RollbackUpdateCommand cmd) throws IOException {
record(cmd);
super.processRollback(cmd);
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@ -65,14 +66,19 @@ public class UpdateProcessorTestBase extends SolrTestCaseJ4 {
SolrQueryRequest req = new LocalSolrQueryRequest(core, requestParams);
try {
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
AddUpdateCommand cmd = new AddUpdateCommand(req);
cmd.solrDoc = docIn;
UpdateRequestProcessor processor = pc.createProcessor(req, rsp);
processor.processAdd(cmd);
if (null != processor) {
// test chain might be empty or short circuted.
processor.processAdd(cmd);
}
return cmd.solrDoc;
} finally {
SolrRequestInfo.clearRequestInfo();
req.close();
}
}