Replace ContextAndHeaders with a ThreadPool based ThreadLocal implementation

ContextAndHeaders has a massive impact on the core infrastructure since it has to
be manually passed on to all relevant places across threads/network calls etc. For the same reason
it's also very error prone and easily forgotten on potentially relevant APIs.

The new ThreadContext is associated with a ThreadPool (node or transport client) and ensures that
headers and context registered on a current thread are inherited to new threads spawned, send across
the network to be deserialized on the receiver end as well as restored on the response handling thread
once the response is received.
This commit is contained in:
Simon Willnauer 2015-12-23 14:46:54 +01:00
parent a2796b555f
commit 574d1b35b3
256 changed files with 1462 additions and 2656 deletions

View File

@ -34,13 +34,6 @@ public abstract class ActionRequest<T extends ActionRequest> extends TransportRe
super();
}
protected ActionRequest(ActionRequest request) {
super(request);
// this does not set the listenerThreaded API, if needed, its up to the caller to set it
// since most times, we actually want it to not be threaded...
// this.listenerThreaded = request.listenerThreaded();
}
public abstract ActionRequestValidationException validate();
@Override

View File

@ -49,12 +49,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
return this.request;
}
@SuppressWarnings("unchecked")
public final RequestBuilder putHeader(String key, Object value) {
request.putHeader(key, value);
return (RequestBuilder) this;
}
public ListenableActionFuture<Response> execute() {
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
execute(future);

View File

@ -141,7 +141,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
final ClusterState state = observer.observedState();
if (waitFor == 0 || request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));

View File

@ -102,7 +102,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
}
NodeRequest(String nodeId, NodesHotThreadsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -96,7 +96,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
}
NodeInfoRequest(String nodeId, NodesInfoRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -96,7 +96,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
}
NodeStatsRequest(String nodeId, NodesStatsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -146,8 +145,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public Request() {
}
public Request(ActionRequest request, String[] nodesIds) {
super(request, nodesIds);
public Request(String[] nodesIds) {
super(nodesIds);
}
public Request snapshotIds(SnapshotId[] snapshotIds) {
@ -213,7 +212,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
}
NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) {
super(request, nodeId);
super(nodeId);
snapshotIds = request.snapshotIds;
}

View File

@ -110,7 +110,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
snapshotIds[i] = currentSnapshots.get(i).snapshotId();
}
TransportNodesSnapshotsStatus.Request nodesRequest = new TransportNodesSnapshotsStatus.Request(request, nodesIds.toArray(new String[nodesIds.size()]))
TransportNodesSnapshotsStatus.Request nodesRequest = new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
.snapshotIds(snapshotIds).timeout(request.masterNodeTimeout());
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
@Override

View File

@ -132,7 +132,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
}
ClusterStatsNodeRequest(String nodeId, ClusterStatsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -57,7 +57,7 @@ public class TransportRenderSearchTemplateAction extends HandledTransportAction<
@Override
protected void doRun() throws Exception {
ExecutableScript executable = scriptService.executable(request.template(), ScriptContext.Standard.SEARCH, request, Collections.emptyMap());
ExecutableScript executable = scriptService.executable(request.template(), ScriptContext.Standard.SEARCH, Collections.emptyMap());
BytesReference processedTemplate = (BytesReference) executable.run();
RenderSearchTemplateResponse response = new RenderSearchTemplateResponse();
response.source(processedTemplate);

View File

@ -81,14 +81,6 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
public CreateIndexRequest() {
}
/**
* Constructs a new request to create an index that was triggered by a different request,
* provided as an argument so that its headers and context can be copied to the new request.
*/
public CreateIndexRequest(ActionRequest request) {
super(request);
}
/**
* Constructs a new request to create an index with the specified name.
*/

View File

@ -42,17 +42,6 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
private boolean force = false;
private boolean waitIfOngoing = false;
public FlushRequest() {
}
/**
* Copy constructor that creates a new flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public FlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
* be flushed.

View File

@ -31,7 +31,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
private FlushRequest request = new FlushRequest();
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(request, shardId);
super(shardId);
this.request = request;
}

View File

@ -36,17 +36,6 @@ import java.util.Arrays;
*/
public class SyncedFlushRequest extends BroadcastRequest<SyncedFlushRequest> {
public SyncedFlushRequest() {
}
/**
* Copy constructor that creates a new synced flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public SyncedFlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}
/**
* Constructs a new synced flush request against one or more indices. If nothing is provided, all indices will
* be sync flushed.

View File

@ -42,7 +42,6 @@ public class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMap
}
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
super(other);
this.probablySingleFieldRequest = probablySingleFieldRequest;
this.includeDefaults = other.includeDefaults();
this.types = other.types();

View File

@ -33,17 +33,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastRequest;
*/
public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest() {
}
/**
* Copy constructor that creates a new refresh request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public RefreshRequest(ActionRequest originalRequest) {
super(originalRequest);
}
public RefreshRequest(String... indices) {
super(indices);
}

View File

@ -54,7 +54,7 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
@Override
protected ReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
return new ReplicationRequest(request, shardId);
return new ReplicationRequest(shardId);
}
@Override

View File

@ -68,14 +68,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
public BulkRequest() {
}
/**
* Creates a bulk request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public BulkRequest(ActionRequest request) {
super(request);
}
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/

View File

@ -41,7 +41,7 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
}
BulkShardRequest(BulkRequest bulkRequest, ShardId shardId, boolean refresh, BulkItemRequest[] items) {
super(bulkRequest, shardId);
super(shardId);
this.items = items;
this.refresh = refresh;
}

View File

@ -114,7 +114,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
for (Map.Entry<String, Set<String>> entry : indicesAndTypes.entrySet()) {
final String index = entry.getKey();
if (autoCreateIndex.shouldAutoCreate(index, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(bulkRequest);
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
for (String type : entry.getValue()) {
createIndexRequest.mapping(type);

View File

@ -92,7 +92,7 @@ public class DeleteRequest extends ReplicationRequest<DeleteRequest> implements
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteRequest(DeleteRequest request, ActionRequest originalRequest) {
super(request, originalRequest);
super(request);
this.type = request.type();
this.id = request.id();
this.routing = request.routing();
@ -102,14 +102,6 @@ public class DeleteRequest extends ReplicationRequest<DeleteRequest> implements
this.versionType = request.versionType();
}
/**
* Creates a delete request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public DeleteRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();

View File

@ -72,7 +72,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.RealtimeRequest;
import org.elasticsearch.action.ValidateActions;
@ -72,8 +71,7 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
* Copy constructor that creates a new get request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public GetRequest(GetRequest getRequest, ActionRequest originalRequest) {
super(originalRequest);
public GetRequest(GetRequest getRequest) {
this.index = getRequest.index;
this.type = getRequest.type;
this.id = getRequest.id;
@ -98,14 +96,6 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
this.type = "_all";
}
/**
* Constructs a new get request starting from the provided request, meaning that it will
* inherit its headers and context, and against the specified index.
*/
public GetRequest(ActionRequest request, String index) {
super(request, index);
}
/**
* Constructs a new get request against the specified index with the type and id.
*

View File

@ -266,18 +266,6 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
List<Item> items = new ArrayList<>();
public MultiGetRequest() {
}
/**
* Creates a multi get request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public MultiGetRequest(ActionRequest request) {
super(request);
}
public List<Item> getItems() {
return this.items;
}

View File

@ -45,7 +45,7 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
}
MultiGetShardRequest(MultiGetRequest multiGetRequest, String index, int shardId) {
super(multiGetRequest, index);
super(index);
this.shardId = shardId;
locations = new IntArrayList();
items = new ArrayList<>();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.index;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.RoutingMissingException;
@ -158,20 +157,12 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
public IndexRequest() {
}
/**
* Creates an index request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public IndexRequest(ActionRequest request) {
super(request);
}
/**
* Copy constructor that creates a new index request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public IndexRequest(IndexRequest indexRequest, ActionRequest originalRequest) {
super(indexRequest, originalRequest);
public IndexRequest(IndexRequest indexRequest) {
super(indexRequest);
this.type = indexRequest.type;
this.id = indexRequest.id;
this.routing = indexRequest.routing;

View File

@ -88,7 +88,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");

View File

@ -66,7 +66,6 @@ public class PercolateRequest extends BroadcastRequest<PercolateRequest> impleme
}
PercolateRequest(PercolateRequest request, BytesReference docSource) {
super(request);
this.indices = request.indices();
this.documentType = request.documentType();
this.routing = request.routing();
@ -274,7 +273,7 @@ public class PercolateRequest extends BroadcastRequest<PercolateRequest> impleme
source = in.readBytesReference();
docSource = in.readBytesReference();
if (in.readBoolean()) {
getRequest = new GetRequest(null);
getRequest = new GetRequest();
getRequest.readFrom(in);
}
onlyCount = in.readBoolean();

View File

@ -97,7 +97,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
}
if (!existingDocsRequests.isEmpty()) {
final MultiGetRequest multiGetRequest = new MultiGetRequest(request);
final MultiGetRequest multiGetRequest = new MultiGetRequest();
for (GetRequest getRequest : existingDocsRequests) {
multiGetRequest.add(
new MultiGetRequest.Item(getRequest.index(), getRequest.type(), getRequest.id())
@ -200,7 +200,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
ShardId shardId = shard.shardId();
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
if (requests == null) {
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(multiPercolateRequest, shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
}
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));

View File

@ -74,7 +74,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
request.startTime = System.currentTimeMillis();
if (request.getRequest() != null) {
//create a new get request to make sure it has the same headers and context as the original percolate request
GetRequest getRequest = new GetRequest(request.getRequest(), request);
GetRequest getRequest = new GetRequest(request.getRequest());
getAction.execute(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
@ -150,7 +150,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
} else {
PercolatorService.ReduceResult result = null;
try {
result = percolatorService.reduce(onlyCount, shardResults, request);
result = percolatorService.reduce(onlyCount, shardResults);
} catch (IOException e) {
throw new ElasticsearchException("error during reduce phase", e);
}

View File

@ -118,8 +118,8 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
public Request() {
}
Request(MultiPercolateRequest multiPercolateRequest, String concreteIndex, int shardId, String preference) {
super(multiPercolateRequest, concreteIndex);
Request(String concreteIndex, int shardId, String preference) {
super(concreteIndex);
this.shardId = shardId;
this.preference = preference;
this.items = new ArrayList<>();

View File

@ -37,17 +37,6 @@ public class ClearScrollRequest extends ActionRequest<ClearScrollRequest> {
private List<String> scrollIds;
public ClearScrollRequest() {
}
/**
* Creates a clear scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public ClearScrollRequest(ActionRequest request) {
super(request);
}
public List<String> getScrollIds() {
return scrollIds;
}

View File

@ -80,8 +80,7 @@ public class SearchRequest extends ActionRequest<SearchRequest> implements Indic
* Copy constructor that creates a new search request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public SearchRequest(SearchRequest searchRequest, ActionRequest originalRequest) {
super(originalRequest);
public SearchRequest(SearchRequest searchRequest) {
this.searchType = searchRequest.searchType;
this.indices = searchRequest.indices;
this.routing = searchRequest.routing;
@ -94,15 +93,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> implements Indic
this.indicesOptions = searchRequest.indicesOptions;
}
/**
* Constructs a new search request starting from the provided request, meaning that it will
* inherit its headers and context
*/
public SearchRequest(ActionRequest request) {
super(request);
this.source = new SearchSourceBuilder();
}
/**
* Constructs a new search request against the indices. No indices provided here means that search
* will run against all indices.

View File

@ -46,14 +46,6 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
this.scrollId = scrollId;
}
/**
* Creates a scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public SearchScrollRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

View File

@ -59,7 +59,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
final AtomicInteger counter = new AtomicInteger(responses.length());
for (int i = 0; i < responses.length(); i++) {
final int index = i;
SearchRequest searchRequest = new SearchRequest(request.requests().get(i), request);
SearchRequest searchRequest = new SearchRequest(request.requests().get(i));
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {

View File

@ -135,7 +135,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults, request);
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -211,7 +211,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
fetchResults, request);
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -82,7 +82,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults, request);
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);

View File

@ -146,7 +146,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
fetchResults, request);
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -193,7 +193,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults, request);
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();

View File

@ -208,7 +208,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override
@ -243,7 +243,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, request);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();

View File

@ -143,7 +143,7 @@ public class TransportSuggestAction extends TransportBroadcastAction<SuggestRequ
throw new IllegalArgumentException("suggest content missing");
}
final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(parser, indexService.mapperService(),
indexService.fieldData(), request.shardId().getIndex(), request.shardId().id(), request);
indexService.fieldData(), request.shardId().getIndex(), request.shardId().id());
final Suggest result = suggestPhase.execute(context, searcher.searcher());
return new ShardSuggestResponse(request.shardId(), result);
}

View File

@ -38,11 +38,6 @@ public class ChildTaskRequest extends TransportRequest {
private long parentTaskId;
protected ChildTaskRequest() {
}
protected ChildTaskRequest(TransportRequest parentTaskRequest) {
super(parentTaskRequest);
}
public void setParentTask(String parentTaskNode, long parentTaskId) {

View File

@ -37,11 +37,6 @@ public class BroadcastRequest<T extends BroadcastRequest> extends ActionRequest<
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
public BroadcastRequest() {
}
protected BroadcastRequest(ActionRequest originalRequest) {
super(originalRequest);
}
protected BroadcastRequest(String[] indices) {

View File

@ -42,7 +42,6 @@ public abstract class BroadcastShardRequest extends TransportRequest implements
}
protected BroadcastShardRequest(ShardId shardId, BroadcastRequest request) {
super(request);
this.shardId = shardId;
this.originalIndices = new OriginalIndices(request);
}

View File

@ -433,7 +433,6 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
}
public NodeRequest(String nodeId, Request request, List<ShardRouting> shards) {
super(request);
this.indicesLevelRequest = request;
this.shards = shards;
this.nodeId = nodeId;

View File

@ -42,10 +42,6 @@ public abstract class AcknowledgedRequest<T extends MasterNodeRequest> extends M
protected AcknowledgedRequest() {
}
protected AcknowledgedRequest(ActionRequest request) {
super(request);
}
/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)

View File

@ -36,11 +36,6 @@ public abstract class MasterNodeRequest<T extends MasterNodeRequest> extends Act
protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;
protected MasterNodeRequest() {
}
protected MasterNodeRequest(ActionRequest request) {
super(request);
}
/**

View File

@ -121,7 +121,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
doStart();
}

View File

@ -36,8 +36,7 @@ public abstract class BaseNodeRequest extends ChildTaskRequest {
}
protected BaseNodeRequest(BaseNodesRequest request, String nodeId) {
super(request);
protected BaseNodeRequest(String nodeId) {
this.nodeId = nodeId;
}

View File

@ -43,11 +43,6 @@ public abstract class BaseNodesRequest<T extends BaseNodesRequest> extends Actio
}
protected BaseNodesRequest(ActionRequest request, String... nodesIds) {
super(request);
this.nodesIds = nodesIds;
}
protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}

View File

@ -58,35 +58,20 @@ public class ReplicationRequest<T extends ReplicationRequest> extends ActionRequ
}
/**
* Creates a new request that inherits headers and context from the request provided as argument.
*/
public ReplicationRequest(ActionRequest request) {
super(request);
}
/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(ActionRequest request, ShardId shardId) {
super(request);
public ReplicationRequest(ShardId shardId) {
this.index = shardId.getIndex();
this.shardId = shardId;
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
*/
protected ReplicationRequest(T request) {
this(request, request);
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
protected ReplicationRequest(T request, ActionRequest originalRequest) {
super(originalRequest);
protected ReplicationRequest(T request) {
this.timeout = request.timeout();
this.index = request.index();
this.consistencyLevel = request.consistencyLevel();

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@ -302,7 +303,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
@ -313,9 +314,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
final ThreadContext threadContext = threadPool.getThreadContext();
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
context.close();
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
@ -411,7 +415,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
ReroutePhase(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
}
@Override
@ -515,9 +519,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
finishAsFailed(failure);
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
context.close();
run();
}
@ -528,6 +535,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void onTimeout(TimeValue timeout) {
context.close();
// Try one more time...
run();
}

View File

@ -124,7 +124,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
doStart();
}

View File

@ -52,16 +52,7 @@ public abstract class SingleShardRequest<T extends SingleShardRequest> extends A
public SingleShardRequest() {
}
protected SingleShardRequest(String index) {
this.index = index;
}
protected SingleShardRequest(ActionRequest request) {
super(request);
}
protected SingleShardRequest(ActionRequest request, String index) {
super(request);
public SingleShardRequest(String index) {
this.index = index;
}

View File

@ -61,15 +61,6 @@ public class BaseTasksRequest<T extends BaseTasksRequest> extends ActionRequest<
return null;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.
*/
public BaseTasksRequest(ActionRequest request, String... nodesIds) {
super(request);
this.nodesIds = nodesIds;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.

View File

@ -291,7 +291,7 @@ public abstract class TransportTasksAction<
}
protected NodeTaskRequest(TasksRequest tasksRequest) {
super(tasksRequest);
super();
this.tasksRequest = tasksRequest;
}

View File

@ -41,8 +41,8 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
}
MultiTermVectorsShardRequest(MultiTermVectorsRequest request, String index, int shardId) {
super(request, index);
MultiTermVectorsShardRequest(String index, int shardId) {
super(index);
this.shardId = shardId;
locations = new IntArrayList();
requests = new ArrayList<>();

View File

@ -82,7 +82,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
termVectorsRequest.id(), termVectorsRequest.routing());
MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiTermVectorsShardRequest(request, shardId.index().name(), shardId.id());
shardRequest = new MultiTermVectorsShardRequest(shardId.index().name(), shardId.id());
shardRequest.preference(request.preference);
shardRequests.put(shardId, shardRequest);
}

View File

@ -113,7 +113,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
@ -164,12 +164,12 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) {
IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
IndexShard indexShard = indexService.getShard(request.shardId());
final IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
final IndexShard indexShard = indexService.getShard(request.shardId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.operation()) {
case UPSERT:
IndexRequest upsertRequest = new IndexRequest(result.action(), request);
IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@ -206,7 +206,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case INDEX:
IndexRequest indexRequest = new IndexRequest(result.action(), request);
IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
@ -99,7 +100,7 @@ public class UpdateHelper extends AbstractComponent {
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
ctx = executeScript(request, ctx);
ctx = executeScript(request.script, ctx);
//Allow the script to set TTL using ctx._ttl
if (ttl == null) {
ttl = getTTLFromScriptContext(ctx);
@ -193,7 +194,7 @@ public class UpdateHelper extends AbstractComponent {
ctx.put("_ttl", originalTtl);
ctx.put("_source", sourceAndContent.v2());
ctx = executeScript(request, ctx);
ctx = executeScript(request.script, ctx);
operation = (String) ctx.get("op");
@ -243,14 +244,14 @@ public class UpdateHelper extends AbstractComponent {
}
}
private Map<String, Object> executeScript(UpdateRequest request, Map<String, Object> ctx) {
private Map<String, Object> executeScript(Script script, Map<String, Object> ctx) {
try {
if (scriptService != null) {
ExecutableScript script = scriptService.executable(request.script, ScriptContext.Standard.UPDATE, request, Collections.emptyMap());
script.setNextVar("ctx", ctx);
script.run();
ExecutableScript executableScript = scriptService.executable(script, ScriptContext.Standard.UPDATE, Collections.emptyMap());
executableScript.setNextVar("ctx", ctx);
executableScript.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
ctx = (Map<String, Object>) executableScript.unwrap(ctx);
}
} catch (Exception e) {
throw new IllegalArgumentException("failed to execute script", e);

View File

@ -19,8 +19,12 @@
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -80,11 +84,12 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import java.util.Map;
/**
* A client provides a one stop interface for performing actions/operations against the cluster.
* <p>
@ -597,5 +602,5 @@ public interface Client extends ElasticsearchClient, Releasable {
*/
Settings settings();
Headers headers();
Client filterWithHeader(Map<String, String> headers);
}

View File

@ -42,7 +42,7 @@ public abstract class FilterClient extends AbstractClient {
* @see #in()
*/
public FilterClient(Client in) {
super(in.settings(), in.threadPool(), in.headers());
super(in.settings(), in.threadPool());
this.in = in;
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -44,8 +43,8 @@ public class NodeClient extends AbstractClient {
private final Map<GenericAction, TransportAction> actions;
@Inject
public NodeClient(Settings settings, ThreadPool threadPool, Headers headers, Map<GenericAction, TransportAction> actions) {
super(settings, threadPool, headers);
public NodeClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions) {
super(settings, threadPool);
this.actions = unmodifiableMap(actions);
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.client.node;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.inject.AbstractModule;
/**
@ -30,7 +29,6 @@ public class NodeClientModule extends AbstractModule {
@Override
protected void configure() {
bind(Headers.class).asEagerSingleton();
bind(Client.class).to(NodeClient.class).asEagerSingleton();
}
}

View File

@ -317,12 +317,16 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
/**
*
*/
@ -330,23 +334,15 @@ public abstract class AbstractClient extends AbstractComponent implements Client
private final ThreadPool threadPool;
private final Admin admin;
private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
public AbstractClient(Settings settings, ThreadPool threadPool, Headers headers) {
public AbstractClient(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.headers = headers;
this.admin = new Admin(this);
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
}
@Override
public Headers headers() {
return this.headers;
}
@Override
public final Settings settings() {
return this.settings;
@ -379,7 +375,6 @@ public abstract class AbstractClient extends AbstractComponent implements Client
*/
@Override
public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
}
@ -1672,4 +1667,18 @@ public abstract class AbstractClient extends AbstractComponent implements Client
execute(GetSettingsAction.INSTANCE, request, listener);
}
}
@Override
public Client filterWithHeader(Map<String, String> headers) {
return new FilterClient(this) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashContext()) {
threadContext.putHeader(headers);
super.doExecute(action, request, listener);
}
}
};
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.support;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportMessage;
/**
* Client request headers picked up from the client settings. Applied to every
* request sent by the client (both transport and node clients)
*/
public class Headers {
public static final String PREFIX = "request.headers";
public static final Headers EMPTY = new Headers(Settings.EMPTY) {
@Override
public <M extends TransportMessage<?>> M applyTo(M message) {
return message;
}
};
private final Settings headers;
@Inject
public Headers(Settings settings) {
headers = resolveHeaders(settings);
}
public <M extends TransportMessage<?>> M applyTo(M message) {
for (String key : headers.names()) {
if (!message.hasHeader(key)) {
message.putHeader(key, headers.get(key));
}
}
return message;
}
public Settings headers() {
return headers;
}
static Settings resolveHeaders(Settings settings) {
Settings headers = settings.getAsSettings(PREFIX);
return headers != null ? headers : Settings.EMPTY;
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -173,7 +172,7 @@ public class TransportClient extends AbstractClient {
private final TransportProxyClient proxy;
private TransportClient(Injector injector) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class);

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAct
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -79,8 +78,6 @@ public class TransportClientNodesService extends AbstractComponent {
private final Version minCompatibilityVersion;
private final Headers headers;
// nodes that are added to be discovered
private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();
@ -103,13 +100,12 @@ public class TransportClientNodesService extends AbstractComponent {
@Inject
public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService,
ThreadPool threadPool, Headers headers, Version version) {
ThreadPool threadPool, Version version) {
super(settings);
this.clusterName = clusterName;
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = version.minimumCompatibilityVersion();
this.headers = headers;
this.nodesSamplerInterval = this.settings.getAsTime("client.transport.nodes_sampler_interval", timeValueSeconds(5));
this.pingTimeout = this.settings.getAsTime("client.transport.ping_timeout", timeValueSeconds(5)).millis();
@ -358,7 +354,7 @@ public class TransportClientNodesService extends AbstractComponent {
}
try {
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
headers.applyTo(new LivenessRequest()),
new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
@ -428,8 +424,7 @@ public class TransportClientNodesService extends AbstractComponent {
return;
}
}
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
transportService.sendRequest(listedNode, ClusterStateAction.NAME, Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new BaseTransportResponseHandler<ClusterStateResponse>() {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.concurrent.atomic.AtomicReference;
@ -44,6 +45,7 @@ public class ClusterStateObserver {
};
private final ClusterService clusterService;
private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;
@ -55,8 +57,8 @@ public class ClusterStateObserver {
volatile boolean timedOut;
public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
this(clusterService, new TimeValue(60000), logger);
public ClusterStateObserver(ClusterService clusterService, ESLogger logger, ThreadContext contextHolder) {
this(clusterService, new TimeValue(60000), logger, contextHolder);
}
/**
@ -64,7 +66,7 @@ public class ClusterStateObserver {
* will fail any existing or new #waitForNextChange calls. Set to null
* to wait indefinitely
*/
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) {
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger, ThreadContext contextHolder) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;
@ -72,6 +74,7 @@ public class ClusterStateObserver {
this.startTimeNS = System.nanoTime();
}
this.logger = logger;
this.contextHolder = contextHolder;
}
/** last cluster state observer by this observer. Note that this may not be the current one */
@ -146,7 +149,7 @@ public class ClusterStateObserver {
listener.onNewClusterState(newState.clusterState);
} else {
logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState);
ObservingContext context = new ObservingContext(listener, changePredicate);
ObservingContext context = new ObservingContext(new ContextPreservingListener(listener, contextHolder.newStoredContext()), changePredicate);
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
@ -317,4 +320,33 @@ public class ClusterStateObserver {
return "version [" + clusterState.version() + "], status [" + status + "]";
}
}
private final static class ContextPreservingListener implements Listener {
private final Listener delegate;
private final ThreadContext.StoredContext tempContext;
private ContextPreservingListener(Listener delegate, ThreadContext.StoredContext storedContext) {
this.tempContext = storedContext;
this.delegate = delegate;
}
@Override
public void onNewClusterState(ClusterState state) {
tempContext.restore();
delegate.onNewClusterState(state);
}
@Override
public void onClusterServiceClose() {
tempContext.restore();
delegate.onClusterServiceClose();
}
@Override
public void onTimeout(TimeValue timeout) {
tempContext.restore();
delegate.onTimeout(timeout);
}
}
}

View File

@ -190,7 +190,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() {
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling

View File

@ -1,153 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
*
*/
public class ContextAndHeaderHolder implements HasContextAndHeaders {
private ObjectObjectHashMap<Object, Object> context;
protected Map<String, Object> headers;
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectHashMap<>(2);
}
return (V) context.put(key, value);
}
@Override
public final synchronized void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
if (map == null) {
return;
}
if (context == null) {
context = new ObjectObjectHashMap<>(map);
} else {
context.putAll(map);
}
}
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}
@Override
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}
@Override
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}
@Override
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}
@Override
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}
@Override
public synchronized void copyContextFrom(HasContext other) {
if (other == null) {
return;
}
synchronized (other) {
ImmutableOpenMap<Object, Object> otherContext = other.getContext();
if (otherContext == null) {
return;
}
if (context == null) {
ObjectObjectHashMap<Object, Object> map = new ObjectObjectHashMap<>(other.getContext().size());
map.putAll(otherContext);
this.context = map;
} else {
context.putAll(otherContext);
}
}
}
@SuppressWarnings("unchecked")
@Override
public final void putHeader(String key, Object value) {
if (headers == null) {
headers = new HashMap<>();
}
headers.put(key, value);
}
@SuppressWarnings("unchecked")
@Override
public final <V> V getHeader(String key) {
return headers != null ? (V) headers.get(key) : null;
}
@Override
public final boolean hasHeader(String key) {
return headers != null && headers.containsKey(key);
}
@Override
public Set<String> getHeaders() {
return headers != null ? headers.keySet() : Collections.<String>emptySet();
}
@Override
public void copyHeadersFrom(HasHeaders from) {
if (from != null && from.getHeaders() != null && !from.getHeaders().isEmpty()) {
for (String headerName : from.getHeaders()) {
putHeader(headerName, from.getHeader(headerName));
}
}
}
@Override
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
copyContextFrom(other);
copyHeadersFrom(other);
}
}

View File

@ -1,111 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import java.util.Set;
public class DelegatingHasContextAndHeaders implements HasContextAndHeaders {
private HasContextAndHeaders delegate;
public DelegatingHasContextAndHeaders(HasContextAndHeaders delegate) {
this.delegate = delegate;
}
@Override
public <V> void putHeader(String key, V value) {
delegate.putHeader(key, value);
}
@Override
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
delegate.copyContextAndHeadersFrom(other);
}
@Override
public <V> V getHeader(String key) {
return delegate.getHeader(key);
}
@Override
public boolean hasHeader(String key) {
return delegate.hasHeader(key);
}
@Override
public <V> V putInContext(Object key, Object value) {
return delegate.putInContext(key, value);
}
@Override
public Set<String> getHeaders() {
return delegate.getHeaders();
}
@Override
public void copyHeadersFrom(HasHeaders from) {
delegate.copyHeadersFrom(from);
}
@Override
public void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
delegate.putAllInContext(map);
}
@Override
public <V> V getFromContext(Object key) {
return delegate.getFromContext(key);
}
@Override
public <V> V getFromContext(Object key, V defaultValue) {
return delegate.getFromContext(key, defaultValue);
}
@Override
public boolean hasInContext(Object key) {
return delegate.hasInContext(key);
}
@Override
public int contextSize() {
return delegate.contextSize();
}
@Override
public boolean isContextEmpty() {
return delegate.isContextEmpty();
}
@Override
public ImmutableOpenMap<Object, Object> getContext() {
return delegate.getContext();
}
@Override
public void copyContextFrom(HasContext other) {
delegate.copyContextFrom(other);
}
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import org.elasticsearch.common.collect.ImmutableOpenMap;
public interface HasContext {
/**
* Attaches the given value to the context.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
<V> V putInContext(Object key, Object value);
/**
* Attaches the given values to the context
*/
void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map);
/**
* @return The context value that is associated with the given key
*
* @see #putInContext(Object, Object)
*/
<V> V getFromContext(Object key);
/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The value that is associated with the given key in the context
*
* @see #putInContext(Object, Object)
*/
<V> V getFromContext(Object key, V defaultValue);
/**
* Checks if the context contains an entry with the given key
*/
boolean hasInContext(Object key);
/**
* @return The number of values attached in the context.
*/
int contextSize();
/**
* Checks if the context is empty.
*/
boolean isContextEmpty();
/**
* @return A safe immutable copy of the current context.
*/
ImmutableOpenMap<Object, Object> getContext();
/**
* Copies the context from the given context holder to this context holder. Any shared keys between
* the two context will be overridden by the given context holder.
*/
void copyContextFrom(HasContext other);
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
/**
* marker interface
*/
public interface HasContextAndHeaders extends HasContext, HasHeaders {
/**
* copies over the context and the headers
* @param other another object supporting headers and context
*/
void copyContextAndHeadersFrom(HasContextAndHeaders other);
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import java.util.Set;
/**
*
*/
public interface HasHeaders {
<V> void putHeader(String key, V value);
<V> V getHeader(String key);
boolean hasHeader(String key);
Set<String> getHeaders();
void copyHeadersFrom(HasHeaders from);
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.network;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -361,7 +360,6 @@ public class NetworkModule extends AbstractModule {
transportTypes.bindType(binder(), settings, TRANSPORT_TYPE_KEY, defaultTransport);
if (transportClient) {
bind(Headers.class).asEagerSingleton();
bind(TransportProxyClient.class).asEagerSingleton();
bind(TransportClientNodesService.class).asEagerSingleton();
} else {

View File

@ -60,30 +60,30 @@ public class EsExecutors {
return settings.getAsInt(PROCESSORS, defaultValue);
}
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder);
}
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy());
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy(), contextHolder);
}
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}
public static String threadName(Settings settings, String ... names) {

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
*/
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private final ThreadContext contextHolder;
private volatile ShutdownListener listener;
private final Object monitor = new Object();
@ -38,13 +39,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
*/
private final String name;
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
}
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) {
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
}
public void shutdown(ShutdownListener listener) {
@ -80,7 +82,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
@Override
public void execute(Runnable command) {
public void execute(final Runnable command) {
doExecute(wrapRunnable(command));
}
protected void doExecute(final Runnable command) {
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
@ -116,4 +122,94 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
b.append(super.toString()).append(']');
return b.toString();
}
protected Runnable wrapRunnable(Runnable command) {
final Runnable wrappedCommand;
if (command instanceof AbstractRunnable) {
wrappedCommand = new FilterAbstractRunnable(contextHolder, (AbstractRunnable) command);
} else {
wrappedCommand = new FilterRunnable(contextHolder, command);
}
return wrappedCommand;
}
protected Runnable unwrap(Runnable runnable) {
if (runnable instanceof FilterAbstractRunnable) {
return ((FilterAbstractRunnable) runnable).in;
} else if (runnable instanceof FilterRunnable) {
return ((FilterRunnable) runnable).in;
}
return runnable;
}
private static class FilterAbstractRunnable extends AbstractRunnable {
private final ThreadContext contextHolder;
private final AbstractRunnable in;
private final ThreadContext.StoredContext ctx;
FilterAbstractRunnable(ThreadContext contextHolder, AbstractRunnable in) {
this.contextHolder = contextHolder;
ctx = contextHolder.newStoredContext();
this.in = in;
}
@Override
public boolean isForceExecution() {
return in.isForceExecution();
}
@Override
public void onAfter() {
in.onAfter();
}
@Override
public void onFailure(Throwable t) {
in.onFailure(t);
}
@Override
public void onRejection(Throwable t) {
in.onRejection(t);
}
@Override
protected void doRun() throws Exception {
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore();
in.doRun();
}
}
@Override
public String toString() {
return in.toString();
}
}
private static class FilterRunnable implements Runnable {
private final ThreadContext contextHolder;
private final Runnable in;
private final ThreadContext.StoredContext ctx;
FilterRunnable(ThreadContext contextHolder, Runnable in) {
this.contextHolder = contextHolder;
ctx = contextHolder.newStoredContext();
this.in = in;
}
@Override
public void run() {
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore();
in.run();
}
}
@Override
public String toString() {
return in.toString();
}
}
}

View File

@ -47,8 +47,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
}
public Pending[] getPending() {
@ -88,10 +88,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing));
pending.add(new Pending(unwrap(t.runnable), t.priority(), t.insertionOrder, executing));
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing));
Object task = t.task;
if (t.task instanceof Runnable) {
task = unwrap((Runnable) t.task);
}
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
}
}
}
@ -107,12 +111,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
if (command instanceof PrioritizedRunnable) {
command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
command = wrapRunnable(command);
doExecute(command);
if (timeout.nanos() >= 0) {
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
@ -125,21 +125,31 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
@Override
public void execute(Runnable command) {
protected Runnable wrapRunnable(Runnable command) {
if (command instanceof PrioritizedRunnable) {
command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
if ((command instanceof TieBreakingPrioritizedRunnable)) {
return command;
}
Priority priority = ((PrioritizedRunnable) command).priority();
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
} else if (command instanceof PrioritizedFutureTask) {
return command;
} else { // it might be a callable wrapper...
if (command instanceof TieBreakingPrioritizedRunnable) {
return command;
}
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
Priority priority = ((PrioritizedRunnable) runnable).priority();
return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet());
}
@Override
@ -147,7 +157,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet());
}
public static class Pending {
@ -173,10 +183,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private ScheduledFuture<?> timeoutFuture;
private boolean started = false;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
}
TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) {
super(priority);
this.runnable = runnable;
@ -233,6 +239,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
runnable = null;
timeoutFuture = null;
}
}
}
@ -242,10 +249,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
final Priority priority;
final long insertionOrder;
public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) {
public PrioritizedFutureTask(Runnable runnable, Priority priority, T value, long insertionOrder) {
super(runnable, value);
this.task = runnable;
this.priority = runnable.priority();
this.priority = priority;
this.insertionOrder = insertionOrder;
}
@ -265,4 +272,5 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
return insertionOrder < pft.insertionOrder ? -1 : 1;
}
}
}

View File

@ -0,0 +1,288 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A ThreadContext a map of string headers and a transient map of keyed objects that are associated with
* a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a
* thread that has a {@link ThreadContext} associated with. Threads spawned from a {@link org.elasticsearch.threadpool.ThreadPool} have out of the box
* support for {@link ThreadContext} and all threads spawned will inherit the {@link ThreadContext} from the thread that is forking off.
* Network calls will also preserve the senders heaaders automatically.
*/
public final class ThreadContext implements Closeable, Writeable<ThreadContext.ThreadContextStruct>{
public static final String PREFIX = "request.headers";
private final ThreadContextStruct defaultContext;
private final ContextThreadLocal threadLocal;
/**
* Creates a new ThreadContext instance
* @param settings the settings to read the default request headers from
*/
public ThreadContext(Settings settings) {
Settings headers = settings.getAsSettings(PREFIX);
if (headers == null) {
this.defaultContext = new ThreadContextStruct(Collections.emptyMap());
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
this.defaultContext = new ThreadContextStruct(defaultHeader);
}
threadLocal = new ContextThreadLocal(defaultContext);
}
@Override
public void close() throws IOException {
threadLocal.close();
}
/**
* Removes the current context and resets a default context. The removed context can be
* restored when closing the returned {@link StoredContext}
*/
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(null);
return () -> {
threadLocal.set(context);
};
}
/**
* Just like {@link #stashContext()} but no default context is set.
*/
public StoredContext newStoredContext() {
final ThreadContextStruct context = threadLocal.get();
return () -> {
threadLocal.set(context);
};
}
@Override
public void writeTo(StreamOutput out) throws IOException {
threadLocal.get().writeTo(out);
}
@Override
public ThreadContextStruct readFrom(StreamInput in) throws IOException {
return defaultContext.readFrom(in);
}
/**
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
threadLocal.set(readFrom(in));
}
/**
* Returns the header for the given key or <code>null</code> if not present
*/
public String getHeader(String key) {
return threadLocal.get().headers.get(key);
}
/**
* Returns all of the current contexts headers
*/
public Map<String, String> getHeaders() {
return threadLocal.get().headers;
}
/**
* Copies all header key, value pairs into the current context
*/
public void copyHeaders(Iterable<Map.Entry<String, String>> headers) {
threadLocal.set(threadLocal.get().copyHeaders(headers));
}
/**
* Puts a header into the context
*/
public void putHeader(String key, String value) {
putHeader(Collections.singletonMap(key, value));
}
/**
* Puts all of the given headers into this context
*/
public void putHeader(Map<String, String> header) {
threadLocal.set(threadLocal.get().putHeaders(header));
}
/**
* Puts a transient header object into this context
*/
public void putTransient(String key, Object value) {
threadLocal.set(threadLocal.get().putTransient(key, value));
}
/**
* Returns a transient header object or <code>null</code> if there is no header for the given key
*/
public <T> T getTransient(String key) {
return (T) threadLocal.get().transientHeaders.get(key);
}
public interface StoredContext extends AutoCloseable {
@Override
void close();
default void restore() {
close();
}
}
static final class ThreadContextStruct implements Writeable<ThreadContextStruct> {
private final Map<String,String> headers;
private final Map<String, Object> transientHeaders;
private ThreadContextStruct(StreamInput in) throws IOException {
int numValues = in.readVInt();
Map<String, String> headers = numValues == 0 ? Collections.emptyMap() : new HashMap<>(numValues);
for (int i = 0; i < numValues; i++) {
headers.put(in.readString(), in.readString());
}
this.headers = headers;
this.transientHeaders = Collections.emptyMap();
}
private ThreadContextStruct(Map<String, String> headers, Map<String, Object> transientHeaders) {
this.headers = headers;
this.transientHeaders = transientHeaders;
}
private ThreadContextStruct(Map<String, String> headers) {
this(headers, Collections.emptyMap());
}
private ThreadContextStruct putHeaders(Map<String, String> headers) {
if (headers.isEmpty()) {
return this;
} else {
Map<String, String> newHeaders = new HashMap<>(this.headers);
newHeaders.putAll(headers);
return new ThreadContextStruct(newHeaders, transientHeaders);
}
}
private ThreadContextStruct putTransient(String key, Object value) {
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
if (newTransient.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException("value for key [" + key + "] already present");
}
return new ThreadContextStruct(headers, newTransient);
}
boolean isEmpty() {
return headers.isEmpty() && transientHeaders.isEmpty();
}
private ThreadContextStruct copyHeaders(Iterable<Map.Entry<String, String>> headers) {
Map<String, String> newHeaders = new HashMap<>();
for (Map.Entry<String, String> header : headers) {
newHeaders.put(header.getKey(), header.getValue());
}
return putHeaders(newHeaders);
}
@Override
public ThreadContextStruct readFrom(StreamInput in) throws IOException {
return new ThreadContextStruct(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
int keys = headers.size();
out.writeVInt(keys);
for (Map.Entry<String, String> entry : headers.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
}
}
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
private final ThreadContextStruct defaultStruct;
private final AtomicBoolean closed = new AtomicBoolean(false);
private ContextThreadLocal(ThreadContextStruct defaultStruct) {
this.defaultStruct = defaultStruct;
}
@Override
public void set(ThreadContextStruct object) {
try {
if (object == defaultStruct) {
super.set(null);
} else {
super.set(object);
}
} catch (NullPointerException ex) {
ensureOpen();
throw ex;
}
}
@Override
public ThreadContextStruct get() {
try {
ThreadContextStruct threadContextStruct = super.get();
if (threadContextStruct != null) {
return threadContextStruct;
}
return defaultStruct;
} catch (NullPointerException ex) {
ensureOpen();
throw ex;
}
}
private void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("threadcontext is already closed");
}
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
}
}
}
}

View File

@ -170,7 +170,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
}
@Override

View File

@ -183,7 +183,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
}
NodeRequest(String nodeId, TransportNodesListGatewayMetaState.Request request) {
super(request, nodeId);
super(nodeId);
}
@Override

View File

@ -247,7 +247,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
}
NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) {
super(request, nodeId);
super(nodeId);
this.shardId = request.shardId();
this.indexUUID = request.getIndexUUID();
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.BytesRestResponse;
@ -53,7 +54,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
/**
*
*/
public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
public class HttpServer extends AbstractLifecycleComponent<HttpServer> implements HttpServerAdapter {
private final Environment environment;
@ -79,23 +80,9 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
nodeService.setHttpServer(this);
this.disableSites = this.settings.getAsBoolean("http.disable_sites", false);
transport.httpServerAdapter(new Dispatcher(this));
transport.httpServerAdapter(this);
}
static class Dispatcher implements HttpServerAdapter {
private final HttpServer server;
Dispatcher(HttpServer server) {
this.server = server;
}
@Override
public void dispatchRequest(HttpRequest request, HttpChannel channel) {
server.internalDispatchRequest(request, channel);
}
}
@Override
protected void doStart() {
@ -125,7 +112,7 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
return transport.stats();
}
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
public void dispatchRequest(HttpRequest request, HttpChannel channel, ThreadContext threadContext) {
String rawPath = request.rawPath();
if (rawPath.startsWith("/_plugin/")) {
RestFilterChain filterChain = restController.filterChain(pluginSiteFilter);
@ -135,7 +122,7 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
handleFavicon(request, channel);
return;
}
restController.dispatchRequest(request, channel);
restController.dispatchRequest(request, channel, threadContext);
}

View File

@ -19,10 +19,12 @@
package org.elasticsearch.http;
import org.elasticsearch.common.util.concurrent.ThreadContext;
/**
*
*/
public interface HttpServerAdapter {
void dispatchRequest(HttpRequest request, HttpChannel channel);
void dispatchRequest(HttpRequest request, HttpChannel channel, ThreadContext context);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.ChannelHandler;
@ -41,12 +42,14 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
private final Pattern corsPattern;
private final boolean httpPipeliningEnabled;
private final boolean detailedErrorsEnabled;
private final ThreadContext threadContext;
public HttpRequestHandler(NettyHttpServerTransport serverTransport, boolean detailedErrorsEnabled) {
public HttpRequestHandler(NettyHttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
this.serverTransport = serverTransport;
this.corsPattern = RestUtils.checkCorsSettingForRegex(serverTransport.settings().get(NettyHttpServerTransport.SETTING_CORS_ALLOW_ORIGIN));
this.httpPipeliningEnabled = serverTransport.pipelining;
this.detailedErrorsEnabled = detailedErrorsEnabled;
this.threadContext = threadContext;
}
@Override
@ -60,6 +63,7 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
request = (HttpRequest) e.getMessage();
}
threadContext.copyHeaders(request.headers());
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpInfo;
@ -47,6 +48,7 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
@ -139,6 +141,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
protected final String publishHosts[];
protected final boolean detailedErrorsEnabled;
private final ThreadPool threadPool;
protected int publishPort;
@ -167,10 +170,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
@Inject
@SuppressForbidden(reason = "sets org.jboss.netty.epollBugWorkaround based on netty.epollBugWorkaround")
// TODO: why be confusing like this? just let the user do it with the netty parameter instead!
public NettyHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays) {
public NettyHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool) {
super(settings);
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
@ -389,7 +393,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
protected void dispatchRequest(HttpRequest request, HttpChannel channel) {
httpServerAdapter.dispatchRequest(request, channel);
httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
}
protected void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
@ -414,7 +418,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new HttpChannelPipelineFactory(this, detailedErrorsEnabled);
return new HttpChannelPipelineFactory(this, detailedErrorsEnabled, threadPool.getThreadContext());
}
protected static class HttpChannelPipelineFactory implements ChannelPipelineFactory {
@ -422,9 +426,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
protected final NettyHttpServerTransport transport;
protected final HttpRequestHandler requestHandler;
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) {
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
this.transport = transport;
this.requestHandler = new HttpRequestHandler(transport, detailedErrorsEnabled);
this.requestHandler = new HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
}
@Override

View File

@ -240,7 +240,6 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
ShapeBuilder shapeToQuery = shape;
if (shapeToQuery == null) {
GetRequest getRequest = new GetRequest(indexedShapeIndex, indexedShapeType, indexedShapeId);
getRequest.copyContextAndHeadersFrom(SearchContext.current());
shapeToQuery = fetch(context.getClient(), getRequest, indexedShapePath);
}
MappedFieldType fieldType = context.fieldMapper(fieldName);

View File

@ -917,7 +917,6 @@ public class MoreLikeThisQueryBuilder extends AbstractQueryBuilder<MoreLikeThisQ
for (Item item : unlikeItems) {
request.add(item.toTermVectorsRequest());
}
request.copyContextAndHeadersFrom(searchContext);
return client.multiTermVectors(request).actionGet();
}

View File

@ -364,8 +364,8 @@ public class QueryShardContext {
/*
* Executes the given template, and returns the response.
*/
public BytesReference executeQueryTemplate(Template template, SearchContext searchContext) {
ExecutableScript executable = getScriptService().executable(template, ScriptContext.Standard.SEARCH, searchContext, Collections.emptyMap());
public BytesReference executeQueryTemplate(Template template) {
ExecutableScript executable = getScriptService().executable(template, ScriptContext.Standard.SEARCH, Collections.emptyMap());
return (BytesReference) executable.run();
}

View File

@ -100,7 +100,7 @@ public class TemplateQueryBuilder extends AbstractQueryBuilder<TemplateQueryBuil
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
BytesReference querySource = context.executeQueryTemplate(template, SearchContext.current());
BytesReference querySource = context.executeQueryTemplate(template);
try (XContentParser qSourceParser = XContentFactory.xContent(querySource).createParser(querySource)) {
final QueryShardContext contextCopy = new QueryShardContext(context);
contextCopy.reset(qSourceParser);

View File

@ -249,7 +249,6 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
List<Object> terms = new ArrayList<>();
GetRequest getRequest = new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id())
.preference("_local").routing(termsLookup.routing());
getRequest.copyContextAndHeadersFrom(SearchContext.current());
final GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());

View File

@ -740,5 +740,4 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
public AnalysisRegistry getAnalysis() {
return analysisRegistry;
}
}

View File

@ -83,7 +83,6 @@ public class RecoverySettings extends AbstractComponent {
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);
this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.bytes() <= 0) {
rateLimiter = null;

View File

@ -308,7 +308,7 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
translog.totalOperations(request.totalTranslogOps());

View File

@ -74,27 +74,23 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final IndicesService indicesService;
private final ClusterService clusterService;
private final TransportService transportService;
private final ThreadPool threadPool;
private TimeValue deleteShardTimeout;
@Inject
public IndicesStore(Settings settings, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) {
ClusterService clusterService, TransportService transportService, ThreadPool threadPool) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.transportService = transportService;
this.threadPool = threadPool;
transportService.registerRequestHandler(ACTION_SHARD_EXISTS, ShardActiveRequest::new, ThreadPool.Names.SAME, new ShardActiveRequestHandler());
this.deleteShardTimeout = settings.getAsTime(INDICES_STORE_DELETE_SHARD_TIMEOUT, new TimeValue(30, TimeUnit.SECONDS));
clusterService.addLast(this);
}
IndicesStore() {
super(Settings.EMPTY);
indicesService = null;
this.clusterService = null;
this.transportService = null;
}
@Override
public void close() {
clusterService.remove(this);
@ -278,6 +274,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
@Override
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
IndexShard indexShard = getShard(request);
// make sure shard is really there before register cluster state observer
if (indexShard == null) {
channel.sendResponse(new ShardActiveResponse(false, clusterService.localNode()));
@ -288,7 +285,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
// in general, using a cluster state observer here is a workaround for the fact that we cannot listen on shard state changes explicitly.
// instead we wait for the cluster state changes because we know any shard state change will trigger or be
// triggered by a cluster state change.
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout, logger, threadPool.getThreadContext());
// check if shard is active. if so, all is good
boolean shardActive = shardActive(indexShard);
if (shardActive) {
@ -348,7 +345,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
logger.trace("shard exists request meant for cluster[{}], but this is cluster[{}], ignoring request", request.clusterName, thisClusterName);
return null;
}
ShardId shardId = request.shardId;
IndexService indexService = indicesService.indexService(shardId.index().getName());
if (indexService != null && indexService.indexUUID().equals(request.indexUUID)) {
@ -356,6 +352,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
}
return null;
}
}
private static class ShardActiveRequest extends TransportRequest {

View File

@ -344,7 +344,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
NodeRequest(String nodeId, TransportNodesListShardStoreMetaData.Request request) {
super(request, nodeId);
super(nodeId);
this.shardId = request.shardId;
this.unallocated = request.unallocated;
}

View File

@ -31,9 +31,6 @@ import org.apache.lucene.util.Counter;
import org.elasticsearch.action.percolate.PercolateShardRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.common.HasContext;
import org.elasticsearch.common.HasContextAndHeaders;
import org.elasticsearch.common.HasHeaders;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.lease.Releasables;
@ -125,7 +122,7 @@ public class PercolateContext extends SearchContext {
public PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, IndexShard indexShard,
IndexService indexService, PageCacheRecycler pageCacheRecycler,
BigArrays bigArrays, ScriptService scriptService, Query aliasFilter, ParseFieldMatcher parseFieldMatcher) {
super(parseFieldMatcher, request);
super(parseFieldMatcher);
this.indexShard = indexShard;
this.indexService = indexService;
this.fieldDataService = indexService.fieldData();
@ -146,7 +143,7 @@ public class PercolateContext extends SearchContext {
// for testing:
PercolateContext(PercolateShardRequest request, SearchShardTarget searchShardTarget, MapperService mapperService) {
super(null, request);
super(null);
this.searchShardTarget = searchShardTarget;
this.mapperService = mapperService;
this.indexService = null;
@ -679,82 +676,6 @@ public class PercolateContext extends SearchContext {
throw new UnsupportedOperationException();
}
@Override
public <V> V putInContext(Object key, Object value) {
assert false : "percolatecontext does not support contexts & headers";
return null;
}
@Override
public void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
assert false : "percolatocontext does not support contexts & headers";
}
@Override
public <V> V getFromContext(Object key) {
return null;
}
@Override
public <V> V getFromContext(Object key, V defaultValue) {
return defaultValue;
}
@Override
public boolean hasInContext(Object key) {
return false;
}
@Override
public int contextSize() {
return 0;
}
@Override
public boolean isContextEmpty() {
return true;
}
@Override
public ImmutableOpenMap<Object, Object> getContext() {
return ImmutableOpenMap.of();
}
@Override
public void copyContextFrom(HasContext other) {
assert false : "percolatecontext does not support contexts & headers";
}
@Override
public <V> void putHeader(String key, V value) {
assert false : "percolatecontext does not support contexts & headers";
}
@Override
public <V> V getHeader(String key) {
return null;
}
@Override
public boolean hasHeader(String key) {
return false;
}
@Override
public Set<String> getHeaders() {
return Collections.emptySet();
}
@Override
public void copyHeadersFrom(HasHeaders from) {
assert false : "percolatecontext does not support contexts & headers";
}
@Override
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
assert false : "percolatecontext does not support contexts & headers";
}
@Override
public Map<Class<?>, Collector> queryCollectors() {
return queryCollectors;

View File

@ -39,7 +39,6 @@ import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.HasContextAndHeaders;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.component.AbstractComponent;
@ -135,14 +134,14 @@ public class PercolatorService extends AbstractComponent {
multi = new MultiDocumentPercolatorIndex(cache);
}
public ReduceResult reduce(boolean onlyCount, List<PercolateShardResponse> shardResponses, HasContextAndHeaders headersContext) throws IOException {
public ReduceResult reduce(boolean onlyCount, List<PercolateShardResponse> shardResponses) throws IOException {
if (onlyCount) {
long finalCount = 0;
for (PercolateShardResponse shardResponse : shardResponses) {
finalCount += shardResponse.topDocs().totalHits;
}
InternalAggregations reducedAggregations = reduceAggregations(shardResponses, headersContext);
InternalAggregations reducedAggregations = reduceAggregations(shardResponses);
return new PercolatorService.ReduceResult(finalCount, reducedAggregations);
} else {
int requestedSize = shardResponses.get(0).requestedSize();
@ -162,7 +161,7 @@ public class PercolatorService extends AbstractComponent {
Map<String, HighlightField> hl = shardResponse.hls().get(doc.doc);
matches[i] = new PercolateResponse.Match(new Text(shardResponse.getIndex()), new Text(id), doc.score, hl);
}
InternalAggregations reducedAggregations = reduceAggregations(shardResponses, headersContext);
InternalAggregations reducedAggregations = reduceAggregations(shardResponses);
return new PercolatorService.ReduceResult(foundMatches, matches, reducedAggregations);
}
}
@ -307,7 +306,7 @@ public class PercolatorService extends AbstractComponent {
cache.close();
}
private InternalAggregations reduceAggregations(List<PercolateShardResponse> shardResults, HasContextAndHeaders headersContext) {
private InternalAggregations reduceAggregations(List<PercolateShardResponse> shardResults) {
if (shardResults.get(0).aggregations() == null) {
return null;
}
@ -316,7 +315,7 @@ public class PercolatorService extends AbstractComponent {
for (PercolateShardResponse shardResult : shardResults) {
aggregationsList.add(shardResult.aggregations());
}
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, new InternalAggregation.ReduceContext(bigArrays, scriptService, headersContext));
InternalAggregations aggregations = InternalAggregations.reduce(aggregationsList, new InternalAggregation.ReduceContext(bigArrays, scriptService));
if (aggregations != null) {
List<SiblingPipelineAggregator> pipelineAggregators = shardResults.get(0).pipelineAggregators();
if (pipelineAggregators != null) {
@ -324,7 +323,7 @@ public class PercolatorService extends AbstractComponent {
return (InternalAggregation) p;
}).collect(Collectors.toList());
for (SiblingPipelineAggregator pipelineAggregator : pipelineAggregators) {
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), new InternalAggregation.ReduceContext(bigArrays, scriptService, headersContext));
InternalAggregation newAgg = pipelineAggregator.doReduce(new InternalAggregations(newAggs), new InternalAggregation.ReduceContext(bigArrays, scriptService));
newAggs.add(newAgg);
}
aggregations = new InternalAggregations(newAggs);

Some files were not shown because too many files have changed in this diff Show More