From 82e69691b1a22f33a5f7ec19f23580d9a5e4b4dd Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 18 Feb 2010 23:21:08 +0200 Subject: [PATCH] add support for "keep alive" time for a search request, and timeout context that have not been accessed --- .../action/search/SearchRequest.java | 3 +- .../client/transport/TransportClient.java | 7 + .../http/netty/NettyHttpRequest.java | 13 + .../org/elasticsearch/rest/RestRequest.java | 6 + .../RestReplicationPingAction.java | 3 +- .../indices/create/RestCreateIndexAction.java | 3 +- .../indices/delete/RestDeleteIndexAction.java | 3 +- .../snapshot/RestGatewaySnapshotAction.java | 3 +- .../create/RestCreateMappingAction.java | 3 +- .../rest/action/delete/RestDeleteAction.java | 3 +- .../RestDeleteByQueryAction.java | 3 +- .../rest/action/index/RestIndexAction.java | 3 +- .../rest/action/search/RestSearchAction.java | 6 +- .../java/org/elasticsearch/search/Scroll.java | 21 +- .../search/SearchContextMissingException.java | 2 +- .../elasticsearch/search/SearchService.java | 59 +- .../internal/InternalSearchRequest.java | 3 +- .../search/internal/SearchContext.java | 30 + .../server/internal/InternalServer.java | 4 + .../org/elasticsearch/timer/TimerModule.java | 32 + .../org/elasticsearch/timer/TimerService.java | 94 ++ .../elasticsearch/util/ReusableIterator.java | 29 + .../org/elasticsearch/util/SizeValue.java | 2 +- .../util/ThreadNameDeterminer.java | 61 + .../util/ThreadRenamingRunnable.java | 122 ++ .../concurrent/ConcurrentIdentityHashMap.java | 1405 +++++++++++++++++ .../util/settings/ImmutableSettings.java | 6 +- .../elasticsearch/util/settings/Settings.java | 2 +- .../util/timer/HashedWheelTimer.java | 489 ++++++ .../org/elasticsearch/util/timer/Timeout.java | 58 + .../org/elasticsearch/util/timer/Timer.java | 51 + .../elasticsearch/util/timer/TimerTask.java | 37 + .../SingleInstanceEmbeddedSearchTests.java | 24 + 33 files changed, 2555 insertions(+), 35 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/ReusableIterator.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadNameDeterminer.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadRenamingRunnable.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentIdentityHashMap.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/HashedWheelTimer.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timeout.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timer.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/TimerTask.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java index ac30479d1ca..d2e5a970076 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -183,8 +183,9 @@ public class SearchRequest implements ActionRequest { return timeout; } - public void timeout(TimeValue timeout) { + public SearchRequest timeout(TimeValue timeout) { this.timeout = timeout; + return this; } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 27679470f4f..f2658127c51 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -50,7 +50,10 @@ import org.elasticsearch.cluster.node.Node; import org.elasticsearch.env.Environment; import org.elasticsearch.env.EnvironmentModule; import org.elasticsearch.server.internal.InternalSettingsPerparer; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; +import org.elasticsearch.timer.TimerModule; +import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.Tuple; @@ -123,6 +126,7 @@ public class TransportClient implements Client { modules.add(new EnvironmentModule(environment)); modules.add(new SettingsModule(settings)); modules.add(new ClusterNameModule(settings)); + modules.add(new TimerModule()); modules.add(new ThreadPoolModule(settings)); modules.add(new TransportModule(settings)); modules.add(new ClientTransportActionModule()); @@ -196,6 +200,9 @@ public class TransportClient implements Client { } injector.getInstance(TransportClientNodesService.class).close(); injector.getInstance(TransportService.class).close(); + + injector.getInstance(TimerService.class).close(); + injector.getInstance(ThreadPool.class).shutdown(); } @Override public AdminClient admin() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java index 6aa3dd9152e..e993ff646b6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/http/netty/NettyHttpRequest.java @@ -22,6 +22,8 @@ package org.elasticsearch.http.netty; import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.http.HttpRequest; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.TimeValue; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -31,6 +33,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.elasticsearch.util.SizeValue.*; +import static org.elasticsearch.util.TimeValue.*; + /** * @author kimchy (Shay Banon) */ @@ -131,6 +136,14 @@ public class NettyHttpRequest implements HttpRequest { return sValue.equals("true") || sValue.equals("1"); } + @Override public TimeValue paramAsTime(String key, TimeValue defaultValue) { + return parseTimeValue(param(key), defaultValue); + } + + @Override public SizeValue paramAsSize(String key, SizeValue defaultValue) { + return parseSizeValue(param(key), defaultValue); + } + @Override public String param(String key) { List keyParams = params(key); if (keyParams == null || keyParams.isEmpty()) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/RestRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/RestRequest.java index faf9faa60e3..95403b53c65 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/RestRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/RestRequest.java @@ -19,6 +19,8 @@ package org.elasticsearch.rest; +import org.elasticsearch.util.SizeValue; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.ToJson; import java.util.List; @@ -58,6 +60,10 @@ public interface RestRequest extends ToJson.Params { boolean paramAsBoolean(String key, boolean defaultValue); + TimeValue paramAsTime(String key, TimeValue defaultValue); + + SizeValue paramAsSize(String key, SizeValue defaultValue); + List params(String key); Map> params(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java index 86cbc4ecf5d..1d97a2286e5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/ping/replication/RestReplicationPingAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -50,7 +49,7 @@ public class RestReplicationPingAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { ReplicationPingRequest replicationPingRequest = new ReplicationPingRequest(RestActions.splitIndices(request.param("index"))); - replicationPingRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardReplicationPingRequest.DEFAULT_TIMEOUT)); + replicationPingRequest.timeout(request.paramAsTime("timeout", ShardReplicationPingRequest.DEFAULT_TIMEOUT)); replicationPingRequest.listenerThreaded(false); client.admin().cluster().execPing(replicationPingRequest, new ActionListener() { @Override public void onResponse(ReplicationPingResponse result) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java index 1b8aecc599c..bd655731c8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/create/RestCreateIndexAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestJsonBuilder; import org.elasticsearch.util.Strings; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.ImmutableSettings; import org.elasticsearch.util.settings.Settings; @@ -67,7 +66,7 @@ public class RestCreateIndexAction extends BaseRestHandler { } } CreateIndexRequest createIndexRequest = new CreateIndexRequest(request.param("index"), indexSettings); - createIndexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10))); + createIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); client.admin().indices().execCreate(createIndexRequest, new ActionListener() { @Override public void onResponse(CreateIndexResponse result) { try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java index 2d7741d6740..cb1049345c3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/delete/RestDeleteIndexAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -50,7 +49,7 @@ public class RestDeleteIndexAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(request.param("index")); - deleteIndexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10))); + deleteIndexRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); client.admin().indices().execDelete(deleteIndexRequest, new ActionListener() { @Override public void onResponse(DeleteIndexResponse result) { try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/gateway/snapshot/RestGatewaySnapshotAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/gateway/snapshot/RestGatewaySnapshotAction.java index 346e9039965..25e4992dba7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/gateway/snapshot/RestGatewaySnapshotAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/gateway/snapshot/RestGatewaySnapshotAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -51,7 +50,7 @@ public class RestGatewaySnapshotAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { GatewaySnapshotRequest gatewaySnapshotRequest = new GatewaySnapshotRequest(RestActions.splitIndices(request.param("index"))); - gatewaySnapshotRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DEFAULT_TIMEOUT)); + gatewaySnapshotRequest.timeout(request.paramAsTime("timeout", DEFAULT_TIMEOUT)); gatewaySnapshotRequest.listenerThreaded(false); client.admin().indices().execGatewaySnapshot(gatewaySnapshotRequest, new ActionListener() { @Override public void onResponse(GatewaySnapshotResponse result) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java index 67ac604960c..5e15b38a0fb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/mapping/create/RestCreateMappingAction.java @@ -28,7 +28,6 @@ import org.elasticsearch.index.mapper.InvalidTypeNameException; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -56,7 +55,7 @@ public class RestCreateMappingAction extends BaseRestHandler { CreateMappingRequest createMappingRequest = createMappingRequest(splitIndices(request.param("index"))); createMappingRequest.type(request.param("type")); createMappingRequest.mappingSource(request.contentAsString()); - createMappingRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), timeValueSeconds(10))); + createMappingRequest.timeout(request.paramAsTime("timeout", timeValueSeconds(10))); client.admin().indices().execCreateMapping(createMappingRequest, new ActionListener() { @Override public void onResponse(CreateMappingResponse result) { try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java index 5827e92c271..a90d5b37a88 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/delete/RestDeleteAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -47,7 +46,7 @@ public class RestDeleteAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { DeleteRequest deleteRequest = new DeleteRequest(request.param("index"), request.param("type"), request.param("id")); - deleteRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), DeleteRequest.DEFAULT_TIMEOUT)); + deleteRequest.timeout(request.paramAsTime("timeout", DeleteRequest.DEFAULT_TIMEOUT)); // we just send a response, no need to fork deleteRequest.listenerThreaded(false); // we don't spawn, then fork if local diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java index 88b67d85122..7bfbb20ce0b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/deletebyquery/RestDeleteByQueryAction.java @@ -29,7 +29,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -61,7 +60,7 @@ public class RestDeleteByQueryAction extends BaseRestHandler { if (typesParam != null) { deleteByQueryRequest.types(RestActions.splitTypes(typesParam)); } - deleteByQueryRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), ShardDeleteByQueryRequest.DEFAULT_TIMEOUT)); + deleteByQueryRequest.timeout(request.paramAsTime("timeout", ShardDeleteByQueryRequest.DEFAULT_TIMEOUT)); } catch (Exception e) { try { channel.sendResponse(new JsonRestResponse(request, PRECONDITION_FAILED, JsonBuilder.jsonBuilder().startObject().field("error", e.getMessage()).endObject())); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java index 5b6112f73b6..d2644aabef5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/index/RestIndexAction.java @@ -26,7 +26,6 @@ import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; import org.elasticsearch.rest.*; import org.elasticsearch.rest.action.support.RestJsonBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -48,7 +47,7 @@ public class RestIndexAction extends BaseRestHandler { @Override public void handleRequest(final RestRequest request, final RestChannel channel) { IndexRequest indexRequest = new IndexRequest(request.param("index"), request.param("type"), request.param("id"), request.contentAsString()); - indexRequest.timeout(TimeValue.parseTimeValue(request.param("timeout"), IndexRequest.DEFAULT_TIMEOUT)); + indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT)); String sOpType = request.param("opType"); if (sOpType != null) { if ("index".equals(sOpType)) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index a9210efe0a3..34b7669880d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -34,7 +34,6 @@ import org.elasticsearch.rest.action.support.RestActions; import org.elasticsearch.rest.action.support.RestJsonBuilder; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.json.JsonBuilder; import org.elasticsearch.util.settings.Settings; @@ -44,6 +43,7 @@ import java.util.regex.Pattern; import static org.elasticsearch.rest.RestRequest.Method.*; import static org.elasticsearch.rest.RestResponse.Status.*; +import static org.elasticsearch.util.TimeValue.*; /** * @author kimchy (Shay Banon) @@ -162,12 +162,12 @@ public class RestSearchAction extends BaseRestHandler { String scroll = request.param("scroll"); if (scroll != null) { - searchRequest.scroll(new Scroll(TimeValue.parseTimeValue(scroll, null))); + searchRequest.scroll(new Scroll(parseTimeValue(scroll, null))); } String timeout = request.param("timeout"); if (timeout != null) { - searchRequest.timeout(TimeValue.parseTimeValue(timeout, null)); + searchRequest.timeout(parseTimeValue(timeout, null)); } String typesParam = request.param("type"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/Scroll.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/Scroll.java index 1ec30e8182c..9548d5ba72b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/Scroll.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/Scroll.java @@ -33,18 +33,18 @@ import static org.elasticsearch.util.TimeValue.*; */ public class Scroll implements Streamable { - private TimeValue timeout; + private TimeValue keepAlive; private Scroll() { } - public Scroll(TimeValue timeout) { - this.timeout = timeout; + public Scroll(TimeValue keepAlive) { + this.keepAlive = keepAlive; } - public TimeValue timeout() { - return timeout; + public TimeValue keepAlive() { + return keepAlive; } public static Scroll readScroll(DataInput in) throws IOException, ClassNotFoundException { @@ -54,10 +54,17 @@ public class Scroll implements Streamable { } @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { - timeout = readTimeValue(in); + if (in.readBoolean()) { + keepAlive = readTimeValue(in); + } } @Override public void writeTo(DataOutput out) throws IOException { - timeout.writeTo(out); + if (keepAlive == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + keepAlive.writeTo(out); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchContextMissingException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchContextMissingException.java index 21c4fa0a7af..65607c04737 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchContextMissingException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchContextMissingException.java @@ -29,7 +29,7 @@ public class SearchContextMissingException extends ElasticSearchException { private final long id; public SearchContextMissingException(long id) { - super("No search context found for id [" + id + "]"); + super("No search context found for id [" + id + "], timed out"); this.id = id; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index f516bb64416..2a6a2f5665d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -44,6 +44,8 @@ import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.query.QueryPhase; import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; +import org.elasticsearch.timer.TimerService; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.component.AbstractComponent; import org.elasticsearch.util.component.Lifecycle; import org.elasticsearch.util.component.LifecycleComponent; @@ -51,13 +53,18 @@ import org.elasticsearch.util.concurrent.highscalelib.NonBlockingHashMapLong; import org.elasticsearch.util.io.FastStringReader; import org.elasticsearch.util.json.Jackson; import org.elasticsearch.util.settings.Settings; +import org.elasticsearch.util.timer.Timeout; +import org.elasticsearch.util.timer.TimerTask; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import static org.elasticsearch.util.TimeValue.*; + /** * @author kimchy (Shay Banon) */ @@ -71,27 +78,36 @@ public class SearchService extends AbstractComponent implements LifecycleCompone private final IndicesService indicesService; + private final TimerService timerService; + private final DfsPhase dfsPhase; private final QueryPhase queryPhase; private final FetchPhase fetchPhase; + + private final TimeValue defaultKeepAlive; + + private final AtomicLong idGenerator = new AtomicLong(); private final NonBlockingHashMapLong activeContexts = new NonBlockingHashMapLong(); private final ImmutableMap elementParsers; - @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, + @Inject public SearchService(Settings settings, ClusterService clusterService, IndicesService indicesService, TimerService timerService, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase) { super(settings); this.clusterService = clusterService; this.indicesService = indicesService; + this.timerService = timerService; this.dfsPhase = dfsPhase; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; + this.defaultKeepAlive = componentSettings.getAsTime("defaultKeepAlive", timeValueMinutes(2)); + Map elementParsers = new HashMap(); elementParsers.putAll(dfsPhase.parseElements()); elementParsers.putAll(queryPhase.parseElements()); @@ -156,7 +172,7 @@ public class SearchService extends AbstractComponent implements LifecycleCompone try { context.searcher().dfSource(new CachedDfSource(request.dfs(), context.similarityService().defaultSearchSimilarity())); } catch (IOException e) { - throw new SearchException("Failed to set aggreagted df", e); + throw new SearchException("Failed to set aggregated df", e); } queryPhase.execute(context); return context.queryResult(); @@ -216,6 +232,8 @@ public class SearchService extends AbstractComponent implements LifecycleCompone if (context == null) { throw new SearchContextMissingException(id); } + // update the last access time of the context + context.accessed(timerService.estimatedTimeInMillis()); return context; } @@ -245,6 +263,15 @@ public class SearchService extends AbstractComponent implements LifecycleCompone context.size(10); } + // compute the context keep alive + TimeValue keepAlive = defaultKeepAlive; + if (request.scroll() != null && request.scroll().keepAlive() != null) { + keepAlive = request.scroll().keepAlive(); + } + context.keepAlive(keepAlive); + context.accessed(timerService.estimatedTimeInMillis()); + context.keepAliveTimeout(timerService.newTimeout(new KeepAliveTimerTask(context), keepAlive)); + return context; } @@ -310,5 +337,33 @@ public class SearchService extends AbstractComponent implements LifecycleCompone // process scroll context.from(context.from() + context.size()); context.scroll(request.scroll()); + // update the context keep alive based on the new scroll value + if (request.scroll() != null && request.scroll().keepAlive() != null) { + context.keepAlive(request.scroll().keepAlive()); + } + } + + private class KeepAliveTimerTask implements TimerTask { + + private final SearchContext context; + + private KeepAliveTimerTask(SearchContext context) { + this.context = context; + } + + @Override public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + long currentTime = timerService.estimatedTimeInMillis(); + long nextDelay = context.keepAlive().millis() - (currentTime - context.lastAccessTime()); + if (nextDelay <= 0) { + // Time out, free the context (and remove it from the active context) + freeContext(context.id()); + } else { + // Read occurred before the timeout - set a new timeout with shorter delay. + context.keepAliveTimeout(timerService.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS)); + } + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java index 5321e7ca0af..b2f076996fd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchRequest.java @@ -119,8 +119,9 @@ public class InternalSearchRequest implements Streamable { return timeout; } - public void timeout(TimeValue timeout) { + public InternalSearchRequest timeout(TimeValue timeout) { this.timeout = timeout; + return this; } /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 45b53c93451..f8e64cab494 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -37,6 +37,7 @@ import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.lease.Releasable; +import org.elasticsearch.util.timer.Timeout; import java.io.IOException; @@ -91,6 +92,12 @@ public class SearchContext implements Releasable { private boolean queryRewritten; + private volatile TimeValue keepAlive; + + private volatile long lastAccessTime; + + private volatile Timeout keepAliveTimeout; + public SearchContext(long id, SearchShardTarget shardTarget, TimeValue timeout, float queryBoost, String source, String[] types, Engine.Searcher engineSearcher, IndexService indexService) { this.id = id; @@ -114,6 +121,9 @@ public class SearchContext implements Releasable { // ignore this exception } engineSearcher.release(); + if (!keepAliveTimeout.isCancelled()) { + keepAliveTimeout.cancel(); + } return true; } @@ -275,6 +285,26 @@ public class SearchContext implements Releasable { return this; } + public void accessed(long accessTime) { + this.lastAccessTime = accessTime; + } + + public long lastAccessTime() { + return this.lastAccessTime; + } + + public TimeValue keepAlive() { + return this.keepAlive; + } + + public void keepAlive(TimeValue keepAlive) { + this.keepAlive = keepAlive; + } + + public void keepAliveTimeout(Timeout keepAliveTimeout) { + this.keepAliveTimeout = keepAliveTimeout; + } + public DfsSearchResult dfsResult() { return dfsResult; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java index b4c792ef7df..63237308588 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/server/internal/InternalServer.java @@ -53,6 +53,8 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.server.Server; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; +import org.elasticsearch.timer.TimerModule; +import org.elasticsearch.timer.TimerService; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; import org.elasticsearch.util.Tuple; @@ -103,6 +105,7 @@ public final class InternalServer implements Server { modules.add(new ClusterNameModule(settings)); modules.add(new SettingsModule(settings)); modules.add(new ThreadPoolModule(settings)); + modules.add(new TimerModule()); modules.add(new DiscoveryModule(settings)); modules.add(new ClusterModule(settings)); modules.add(new RestModule(settings)); @@ -221,6 +224,7 @@ public final class InternalServer implements Server { injector.getInstance(RestController.class).close(); injector.getInstance(TransportService.class).close(); + injector.getInstance(TimerService.class).close(); injector.getInstance(ThreadPool.class).shutdown(); try { injector.getInstance(ThreadPool.class).awaitTermination(10, TimeUnit.SECONDS); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerModule.java new file mode 100644 index 00000000000..a332fe80bfd --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerModule.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.timer; + +import com.google.inject.AbstractModule; + +/** + * @author kimchy (Shay Banon) + */ +public class TimerModule extends AbstractModule { + + @Override protected void configure() { + bind(TimerService.class).asEagerSingleton(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java new file mode 100644 index 00000000000..fc3214c49b9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/timer/TimerService.java @@ -0,0 +1,94 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.timer; + +import com.google.inject.Inject; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.util.TimeValue; +import org.elasticsearch.util.component.AbstractComponent; +import org.elasticsearch.util.settings.Settings; +import org.elasticsearch.util.timer.HashedWheelTimer; +import org.elasticsearch.util.timer.Timeout; +import org.elasticsearch.util.timer.Timer; +import org.elasticsearch.util.timer.TimerTask; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.elasticsearch.util.TimeValue.*; +import static org.elasticsearch.util.concurrent.DynamicExecutors.*; + +/** + * @author kimchy (Shay Banon) + */ +public class TimerService extends AbstractComponent { + + private final ThreadPool threadPool; + + private final TimeEstimator timeEstimator; + + private final ScheduledFuture timeEstimatorFuture; + + private final Timer timer; + + private final TimeValue tickDuration; + + @Inject public TimerService(Settings settings, ThreadPool threadPool) { + super(settings); + this.threadPool = threadPool; + + this.timeEstimator = new TimeEstimator(); + this.timeEstimatorFuture = threadPool.scheduleWithFixedDelay(timeEstimator, 50, 50, TimeUnit.MILLISECONDS); + + this.tickDuration = componentSettings.getAsTime("tickDuration", timeValueMillis(100)); + + this.timer = new HashedWheelTimer(logger, daemonThreadFactory(settings, "timer"), tickDuration.millis(), TimeUnit.MILLISECONDS); + } + + public void close() { + timeEstimatorFuture.cancel(true); + timer.stop(); + } + + public long estimatedTimeInMillis() { + return timeEstimator.time(); + } + + public Timeout newTimeout(TimerTask task, TimeValue delay) { + return newTimeout(task, delay.nanos(), TimeUnit.NANOSECONDS); + } + + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + return timer.newTimeout(task, delay, unit); + } + + private static class TimeEstimator implements Runnable { + + private long time = System.currentTimeMillis(); + + @Override public void run() { + this.time = System.currentTimeMillis(); + } + + public long time() { + return this.time; + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/ReusableIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ReusableIterator.java new file mode 100644 index 00000000000..ad5a7a60e76 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ReusableIterator.java @@ -0,0 +1,29 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util; + +import java.util.Iterator; + +/** + * @author kimchy (Shay Banon) + */ +public interface ReusableIterator extends Iterator { + void rewind(); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/SizeValue.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/SizeValue.java index 08a7e7f52cb..870665ea85c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/SizeValue.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/SizeValue.java @@ -96,7 +96,7 @@ public class SizeValue implements Serializable, Streamable { return Strings.format1Decimals(value, suffix); } - public static SizeValue parse(String sValue, SizeValue defaultValue) throws ElasticSearchParseException { + public static SizeValue parseSizeValue(String sValue, SizeValue defaultValue) throws ElasticSearchParseException { if (sValue == null) { return defaultValue; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadNameDeterminer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadNameDeterminer.java new file mode 100644 index 00000000000..78a8c340d75 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadNameDeterminer.java @@ -0,0 +1,61 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util; + +/** + * Overrides the thread name proposed by {@link ThreadRenamingRunnable}. + * + * @author kimchy (shay.banon) + */ +public interface ThreadNameDeterminer { + + /** + * {@link ThreadNameDeterminer} that accepts the proposed thread name + * as is. + */ + ThreadNameDeterminer PROPOSED = new ThreadNameDeterminer() { + public String determineThreadName(String currentThreadName, + String proposedThreadName) throws Exception { + return proposedThreadName; + } + }; + + /** + * {@link ThreadNameDeterminer} that rejects the proposed thread name and + * retains the current one. + */ + ThreadNameDeterminer CURRENT = new ThreadNameDeterminer() { + public String determineThreadName(String currentThreadName, + String proposedThreadName) throws Exception { + return null; + } + }; + + /** + * Overrides the thread name proposed by {@link ThreadRenamingRunnable}. + * + * @param currentThreadName the current thread name + * @param proposedThreadName the proposed new thread name + * @return the actual new thread name. + * If {@code null} is returned, the proposed thread name is + * discarded (i.e. no rename). + */ + String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception; +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadRenamingRunnable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadRenamingRunnable.java new file mode 100644 index 00000000000..80cede9441e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/ThreadRenamingRunnable.java @@ -0,0 +1,122 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util; + +import org.elasticsearch.util.logging.Loggers; +import org.slf4j.Logger; + +/** + * A {@link Runnable} that changes the current thread name and reverts it back + * when its execution ends. To change the default thread names set by Netty, + * use {@link #setThreadNameDeterminer(ThreadNameDeterminer)}. + * + * @author kimchy (shay.banon) + */ +public class ThreadRenamingRunnable implements Runnable { + + private static final Logger logger = Loggers.getLogger(ThreadRenamingRunnable.class); + + private static volatile ThreadNameDeterminer threadNameDeterminer = + ThreadNameDeterminer.PROPOSED; + + /** + * Returns the {@link ThreadNameDeterminer} which overrides the proposed + * new thread name. + */ + public static ThreadNameDeterminer getThreadNameDeterminer() { + return threadNameDeterminer; + } + + /** + * Sets the {@link ThreadNameDeterminer} which overrides the proposed new + * thread name. Please note that the specified {@link ThreadNameDeterminer} + * affects only new {@link ThreadRenamingRunnable}s; the existing instances + * are not affected at all. Therefore, you should make sure to call this + * method at the earliest possible point (i.e. before any Netty worker + * thread starts) for consistent thread naming. Otherwise, you might see + * the default thread names and the new names appear at the same time in + * the full thread dump. + */ + public static void setThreadNameDeterminer(ThreadNameDeterminer threadNameDeterminer) { + if (threadNameDeterminer == null) { + throw new NullPointerException("threadNameDeterminer"); + } + ThreadRenamingRunnable.threadNameDeterminer = threadNameDeterminer; + } + + private final Runnable runnable; + private final String proposedThreadName; + + /** + * Creates a new instance which wraps the specified {@code runnable} + * and changes the thread name to the specified thread name when the + * specified {@code runnable} is running. + */ + public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) { + if (runnable == null) { + throw new NullPointerException("runnable"); + } + if (proposedThreadName == null) { + throw new NullPointerException("proposedThreadName"); + } + this.runnable = runnable; + this.proposedThreadName = proposedThreadName; + } + + public void run() { + final Thread currentThread = Thread.currentThread(); + final String oldThreadName = currentThread.getName(); + final String newThreadName = getNewThreadName(oldThreadName); + + // Change the thread name before starting the actual runnable. + boolean renamed = false; + if (!oldThreadName.equals(newThreadName)) { + try { + currentThread.setName(newThreadName); + renamed = true; + } catch (SecurityException e) { + logger.debug("Failed to rename a thread due to security restriction.", e); + } + } + + // Run the actual runnable and revert the name back when it ends. + try { + runnable.run(); + } finally { + if (renamed) { + // Revert the name back if the current thread was renamed. + // We do not check the exception here because we know it works. + currentThread.setName(oldThreadName); + } + } + } + + private String getNewThreadName(String currentThreadName) { + String newThreadName = null; + + try { + newThreadName = getThreadNameDeterminer().determineThreadName(currentThreadName, proposedThreadName); + } catch (Throwable t) { + logger.warn("Failed to determine the thread name", t); + } + + return newThreadName == null ? currentThreadName : newThreadName; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentIdentityHashMap.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentIdentityHashMap.java new file mode 100644 index 00000000000..55001e86a2d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/concurrent/ConcurrentIdentityHashMap.java @@ -0,0 +1,1405 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.concurrent; + +import org.elasticsearch.util.ReusableIterator; + +import java.util.*; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + +/** + * An alternative identity-comparing {@link ConcurrentMap} which is similar to + * {@link java.util.concurrent.ConcurrentHashMap}. + * + * @author Doug Lea + * @author kimchy (shay.banon) + */ +public final class ConcurrentIdentityHashMap extends AbstractMap + implements ConcurrentMap { + + /** + * The default initial capacity for this table, used when not otherwise + * specified in a constructor. + */ + static final int DEFAULT_INITIAL_CAPACITY = 16; + + /** + * The default load factor for this table, used when not otherwise specified + * in a constructor. + */ + static final float DEFAULT_LOAD_FACTOR = 0.75f; + + /** + * The default concurrency level for this table, used when not otherwise + * specified in a constructor. + */ + static final int DEFAULT_CONCURRENCY_LEVEL = 16; + + /** + * The maximum capacity, used if a higher value is implicitly specified by + * either of the constructors with arguments. MUST be a power of two + * <= 1<<30 to ensure that entries are indexable using integers. + */ + static final int MAXIMUM_CAPACITY = 1 << 30; + + /** + * The maximum number of segments to allow; used to bound constructor + * arguments. + */ + static final int MAX_SEGMENTS = 1 << 16; // slightly conservative + + /** + * Number of unsynchronized retries in size and containsValue methods before + * resorting to locking. This is used to avoid unbounded retries if tables + * undergo continuous modification which would make it impossible to obtain + * an accurate result. + */ + static final int RETRIES_BEFORE_LOCK = 2; + + /* ---------------- Fields -------------- */ + + /** + * Mask value for indexing into segments. The upper bits of a key's hash + * code are used to choose the segment. + */ + final int segmentMask; + + /** + * Shift value for indexing within segments. + */ + final int segmentShift; + + /** + * The segments, each of which is a specialized hash table + */ + final Segment[] segments; + + Set keySet; + Set> entrySet; + Collection values; + + /* ---------------- Small Utilities -------------- */ + + /** + * Applies a supplemental hash function to a given hashCode, which defends + * against poor quality hash functions. This is critical because + * ConcurrentReferenceHashMap uses power-of-two length hash tables, that + * otherwise encounter collisions for hashCodes that do not differ in lower + * or upper bits. + */ + private static int hash(int h) { + // Spread bits to regularize both segment and index locations, + // using variant of single-word Wang/Jenkins hash. + h += h << 15 ^ 0xffffcd7d; + h ^= h >>> 10; + h += h << 3; + h ^= h >>> 6; + h += (h << 2) + (h << 14); + return h ^ h >>> 16; + } + + /** + * Returns the segment that should be used for key with given hash. + * + * @param hash the hash code for the key + * @return the segment + */ + final Segment segmentFor(int hash) { + return segments[hash >>> segmentShift & segmentMask]; + } + + private int hashOf(Object key) { + return hash(System.identityHashCode(key)); + } + + /** + * ConcurrentReferenceHashMap list entry. Note that this is never exported + * out as a user-visible Map.Entry. + * + * Because the value field is volatile, not final, it is legal wrt + * the Java Memory Model for an unsynchronized reader to see null + * instead of initial value when read via a data race. Although a + * reordering leading to this is not likely to ever actually + * occur, the Segment.readValueUnderLock method is used as a + * backup in case a null (pre-initialized) value is ever seen in + * an unsynchronized access method. + */ + static final class HashEntry { + final Object key; + final int hash; + volatile Object value; + final HashEntry next; + + HashEntry( + K key, int hash, HashEntry next, V value) { + this.hash = hash; + this.next = next; + this.key = key; + this.value = value; + } + + @SuppressWarnings("unchecked") + final K key() { + return (K) key; + } + + @SuppressWarnings("unchecked") + final V value() { + return (V) value; + } + + final void setValue(V value) { + this.value = value; + } + + @SuppressWarnings("unchecked") + static final HashEntry[] newArray(int i) { + return new HashEntry[i]; + } + } + + /** + * Segments are specialized versions of hash tables. This subclasses from + * ReentrantLock opportunistically, just to simplify some locking and avoid + * separate construction. + */ + static final class Segment extends ReentrantLock { + /* + * Segments maintain a table of entry lists that are ALWAYS kept in a + * consistent state, so can be read without locking. Next fields of + * nodes are immutable (final). All list additions are performed at the + * front of each bin. This makes it easy to check changes, and also fast + * to traverse. When nodes would otherwise be changed, new nodes are + * created to replace them. This works well for hash tables since the + * bin lists tend to be short. (The average length is less than two for + * the default load factor threshold.) + * + * Read operations can thus proceed without locking, but rely on + * selected uses of volatiles to ensure that completed write operations + * performed by other threads are noticed. For most purposes, the + * "count" field, tracking the number of elements, serves as that + * volatile variable ensuring visibility. This is convenient because + * this field needs to be read in many read operations anyway: + * + * - All (unsynchronized) read operations must first read the + * "count" field, and should not look at table entries if + * it is 0. + * + * - All (synchronized) write operations should write to + * the "count" field after structurally changing any bin. + * The operations must not take any action that could even + * momentarily cause a concurrent read operation to see + * inconsistent data. This is made easier by the nature of + * the read operations in Map. For example, no operation + * can reveal that the table has grown but the threshold + * has not yet been updated, so there are no atomicity + * requirements for this with respect to reads. + * + * As a guide, all critical volatile reads and writes to the count field + * are marked in code comments. + */ + + private static final long serialVersionUID = 5207829234977119743L; + + /** + * The number of elements in this segment's region. + */ + transient volatile int count; + + /** + * Number of updates that alter the size of the table. This is used + * during bulk-read methods to make sure they see a consistent snapshot: + * If modCounts change during a traversal of segments computing size or + * checking containsValue, then we might have an inconsistent view of + * state so (usually) must retry. + */ + int modCount; + + /** + * The table is rehashed when its size exceeds this threshold. + * (The value of this field is always (capacity * loadFactor).) + */ + int threshold; + + /** + * The per-segment table. + */ + transient volatile HashEntry[] table; + + /** + * The load factor for the hash table. Even though this value is same + * for all segments, it is replicated to avoid needing links to outer + * object. + * + * @serial + */ + final float loadFactor; + + Segment(int initialCapacity, float lf) { + loadFactor = lf; + setTable(HashEntry.newArray(initialCapacity)); + } + + @SuppressWarnings("unchecked") + static final Segment[] newArray(int i) { + return new Segment[i]; + } + + private boolean keyEq(Object src, Object dest) { + return src == dest; + } + + /** + * Sets table to new HashEntry array. Call only while holding lock or in + * constructor. + */ + void setTable(HashEntry[] newTable) { + threshold = (int) (newTable.length * loadFactor); + table = newTable; + } + + /** + * Returns properly casted first entry of bin for given hash. + */ + HashEntry getFirst(int hash) { + HashEntry[] tab = table; + return tab[hash & tab.length - 1]; + } + + HashEntry newHashEntry( + K key, int hash, HashEntry next, V value) { + return new HashEntry(key, hash, next, value); + } + + /** + * Reads value field of an entry under lock. Called if value field ever + * appears to be null. This is possible only if a compiler happens to + * reorder a HashEntry initialization with its table assignment, which + * is legal under memory model but is not known to ever occur. + */ + V readValueUnderLock(HashEntry e) { + lock(); + try { + return e.value(); + } finally { + unlock(); + } + } + + /* Specialized implementations of map methods */ + + V get(Object key, int hash) { + if (count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + V opaque = e.value(); + if (opaque != null) { + return opaque; + } + + return readValueUnderLock(e); // recheck + } + e = e.next; + } + } + return null; + } + + boolean containsKey(Object key, int hash) { + if (count != 0) { // read-volatile + HashEntry e = getFirst(hash); + while (e != null) { + if (e.hash == hash && keyEq(key, e.key())) { + return true; + } + e = e.next; + } + } + return false; + } + + boolean containsValue(Object value) { + if (count != 0) { // read-volatile + HashEntry[] tab = table; + int len = tab.length; + for (int i = 0; i < len; i++) { + for (HashEntry e = tab[i]; e != null; e = e.next) { + V opaque = e.value(); + V v; + + if (opaque == null) { + v = readValueUnderLock(e); // recheck + } else { + v = opaque; + } + + if (value.equals(v)) { + return true; + } + } + } + } + return false; + } + + boolean replace(K key, int hash, V oldValue, V newValue) { + lock(); + try { + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + boolean replaced = false; + if (e != null && oldValue.equals(e.value())) { + replaced = true; + e.setValue(newValue); + } + return replaced; + } finally { + unlock(); + } + } + + V replace(K key, int hash, V newValue) { + lock(); + try { + HashEntry e = getFirst(hash); + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + oldValue = e.value(); + e.setValue(newValue); + } + return oldValue; + } finally { + unlock(); + } + } + + V put(K key, int hash, V value, boolean onlyIfAbsent) { + lock(); + try { + int c = count; + if (c++ > threshold) { // ensure capacity + int reduced = rehash(); + if (reduced > 0) { + count = (c -= reduced) - 1; // write-volatile + } + } + + HashEntry[] tab = table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + while (e != null && (e.hash != hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue; + if (e != null) { + oldValue = e.value(); + if (!onlyIfAbsent) { + e.setValue(value); + } + } else { + oldValue = null; + ++modCount; + tab[index] = newHashEntry(key, hash, first, value); + count = c; // write-volatile + } + return oldValue; + } finally { + unlock(); + } + } + + int rehash() { + HashEntry[] oldTable = table; + int oldCapacity = oldTable.length; + if (oldCapacity >= MAXIMUM_CAPACITY) { + return 0; + } + + /* + * Reclassify nodes in each list to new Map. Because we are using + * power-of-two expansion, the elements from each bin must either + * stay at same index, or move with a power of two offset. We + * eliminate unnecessary node creation by catching cases where old + * nodes can be reused because their next fields won't change. + * Statistically, at the default threshold, only about one-sixth of + * them need cloning when a table doubles. The nodes they replace + * will be garbage collectable as soon as they are no longer + * referenced by any reader thread that may be in the midst of + * traversing table right now. + */ + + HashEntry[] newTable = HashEntry.newArray(oldCapacity << 1); + threshold = (int) (newTable.length * loadFactor); + int sizeMask = newTable.length - 1; + int reduce = 0; + for (int i = 0; i < oldCapacity; i++) { + // We need to guarantee that any existing reads of old Map can + // proceed. So we cannot yet null out each bin. + HashEntry e = oldTable[i]; + + if (e != null) { + HashEntry next = e.next; + int idx = e.hash & sizeMask; + + // Single node on list + if (next == null) { + newTable[idx] = e; + } else { + // Reuse trailing consecutive sequence at same slot + HashEntry lastRun = e; + int lastIdx = idx; + for (HashEntry last = next; last != null; last = last.next) { + int k = last.hash & sizeMask; + if (k != lastIdx) { + lastIdx = k; + lastRun = last; + } + } + newTable[lastIdx] = lastRun; + // Clone all remaining nodes + for (HashEntry p = e; p != lastRun; p = p.next) { + // Skip GC'd weak references + K key = p.key(); + if (key == null) { + reduce++; + continue; + } + int k = p.hash & sizeMask; + HashEntry n = newTable[k]; + newTable[k] = newHashEntry(key, p.hash, n, p.value()); + } + } + } + } + table = newTable; + return reduce; + } + + /** + * Remove; match on key only if value null, else match both. + */ + V remove(Object key, int hash, Object value, boolean refRemove) { + lock(); + try { + int c = count - 1; + HashEntry[] tab = table; + int index = hash & tab.length - 1; + HashEntry first = tab[index]; + HashEntry e = first; + // a reference remove operation compares the Reference instance + while (e != null && key != e.key && + (refRemove || hash != e.hash || !keyEq(key, e.key()))) { + e = e.next; + } + + V oldValue = null; + if (e != null) { + V v = e.value(); + if (value == null || value.equals(v)) { + oldValue = v; + // All entries following removed node can stay in list, + // but all preceding ones need to be cloned. + ++modCount; + HashEntry newFirst = e.next; + for (HashEntry p = first; p != e; p = p.next) { + K pKey = p.key(); + if (pKey == null) { // Skip GC'd keys + c--; + continue; + } + + newFirst = newHashEntry( + pKey, p.hash, newFirst, p.value()); + } + tab[index] = newFirst; + count = c; // write-volatile + } + } + return oldValue; + } finally { + unlock(); + } + } + + void clear() { + if (count != 0) { + lock(); + try { + HashEntry[] tab = table; + for (int i = 0; i < tab.length; i++) { + tab[i] = null; + } + ++modCount; + count = 0; // write-volatile + } finally { + unlock(); + } + } + } + } + + /* ---------------- Public operations -------------- */ + + /** + * Creates a new, empty map with the specified initial capacity, load factor + * and concurrency level. + * + * @param initialCapacity the initial capacity. The implementation performs + * internal sizing to accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of + * elements per bin exceeds this threshold. + * @param concurrencyLevel the estimated number of concurrently updating + * threads. The implementation performs internal + * sizing to try to accommodate this many threads. + * @throws IllegalArgumentException if the initial capacity is negative or + * the load factor or concurrencyLevel are + * nonpositive. + */ + public ConcurrentIdentityHashMap( + int initialCapacity, float loadFactor, + int concurrencyLevel) { + if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) { + throw new IllegalArgumentException(); + } + + if (concurrencyLevel > MAX_SEGMENTS) { + concurrencyLevel = MAX_SEGMENTS; + } + + // Find power-of-two sizes best matching arguments + int sshift = 0; + int ssize = 1; + while (ssize < concurrencyLevel) { + ++sshift; + ssize <<= 1; + } + segmentShift = 32 - sshift; + segmentMask = ssize - 1; + this.segments = Segment.newArray(ssize); + + if (initialCapacity > MAXIMUM_CAPACITY) { + initialCapacity = MAXIMUM_CAPACITY; + } + int c = initialCapacity / ssize; + if (c * ssize < initialCapacity) { + ++c; + } + int cap = 1; + while (cap < c) { + cap <<= 1; + } + + for (int i = 0; i < this.segments.length; ++i) { + this.segments[i] = new Segment(cap, loadFactor); + } + } + + + /** + * Creates a new, empty map with the specified initial capacity and load + * factor and with the default reference types (weak keys, strong values), + * and concurrencyLevel (16). + * + * @param initialCapacity The implementation performs internal sizing to + * accommodate this many elements. + * @param loadFactor the load factor threshold, used to control resizing. + * Resizing may be performed when the average number of + * elements per bin exceeds this threshold. + * @throws IllegalArgumentException if the initial capacity of elements is + * negative or the load factor is + * nonpositive + */ + public ConcurrentIdentityHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with the specified initial capacity, and with + * default reference types (weak keys, strong values), load factor (0.75) + * and concurrencyLevel (16). + * + * @param initialCapacity the initial capacity. The implementation performs + * internal sizing to accommodate this many elements. + * @throws IllegalArgumentException if the initial capacity of elements is + * negative. + */ + public ConcurrentIdentityHashMap(int initialCapacity) { + this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new, empty map with a default initial capacity (16), reference + * types (weak keys, strong values), default load factor (0.75) and + * concurrencyLevel (16). + */ + public ConcurrentIdentityHashMap() { + this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } + + /** + * Creates a new map with the same mappings as the given map. The map is + * created with a capacity of 1.5 times the number of mappings in the given + * map or 16 (whichever is greater), and a default load factor (0.75) and + * concurrencyLevel (16). + * + * @param m the map + */ + public ConcurrentIdentityHashMap(Map m) { + this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, + DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, + DEFAULT_CONCURRENCY_LEVEL); + putAll(m); + } + + /** + * Returns true if this map contains no key-value mappings. + * + * @return true if this map contains no key-value mappings + */ + @Override + public boolean isEmpty() { + final Segment[] segments = this.segments; + /* + * We keep track of per-segment modCounts to avoid ABA problems in which + * an element in one segment was added and in another removed during + * traversal, in which case the table was never actually empty at any + * point. Note the similar use of modCounts in the size() and + * containsValue() methods, which are the only other methods also + * susceptible to ABA problems. + */ + int[] mc = new int[segments.length]; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + if (segments[i].count != 0) { + return false; + } else { + mcsum += mc[i] = segments[i].modCount; + } + } + // If mcsum happens to be zero, then we know we got a snapshot before + // any modifications at all were made. This is probably common enough + // to bother tracking. + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + if (segments[i].count != 0 || mc[i] != segments[i].modCount) { + return false; + } + } + } + return true; + } + + /** + * Returns the number of key-value mappings in this map. If the map contains + * more than Integer.MAX_VALUE elements, returns + * Integer.MAX_VALUE. + * + * @return the number of key-value mappings in this map + */ + @Override + public int size() { + final Segment[] segments = this.segments; + long sum = 0; + long check = 0; + int[] mc = new int[segments.length]; + // Try a few times to get accurate count. On failure due to continuous + // async changes in table, resort to locking. + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { + check = 0; + sum = 0; + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + sum += segments[i].count; + mcsum += mc[i] = segments[i].modCount; + } + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + check += segments[i].count; + if (mc[i] != segments[i].modCount) { + check = -1; // force retry + break; + } + } + } + if (check == sum) { + break; + } + } + if (check != sum) { // Resort to locking all segments + sum = 0; + for (int i = 0; i < segments.length; ++i) { + segments[i].lock(); + } + for (int i = 0; i < segments.length; ++i) { + sum += segments[i].count; + } + for (int i = 0; i < segments.length; ++i) { + segments[i].unlock(); + } + } + if (sum > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int) sum; + } + } + + /** + * Returns the value to which the specified key is mapped, or {@code null} + * if this map contains no mapping for the key. + * + *

More formally, if this map contains a mapping from a key {@code k} to + * a value {@code v} such that {@code key.equals(k)}, then this method + * returns {@code v}; otherwise it returns {@code null}. (There can be at + * most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ + @Override + public V get(Object key) { + int hash = hashOf(key); + return segmentFor(hash).get(key, hash); + } + + /** + * Tests if the specified object is a key in this table. + * + * @param key possible key + * @return true if and only if the specified object is a key in + * this table, as determined by the equals method; + * false otherwise. + * @throws NullPointerException if the specified key is null + */ + @Override + public boolean containsKey(Object key) { + int hash = hashOf(key); + return segmentFor(hash).containsKey(key, hash); + } + + /** + * Returns true if this map maps one or more keys to the specified + * value. Note: This method requires a full internal traversal of the hash + * table, and so is much slower than method containsKey. + * + * @param value value whose presence in this map is to be tested + * @return true if this map maps one or more keys to the specified + * value + * @throws NullPointerException if the specified value is null + */ + + @Override + public boolean containsValue(Object value) { + if (value == null) { + throw new NullPointerException(); + } + + // See explanation of modCount use above + + final Segment[] segments = this.segments; + int[] mc = new int[segments.length]; + + // Try a few times without locking + for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { + int mcsum = 0; + for (int i = 0; i < segments.length; ++i) { + mcsum += mc[i] = segments[i].modCount; + if (segments[i].containsValue(value)) { + return true; + } + } + boolean cleanSweep = true; + if (mcsum != 0) { + for (int i = 0; i < segments.length; ++i) { + if (mc[i] != segments[i].modCount) { + cleanSweep = false; + break; + } + } + } + if (cleanSweep) { + return false; + } + } + // Resort to locking all segments + for (int i = 0; i < segments.length; ++i) { + segments[i].lock(); + } + boolean found = false; + try { + for (int i = 0; i < segments.length; ++i) { + if (segments[i].containsValue(value)) { + found = true; + break; + } + } + } finally { + for (int i = 0; i < segments.length; ++i) { + segments[i].unlock(); + } + } + return found; + } + + /** + * Legacy method testing if some key maps into the specified value in this + * table. This method is identical in functionality to + * {@link #containsValue}, and exists solely to ensure full compatibility + * with class {@link Hashtable}, which supported this method prior to + * introduction of the Java Collections framework. + * + * @param value a value to search for + * @return true if and only if some key maps to the value + * argument in this table as determined by the equals + * method; false otherwise + * @throws NullPointerException if the specified value is null + */ + public boolean contains(Object value) { + return containsValue(value); + } + + /** + * Maps the specified key to the specified value in this table. Neither the + * key nor the value can be null. + * + *

The value can be retrieved by calling the get method with a + * key that is equal to the original key. + * + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @return the previous value associated with key, or null + * if there was no mapping for key + * @throws NullPointerException if the specified key or value is null + */ + @Override + public V put(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, false); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + public V putIfAbsent(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).put(key, hash, value, true); + } + + /** + * Copies all of the mappings from the specified map to this one. These + * mappings replace any mappings that this map had for any of the keys + * currently in the specified map. + * + * @param m mappings to be stored in this map + */ + @Override + public void putAll(Map m) { + for (Map.Entry e : m.entrySet()) { + put(e.getKey(), e.getValue()); + } + } + + /** + * Removes the key (and its corresponding value) from this map. This method + * does nothing if the key is not in the map. + * + * @param key the key that needs to be removed + * @return the previous value associated with key, or null + * if there was no mapping for key + * @throws NullPointerException if the specified key is null + */ + @Override + public V remove(Object key) { + int hash = hashOf(key); + return segmentFor(hash).remove(key, hash, null, false); + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if the specified key is null + */ + public boolean remove(Object key, Object value) { + int hash = hashOf(key); + if (value == null) { + return false; + } + return segmentFor(hash).remove(key, hash, value, false) != null; + } + + /** + * {@inheritDoc} + * + * @throws NullPointerException if any of the arguments are null + */ + public boolean replace(K key, V oldValue, V newValue) { + if (oldValue == null || newValue == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, oldValue, newValue); + } + + /** + * {@inheritDoc} + * + * @return the previous value associated with the specified key, or + * null if there was no mapping for the key + * @throws NullPointerException if the specified key or value is null + */ + public V replace(K key, V value) { + if (value == null) { + throw new NullPointerException(); + } + int hash = hashOf(key); + return segmentFor(hash).replace(key, hash, value); + } + + /** + * Removes all of the mappings from this map. + */ + @Override + public void clear() { + for (int i = 0; i < segments.length; ++i) { + segments[i].clear(); + } + } + + /** + * Returns a {@link Set} view of the keys contained in this map. The set is + * backed by the map, so changes to the map are reflected in the set, and + * vice-versa. The set supports element removal, which removes the + * corresponding mapping from this map, via the Iterator.remove, + * Set.remove, removeAll, retainAll, and + * clear operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Set keySet() { + Set ks = keySet; + return ks != null ? ks : (keySet = new KeySet()); + } + + /** + * Returns a {@link Collection} view of the values contained in this map. + * The collection is backed by the map, so changes to the map are reflected + * in the collection, and vice-versa. The collection supports element + * removal, which removes the corresponding mapping from this map, via the + * Iterator.remove, Collection.remove, removeAll, + * retainAll, and clear operations. It does not support + * the add or addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Collection values() { + Collection vs = values; + return vs != null ? vs : (values = new Values()); + } + + /** + * Returns a {@link Set} view of the mappings contained in this map. + * The set is backed by the map, so changes to the map are reflected in the + * set, and vice-versa. The set supports element removal, which removes the + * corresponding mapping from the map, via the Iterator.remove, + * Set.remove, removeAll, retainAll, and + * clear operations. It does not support the add or + * addAll operations. + * + *

The view's iterator is a "weakly consistent" iterator that + * will never throw {@link ConcurrentModificationException}, and guarantees + * to traverse elements as they existed upon construction of the iterator, + * and may (but is not guaranteed to) reflect any modifications subsequent + * to construction. + */ + @Override + public Set> entrySet() { + Set> es = entrySet; + return es != null ? es : (entrySet = new EntrySet()); + } + + /** + * Returns an enumeration of the keys in this table. + * + * @return an enumeration of the keys in this table + * @see #keySet() + */ + public Enumeration keys() { + return new KeyIterator(); + } + + /** + * Returns an enumeration of the values in this table. + * + * @return an enumeration of the values in this table + * @see #values() + */ + public Enumeration elements() { + return new ValueIterator(); + } + + /* ---------------- Iterator Support -------------- */ + + abstract class HashIterator { + int nextSegmentIndex; + int nextTableIndex; + HashEntry[] currentTable; + HashEntry nextEntry; + HashEntry lastReturned; + K currentKey; // Strong reference to weak key (prevents gc) + + HashIterator() { + nextSegmentIndex = segments.length - 1; + nextTableIndex = -1; + advance(); + } + + public void rewind() { + nextSegmentIndex = segments.length - 1; + nextTableIndex = -1; + currentTable = null; + nextEntry = null; + lastReturned = null; + currentKey = null; + advance(); + } + + public boolean hasMoreElements() { + return hasNext(); + } + + final void advance() { + if (nextEntry != null && (nextEntry = nextEntry.next) != null) { + return; + } + + while (nextTableIndex >= 0) { + if ((nextEntry = currentTable[nextTableIndex--]) != null) { + return; + } + } + + while (nextSegmentIndex >= 0) { + Segment seg = segments[nextSegmentIndex--]; + if (seg.count != 0) { + currentTable = seg.table; + for (int j = currentTable.length - 1; j >= 0; --j) { + if ((nextEntry = currentTable[j]) != null) { + nextTableIndex = j - 1; + return; + } + } + } + } + } + + public boolean hasNext() { + while (nextEntry != null) { + if (nextEntry.key() != null) { + return true; + } + advance(); + } + + return false; + } + + HashEntry nextEntry() { + do { + if (nextEntry == null) { + throw new NoSuchElementException(); + } + + lastReturned = nextEntry; + currentKey = lastReturned.key(); + advance(); + } while (currentKey == null); // Skip GC'd keys + + return lastReturned; + } + + public void remove() { + if (lastReturned == null) { + throw new IllegalStateException(); + } + ConcurrentIdentityHashMap.this.remove(currentKey); + lastReturned = null; + } + } + + final class KeyIterator + extends HashIterator implements ReusableIterator, Enumeration { + + public K next() { + return super.nextEntry().key(); + } + + public K nextElement() { + return super.nextEntry().key(); + } + } + + final class ValueIterator + extends HashIterator implements ReusableIterator, Enumeration { + + public V next() { + return super.nextEntry().value(); + } + + public V nextElement() { + return super.nextEntry().value(); + } + } + + /* + * This class is needed for JDK5 compatibility. + */ + + static class SimpleEntry implements Entry { + + private static final long serialVersionUID = -8144765946475398746L; + + private final K key; + + private V value; + + public SimpleEntry(K key, V value) { + this.key = key; + this.value = value; + + } + + public SimpleEntry(Entry entry) { + this.key = entry.getKey(); + this.value = entry.getValue(); + + } + + public K getKey() { + return key; + } + + public V getValue() { + return value; + } + + public V setValue(V value) { + V oldValue = this.value; + this.value = value; + return oldValue; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + @SuppressWarnings("unchecked") + Map.Entry e = (Map.Entry) o; + return eq(key, e.getKey()) && eq(value, e.getValue()); + } + + @Override + public int hashCode() { + return (key == null ? 0 : key.hashCode()) ^ (value == null ? 0 : value.hashCode()); + } + + @Override + public String toString() { + return key + "=" + value; + } + + private static boolean eq(Object o1, Object o2) { + return o1 == null ? o2 == null : o1.equals(o2); + } + } + + /** + * Custom Entry class used by EntryIterator.next(), that relays setValue + * changes to the underlying map. + */ + final class WriteThroughEntry extends SimpleEntry { + + WriteThroughEntry(K k, V v) { + super(k, v); + } + + /** + * Set our entry's value and write through to the map. The value to + * return is somewhat arbitrary here. Since a WriteThroughEntry does not + * necessarily track asynchronous changes, the most recent "previous" + * value could be different from what we return (or could even have been + * removed in which case the put will re-establish). We do not and can + * not guarantee more. + */ + @Override + public V setValue(V value) { + + if (value == null) { + throw new NullPointerException(); + } + V v = super.setValue(value); + ConcurrentIdentityHashMap.this.put(getKey(), value); + return v; + } + + } + + final class EntryIterator extends HashIterator implements + ReusableIterator> { + public Map.Entry next() { + HashEntry e = super.nextEntry(); + return new WriteThroughEntry(e.key(), e.value()); + } + } + + final class KeySet extends AbstractSet { + @Override + public Iterator iterator() { + + return new KeyIterator(); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentIdentityHashMap.this.containsKey(o); + } + + @Override + public boolean remove(Object o) { + return ConcurrentIdentityHashMap.this.remove(o) != null; + + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } + + final class Values extends AbstractCollection { + @Override + public Iterator iterator() { + return new ValueIterator(); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return ConcurrentIdentityHashMap.this.containsValue(o); + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } + + final class EntrySet extends AbstractSet> { + @Override + public Iterator> iterator() { + return new EntryIterator(); + } + + @Override + public boolean contains(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry) o; + V v = ConcurrentIdentityHashMap.this.get(e.getKey()); + return v != null && v.equals(e.getValue()); + } + + @Override + public boolean remove(Object o) { + if (!(o instanceof Map.Entry)) { + return false; + } + Map.Entry e = (Map.Entry) o; + return ConcurrentIdentityHashMap.this.remove(e.getKey(), e.getValue()); + } + + @Override + public int size() { + return ConcurrentIdentityHashMap.this.size(); + } + + @Override + public boolean isEmpty() { + return ConcurrentIdentityHashMap.this.isEmpty(); + } + + @Override + public void clear() { + ConcurrentIdentityHashMap.this.clear(); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java index c4ba384fef5..d3328760ba2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/ImmutableSettings.java @@ -32,6 +32,8 @@ import java.util.*; import java.util.concurrent.TimeUnit; import static com.google.common.collect.Lists.*; +import static org.elasticsearch.util.SizeValue.*; +import static org.elasticsearch.util.TimeValue.*; /** * An immutable implementation of {@link Settings}. @@ -162,11 +164,11 @@ public class ImmutableSettings implements Settings { } @Override public TimeValue getAsTime(String setting, TimeValue defaultValue) { - return TimeValue.parseTimeValue(get(setting), defaultValue); + return parseTimeValue(get(setting), defaultValue); } @Override public SizeValue getAsSize(String setting, SizeValue defaultValue) throws SettingsException { - return SizeValue.parse(get(setting), defaultValue); + return parseSizeValue(get(setting), defaultValue); } @SuppressWarnings({"unchecked"}) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java index aed73146c42..a126cc83e0b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/settings/Settings.java @@ -162,7 +162,7 @@ public interface Settings { * @param defaultValue The value to return if no value is associated with the setting * @return The (size) value, or the default value if no value exists. * @throws SettingsException Failure to parse the setting - * @see SizeValue#parse(String, SizeValue) + * @see SizeValue#parseSizeValue(String, SizeValue) */ SizeValue getAsSize(String setting, SizeValue defaultValue) throws SettingsException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/HashedWheelTimer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/HashedWheelTimer.java new file mode 100644 index 00000000000..832a5347410 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/HashedWheelTimer.java @@ -0,0 +1,489 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.timer; + +import org.elasticsearch.util.MapBackedSet; +import org.elasticsearch.util.ReusableIterator; +import org.elasticsearch.util.ThreadRenamingRunnable; +import org.elasticsearch.util.concurrent.ConcurrentIdentityHashMap; +import org.slf4j.Logger; + +import java.util.*; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A {@link Timer} optimized for approximated I/O timeout scheduling. + * + *

Tick Duration

+ * + * As described with 'approximated', this timer does not execute the scheduled + * {@link TimerTask} on time. {@link HashedWheelTimer}, on every tick, will + * check if there are any {@link TimerTask}s behind the schedule and execute + * them. + *

+ * You can increase or decrease the accuracy of the execution timing by + * specifying smaller or larger tick duration in the constructor. In most + * network applications, I/O timeout does not need to be accurate. Therefore, + * the default tick duration is 100 milliseconds and you will not need to try + * different configurations in most cases. + * + *

Ticks per Wheel (Wheel Size)

+ * + * {@link HashedWheelTimer} maintains a data structure called 'wheel'. + * To put simply, a wheel is a hash table of {@link TimerTask}s whose hash + * function is 'dead line of the task'. The default number of ticks per wheel + * (i.e. the size of the wheel) is 512. You could specify a larger value + * if you are going to schedule a lot of timeouts. + * + *

Implementation Details

+ * + * {@link HashedWheelTimer} is based on + * 'Hashed + * and Hierarchical Timing Wheels: data structures to efficiently implement a + * timer facility'. More comprehensive slides are located + * here. + * + * @author kimchy (shay.banon) + */ +public class HashedWheelTimer implements Timer { + + private final Logger logger; + + private static final AtomicInteger id = new AtomicInteger(); + + // I'd say 64 active timer threads are obvious misuse. + private static final int MISUSE_WARNING_THRESHOLD = 64; + private static final AtomicInteger activeInstances = new AtomicInteger(); + private static final AtomicBoolean loggedMisuseWarning = new AtomicBoolean(); + + private final Worker worker = new Worker(); + final Thread workerThread; + final AtomicBoolean shutdown = new AtomicBoolean(); + + private final long roundDuration; + final long tickDuration; + final Set[] wheel; + final ReusableIterator[] iterators; + final int mask; + final ReadWriteLock lock = new ReentrantReadWriteLock(); + volatile int wheelCursor; + + /** + * Creates a new timer with the default number of ticks per wheel. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + */ + public HashedWheelTimer(Logger logger, ThreadFactory threadFactory, long tickDuration, TimeUnit unit) { + this(logger, threadFactory, tickDuration, unit, 512); + } + + /** + * Creates a new timer. + * + * @param threadFactory a {@link ThreadFactory} that creates a + * background {@link Thread} which is dedicated to + * {@link TimerTask} execution. + * @param tickDuration the duration between tick + * @param unit the time unit of the {@code tickDuration} + * @param ticksPerWheel the size of the wheel + */ + public HashedWheelTimer(Logger logger, ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { + this.logger = logger; + + if (threadFactory == null) { + throw new NullPointerException("threadFactory"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (tickDuration <= 0) { + throw new IllegalArgumentException( + "tickDuration must be greater than 0: " + tickDuration); + } + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + + // Normalize ticksPerWheel to power of two and initialize the wheel. + wheel = createWheel(ticksPerWheel); + iterators = createIterators(wheel); + mask = wheel.length - 1; + + // Convert tickDuration to milliseconds. + this.tickDuration = tickDuration = unit.toMillis(tickDuration); + + // Prevent overflow. + if (tickDuration == Long.MAX_VALUE || + tickDuration >= Long.MAX_VALUE / wheel.length) { + throw new IllegalArgumentException( + "tickDuration is too long: " + + tickDuration + ' ' + unit); + } + + roundDuration = tickDuration * wheel.length; + + workerThread = threadFactory.newThread(new ThreadRenamingRunnable( + worker, "Hashed wheel timer #" + id.incrementAndGet())); + + // Misuse check + int activeInstances = HashedWheelTimer.activeInstances.incrementAndGet(); + if (activeInstances >= MISUSE_WARNING_THRESHOLD && + loggedMisuseWarning.compareAndSet(false, true)) { + logger.debug( + "There are too many active " + + HashedWheelTimer.class.getSimpleName() + " instances (" + + activeInstances + ") - you should share the small number " + + "of instances to avoid excessive resource consumption."); + } + } + + @SuppressWarnings("unchecked") + private static Set[] createWheel(int ticksPerWheel) { + if (ticksPerWheel <= 0) { + throw new IllegalArgumentException( + "ticksPerWheel must be greater than 0: " + ticksPerWheel); + } + if (ticksPerWheel > 1073741824) { + throw new IllegalArgumentException( + "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); + } + + ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); + Set[] wheel = new Set[ticksPerWheel]; + for (int i = 0; i < wheel.length; i++) { + wheel[i] = new MapBackedSet( + new ConcurrentIdentityHashMap(16, 0.95f, 4)); + } + return wheel; + } + + @SuppressWarnings("unchecked") + private static ReusableIterator[] createIterators(Set[] wheel) { + ReusableIterator[] iterators = new ReusableIterator[wheel.length]; + for (int i = 0; i < wheel.length; i++) { + iterators[i] = (ReusableIterator) wheel[i].iterator(); + } + return iterators; + } + + private static int normalizeTicksPerWheel(int ticksPerWheel) { + int normalizedTicksPerWheel = 1; + while (normalizedTicksPerWheel < ticksPerWheel) { + normalizedTicksPerWheel <<= 1; + } + return normalizedTicksPerWheel; + } + + /** + * Starts the background thread explicitly. The background thread will + * start automatically on demand even if you did not call this method. + * + * @throws IllegalStateException if this timer has been + * {@linkplain #stop() stopped} already + */ + public synchronized void start() { + if (shutdown.get()) { + throw new IllegalStateException("cannot be started once stopped"); + } + + if (!workerThread.isAlive()) { + workerThread.start(); + } + } + + public synchronized Set stop() { + if (!shutdown.compareAndSet(false, true)) { + return Collections.emptySet(); + } + + boolean interrupted = false; + while (workerThread.isAlive()) { + workerThread.interrupt(); + try { + workerThread.join(100); + } catch (InterruptedException e) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + + activeInstances.decrementAndGet(); + + Set unprocessedTimeouts = new HashSet(); + for (Set bucket : wheel) { + unprocessedTimeouts.addAll(bucket); + bucket.clear(); + } + + return Collections.unmodifiableSet(unprocessedTimeouts); + } + + public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + final long currentTime = System.currentTimeMillis(); + + if (task == null) { + throw new NullPointerException("task"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + + delay = unit.toMillis(delay); + if (delay < tickDuration) { + delay = tickDuration; + } + + if (!workerThread.isAlive()) { + start(); + } + + // Prepare the required parameters to create the timeout object. + HashedWheelTimeout timeout; + final long lastRoundDelay = delay % roundDuration; + final long lastTickDelay = delay % tickDuration; + final long relativeIndex = + lastRoundDelay / tickDuration + (lastTickDelay != 0 ? 1 : 0); + final long deadline = currentTime + delay; + + final long remainingRounds = + delay / roundDuration - (delay % roundDuration == 0 ? 1 : 0); + + // Add the timeout to the wheel. + lock.readLock().lock(); + try { + timeout = + new HashedWheelTimeout( + task, deadline, + (int) (wheelCursor + relativeIndex & mask), + remainingRounds); + + wheel[timeout.stopIndex].add(timeout); + } finally { + lock.readLock().unlock(); + } + + return timeout; + } + + private final class Worker implements Runnable { + + private long startTime; + private long tick; + + Worker() { + super(); + } + + public void run() { + List expiredTimeouts = + new ArrayList(); + + startTime = System.currentTimeMillis(); + tick = 1; + + while (!shutdown.get()) { + waitForNextTick(); + fetchExpiredTimeouts(expiredTimeouts); + notifyExpiredTimeouts(expiredTimeouts); + } + } + + private void fetchExpiredTimeouts( + List expiredTimeouts) { + + // Find the expired timeouts and decrease the round counter + // if necessary. Note that we don't send the notification + // immediately to make sure the listeners are called without + // an exclusive lock. + lock.writeLock().lock(); + try { + int oldBucketHead = wheelCursor; + int newBucketHead = oldBucketHead + 1 & mask; + wheelCursor = newBucketHead; + + ReusableIterator i = iterators[oldBucketHead]; + fetchExpiredTimeouts(expiredTimeouts, i); + } finally { + lock.writeLock().unlock(); + } + } + + private void fetchExpiredTimeouts( + List expiredTimeouts, + ReusableIterator i) { + + long currentDeadline = System.currentTimeMillis() + tickDuration; + i.rewind(); + while (i.hasNext()) { + HashedWheelTimeout timeout = i.next(); + if (timeout.remainingRounds <= 0) { + if (timeout.deadline < currentDeadline) { + i.remove(); + expiredTimeouts.add(timeout); + } else { + // A rare case where a timeout is put for the next + // round: just wait for the next round. + } + } else { + timeout.remainingRounds--; + } + } + } + + private void notifyExpiredTimeouts( + List expiredTimeouts) { + // Notify the expired timeouts. + for (int i = expiredTimeouts.size() - 1; i >= 0; i--) { + expiredTimeouts.get(i).expire(); + } + + // Clean up the temporary list. + expiredTimeouts.clear(); + } + + private void waitForNextTick() { + for (; ;) { + final long currentTime = System.currentTimeMillis(); + final long sleepTime = tickDuration * tick - (currentTime - startTime); + + if (sleepTime <= 0) { + break; + } + + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + if (shutdown.get()) { + return; + } + } + } + + // Reset the tick if overflow is expected. + if (tickDuration * tick > Long.MAX_VALUE - tickDuration) { + startTime = System.currentTimeMillis(); + tick = 1; + } else { + // Increase the tick if overflow is not likely to happen. + tick++; + } + } + } + + private final class HashedWheelTimeout implements Timeout { + + private final TimerTask task; + final int stopIndex; + final long deadline; + volatile long remainingRounds; + private volatile boolean cancelled; + + HashedWheelTimeout( + TimerTask task, long deadline, int stopIndex, long remainingRounds) { + this.task = task; + this.deadline = deadline; + this.stopIndex = stopIndex; + this.remainingRounds = remainingRounds; + } + + public Timer getTimer() { + return HashedWheelTimer.this; + } + + public TimerTask getTask() { + return task; + } + + public void cancel() { + if (isExpired()) { + return; + } + + cancelled = true; + + // Might be called more than once, but doesn't matter. + wheel[stopIndex].remove(this); + } + + public boolean isCancelled() { + return cancelled; + } + + public boolean isExpired() { + return cancelled || System.currentTimeMillis() > deadline; + } + + public void expire() { + if (cancelled) { + return; + } + + try { + task.run(this); + } catch (Throwable t) { + logger.warn( + "An exception was thrown by " + + TimerTask.class.getSimpleName() + ".", t); + } + } + + @Override + public String toString() { + long currentTime = System.currentTimeMillis(); + long remaining = deadline - currentTime; + + StringBuilder buf = new StringBuilder(192); + buf.append(getClass().getSimpleName()); + buf.append('('); + + buf.append("deadline: "); + if (remaining > 0) { + buf.append(remaining); + buf.append(" ms later, "); + } else if (remaining < 0) { + buf.append(-remaining); + buf.append(" ms ago, "); + } else { + buf.append("now, "); + } + + if (isCancelled()) { + buf.append(", cancelled"); + } + + return buf.append(')').toString(); + } + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timeout.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timeout.java new file mode 100644 index 00000000000..ebcbc076123 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timeout.java @@ -0,0 +1,58 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.timer; + +/** + * A handle associated with a {@link TimerTask} that is returned by a + * {@link Timer}. + * + * @author kimchy (Shay Banon) + */ +public interface Timeout { + + /** + * Returns the {@link Timer} that created this handle. + */ + Timer getTimer(); + + /** + * Returns the {@link TimerTask} which is associated with this handle. + */ + TimerTask getTask(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been expired. + */ + boolean isExpired(); + + /** + * Returns {@code true} if and only if the {@link TimerTask} associated + * with this handle has been cancelled. + */ + boolean isCancelled(); + + /** + * Cancels the {@link TimerTask} associated with this handle. It the + * task has been executed or cancelled already, it will return with no + * side effect. + */ + void cancel(); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timer.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timer.java new file mode 100644 index 00000000000..4e2c0f37fe1 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/Timer.java @@ -0,0 +1,51 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.timer; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * Schedules {@link TimerTask}s for one-time future execution in a background + * thread. + * + * @author kimchy (Shay Banon) + */ +public interface Timer { + + /** + * Schedules the specified {@link TimerTask} for one-time execution after + * the specified delay. + * + * @return a handle which is associated with the specified task + * @throws IllegalStateException if this timer has been + * {@linkplain #stop() stopped} already + */ + Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + + /** + * Releases all resources acquired by this {@link Timer} and cancels all + * tasks which were scheduled but not executed yet. + * + * @return the handles associated with the tasks which were canceled by + * this method + */ + Set stop(); +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/TimerTask.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/TimerTask.java new file mode 100644 index 00000000000..7eeb8f09248 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/timer/TimerTask.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.timer; + +/** + * A task which is executed after the delay specified with + * {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}. + * + * @author kimchy (Shay Banon) + */ +public interface TimerTask { + + /** + * Executed after the delay specified with + * {@link Timer#newTimeout(TimerTask, long, java.util.concurrent.TimeUnit)}. + * + * @param timeout a handle which is associated with this task + */ + void run(Timeout timeout) throws Exception; +} diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java index ddf05acb9ac..f282fcc7e7d 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/SingleInstanceEmbeddedSearchTests.java @@ -20,6 +20,8 @@ package org.elasticsearch.test.integration.search; import org.elasticsearch.client.Client; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -35,6 +37,7 @@ import org.elasticsearch.search.query.QuerySearchRequest; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.server.internal.InternalServer; import org.elasticsearch.test.integration.AbstractServersTests; +import org.elasticsearch.util.TimeValue; import org.elasticsearch.util.trove.ExtTIntArrayList; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -169,6 +172,27 @@ public class SingleInstanceEmbeddedSearchTests extends AbstractServersTests { assertThat(queryResult.facets().countFacet("age1").count(), equalTo(1l)); } + @Test public void testQueryFetchKeepAliveTimeout() throws Exception { + QuerySearchResult queryResult = searchService.executeQueryPhase(searchRequest(searchSource().query(termQuery("name", "test1"))).scroll(new Scroll(TimeValue.timeValueMillis(10)))); + assertThat(queryResult.topDocs().totalHits, equalTo(1)); + + ShardDoc[] sortedShardList = searchPhaseController.sortDocs(newArrayList(queryResult)); + Map docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList); + assertThat(docIdsToLoad.size(), equalTo(1)); + assertThat(docIdsToLoad.values().iterator().next().size(), equalTo(1)); + + // sleep more than the 100ms the timeout wheel it set to + Thread.sleep(300); + + try { + searchService.executeFetchPhase(new FetchSearchRequest(queryResult.id(), docIdsToLoad.values().iterator().next())); + assert true : "context should be missing since it timed out"; + } catch (SearchContextMissingException e) { + // all is well + } + } + + private InternalSearchRequest searchRequest(SearchSourceBuilder builder) { return new InternalSearchRequest("test", 0, builder.build()); }