add support for "keep alive" time for a search request, and timeout context that have not been accessed

This commit is contained in:
kimchy 2010-02-18 23:21:08 +02:00
parent f7d152821d
commit 82e69691b1
33 changed files with 2555 additions and 35 deletions

View File

@ -183,8 +183,9 @@ public class SearchRequest implements ActionRequest {
return timeout;
}
public void timeout(TimeValue timeout) {
public SearchRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**

View File

@ -50,7 +50,10 @@ import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
import org.elasticsearch.server.internal.InternalSettingsPerparer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.Tuple;
@ -123,6 +126,7 @@ public class TransportClient implements Client {
modules.add(new EnvironmentModule(environment));
modules.add(new SettingsModule(settings));
modules.add(new ClusterNameModule(settings));
modules.add(new TimerModule());
modules.add(new ThreadPoolModule(settings));
modules.add(new TransportModule(settings));
modules.add(new ClientTransportActionModule());
@ -196,6 +200,9 @@ public class TransportClient implements Client {
}
injector.getInstance(TransportClientNodesService.class).close();
injector.getInstance(TransportService.class).close();
injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown();
}
@Override public AdminClient admin() {

View File

@ -22,6 +22,8 @@ package org.elasticsearch.http.netty;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
@ -31,6 +33,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.util.SizeValue.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (Shay Banon)
*/
@ -131,6 +136,14 @@ public class NettyHttpRequest implements HttpRequest {
return sValue.equals("true") || sValue.equals("1");
}
@Override public TimeValue paramAsTime(String key, TimeValue defaultValue) {
return parseTimeValue(param(key), defaultValue);
}
@Override public SizeValue paramAsSize(String key, SizeValue defaultValue) {
return parseSizeValue(param(key), defaultValue);
}
@Override public String param(String key) {
List<String> keyParams = params(key);
if (keyParams == null || keyParams.isEmpty()) {

View File

@ -19,6 +19,8 @@
package org.elasticsearch.rest;
import org.elasticsearch.util.SizeValue;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.ToJson;
import java.util.List;
@ -58,6 +60,10 @@ public interface RestRequest extends ToJson.Params {
boolean paramAsBoolean(String key, boolean defaultValue);
TimeValue paramAsTime(String key, TimeValue defaultValue);
SizeValue paramAsSize(String key, SizeValue defaultValue);
List<String> params(String key);
Map<String, List<String>> params();

View File

@ -29,7 +29,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -50,7 +49,7 @@ public class RestReplicationPingAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
ReplicationPingRequest replicationPingRequest = new ReplicationPingRequest(RestActions.splitIndices(request.param("index")));
replicationPingRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardReplicationPingRequest.DEFAULT_TIMEOUT));
replicationPingRequest.timeout(request.paramAsTime("timeout", ShardReplicationPingRequest.DEFAULT_TIMEOUT));
replicationPingRequest.listenerThreaded(false);
client.admin().cluster().execPing(replicationPingRequest, new ActionListener<ReplicationPingResponse>() {
@Override public void onResponse(ReplicationPingResponse result) {

View File

@ -29,7 +29,6 @@ import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.Strings;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
@ -67,7 +66,7 @@ public class RestCreateIndexAction extends BaseRestHandler {
}
}
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index"), indexSettings);
createIndexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10)));
createIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().execCreate(createIndexRequest, new ActionListener<CreateIndexResponse>() {
@Override public void onResponse(CreateIndexResponse result) {
try {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -50,7 +49,7 @@ public class RestDeleteIndexAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(request.param("index"));
deleteIndexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10)));
deleteIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().execDelete(deleteIndexRequest, new ActionListener<DeleteIndexResponse>() {
@Override public void onResponse(DeleteIndexResponse result) {
try {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -51,7 +50,7 @@ public class RestGatewaySnapshotAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
GatewaySnapshotRequest gatewaySnapshotRequest = new GatewaySnapshotRequest(RestActions.splitIndices(request.param("index")));
gatewaySnapshotRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_TIMEOUT));
gatewaySnapshotRequest.timeout(request.paramAsTime("timeout", DEFAULT_TIMEOUT));
gatewaySnapshotRequest.listenerThreaded(false);
client.admin().indices().execGatewaySnapshot(gatewaySnapshotRequest, new ActionListener<GatewaySnapshotResponse>() {
@Override public void onResponse(GatewaySnapshotResponse result) {

View File

@ -28,7 +28,6 @@ import org.elasticsearch.index.mapper.InvalidTypeNameException;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -56,7 +55,7 @@ public class RestCreateMappingAction extends BaseRestHandler {
CreateMappingRequest createMappingRequest = createMappingRequest(splitIndices(request.param("index")));
createMappingRequest.type(request.param("type"));
createMappingRequest.mappingSource(request.contentAsString());
createMappingRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10)));
createMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10)));
client.admin().indices().execCreateMapping(createMappingRequest, new ActionListener<CreateMappingResponse>() {
@Override public void onResponse(CreateMappingResponse result) {
try {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -47,7 +46,7 @@ public class RestDeleteAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id"));
deleteRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DeleteRequest.DEFAULT_TIMEOUT));
deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT));
// we just send a response, no need to fork
deleteRequest.listenerThreaded(false);
// we don't spawn, then fork if local

View File

@ -29,7 +29,6 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -61,7 +60,7 @@ public class RestDeleteByQueryAction extends BaseRestHandler {
if (typesParam != null) {
deleteByQueryRequest.types(RestActions.splitTypes(typesParam));
}
deleteByQueryRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardDeleteByQueryRequest.DEFAULT_TIMEOUT));
deleteByQueryRequest.timeout(request.paramAsTime("timeout", ShardDeleteByQueryRequest.DEFAULT_TIMEOUT));
} catch (Exception e) {
try {
channel.sendResponse(new JsonRestResponse(request, PRECONDITION_FAILED, JsonBuilder.jsonBuilder().startObject().field("error", e.getMessage()).endObject()));

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -48,7 +47,7 @@ public class RestIndexAction extends BaseRestHandler {
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"), request.contentAsString());
indexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), IndexRequest.DEFAULT_TIMEOUT));
indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
String sOpType = request.param("opType");
if (sOpType != null) {
if ("index".equals(sOpType)) {

View File

@ -34,7 +34,6 @@ import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.rest.action.support.RestJsonBuilder;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.json.JsonBuilder;
import org.elasticsearch.util.settings.Settings;
@ -44,6 +43,7 @@ import java.util.regex.Pattern;
import static org.elasticsearch.rest.RestRequest.Method.*;
import static org.elasticsearch.rest.RestResponse.Status.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (Shay Banon)
@ -162,12 +162,12 @@ public class RestSearchAction extends BaseRestHandler {
String scroll = request.param("scroll");
if (scroll != null) {
searchRequest.scroll(new Scroll(TimeValue.parseTimeValue(scroll, null)));
searchRequest.scroll(new Scroll(parseTimeValue(scroll, null)));
}
String timeout = request.param("timeout");
if (timeout != null) {
searchRequest.timeout(TimeValue.parseTimeValue(timeout, null));
searchRequest.timeout(parseTimeValue(timeout, null));
}
String typesParam = request.param("type");

View File

@ -33,18 +33,18 @@ import static org.elasticsearch.util.TimeValue.*;
*/
public class Scroll implements Streamable {
private TimeValue timeout;
private TimeValue keepAlive;
private Scroll() {
}
public Scroll(TimeValue timeout) {
this.timeout = timeout;
public Scroll(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}
public TimeValue timeout() {
return timeout;
public TimeValue keepAlive() {
return keepAlive;
}
public static Scroll readScroll(DataInput in) throws IOException, ClassNotFoundException {
@ -54,10 +54,17 @@ public class Scroll implements Streamable {
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
timeout = readTimeValue(in);
if (in.readBoolean()) {
keepAlive = readTimeValue(in);
}
}
@Override public void writeTo(DataOutput out) throws IOException {
timeout.writeTo(out);
if (keepAlive == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
keepAlive.writeTo(out);
}
}
}

View File

@ -29,7 +29,7 @@ public class SearchContextMissingException extends ElasticSearchException {
private final long id;
public SearchContextMissingException(long id) {
super("No search context found for id [" + id + "]");
super("No search context found for id [" + id + "], timed out");
this.id = id;
}

View File

@ -44,6 +44,8 @@ import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.query.QueryPhase;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.component.LifecycleComponent;
@ -51,13 +53,18 @@ import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong;
import org.elasticsearch.util.io.FastStringReader;
import org.elasticsearch.util.json.Jackson;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.timer.Timeout;
import org.elasticsearch.util.timer.TimerTask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.util.TimeValue.*;
/**
* @author kimchy (Shay Banon)
*/
@ -71,27 +78,36 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
private final IndicesService indicesService;
private final TimerService timerService;
private final DfsPhase dfsPhase;
private final QueryPhase queryPhase;
private final FetchPhase fetchPhase;
private final TimeValue defaultKeepAlive;
private final AtomicLong idGenerator = new AtomicLong();
private final NonBlockingHashMapLong<SearchContext> activeContexts = new NonBlockingHashMapLong<SearchContext>();
private final ImmutableMap<String, SearchParseElement> elementParsers;
@Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService,
@Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, TimerService timerService,
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) {
super(settings);
this.clusterService = clusterService;
this.indicesService = indicesService;
this.timerService = timerService;
this.dfsPhase = dfsPhase;
this.queryPhase = queryPhase;
this.fetchPhase = fetchPhase;
this.defaultKeepAlive = componentSettings.getAsTime("defaultKeepAlive", timeValueMinutes(2));
Map<String, SearchParseElement> elementParsers = new HashMap<String, SearchParseElement>();
elementParsers.putAll(dfsPhase.parseElements());
elementParsers.putAll(queryPhase.parseElements());
@ -156,7 +172,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
try {
context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity()));
} catch (IOException e) {
throw new SearchException("Failed to set aggreagted df", e);
throw new SearchException("Failed to set aggregated df", e);
}
queryPhase.execute(context);
return context.queryResult();
@ -216,6 +232,8 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
if (context == null) {
throw new SearchContextMissingException(id);
}
// update the last access time of the context
context.accessed(timerService.estimatedTimeInMillis());
return context;
}
@ -245,6 +263,15 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
context.size(10);
}
// compute the context keep alive
TimeValue keepAlive = defaultKeepAlive;
if (request.scroll() != null && request.scroll().keepAlive() != null) {
keepAlive = request.scroll().keepAlive();
}
context.keepAlive(keepAlive);
context.accessed(timerService.estimatedTimeInMillis());
context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive));
return context;
}
@ -310,5 +337,33 @@ public class SearchService extends AbstractComponent implements LifecycleCompone
// process scroll
context.from(context.from() + context.size());
context.scroll(request.scroll());
// update the context keep alive based on the new scroll value
if (request.scroll() != null && request.scroll().keepAlive() != null) {
context.keepAlive(request.scroll().keepAlive());
}
}
private class KeepAliveTimerTask implements TimerTask {
private final SearchContext context;
private KeepAliveTimerTask(SearchContext context) {
this.context = context;
}
@Override public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
long currentTime = timerService.estimatedTimeInMillis();
long nextDelay = context.keepAlive().millis() - (currentTime - context.lastAccessTime());
if (nextDelay <= 0) {
// Time out, free the context (and remove it from the active context)
freeContext(context.id());
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS));
}
}
}
}

View File

@ -119,8 +119,9 @@ public class InternalSearchRequest implements Streamable {
return timeout;
}
public void timeout(TimeValue timeout) {
public InternalSearchRequest timeout(TimeValue timeout) {
this.timeout = timeout;
return this;
}
/**

View File

@ -37,6 +37,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.lease.Releasable;
import org.elasticsearch.util.timer.Timeout;
import java.io.IOException;
@ -91,6 +92,12 @@ public class SearchContext implements Releasable {
private boolean queryRewritten;
private volatile TimeValue keepAlive;
private volatile long lastAccessTime;
private volatile Timeout keepAliveTimeout;
public SearchContext(long id, SearchShardTarget shardTarget, TimeValue timeout, float queryBoost, String source,
String[] types, Engine.Searcher engineSearcher, IndexService indexService) {
this.id = id;
@ -114,6 +121,9 @@ public class SearchContext implements Releasable {
// ignore this exception
}
engineSearcher.release();
if (!keepAliveTimeout.isCancelled()) {
keepAliveTimeout.cancel();
}
return true;
}
@ -275,6 +285,26 @@ public class SearchContext implements Releasable {
return this;
}
public void accessed(long accessTime) {
this.lastAccessTime = accessTime;
}
public long lastAccessTime() {
return this.lastAccessTime;
}
public TimeValue keepAlive() {
return this.keepAlive;
}
public void keepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}
public void keepAliveTimeout(Timeout keepAliveTimeout) {
this.keepAliveTimeout = keepAliveTimeout;
}
public DfsSearchResult dfsResult() {
return dfsResult;
}

View File

@ -53,6 +53,8 @@ import org.elasticsearch.search.SearchService;
import org.elasticsearch.server.Server;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.timer.TimerModule;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.Tuple;
@ -103,6 +105,7 @@ public final class InternalServer implements Server {
modules.add(new ClusterNameModule(settings));
modules.add(new SettingsModule(settings));
modules.add(new ThreadPoolModule(settings));
modules.add(new TimerModule());
modules.add(new DiscoveryModule(settings));
modules.add(new ClusterModule(settings));
modules.add(new RestModule(settings));
@ -221,6 +224,7 @@ public final class InternalServer implements Server {
injector.getInstance(RestController.class).close();
injector.getInstance(TransportService.class).close();
injector.getInstance(TimerService.class).close();
injector.getInstance(ThreadPool.class).shutdown();
try {
injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS);

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.timer;
import com.google.inject.AbstractModule;
/**
* @author kimchy (Shay Banon)
*/
public class TimerModule extends AbstractModule {
@Override protected void configure() {
bind(TimerService.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,94 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.timer;
import com.google.inject.Inject;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.timer.HashedWheelTimer;
import org.elasticsearch.util.timer.Timeout;
import org.elasticsearch.util.timer.Timer;
import org.elasticsearch.util.timer.TimerTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.concurrent.DynamicExecutors.*;
/**
* @author kimchy (Shay Banon)
*/
public class TimerService extends AbstractComponent {
private final ThreadPool threadPool;
private final TimeEstimator timeEstimator;
private final ScheduledFuture timeEstimatorFuture;
private final Timer timer;
private final TimeValue tickDuration;
@Inject public TimerService(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.timeEstimator = new TimeEstimator();
this.timeEstimatorFuture = threadPool.scheduleWithFixedDelay(timeEstimator, 50, 50, TimeUnit.MILLISECONDS);
this.tickDuration = componentSettings.getAsTime("tickDuration", timeValueMillis(100));
this.timer = new HashedWheelTimer(logger, daemonThreadFactory(settings, "timer"), tickDuration.millis(), TimeUnit.MILLISECONDS);
}
public void close() {
timeEstimatorFuture.cancel(true);
timer.stop();
}
public long estimatedTimeInMillis() {
return timeEstimator.time();
}
public Timeout newTimeout(TimerTask task, TimeValue delay) {
return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS);
}
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return timer.newTimeout(task, delay, unit);
}
private static class TimeEstimator implements Runnable {
private long time = System.currentTimeMillis();
@Override public void run() {
this.time = System.currentTimeMillis();
}
public long time() {
return this.time;
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util;
import java.util.Iterator;
/**
* @author kimchy (Shay Banon)
*/
public interface ReusableIterator<E> extends Iterator<E> {
void rewind();
}

View File

@ -96,7 +96,7 @@ public class SizeValue implements Serializable, Streamable {
return Strings.format1Decimals(value, suffix);
}
public static SizeValue parse(String sValue, SizeValue defaultValue) throws ElasticSearchParseException {
public static SizeValue parseSizeValue(String sValue, SizeValue defaultValue) throws ElasticSearchParseException {
if (sValue == null) {
return defaultValue;
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util;
/**
* Overrides the thread name proposed by {@link ThreadRenamingRunnable}.
*
* @author kimchy (shay.banon)
*/
public interface ThreadNameDeterminer {
/**
* {@link ThreadNameDeterminer} that accepts the proposed thread name
* as is.
*/
ThreadNameDeterminer PROPOSED = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return proposedThreadName;
}
};
/**
* {@link ThreadNameDeterminer} that rejects the proposed thread name and
* retains the current one.
*/
ThreadNameDeterminer CURRENT = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return null;
}
};
/**
* Overrides the thread name proposed by {@link ThreadRenamingRunnable}.
*
* @param currentThreadName the current thread name
* @param proposedThreadName the proposed new thread name
* @return the actual new thread name.
* If {@code null} is returned, the proposed thread name is
* discarded (i.e. no rename).
*/
String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception;
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util;
import org.elasticsearch.util.logging.Loggers;
import org.slf4j.Logger;
/**
* A {@link Runnable} that changes the current thread name and reverts it back
* when its execution ends. To change the default thread names set by Netty,
* use {@link #setThreadNameDeterminer(ThreadNameDeterminer)}.
*
* @author kimchy (shay.banon)
*/
public class ThreadRenamingRunnable implements Runnable {
private static final Logger logger = Loggers.getLogger(ThreadRenamingRunnable.class);
private static volatile ThreadNameDeterminer threadNameDeterminer =
ThreadNameDeterminer.PROPOSED;
/**
* Returns the {@link ThreadNameDeterminer} which overrides the proposed
* new thread name.
*/
public static ThreadNameDeterminer getThreadNameDeterminer() {
return threadNameDeterminer;
}
/**
* Sets the {@link ThreadNameDeterminer} which overrides the proposed new
* thread name. Please note that the specified {@link ThreadNameDeterminer}
* affects only new {@link ThreadRenamingRunnable}s; the existing instances
* are not affected at all. Therefore, you should make sure to call this
* method at the earliest possible point (i.e. before any Netty worker
* thread starts) for consistent thread naming. Otherwise, you might see
* the default thread names and the new names appear at the same time in
* the full thread dump.
*/
public static void setThreadNameDeterminer(ThreadNameDeterminer threadNameDeterminer) {
if (threadNameDeterminer == null) {
throw new NullPointerException("threadNameDeterminer");
}
ThreadRenamingRunnable.threadNameDeterminer = threadNameDeterminer;
}
private final Runnable runnable;
private final String proposedThreadName;
/**
* Creates a new instance which wraps the specified {@code runnable}
* and changes the thread name to the specified thread name when the
* specified {@code runnable} is running.
*/
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
if (proposedThreadName == null) {
throw new NullPointerException("proposedThreadName");
}
this.runnable = runnable;
this.proposedThreadName = proposedThreadName;
}
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName();
final String newThreadName = getNewThreadName(oldThreadName);
// Change the thread name before starting the actual runnable.
boolean renamed = false;
if (!oldThreadName.equals(newThreadName)) {
try {
currentThread.setName(newThreadName);
renamed = true;
} catch (SecurityException e) {
logger.debug("Failed to rename a thread due to security restriction.", e);
}
}
// Run the actual runnable and revert the name back when it ends.
try {
runnable.run();
} finally {
if (renamed) {
// Revert the name back if the current thread was renamed.
// We do not check the exception here because we know it works.
currentThread.setName(oldThreadName);
}
}
}
private String getNewThreadName(String currentThreadName) {
String newThreadName = null;
try {
newThreadName = getThreadNameDeterminer().determineThreadName(currentThreadName, proposedThreadName);
} catch (Throwable t) {
logger.warn("Failed to determine the thread name", t);
}
return newThreadName == null ? currentThreadName : newThreadName;
}
}

View File

@ -32,6 +32,8 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import static com.google.common.collect.Lists.*;
import static org.elasticsearch.util.SizeValue.*;
import static org.elasticsearch.util.TimeValue.*;
/**
* An immutable implementation of {@link Settings}.
@ -162,11 +164,11 @@ public class ImmutableSettings implements Settings {
}
@Override public TimeValue getAsTime(String setting, TimeValue defaultValue) {
return TimeValue.parseTimeValue(get(setting), defaultValue);
return parseTimeValue(get(setting), defaultValue);
}
@Override public SizeValue getAsSize(String setting, SizeValue defaultValue) throws SettingsException {
return SizeValue.parse(get(setting), defaultValue);
return parseSizeValue(get(setting), defaultValue);
}
@SuppressWarnings({"unchecked"})

View File

@ -162,7 +162,7 @@ public interface Settings {
* @param defaultValue The value to return if no value is associated with the setting
* @return The (size) value, or the default value if no value exists.
* @throws SettingsException Failure to parse the setting
* @see SizeValue#parse(String, SizeValue)
* @see SizeValue#parseSizeValue(String, SizeValue)
*/
SizeValue getAsSize(String setting, SizeValue defaultValue) throws SettingsException;

View File

@ -0,0 +1,489 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.timer;
import org.elasticsearch.util.MapBackedSet;
import org.elasticsearch.util.ReusableIterator;
import org.elasticsearch.util.ThreadRenamingRunnable;
import org.elasticsearch.util.concurrent.ConcurrentIdentityHashMap;
import org.slf4j.Logger;
import java.util.*;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A {@link Timer} optimized for approximated I/O timeout scheduling.
*
* <h3>Tick Duration</h3>
*
* As described with 'approximated', this timer does not execute the scheduled
* {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will
* check if there are any {@link TimerTask}s behind the schedule and execute
* them.
* <p>
* You can increase or decrease the accuracy of the execution timing by
* specifying smaller or larger tick duration in the constructor. In most
* network applications, I/O timeout does not need to be accurate. Therefore,
* the default tick duration is 100 milliseconds and you will not need to try
* different configurations in most cases.
*
* <h3>Ticks per Wheel (Wheel Size)</h3>
*
* {@link HashedWheelTimer} maintains a data structure called 'wheel'.
* To put simply, a wheel is a hash table of {@link TimerTask}s whose hash
* function is 'dead line of the task'. The default number of ticks per wheel
* (i.e. the size of the wheel) is 512. You could specify a larger value
* if you are going to schedule a lot of timeouts.
*
* <h3>Implementation Details</h3>
*
* {@link HashedWheelTimer} is based on
* <a href="http://cseweb.ucsd.edu/users/varghese/>George Varghese</a> and
* Tony Lauck's paper,
* <a href="http://www-cse.ucsd.edu/users/varghese/PAPERS/twheel.ps.Z">'Hashed
* and Hierarchical Timing Wheels: data structures to efficiently implement a
* timer facility'</a>. More comprehensive slides are located
* <a href="http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt">here</a>.
*
* @author kimchy (shay.banon)
*/
public class HashedWheelTimer implements Timer {
private final Logger logger;
private static final AtomicInteger id = new AtomicInteger();
// I'd say 64 active timer threads are obvious misuse.
private static final int MISUSE_WARNING_THRESHOLD = 64;
private static final AtomicInteger activeInstances = new AtomicInteger();
private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean();
private final Worker worker = new Worker();
final Thread workerThread;
final AtomicBoolean shutdown = new AtomicBoolean();
private final long roundDuration;
final long tickDuration;
final Set<HashedWheelTimeout>[] wheel;
final ReusableIterator<HashedWheelTimeout>[] iterators;
final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock();
volatile int wheelCursor;
/**
* Creates a new timer with the default number of ticks per wheel.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
*/
public HashedWheelTimer(Logger logger, ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(logger, threadFactory, tickDuration, unit, 512);
}
/**
* Creates a new timer.
*
* @param threadFactory a {@link ThreadFactory} that creates a
* background {@link Thread} which is dedicated to
* {@link TimerTask} execution.
* @param tickDuration the duration between tick
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel
*/
public HashedWheelTimer(Logger logger, ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) {
this.logger = logger;
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
if (unit == null) {
throw new NullPointerException("unit");
}
if (tickDuration <= 0) {
throw new IllegalArgumentException(
"tickDuration must be greater than 0: " + tickDuration);
}
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
wheel = createWheel(ticksPerWheel);
iterators = createIterators(wheel);
mask = wheel.length - 1;
// Convert tickDuration to milliseconds.
this.tickDuration = tickDuration = unit.toMillis(tickDuration);
// Prevent overflow.
if (tickDuration == Long.MAX_VALUE ||
tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(
"tickDuration is too long: " +
tickDuration + ' ' + unit);
}
roundDuration = tickDuration * wheel.length;
workerThread = threadFactory.newThread(new ThreadRenamingRunnable(
worker, "Hashed wheel timer #" + id.incrementAndGet()));
// Misuse check
int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet();
if (activeInstances >= MISUSE_WARNING_THRESHOLD &&
loggedMisuseWarning.compareAndSet(false, true)) {
logger.debug(
"There are too many active " +
HashedWheelTimer.class.getSimpleName() + " instances (" +
activeInstances + ") - you should share the small number " +
"of instances to avoid excessive resource consumption.");
}
}
@SuppressWarnings("unchecked")
private static Set<HashedWheelTimeout>[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
Set<HashedWheelTimeout>[] wheel = new Set[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new MapBackedSet<HashedWheelTimeout>(
new ConcurrentIdentityHashMap<HashedWheelTimeout, Boolean>(16, 0.95f, 4));
}
return wheel;
}
@SuppressWarnings("unchecked")
private static ReusableIterator<HashedWheelTimeout>[] createIterators(Set<HashedWheelTimeout>[] wheel) {
ReusableIterator<HashedWheelTimeout>[] iterators = new ReusableIterator[wheel.length];
for (int i = 0; i < wheel.length; i++) {
iterators[i] = (ReusableIterator<HashedWheelTimeout>) wheel[i].iterator();
}
return iterators;
}
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;
while (normalizedTicksPerWheel < ticksPerWheel) {
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}
/**
* Starts the background thread explicitly. The background thread will
* start automatically on demand even if you did not call this method.
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public synchronized void start() {
if (shutdown.get()) {
throw new IllegalStateException("cannot be started once stopped");
}
if (!workerThread.isAlive()) {
workerThread.start();
}
}
public synchronized Set<Timeout> stop() {
if (!shutdown.compareAndSet(false, true)) {
return Collections.emptySet();
}
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
activeInstances.decrementAndGet();
Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
for (Set<HashedWheelTimeout> bucket : wheel) {
unprocessedTimeouts.addAll(bucket);
bucket.clear();
}
return Collections.unmodifiableSet(unprocessedTimeouts);
}
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
final long currentTime = System.currentTimeMillis();
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
delay = unit.toMillis(delay);
if (delay < tickDuration) {
delay = tickDuration;
}
if (!workerThread.isAlive()) {
start();
}
// Prepare the required parameters to create the timeout object.
HashedWheelTimeout timeout;
final long lastRoundDelay = delay % roundDuration;
final long lastTickDelay = delay % tickDuration;
final long relativeIndex =
lastRoundDelay / tickDuration + (lastTickDelay != 0 ? 1 : 0);
final long deadline = currentTime + delay;
final long remainingRounds =
delay / roundDuration - (delay % roundDuration == 0 ? 1 : 0);
// Add the timeout to the wheel.
lock.readLock().lock();
try {
timeout =
new HashedWheelTimeout(
task, deadline,
(int) (wheelCursor + relativeIndex & mask),
remainingRounds);
wheel[timeout.stopIndex].add(timeout);
} finally {
lock.readLock().unlock();
}
return timeout;
}
private final class Worker implements Runnable {
private long startTime;
private long tick;
Worker() {
super();
}
public void run() {
List<HashedWheelTimeout> expiredTimeouts =
new ArrayList<HashedWheelTimeout>();
startTime = System.currentTimeMillis();
tick = 1;
while (!shutdown.get()) {
waitForNextTick();
fetchExpiredTimeouts(expiredTimeouts);
notifyExpiredTimeouts(expiredTimeouts);
}
}
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) {
// Find the expired timeouts and decrease the round counter
// if necessary. Note that we don't send the notification
// immediately to make sure the listeners are called without
// an exclusive lock.
lock.writeLock().lock();
try {
int oldBucketHead = wheelCursor;
int newBucketHead = oldBucketHead + 1 & mask;
wheelCursor = newBucketHead;
ReusableIterator<HashedWheelTimeout> i = iterators[oldBucketHead];
fetchExpiredTimeouts(expiredTimeouts, i);
} finally {
lock.writeLock().unlock();
}
}
private void fetchExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts,
ReusableIterator<HashedWheelTimeout> i) {
long currentDeadline = System.currentTimeMillis() + tickDuration;
i.rewind();
while (i.hasNext()) {
HashedWheelTimeout timeout = i.next();
if (timeout.remainingRounds <= 0) {
if (timeout.deadline < currentDeadline) {
i.remove();
expiredTimeouts.add(timeout);
} else {
// A rare case where a timeout is put for the next
// round: just wait for the next round.
}
} else {
timeout.remainingRounds--;
}
}
}
private void notifyExpiredTimeouts(
List<HashedWheelTimeout> expiredTimeouts) {
// Notify the expired timeouts.
for (int i = expiredTimeouts.size() - 1; i >= 0; i--) {
expiredTimeouts.get(i).expire();
}
// Clean up the temporary list.
expiredTimeouts.clear();
}
private void waitForNextTick() {
for (; ;) {
final long currentTime = System.currentTimeMillis();
final long sleepTime = tickDuration * tick - (currentTime - startTime);
if (sleepTime <= 0) {
break;
}
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
if (shutdown.get()) {
return;
}
}
}
// Reset the tick if overflow is expected.
if (tickDuration * tick > Long.MAX_VALUE - tickDuration) {
startTime = System.currentTimeMillis();
tick = 1;
} else {
// Increase the tick if overflow is not likely to happen.
tick++;
}
}
}
private final class HashedWheelTimeout implements Timeout {
private final TimerTask task;
final int stopIndex;
final long deadline;
volatile long remainingRounds;
private volatile boolean cancelled;
HashedWheelTimeout(
TimerTask task, long deadline, int stopIndex, long remainingRounds) {
this.task = task;
this.deadline = deadline;
this.stopIndex = stopIndex;
this.remainingRounds = remainingRounds;
}
public Timer getTimer() {
return HashedWheelTimer.this;
}
public TimerTask getTask() {
return task;
}
public void cancel() {
if (isExpired()) {
return;
}
cancelled = true;
// Might be called more than once, but doesn't matter.
wheel[stopIndex].remove(this);
}
public boolean isCancelled() {
return cancelled;
}
public boolean isExpired() {
return cancelled || System.currentTimeMillis() > deadline;
}
public void expire() {
if (cancelled) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
logger.warn(
"An exception was thrown by " +
TimerTask.class.getSimpleName() + ".", t);
}
}
@Override
public String toString() {
long currentTime = System.currentTimeMillis();
long remaining = deadline - currentTime;
StringBuilder buf = new StringBuilder(192);
buf.append(getClass().getSimpleName());
buf.append('(');
buf.append("deadline: ");
if (remaining > 0) {
buf.append(remaining);
buf.append(" ms later, ");
} else if (remaining < 0) {
buf.append(-remaining);
buf.append(" ms ago, ");
} else {
buf.append("now, ");
}
if (isCancelled()) {
buf.append(", cancelled");
}
return buf.append(')').toString();
}
}
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.timer;
/**
* A handle associated with a {@link TimerTask} that is returned by a
* {@link Timer}.
*
* @author kimchy (Shay Banon)
*/
public interface Timeout {
/**
* Returns the {@link Timer} that created this handle.
*/
Timer getTimer();
/**
* Returns the {@link TimerTask} which is associated with this handle.
*/
TimerTask getTask();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been expired.
*/
boolean isExpired();
/**
* Returns {@code true} if and only if the {@link TimerTask} associated
* with this handle has been cancelled.
*/
boolean isCancelled();
/**
* Cancels the {@link TimerTask} associated with this handle. It the
* task has been executed or cancelled already, it will return with no
* side effect.
*/
void cancel();
}

View File

@ -0,0 +1,51 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.timer;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Schedules {@link TimerTask}s for one-time future execution in a background
* thread.
*
* @author kimchy (Shay Banon)
*/
public interface Timer {
/**
* Schedules the specified {@link TimerTask} for one-time execution after
* the specified delay.
*
* @return a handle which is associated with the specified task
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
/**
* Releases all resources acquired by this {@link Timer} and cancels all
* tasks which were scheduled but not executed yet.
*
* @return the handles associated with the tasks which were canceled by
* this method
*/
Set<Timeout> stop();
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.timer;
/**
* A task which is executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}.
*
* @author kimchy (Shay Banon)
*/
public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}.
*
* @param timeout a handle which is associated with this task
*/
void run(Timeout timeout) throws Exception;
}

View File

@ -20,6 +20,8 @@
package org.elasticsearch.test.integration.search;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchContextMissingException;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
@ -35,6 +37,7 @@ import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.server.internal.InternalServer;
import org.elasticsearch.test.integration.AbstractServersTests;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.trove.ExtTIntArrayList;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@ -169,6 +172,27 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractServersTests {
assertThat(queryResult.facets().countFacet("age1").count(), equalTo(1l));
}
@Test public void testQueryFetchKeepAliveTimeout() throws Exception {
QuerySearchResult queryResult = searchService.executeQueryPhase(searchRequest(searchSource().query(termQuery("name", "test1"))).scroll(new Scroll(TimeValue.timeValueMillis(10))));
assertThat(queryResult.topDocs().totalHits, equalTo(1));
ShardDoc[] sortedShardList = searchPhaseController.sortDocs(newArrayList(queryResult));
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
assertThat(docIdsToLoad.size(), equalTo(1));
assertThat(docIdsToLoad.values().iterator().next().size(), equalTo(1));
// sleep more than the 100ms the timeout wheel it set to
Thread.sleep(300);
try {
searchService.executeFetchPhase(new FetchSearchRequest(queryResult.id(), docIdsToLoad.values().iterator().next()));
assert true : "context should be missing since it timed out";
} catch (SearchContextMissingException e) {
// all is well
}
}
private InternalSearchRequest searchRequest(SearchSourceBuilder builder) {
return new InternalSearchRequest("test", 0, builder.build());
}