Merge branch 'master' into more_settings

This commit is contained in:
Simon Willnauer 2016-01-27 17:45:13 +01:00
commit cefa5da08c
278 changed files with 1987 additions and 2941 deletions

View File

@ -32,10 +32,6 @@ public abstract class ActionRequest<Request extends ActionRequest<Request>> exte
public ActionRequest() {
super();
}
protected ActionRequest(ActionRequest<?> request) {
super(request);
// this does not set the listenerThreaded API, if needed, its up to the caller to set it
// since most times, we actually want it to not be threaded...
// this.listenerThreaded = request.listenerThreaded();

View File

@ -49,12 +49,6 @@ public abstract class ActionRequestBuilder<Request extends ActionRequest, Respon
return this.request;
}
@SuppressWarnings("unchecked")
public final RequestBuilder putHeader(String key, Object value) {
request.putHeader(key, value);
return (RequestBuilder) this;
}
public ListenableActionFuture<Response> execute() {
PlainListenableActionFuture<Response> future = new PlainListenableActionFuture<>(threadPool);
execute(future);

View File

@ -141,7 +141,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
}
assert waitFor >= 0;
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
final ClusterState state = observer.observedState();
if (waitFor == 0 || request.timeout().millis() == 0) {
listener.onResponse(getResponse(request, state, waitFor, request.timeout().millis() == 0));

View File

@ -102,7 +102,7 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
}
NodeRequest(String nodeId, NodesHotThreadsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -96,7 +96,7 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
}
NodeInfoRequest(String nodeId, NodesInfoRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -96,7 +96,7 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
}
NodeStatsRequest(String nodeId, NodesStatsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.action.admin.cluster.snapshots.status;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
@ -146,8 +145,8 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
public Request() {
}
public Request(ActionRequest<?> request, String[] nodesIds) {
super(request, nodesIds);
public Request(String[] nodesIds) {
super(nodesIds);
}
public Request snapshotIds(SnapshotId[] snapshotIds) {
@ -214,7 +213,7 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
}
NodeRequest(String nodeId, TransportNodesSnapshotsStatus.Request request) {
super(request, nodeId);
super(nodeId);
snapshotIds = request.snapshotIds;
}

View File

@ -110,7 +110,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
snapshotIds[i] = currentSnapshots.get(i).snapshotId();
}
TransportNodesSnapshotsStatus.Request nodesRequest = new TransportNodesSnapshotsStatus.Request(request, nodesIds.toArray(new String[nodesIds.size()]))
TransportNodesSnapshotsStatus.Request nodesRequest = new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(new String[nodesIds.size()]))
.snapshotIds(snapshotIds).timeout(request.masterNodeTimeout());
transportNodesSnapshotsStatus.execute(nodesRequest, new ActionListener<TransportNodesSnapshotsStatus.NodesSnapshotStatus>() {
@Override

View File

@ -132,7 +132,7 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
}
ClusterStatsNodeRequest(String nodeId, ClusterStatsRequest request) {
super(request, nodeId);
super(nodeId);
this.request = request;
}

View File

@ -57,7 +57,7 @@ public class TransportRenderSearchTemplateAction extends HandledTransportAction<
@Override
protected void doRun() throws Exception {
ExecutableScript executable = scriptService.executable(request.template(), ScriptContext.Standard.SEARCH, request, Collections.emptyMap());
ExecutableScript executable = scriptService.executable(request.template(), ScriptContext.Standard.SEARCH, Collections.emptyMap());
BytesReference processedTemplate = (BytesReference) executable.run();
RenderSearchTemplateResponse response = new RenderSearchTemplateResponse();
response.source(processedTemplate);

View File

@ -81,14 +81,6 @@ public class CreateIndexRequest extends AcknowledgedRequest<CreateIndexRequest>
public CreateIndexRequest() {
}
/**
* Constructs a new request to create an index that was triggered by a different request,
* provided as an argument so that its headers and context can be copied to the new request.
*/
public CreateIndexRequest(ActionRequest request) {
super(request);
}
/**
* Constructs a new request to create an index with the specified name.
*/

View File

@ -42,17 +42,6 @@ public class FlushRequest extends BroadcastRequest<FlushRequest> {
private boolean force = false;
private boolean waitIfOngoing = false;
public FlushRequest() {
}
/**
* Copy constructor that creates a new flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public FlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}
/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
* be flushed.

View File

@ -31,7 +31,7 @@ public class ShardFlushRequest extends ReplicationRequest<ShardFlushRequest> {
private FlushRequest request = new FlushRequest();
public ShardFlushRequest(FlushRequest request, ShardId shardId) {
super(request, shardId);
super(shardId);
this.request = request;
}

View File

@ -36,17 +36,6 @@ import java.util.Arrays;
*/
public class SyncedFlushRequest extends BroadcastRequest<SyncedFlushRequest> {
public SyncedFlushRequest() {
}
/**
* Copy constructor that creates a new synced flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public SyncedFlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}
/**
* Constructs a new synced flush request against one or more indices. If nothing is provided, all indices will
* be sync flushed.

View File

@ -42,7 +42,6 @@ public class GetFieldMappingsIndexRequest extends SingleShardRequest<GetFieldMap
}
GetFieldMappingsIndexRequest(GetFieldMappingsRequest other, String index, boolean probablySingleFieldRequest) {
super(other);
this.probablySingleFieldRequest = probablySingleFieldRequest;
this.includeDefaults = other.includeDefaults();
this.types = other.types();

View File

@ -33,17 +33,6 @@ import org.elasticsearch.action.support.broadcast.BroadcastRequest;
*/
public class RefreshRequest extends BroadcastRequest<RefreshRequest> {
public RefreshRequest() {
}
/**
* Copy constructor that creates a new refresh request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public RefreshRequest(ActionRequest originalRequest) {
super(originalRequest);
}
public RefreshRequest(String... indices) {
super(indices);
}

View File

@ -54,7 +54,7 @@ public class TransportRefreshAction extends TransportBroadcastReplicationAction<
@Override
protected BasicReplicationRequest newShardRequest(RefreshRequest request, ShardId shardId) {
return new BasicReplicationRequest(request, shardId);
return new BasicReplicationRequest(shardId);
}
@Override

View File

@ -69,14 +69,6 @@ public class BulkRequest extends ActionRequest<BulkRequest> implements Composite
public BulkRequest() {
}
/**
* Creates a bulk request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public BulkRequest(ActionRequest<?> request) {
super(request);
}
/**
* Adds a list of requests to be executed. Either index or delete requests.
*/

View File

@ -41,7 +41,7 @@ public class BulkShardRequest extends ReplicationRequest<BulkShardRequest> {
}
BulkShardRequest(BulkRequest bulkRequest, ShardId shardId, boolean refresh, BulkItemRequest[] items) {
super(bulkRequest, shardId);
super(shardId);
this.items = items;
this.refresh = refresh;
}

View File

@ -114,7 +114,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
for (Map.Entry<String, Set<String>> entry : indicesAndTypes.entrySet()) {
final String index = entry.getKey();
if (autoCreateIndex.shouldAutoCreate(index, state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(bulkRequest);
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(index);
for (String type : entry.getValue()) {
createIndexRequest.mapping(type);

View File

@ -92,7 +92,7 @@ public class DeleteRequest extends ReplicationRequest<DeleteRequest> implements
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteRequest(DeleteRequest request, ActionRequest originalRequest) {
super(request, originalRequest);
super(request);
this.type = request.type();
this.id = request.id();
this.routing = request.routing();
@ -102,14 +102,6 @@ public class DeleteRequest extends ReplicationRequest<DeleteRequest> implements
this.versionType = request.versionType();
}
/**
* Creates a delete request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public DeleteRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();

View File

@ -72,7 +72,7 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.get;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.RealtimeRequest;
import org.elasticsearch.action.ValidateActions;
@ -72,8 +71,7 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
* Copy constructor that creates a new get request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public GetRequest(GetRequest getRequest, ActionRequest originalRequest) {
super(originalRequest);
public GetRequest(GetRequest getRequest) {
this.index = getRequest.index;
this.type = getRequest.type;
this.id = getRequest.id;
@ -98,14 +96,6 @@ public class GetRequest extends SingleShardRequest<GetRequest> implements Realti
this.type = "_all";
}
/**
* Constructs a new get request starting from the provided request, meaning that it will
* inherit its headers and context, and against the specified index.
*/
public GetRequest(ActionRequest request, String index) {
super(request, index);
}
/**
* Constructs a new get request against the specified index with the type and id.
*

View File

@ -266,18 +266,6 @@ public class MultiGetRequest extends ActionRequest<MultiGetRequest> implements I
List<Item> items = new ArrayList<>();
public MultiGetRequest() {
}
/**
* Creates a multi get request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public MultiGetRequest(ActionRequest request) {
super(request);
}
public List<Item> getItems() {
return this.items;
}

View File

@ -45,7 +45,7 @@ public class MultiGetShardRequest extends SingleShardRequest<MultiGetShardReques
}
MultiGetShardRequest(MultiGetRequest multiGetRequest, String index, int shardId) {
super(multiGetRequest, index);
super(index);
this.shardId = shardId;
locations = new IntArrayList();
items = new ArrayList<>();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.action.index;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocumentRequest;
import org.elasticsearch.action.RoutingMissingException;
@ -160,20 +159,12 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
public IndexRequest() {
}
/**
* Creates an index request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public IndexRequest(ActionRequest request) {
super(request);
}
/**
* Copy constructor that creates a new index request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public IndexRequest(IndexRequest indexRequest, ActionRequest originalRequest) {
super(indexRequest, originalRequest);
public IndexRequest(IndexRequest indexRequest) {
super(indexRequest);
this.type = indexRequest.type;
this.id = indexRequest.id;
this.routing = indexRequest.routing;

View File

@ -88,7 +88,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
ClusterState state = clusterService.state();
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(request);
CreateIndexRequest createIndexRequest = new CreateIndexRequest();
createIndexRequest.index(request.index());
createIndexRequest.mapping(request.type());
createIndexRequest.cause("auto(index api)");

View File

@ -158,7 +158,7 @@ public final class IngestActionFilter extends AbstractComponent implements Actio
if (itemResponses.isEmpty()) {
return bulkRequest;
} else {
BulkRequest modifiedBulkRequest = new BulkRequest(bulkRequest);
BulkRequest modifiedBulkRequest = new BulkRequest();
modifiedBulkRequest.refresh(bulkRequest.refresh());
modifiedBulkRequest.consistencyLevel(bulkRequest.consistencyLevel());
modifiedBulkRequest.timeout(bulkRequest.timeout());

View File

@ -66,7 +66,6 @@ public class PercolateRequest extends BroadcastRequest<PercolateRequest> impleme
}
PercolateRequest(PercolateRequest request, BytesReference docSource) {
super(request);
this.indices = request.indices();
this.documentType = request.documentType();
this.routing = request.routing();
@ -274,7 +273,7 @@ public class PercolateRequest extends BroadcastRequest<PercolateRequest> impleme
source = in.readBytesReference();
docSource = in.readBytesReference();
if (in.readBoolean()) {
getRequest = new GetRequest(null);
getRequest = new GetRequest();
getRequest.readFrom(in);
}
onlyCount = in.readBoolean();

View File

@ -97,7 +97,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
}
if (!existingDocsRequests.isEmpty()) {
final MultiGetRequest multiGetRequest = new MultiGetRequest(request);
final MultiGetRequest multiGetRequest = new MultiGetRequest();
for (GetRequest getRequest : existingDocsRequests) {
multiGetRequest.add(
new MultiGetRequest.Item(getRequest.index(), getRequest.type(), getRequest.id())
@ -200,7 +200,7 @@ public class TransportMultiPercolateAction extends HandledTransportAction<MultiP
ShardId shardId = shard.shardId();
TransportShardMultiPercolateAction.Request requests = requestsByShard.get(shardId);
if (requests == null) {
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(multiPercolateRequest, shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
requestsByShard.put(shardId, requests = new TransportShardMultiPercolateAction.Request(shardId.getIndex(), shardId.getId(), percolateRequest.preference()));
}
logger.trace("Adding shard[{}] percolate request for item[{}]", shardId, slot);
requests.add(new TransportShardMultiPercolateAction.Request.Item(slot, new PercolateShardRequest(shardId, percolateRequest)));

View File

@ -74,7 +74,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
request.startTime = System.currentTimeMillis();
if (request.getRequest() != null) {
//create a new get request to make sure it has the same headers and context as the original percolate request
GetRequest getRequest = new GetRequest(request.getRequest(), request);
GetRequest getRequest = new GetRequest(request.getRequest());
getAction.execute(getRequest, new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
@ -150,7 +150,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
} else {
PercolatorService.ReduceResult result = null;
try {
result = percolatorService.reduce(onlyCount, shardResults, request);
result = percolatorService.reduce(onlyCount, shardResults);
} catch (IOException e) {
throw new ElasticsearchException("error during reduce phase", e);
}

View File

@ -117,8 +117,8 @@ public class TransportShardMultiPercolateAction extends TransportSingleShardActi
public Request() {
}
Request(MultiPercolateRequest multiPercolateRequest, String concreteIndex, int shardId, String preference) {
super(multiPercolateRequest, concreteIndex);
Request(String concreteIndex, int shardId, String preference) {
super(concreteIndex);
this.shardId = shardId;
this.preference = preference;
this.items = new ArrayList<>();

View File

@ -37,17 +37,6 @@ public class ClearScrollRequest extends ActionRequest<ClearScrollRequest> {
private List<String> scrollIds;
public ClearScrollRequest() {
}
/**
* Creates a clear scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public ClearScrollRequest(ActionRequest request) {
super(request);
}
public List<String> getScrollIds() {
return scrollIds;
}

View File

@ -80,8 +80,7 @@ public class SearchRequest extends ActionRequest<SearchRequest> implements Indic
* Copy constructor that creates a new search request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public SearchRequest(SearchRequest searchRequest, ActionRequest originalRequest) {
super(originalRequest);
public SearchRequest(SearchRequest searchRequest) {
this.searchType = searchRequest.searchType;
this.indices = searchRequest.indices;
this.routing = searchRequest.routing;
@ -94,15 +93,6 @@ public class SearchRequest extends ActionRequest<SearchRequest> implements Indic
this.indicesOptions = searchRequest.indicesOptions;
}
/**
* Constructs a new search request starting from the provided request, meaning that it will
* inherit its headers and context
*/
public SearchRequest(ActionRequest request) {
super(request);
this.source = new SearchSourceBuilder();
}
/**
* Constructs a new search request against the indices. No indices provided here means that search
* will run against all indices.

View File

@ -46,14 +46,6 @@ public class SearchScrollRequest extends ActionRequest<SearchScrollRequest> {
this.scrollId = scrollId;
}
/**
* Creates a scroll request caused by some other request, which is provided as an
* argument so that its headers and context can be copied to the new request
*/
public SearchScrollRequest(ActionRequest request) {
super(request);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;

View File

@ -59,7 +59,7 @@ public class TransportMultiSearchAction extends HandledTransportAction<MultiSear
final AtomicInteger counter = new AtomicInteger(responses.length());
for (int i = 0; i < responses.length(); i++) {
final int index = i;
SearchRequest searchRequest = new SearchRequest(request.requests().get(i), request);
SearchRequest searchRequest = new SearchRequest(request.requests().get(i));
searchAction.execute(searchRequest, new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {

View File

@ -135,7 +135,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc
public void doRun() throws IOException {
sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults, request);
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -211,7 +211,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults,
fetchResults, request);
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -82,7 +82,7 @@ public class TransportSearchQueryAndFetchAction extends TransportSearchTypeActio
boolean useScroll = request.scroll() != null;
sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults, request);
firstResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = buildScrollId(request.searchType(), firstResults, null);

View File

@ -146,7 +146,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi
@Override
public void doRun() throws IOException {
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
fetchResults, request);
fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), firstResults, null);

View File

@ -193,7 +193,7 @@ public class TransportSearchScrollQueryAndFetchAction extends AbstractComponent
private void innerFinishHim() throws Exception {
ScoreDoc[] sortedShardList = searchPhaseController.sortDocs(true, queryFetchResults);
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults,
queryFetchResults, request);
queryFetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();

View File

@ -208,7 +208,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
IntArrayList docIds = entry.value;
final QuerySearchResult querySearchResult = queryResults.get(entry.index);
ScoreDoc lastEmittedDoc = lastEmittedDocPerShard[entry.index];
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(request, querySearchResult.id(), docIds, lastEmittedDoc);
ShardFetchRequest shardFetchRequest = new ShardFetchRequest(querySearchResult.id(), docIds, lastEmittedDoc);
DiscoveryNode node = nodes.get(querySearchResult.shardTarget().nodeId());
searchService.sendExecuteFetchScroll(node, shardFetchRequest, new ActionListener<FetchSearchResult>() {
@Override
@ -243,7 +243,7 @@ public class TransportSearchScrollQueryThenFetchAction extends AbstractComponent
}
private void innerFinishHim() {
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults, request);
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
String scrollId = null;
if (request.scroll() != null) {
scrollId = request.scrollId();

View File

@ -143,7 +143,7 @@ public class TransportSuggestAction extends TransportBroadcastAction<SuggestRequ
throw new IllegalArgumentException("suggest content missing");
}
final SuggestionSearchContext context = suggestPhase.parseElement().parseInternal(parser, indexService.mapperService(),
indexService.fieldData(), request.shardId().getIndex(), request.shardId().id(), request);
indexService.fieldData(), request.shardId().getIndex(), request.shardId().id());
final Suggest result = suggestPhase.execute(context, searcher.searcher());
return new ShardSuggestResponse(request.shardId(), result);
}

View File

@ -23,111 +23,100 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MapperService;
import java.util.ArrayList;
import java.util.List;
/**
* Encapsulates the logic of whether a new index should be automatically created when
* a write operation is about to happen in a non existing index.
*/
public final class AutoCreateIndex {
private final boolean needToCheck;
private final boolean globallyDisabled;
private final boolean dynamicMappingDisabled;
private final String[] matches;
private final String[] matches2;
private final IndexNameExpressionResolver resolver;
public static final Setting<AutoCreate> AUTO_CREATE_INDEX_SETTING = new Setting<>("action.auto_create_index", "true", AutoCreate::new, false, Setting.Scope.CLUSTER);
private final boolean dynamicMappingDisabled;
private final IndexNameExpressionResolver resolver;
private final AutoCreate autoCreate;
@Inject
public AutoCreateIndex(Settings settings, IndexNameExpressionResolver resolver) {
this.resolver = resolver;
dynamicMappingDisabled = !MapperService.INDEX_MAPPER_DYNAMIC_SETTING.get(settings);
final AutoCreate autoCreate = AUTO_CREATE_INDEX_SETTING.get(settings);
if (autoCreate.autoCreateIndex) {
needToCheck = true;
globallyDisabled = false;
matches = autoCreate.indices;
if (matches != null) {
matches2 = new String[matches.length];
for (int i = 0; i < matches.length; i++) {
matches2[i] = matches[i].substring(1);
}
} else {
matches2 = null;
}
} else {
needToCheck = false;
globallyDisabled = true;
matches = null;
matches2 = null;
}
this.autoCreate = AUTO_CREATE_INDEX_SETTING.get(settings);
}
/**
* Do we really need to check if an index should be auto created?
*/
public boolean needToCheck() {
return this.needToCheck;
return this.autoCreate.autoCreateIndex;
}
/**
* Should the index be auto created?
*/
public boolean shouldAutoCreate(String index, ClusterState state) {
if (!needToCheck) {
if (autoCreate.autoCreateIndex == false) {
return false;
}
boolean exists = resolver.hasIndexOrAlias(index, state);
if (exists) {
if (dynamicMappingDisabled) {
return false;
}
if (globallyDisabled || dynamicMappingDisabled) {
if (resolver.hasIndexOrAlias(index, state)) {
return false;
}
// matches not set, default value of "true"
if (matches == null) {
if (autoCreate.expressions.isEmpty()) {
return true;
}
for (int i = 0; i < matches.length; i++) {
char c = matches[i].charAt(0);
if (c == '-') {
if (Regex.simpleMatch(matches2[i], index)) {
return false;
}
} else if (c == '+') {
if (Regex.simpleMatch(matches2[i], index)) {
return true;
}
} else {
if (Regex.simpleMatch(matches[i], index)) {
return true;
}
for (Tuple<String, Boolean> expression : autoCreate.expressions) {
String indexExpression = expression.v1();
boolean include = expression.v2();
if (Regex.simpleMatch(indexExpression, index)) {
return include;
}
}
return false;
}
public static class AutoCreate {
private static class AutoCreate {
private final boolean autoCreateIndex;
private final String[] indices;
private final List<Tuple<String, Boolean>> expressions;
public AutoCreate(String value) {
private AutoCreate(String value) {
boolean autoCreateIndex;
String[] indices = null;
List<Tuple<String, Boolean>> expressions = new ArrayList<>();
try {
autoCreateIndex = Booleans.parseBooleanExact(value);
} catch (IllegalArgumentException ex) {
try {
indices = Strings.commaDelimitedListToStringArray(value);
for (String string : indices) {
if (string == null || string.length() == 0) {
throw new IllegalArgumentException("Can't parse [" + value + "] for setting [action.auto_create_index] must be either [true, false, or a comma seperated list of index patterns]");
String[] patterns = Strings.commaDelimitedListToStringArray(value);
for (String pattern : patterns) {
if (pattern == null || pattern.length() == 0) {
throw new IllegalArgumentException("Can't parse [" + value + "] for setting [action.auto_create_index] must be either [true, false, or a comma separated list of index patterns]");
}
Tuple<String, Boolean> expression;
if (pattern.startsWith("-")) {
if (pattern.length() == 1) {
throw new IllegalArgumentException("Can't parse [" + value + "] for setting [action.auto_create_index] must contain an index name after [-]");
}
expression = new Tuple<>(pattern.substring(1), false);
} else if(pattern.startsWith("+")) {
if (pattern.length() == 1) {
throw new IllegalArgumentException("Can't parse [" + value + "] for setting [action.auto_create_index] must contain an index name after [+]");
}
expression = new Tuple<>(pattern.substring(1), true);
} else {
expression = new Tuple<>(pattern, true);
}
expressions.add(expression);
}
autoCreateIndex = true;
} catch (IllegalArgumentException ex1) {
@ -135,7 +124,7 @@ public final class AutoCreateIndex {
throw ex1;
}
}
this.indices = indices;
this.expressions = expressions;
this.autoCreateIndex = autoCreateIndex;
}
}

View File

@ -38,11 +38,6 @@ public class ChildTaskRequest extends TransportRequest {
private long parentTaskId;
protected ChildTaskRequest() {
}
protected ChildTaskRequest(TransportRequest parentTaskRequest) {
super(parentTaskRequest);
}
public void setParentTask(String parentTaskNode, long parentTaskId) {

View File

@ -37,11 +37,6 @@ public class BroadcastRequest<Request extends BroadcastRequest<Request>> extends
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpenAndForbidClosed();
public BroadcastRequest() {
}
protected BroadcastRequest(ActionRequest<?> originalRequest) {
super(originalRequest);
}
protected BroadcastRequest(String[] indices) {

View File

@ -42,7 +42,6 @@ public abstract class BroadcastShardRequest extends TransportRequest implements
}
protected BroadcastShardRequest(ShardId shardId, BroadcastRequest request) {
super(request);
this.shardId = shardId;
this.originalIndices = new OriginalIndices(request);
}

View File

@ -433,7 +433,6 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
}
public NodeRequest(String nodeId, Request request, List<ShardRouting> shards) {
super(request);
this.indicesLevelRequest = request;
this.shards = shards;
this.nodeId = nodeId;

View File

@ -42,10 +42,6 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
protected AcknowledgedRequest() {
}
protected AcknowledgedRequest(ActionRequest<?> request) {
super(request);
}
/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)

View File

@ -36,11 +36,6 @@ public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Reques
protected TimeValue masterNodeTimeout = DEFAULT_MASTER_NODE_TIMEOUT;
protected MasterNodeRequest() {
}
protected MasterNodeRequest(ActionRequest<?> request) {
super(request);
}
/**

View File

@ -121,7 +121,7 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.masterNodeTimeout(), logger, threadPool.getThreadContext());
doStart();
}

View File

@ -36,8 +36,7 @@ public abstract class BaseNodeRequest extends ChildTaskRequest {
}
protected BaseNodeRequest(BaseNodesRequest request, String nodeId) {
super(request);
protected BaseNodeRequest(String nodeId) {
this.nodeId = nodeId;
}

View File

@ -43,11 +43,6 @@ public abstract class BaseNodesRequest<Request extends BaseNodesRequest<Request>
}
protected BaseNodesRequest(ActionRequest<?> request, String... nodesIds) {
super(request);
this.nodesIds = nodesIds;
}
protected BaseNodesRequest(String... nodesIds) {
this.nodesIds = nodesIds;
}

View File

@ -30,22 +30,13 @@ import org.elasticsearch.index.shard.ShardId;
*/
public class BasicReplicationRequest extends ReplicationRequest<BasicReplicationRequest> {
public BasicReplicationRequest() {
}
/**
* Creates a new request that inherits headers and context from the request
* provided as argument.
*/
public BasicReplicationRequest(ActionRequest<?> request) {
super(request);
}
/**
* Creates a new request with resolved shard id
*/
public BasicReplicationRequest(ActionRequest<?> request, ShardId shardId) {
super(request, shardId);
public BasicReplicationRequest(ShardId shardId) {
super(shardId);
}
/**

View File

@ -58,35 +58,20 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
}
/**
* Creates a new request that inherits headers and context from the request provided as argument.
*/
public ReplicationRequest(ActionRequest<?> request) {
super(request);
}
/**
* Creates a new request with resolved shard id
*/
public ReplicationRequest(ActionRequest<?> request, ShardId shardId) {
super(request);
public ReplicationRequest(ShardId shardId) {
this.index = shardId.getIndex();
this.shardId = shardId;
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
*/
protected ReplicationRequest(Request request) {
this(request, request);
}
/**
* Copy constructor that creates a new request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
protected ReplicationRequest(Request request, ActionRequest<?> originalRequest) {
super(originalRequest);
protected ReplicationRequest(Request request) {
this.timeout = request.timeout();
this.index = request.index();
this.consistencyLevel = request.consistencyLevel();

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IndexShard;
@ -297,7 +298,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
private final TransportChannel channel;
// important: we pass null as a timeout as failing a replica is
// something we want to avoid at all costs
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
private final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
AsyncReplicaAction(ReplicaRequest request, TransportChannel channel) {
this.request = request;
@ -308,9 +309,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
final ThreadContext threadContext = threadPool.getThreadContext();
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
context.close();
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
@ -406,7 +410,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
ReroutePhase(Request request, ActionListener<Response> listener) {
this.request = request;
this.listener = listener;
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
}
@Override
@ -510,9 +514,12 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
finishAsFailed(failure);
return;
}
final ThreadContext threadContext = threadPool.getThreadContext();
final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
context.close();
run();
}
@ -523,6 +530,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void onTimeout(TimeValue timeout) {
context.close();
// Try one more time...
run();
}

View File

@ -124,7 +124,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
}
public void start() {
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger);
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
doStart();
}

View File

@ -56,15 +56,6 @@ public abstract class SingleShardRequest<Request extends SingleShardRequest<Requ
this.index = index;
}
protected SingleShardRequest(ActionRequest<?> request) {
super(request);
}
protected SingleShardRequest(ActionRequest<?> request, String index) {
super(request);
this.index = index;
}
/**
* @return a validation exception if the index property hasn't been set
*/

View File

@ -61,15 +61,6 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
return null;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.
*/
public BaseTasksRequest(ActionRequest<?> request, String... nodesIds) {
super(request);
this.nodesIds = nodesIds;
}
/**
* Get information about tasks from nodes based on the nodes ids specified.
* If none are passed, information for all nodes will be returned.

View File

@ -291,7 +291,7 @@ public abstract class TransportTasksAction<
}
protected NodeTaskRequest(TasksRequest tasksRequest) {
super(tasksRequest);
super();
this.tasksRequest = tasksRequest;
}

View File

@ -41,8 +41,8 @@ public class MultiTermVectorsShardRequest extends SingleShardRequest<MultiTermVe
}
MultiTermVectorsShardRequest(MultiTermVectorsRequest request, String index, int shardId) {
super(request, index);
MultiTermVectorsShardRequest(String index, int shardId) {
super(index);
this.shardId = shardId;
locations = new IntArrayList();
requests = new ArrayList<>();

View File

@ -82,7 +82,7 @@ public class TransportMultiTermVectorsAction extends HandledTransportAction<Mult
termVectorsRequest.id(), termVectorsRequest.routing());
MultiTermVectorsShardRequest shardRequest = shardRequests.get(shardId);
if (shardRequest == null) {
shardRequest = new MultiTermVectorsShardRequest(request, shardId.index().name(), shardId.id());
shardRequest = new MultiTermVectorsShardRequest(shardId.index().name(), shardId.id());
shardRequest.preference(request.preference);
shardRequests.put(shardId, shardRequest);
}

View File

@ -113,7 +113,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
protected void doExecute(final UpdateRequest request, final ActionListener<UpdateResponse> listener) {
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
if (autoCreateIndex.shouldAutoCreate(request.index(), clusterService.state())) {
createIndexAction.execute(new CreateIndexRequest(request).index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(update api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse result) {
innerExecute(request, listener);
@ -164,12 +164,12 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
}
protected void shardOperation(final UpdateRequest request, final ActionListener<UpdateResponse> listener, final int retryCount) {
IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
IndexShard indexShard = indexService.getShard(request.shardId());
final IndexService indexService = indicesService.indexServiceSafe(request.concreteIndex());
final IndexShard indexShard = indexService.getShard(request.shardId());
final UpdateHelper.Result result = updateHelper.prepare(request, indexShard);
switch (result.operation()) {
case UPSERT:
IndexRequest upsertRequest = new IndexRequest(result.action(), request);
IndexRequest upsertRequest = new IndexRequest((IndexRequest)result.action());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference upsertSourceBytes = upsertRequest.source();
indexAction.execute(upsertRequest, new ActionListener<IndexResponse>() {
@ -206,7 +206,7 @@ public class TransportUpdateAction extends TransportInstanceSingleOperationActio
});
break;
case INDEX:
IndexRequest indexRequest = new IndexRequest(result.action(), request);
IndexRequest indexRequest = new IndexRequest((IndexRequest)result.action());
// we fetch it from the index request so we don't generate the bytes twice, its already done in the index request
final BytesReference indexSourceBytes = indexRequest.source();
indexAction.execute(indexRequest, new ActionListener<IndexResponse>() {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptContext;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.fetch.source.FetchSourceContext;
@ -99,7 +100,7 @@ public class UpdateHelper extends AbstractComponent {
// Tell the script that this is a create and not an update
ctx.put("op", "create");
ctx.put("_source", upsertDoc);
ctx = executeScript(request, ctx);
ctx = executeScript(request.script, ctx);
//Allow the script to set TTL using ctx._ttl
if (ttl == null) {
ttl = getTTLFromScriptContext(ctx);
@ -193,7 +194,7 @@ public class UpdateHelper extends AbstractComponent {
ctx.put("_ttl", originalTtl);
ctx.put("_source", sourceAndContent.v2());
ctx = executeScript(request, ctx);
ctx = executeScript(request.script, ctx);
operation = (String) ctx.get("op");
@ -243,14 +244,14 @@ public class UpdateHelper extends AbstractComponent {
}
}
private Map<String, Object> executeScript(UpdateRequest request, Map<String, Object> ctx) {
private Map<String, Object> executeScript(Script script, Map<String, Object> ctx) {
try {
if (scriptService != null) {
ExecutableScript script = scriptService.executable(request.script, ScriptContext.Standard.UPDATE, request, Collections.emptyMap());
script.setNextVar("ctx", ctx);
script.run();
ExecutableScript executableScript = scriptService.executable(script, ScriptContext.Standard.UPDATE, Collections.emptyMap());
executableScript.setNextVar("ctx", ctx);
executableScript.run();
// we need to unwrap the ctx...
ctx = (Map<String, Object>) script.unwrap(ctx);
ctx = (Map<String, Object>) executableScript.unwrap(ctx);
}
} catch (Exception e) {
throw new IllegalArgumentException("failed to execute script", e);

View File

@ -19,8 +19,12 @@
package org.elasticsearch.client;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -80,13 +84,12 @@ import org.elasticsearch.action.termvectors.TermVectorsResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import java.util.function.Function;
import java.util.Map;
/**
* A client provides a one stop interface for performing actions/operations against the cluster.
@ -608,5 +611,9 @@ public interface Client extends ElasticsearchClient, Releasable {
*/
Settings settings();
Headers headers();
/**
* Returns a new lightweight Client that applies all given headers to each of the requests
* issued from it.
*/
Client filterWithHeader(Map<String, String> headers);
}

View File

@ -42,7 +42,7 @@ public abstract class FilterClient extends AbstractClient {
* @see #in()
*/
public FilterClient(Client in) {
super(in.settings(), in.threadPool(), in.headers());
super(in.settings(), in.threadPool());
this.in = in;
}

View File

@ -27,7 +27,6 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.GenericAction;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
@ -44,8 +43,8 @@ public class NodeClient extends AbstractClient {
private final Map<GenericAction, TransportAction> actions;
@Inject
public NodeClient(Settings settings, ThreadPool threadPool, Headers headers, Map<GenericAction, TransportAction> actions) {
super(settings, threadPool, headers);
public NodeClient(Settings settings, ThreadPool threadPool, Map<GenericAction, TransportAction> actions) {
super(settings, threadPool);
this.actions = unmodifiableMap(actions);
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.client.node;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.common.inject.AbstractModule;
/**
@ -30,7 +29,6 @@ public class NodeClientModule extends AbstractModule {
@Override
protected void configure() {
bind(Headers.class).asEagerSingleton();
bind(Client.class).to(NodeClient.class).asEagerSingleton();
}
}

View File

@ -332,13 +332,17 @@ import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.FilterClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Map;
/**
*
*/
@ -346,23 +350,15 @@ public abstract class AbstractClient extends AbstractComponent implements Client
private final ThreadPool threadPool;
private final Admin admin;
private final Headers headers;
private final ThreadedActionListener.Wrapper threadedWrapper;
public AbstractClient(Settings settings, ThreadPool threadPool, Headers headers) {
public AbstractClient(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
this.headers = headers;
this.admin = new Admin(this);
this.threadedWrapper = new ThreadedActionListener.Wrapper(logger, settings, threadPool);
}
@Override
public Headers headers() {
return this.headers;
}
@Override
public final Settings settings() {
return this.settings;
@ -398,7 +394,6 @@ public abstract class AbstractClient extends AbstractComponent implements Client
@Override
public final <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(
Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
headers.applyTo(request);
listener = threadedWrapper.wrap(listener);
doExecute(action, request, listener);
}
@ -1757,4 +1752,17 @@ public abstract class AbstractClient extends AbstractComponent implements Client
execute(GetSettingsAction.INSTANCE, request, listener);
}
}
@Override
public Client filterWithHeader(Map<String, String> headers) {
return new FilterClient(this) {
@Override
protected <Request extends ActionRequest<Request>, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
ThreadContext threadContext = threadPool().getThreadContext();
try (ThreadContext.StoredContext ctx = threadContext.stashAndMergeHeaders(headers)) {
super.doExecute(action, request, listener);
}
}
};
}
}

View File

@ -1,65 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.support;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportMessage;
/**
* Client request headers picked up from the client settings. Applied to every
* request sent by the client (both transport and node clients)
*/
public class Headers {
public static final String PREFIX = "request.headers";
public static final Headers EMPTY = new Headers(Settings.EMPTY) {
@Override
public <M extends TransportMessage<?>> M applyTo(M message) {
return message;
}
};
private final Settings headers;
@Inject
public Headers(Settings settings) {
headers = resolveHeaders(settings);
}
public <M extends TransportMessage<?>> M applyTo(M message) {
for (String key : headers.names()) {
if (!message.hasHeader(key)) {
message.putHeader(key, headers.get(key));
}
}
return message;
}
public Settings headers() {
return headers;
}
static Settings resolveHeaders(Settings settings) {
Settings headers = settings.getAsSettings(PREFIX);
return headers != null ? headers : Settings.EMPTY;
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cache.recycler.PageCacheRecycler;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.ClusterNameModule;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -177,7 +176,7 @@ public class TransportClient extends AbstractClient {
private final TransportProxyClient proxy;
private TransportClient(Injector injector) {
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class));
this.injector = injector;
nodesService = injector.getInstance(TransportClientNodesService.class);
proxy = injector.getInstance(TransportProxyClient.class);

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAct
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
@ -80,8 +79,6 @@ public class TransportClientNodesService extends AbstractComponent {
private final Version minCompatibilityVersion;
private final Headers headers;
// nodes that are added to be discovered
private volatile List<DiscoveryNode> listedNodes = Collections.emptyList();
@ -109,13 +106,12 @@ public class TransportClientNodesService extends AbstractComponent {
@Inject
public TransportClientNodesService(Settings settings, ClusterName clusterName, TransportService transportService,
ThreadPool threadPool, Headers headers, Version version) {
ThreadPool threadPool, Version version) {
super(settings);
this.clusterName = clusterName;
this.transportService = transportService;
this.threadPool = threadPool;
this.minCompatibilityVersion = version.minimumCompatibilityVersion();
this.headers = headers;
this.nodesSamplerInterval = CLIENT_TRANSPORT_NODES_SAMPLER_INTERVAL.get(this.settings);
this.pingTimeout = CLIENT_TRANSPORT_PING_TIMEOUT.get(this.settings).millis();
@ -364,7 +360,7 @@ public class TransportClientNodesService extends AbstractComponent {
}
try {
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
headers.applyTo(new LivenessRequest()),
new LivenessRequest(),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
@ -434,8 +430,7 @@ public class TransportClientNodesService extends AbstractComponent {
return;
}
}
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
transportService.sendRequest(listedNode, ClusterStateAction.NAME, Requests.clusterStateRequest().clear().nodes(true).local(true),
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
new BaseTransportResponseHandler<ClusterStateResponse>() {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import java.util.concurrent.atomic.AtomicReference;
@ -44,6 +45,7 @@ public class ClusterStateObserver {
};
private final ClusterService clusterService;
private final ThreadContext contextHolder;
volatile TimeValue timeOutValue;
@ -55,8 +57,8 @@ public class ClusterStateObserver {
volatile boolean timedOut;
public ClusterStateObserver(ClusterService clusterService, ESLogger logger) {
this(clusterService, new TimeValue(60000), logger);
public ClusterStateObserver(ClusterService clusterService, ESLogger logger, ThreadContext contextHolder) {
this(clusterService, new TimeValue(60000), logger, contextHolder);
}
/**
@ -64,7 +66,7 @@ public class ClusterStateObserver {
* will fail any existing or new #waitForNextChange calls. Set to null
* to wait indefinitely
*/
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger) {
public ClusterStateObserver(ClusterService clusterService, @Nullable TimeValue timeout, ESLogger logger, ThreadContext contextHolder) {
this.clusterService = clusterService;
this.lastObservedState = new AtomicReference<>(new ObservedState(clusterService.state()));
this.timeOutValue = timeout;
@ -72,6 +74,7 @@ public class ClusterStateObserver {
this.startTimeNS = System.nanoTime();
}
this.logger = logger;
this.contextHolder = contextHolder;
}
/** last cluster state observer by this observer. Note that this may not be the current one */
@ -146,7 +149,7 @@ public class ClusterStateObserver {
listener.onNewClusterState(newState.clusterState);
} else {
logger.trace("observer: sampled state rejected by predicate ({}). adding listener to ClusterService", newState);
ObservingContext context = new ObservingContext(listener, changePredicate);
ObservingContext context = new ObservingContext(new ContextPreservingListener(listener, contextHolder.newStoredContext()), changePredicate);
if (!observingContext.compareAndSet(null, context)) {
throw new ElasticsearchException("already waiting for a cluster state change");
}
@ -317,4 +320,33 @@ public class ClusterStateObserver {
return "version [" + clusterState.version() + "], status [" + status + "]";
}
}
private final static class ContextPreservingListener implements Listener {
private final Listener delegate;
private final ThreadContext.StoredContext tempContext;
private ContextPreservingListener(Listener delegate, ThreadContext.StoredContext storedContext) {
this.tempContext = storedContext;
this.delegate = delegate;
}
@Override
public void onNewClusterState(ClusterState state) {
tempContext.restore();
delegate.onNewClusterState(state);
}
@Override
public void onClusterServiceClose() {
tempContext.restore();
delegate.onClusterServiceClose();
}
@Override
public void onTimeout(TimeValue timeout) {
tempContext.restore();
delegate.onTimeout(timeout);
}
}
}

View File

@ -74,13 +74,15 @@ public class ShardStateAction extends AbstractComponent {
private final TransportService transportService;
private final ClusterService clusterService;
private final ThreadPool threadPool;
@Inject
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
AllocationService allocationService, RoutingService routingService) {
AllocationService allocationService, RoutingService routingService, ThreadPool threadPool) {
super(settings);
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
transportService.registerRequestHandler(SHARD_STARTED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardStartedTransportHandler(clusterService, new ShardStartedClusterStateTaskExecutor(allocationService, logger), logger));
transportService.registerRequestHandler(SHARD_FAILED_ACTION_NAME, ShardRoutingEntry::new, ThreadPool.Names.SAME, new ShardFailedTransportHandler(clusterService, new ShardFailedClusterStateTaskExecutor(allocationService, routingService, logger), logger));
@ -124,7 +126,7 @@ public class ShardStateAction extends AbstractComponent {
}
public void shardFailed(final ShardRouting shardRouting, final String indexUUID, final String message, @Nullable final Throwable failure, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, failure);
sendShardAction(SHARD_FAILED_ACTION_NAME, observer, shardRoutingEntry, listener);
}
@ -290,7 +292,7 @@ public class ShardStateAction extends AbstractComponent {
}
public void shardStarted(final ShardRouting shardRouting, String indexUUID, final String message, Listener listener) {
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
ShardRoutingEntry shardRoutingEntry = new ShardRoutingEntry(shardRouting, indexUUID, message, null);
sendShardAction(SHARD_STARTED_ACTION_NAME, observer, shardRoutingEntry, listener);
}

View File

@ -190,7 +190,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
protected void doStart() {
add(localNodeMasterListeners);
this.clusterState = ClusterState.builder(clusterState).blocks(initialBlocks).build();
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME));
this.updateTasksExecutor = EsExecutors.newSinglePrioritizing(UPDATE_THREAD_NAME, daemonThreadFactory(settings, UPDATE_THREAD_NAME), threadPool.getThreadContext());
this.reconnectToNodes = threadPool.schedule(reconnectInterval, ThreadPool.Names.GENERIC, new ReconnectToNodes());
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
// note, we rely on the fact that its a new id each time we start, see FD and "kill -9" handling

View File

@ -1,153 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
*
*/
public class ContextAndHeaderHolder implements HasContextAndHeaders {
private ObjectObjectHashMap<Object, Object> context;
protected Map<String, Object> headers;
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V putInContext(Object key, Object value) {
if (context == null) {
context = new ObjectObjectHashMap<>(2);
}
return (V) context.put(key, value);
}
@Override
public final synchronized void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
if (map == null) {
return;
}
if (context == null) {
context = new ObjectObjectHashMap<>(map);
} else {
context.putAll(map);
}
}
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V getFromContext(Object key) {
return context != null ? (V) context.get(key) : null;
}
@SuppressWarnings("unchecked")
@Override
public final synchronized <V> V getFromContext(Object key, V defaultValue) {
V value = getFromContext(key);
return value == null ? defaultValue : value;
}
@Override
public final synchronized boolean hasInContext(Object key) {
return context != null && context.containsKey(key);
}
@Override
public final synchronized int contextSize() {
return context != null ? context.size() : 0;
}
@Override
public final synchronized boolean isContextEmpty() {
return context == null || context.isEmpty();
}
@Override
public synchronized ImmutableOpenMap<Object, Object> getContext() {
return context != null ? ImmutableOpenMap.copyOf(context) : ImmutableOpenMap.of();
}
@Override
public synchronized void copyContextFrom(HasContext other) {
if (other == null) {
return;
}
synchronized (other) {
ImmutableOpenMap<Object, Object> otherContext = other.getContext();
if (otherContext == null) {
return;
}
if (context == null) {
ObjectObjectHashMap<Object, Object> map = new ObjectObjectHashMap<>(other.getContext().size());
map.putAll(otherContext);
this.context = map;
} else {
context.putAll(otherContext);
}
}
}
@SuppressWarnings("unchecked")
@Override
public final void putHeader(String key, Object value) {
if (headers == null) {
headers = new HashMap<>();
}
headers.put(key, value);
}
@SuppressWarnings("unchecked")
@Override
public final <V> V getHeader(String key) {
return headers != null ? (V) headers.get(key) : null;
}
@Override
public final boolean hasHeader(String key) {
return headers != null && headers.containsKey(key);
}
@Override
public Set<String> getHeaders() {
return headers != null ? headers.keySet() : Collections.<String>emptySet();
}
@Override
public void copyHeadersFrom(HasHeaders from) {
if (from != null && from.getHeaders() != null && !from.getHeaders().isEmpty()) {
for (String headerName : from.getHeaders()) {
putHeader(headerName, from.getHeader(headerName));
}
}
}
@Override
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
copyContextFrom(other);
copyHeadersFrom(other);
}
}

View File

@ -1,111 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import java.util.Set;
public class DelegatingHasContextAndHeaders implements HasContextAndHeaders {
private HasContextAndHeaders delegate;
public DelegatingHasContextAndHeaders(HasContextAndHeaders delegate) {
this.delegate = delegate;
}
@Override
public <V> void putHeader(String key, V value) {
delegate.putHeader(key, value);
}
@Override
public void copyContextAndHeadersFrom(HasContextAndHeaders other) {
delegate.copyContextAndHeadersFrom(other);
}
@Override
public <V> V getHeader(String key) {
return delegate.getHeader(key);
}
@Override
public boolean hasHeader(String key) {
return delegate.hasHeader(key);
}
@Override
public <V> V putInContext(Object key, Object value) {
return delegate.putInContext(key, value);
}
@Override
public Set<String> getHeaders() {
return delegate.getHeaders();
}
@Override
public void copyHeadersFrom(HasHeaders from) {
delegate.copyHeadersFrom(from);
}
@Override
public void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map) {
delegate.putAllInContext(map);
}
@Override
public <V> V getFromContext(Object key) {
return delegate.getFromContext(key);
}
@Override
public <V> V getFromContext(Object key, V defaultValue) {
return delegate.getFromContext(key, defaultValue);
}
@Override
public boolean hasInContext(Object key) {
return delegate.hasInContext(key);
}
@Override
public int contextSize() {
return delegate.contextSize();
}
@Override
public boolean isContextEmpty() {
return delegate.isContextEmpty();
}
@Override
public ImmutableOpenMap<Object, Object> getContext() {
return delegate.getContext();
}
@Override
public void copyContextFrom(HasContext other) {
delegate.copyContextFrom(other);
}
}

View File

@ -1,82 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import com.carrotsearch.hppc.ObjectObjectAssociativeContainer;
import org.elasticsearch.common.collect.ImmutableOpenMap;
public interface HasContext {
/**
* Attaches the given value to the context.
*
* @return The previous value that was associated with the given key in the context, or
* {@code null} if there was none.
*/
<V> V putInContext(Object key, Object value);
/**
* Attaches the given values to the context
*/
void putAllInContext(ObjectObjectAssociativeContainer<Object, Object> map);
/**
* @return The context value that is associated with the given key
*
* @see #putInContext(Object, Object)
*/
<V> V getFromContext(Object key);
/**
* @param defaultValue The default value that should be returned for the given key, if no
* value is currently associated with it.
*
* @return The value that is associated with the given key in the context
*
* @see #putInContext(Object, Object)
*/
<V> V getFromContext(Object key, V defaultValue);
/**
* Checks if the context contains an entry with the given key
*/
boolean hasInContext(Object key);
/**
* @return The number of values attached in the context.
*/
int contextSize();
/**
* Checks if the context is empty.
*/
boolean isContextEmpty();
/**
* @return A safe immutable copy of the current context.
*/
ImmutableOpenMap<Object, Object> getContext();
/**
* Copies the context from the given context holder to this context holder. Any shared keys between
* the two context will be overridden by the given context holder.
*/
void copyContextFrom(HasContext other);
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
/**
* marker interface
*/
public interface HasContextAndHeaders extends HasContext, HasHeaders {
/**
* copies over the context and the headers
* @param other another object supporting headers and context
*/
void copyContextAndHeadersFrom(HasContextAndHeaders other);
}

View File

@ -1,38 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common;
import java.util.Set;
/**
*
*/
public interface HasHeaders {
<V> void putHeader(String key, V value);
<V> V getHeader(String key);
boolean hasHeader(String key);
Set<String> getHeaders();
void copyHeadersFrom(HasHeaders from);
}

View File

@ -22,7 +22,6 @@ package org.elasticsearch.common.network;
import java.util.Arrays;
import java.util.List;
import org.elasticsearch.client.support.Headers;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.support.TransportProxyClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -376,7 +375,6 @@ public class NetworkModule extends AbstractModule {
transportTypes.bindType(binder(), settings, TRANSPORT_TYPE_KEY, defaultTransport);
if (transportClient) {
bind(Headers.class).asEagerSingleton();
bind(TransportProxyClient.class).asEagerSingleton();
bind(TransportClientNodesService.class).asEagerSingleton();
} else {

View File

@ -59,6 +59,7 @@ import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.analysis.HunspellService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.recovery.RecoverySettings;
@ -144,6 +145,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
FsRepository.REPOSITORIES_LOCATION_SETTING,
IndexStoreConfig.INDICES_STORE_THROTTLE_TYPE_SETTING,
IndexStoreConfig.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesTTLService.INDICES_TTL_INTERVAL_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,

View File

@ -54,30 +54,30 @@ public class EsExecutors {
return PROCESSORS_SETTING.get(settings);
}
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
public static PrioritizedEsThreadPoolExecutor newSinglePrioritizing(String name, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new PrioritizedEsThreadPoolExecutor(name, 1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory, contextHolder);
}
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
// we force the execution, since we might run into concurrency issues in offer for ScalingBlockingQueue
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy());
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy());
public static EsThreadPoolExecutor newCached(String name, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
return new EsThreadPoolExecutor(name, 0, Integer.MAX_VALUE, keepAliveTime, unit, new SynchronousQueue<Runnable>(), threadFactory, new EsAbortPolicy(), contextHolder);
}
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory) {
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) {
BlockingQueue<Runnable> queue;
if (queueCapacity < 0) {
queue = ConcurrentCollections.newBlockingQueue();
} else {
queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity);
}
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy());
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
}
public static String threadName(Settings settings, String ... names) {

View File

@ -24,12 +24,14 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
/**
* An extension to thread pool executor, allowing (in the future) to add specific additional stats to it.
*/
public class EsThreadPoolExecutor extends ThreadPoolExecutor {
private final ThreadContext contextHolder;
private volatile ShutdownListener listener;
private final Object monitor = new Object();
@ -38,13 +40,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
*/
private final String name;
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy());
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
}
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler) {
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
}
public void shutdown(ShutdownListener listener) {
@ -80,7 +83,11 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
@Override
public void execute(Runnable command) {
public void execute(final Runnable command) {
doExecute(wrapRunnable(command));
}
protected void doExecute(final Runnable command) {
try {
super.execute(command);
} catch (EsRejectedExecutionException ex) {
@ -99,6 +106,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
}
}
/**
* Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted
* {@link Runnable} instances rather than potentially wrapped ones.
*/
public Stream<Runnable> getTasks() {
return this.getQueue().stream().map(this::unwrap);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder();
@ -116,4 +131,112 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
b.append(super.toString()).append(']');
return b.toString();
}
protected Runnable wrapRunnable(Runnable command) {
final Runnable wrappedCommand;
if (command instanceof AbstractRunnable) {
wrappedCommand = new FilterAbstractRunnable(contextHolder, (AbstractRunnable) command);
} else {
wrappedCommand = new FilterRunnable(contextHolder, command);
}
return wrappedCommand;
}
protected Runnable unwrap(Runnable runnable) {
if (runnable instanceof FilterAbstractRunnable) {
return ((FilterAbstractRunnable) runnable).in;
} else if (runnable instanceof FilterRunnable) {
return ((FilterRunnable) runnable).in;
}
return runnable;
}
private class FilterAbstractRunnable extends AbstractRunnable {
private final ThreadContext contextHolder;
private final AbstractRunnable in;
private final ThreadContext.StoredContext ctx;
FilterAbstractRunnable(ThreadContext contextHolder, AbstractRunnable in) {
this.contextHolder = contextHolder;
ctx = contextHolder.newStoredContext();
this.in = in;
}
@Override
public boolean isForceExecution() {
return in.isForceExecution();
}
@Override
public void onAfter() {
in.onAfter();
}
@Override
public void onFailure(Throwable t) {
in.onFailure(t);
}
@Override
public void onRejection(Throwable t) {
in.onRejection(t);
}
@Override
protected void doRun() throws Exception {
boolean started = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore();
started = true;
in.doRun();
} catch (IllegalStateException ex) {
if (started || isShutdown() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}
@Override
public String toString() {
return in.toString();
}
}
private class FilterRunnable implements Runnable {
private final ThreadContext contextHolder;
private final Runnable in;
private final ThreadContext.StoredContext ctx;
FilterRunnable(ThreadContext contextHolder, Runnable in) {
this.contextHolder = contextHolder;
ctx = contextHolder.newStoredContext();
this.in = in;
}
@Override
public void run() {
boolean started = false;
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
ctx.restore();
started = true;
in.run();
} catch (IllegalStateException ex) {
if (started || isShutdown() == false) {
throw ex;
}
// if we hit an ISE here we have been shutting down
// this comes from the threadcontext and barfs if
// our threadpool has been shutting down
}
}
@Override
public String toString() {
return in.toString();
}
}
}

View File

@ -47,8 +47,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong insertionOrder = new AtomicLong();
private Queue<Runnable> current = ConcurrentCollections.newQueue();
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
PrioritizedEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<>(), threadFactory, contextHolder);
}
public Pending[] getPending() {
@ -88,10 +88,14 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
for (Runnable runnable : runnables) {
if (runnable instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) runnable;
pending.add(new Pending(t.runnable, t.priority(), t.insertionOrder, executing));
pending.add(new Pending(unwrap(t.runnable), t.priority(), t.insertionOrder, executing));
} else if (runnable instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) runnable;
pending.add(new Pending(t.task, t.priority, t.insertionOrder, executing));
Object task = t.task;
if (t.task instanceof Runnable) {
task = unwrap((Runnable) t.task);
}
pending.add(new Pending(task, t.priority, t.insertionOrder, executing));
}
}
}
@ -107,12 +111,8 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
if (command instanceof PrioritizedRunnable) {
command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
command = wrapRunnable(command);
doExecute(command);
if (timeout.nanos() >= 0) {
if (command instanceof TieBreakingPrioritizedRunnable) {
((TieBreakingPrioritizedRunnable) command).scheduleTimeout(timer, timeoutCallback, timeout);
@ -125,21 +125,31 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
}
@Override
public void execute(Runnable command) {
protected Runnable wrapRunnable(Runnable command) {
if (command instanceof PrioritizedRunnable) {
command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
if ((command instanceof TieBreakingPrioritizedRunnable)) {
return command;
}
Priority priority = ((PrioritizedRunnable) command).priority();
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), priority, insertionOrder.incrementAndGet());
} else if (command instanceof PrioritizedFutureTask) {
return command;
} else { // it might be a callable wrapper...
if (command instanceof TieBreakingPrioritizedRunnable) {
return command;
}
return new TieBreakingPrioritizedRunnable(super.wrapRunnable(command), Priority.NORMAL, insertionOrder.incrementAndGet());
}
super.execute(command);
}
@Override
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
Priority priority = ((PrioritizedRunnable) runnable).priority();
return new PrioritizedFutureTask<>(runnable, priority, value, insertionOrder.incrementAndGet());
}
@Override
@ -147,7 +157,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
}
return new PrioritizedFutureTask<>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
return new PrioritizedFutureTask<>((PrioritizedCallable)callable, insertionOrder.incrementAndGet());
}
public static class Pending {
@ -173,10 +183,6 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private ScheduledFuture<?> timeoutFuture;
private boolean started = false;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), insertionOrder);
}
TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) {
super(priority);
this.runnable = runnable;
@ -233,6 +239,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
runnable = null;
timeoutFuture = null;
}
}
}
@ -242,10 +249,10 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
final Priority priority;
final long insertionOrder;
public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) {
public PrioritizedFutureTask(Runnable runnable, Priority priority, T value, long insertionOrder) {
super(runnable, value);
this.task = runnable;
this.priority = runnable.priority();
this.priority = priority;
this.insertionOrder = insertionOrder;
}
@ -265,4 +272,5 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
return insertionOrder < pft.insertionOrder ? -1 : 1;
}
}
}

View File

@ -0,0 +1,357 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.util.concurrent;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with
* a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a
* thread that has a {@link ThreadContext} associated with. Threads spawned from a {@link org.elasticsearch.threadpool.ThreadPool} have out of the box
* support for {@link ThreadContext} and all threads spawned will inherit the {@link ThreadContext} from the thread that it is forking from.".
* Network calls will also preserve the senders headers automatically.
* <p>
* Consumers of ThreadContext usually don't need to interact with adding or stashing contexts. Every elasticsearch thread is managed by a thread pool or executor
* being responsible for stashing and restoring the threads context. For instance if a network request is received, all headers are deserialized from the network
* and directly added as the headers of the threads {@link ThreadContext} (see {@link #readHeaders(StreamInput)}. In order to not modify the context that is currently
* active on this thread the network code uses a try/with pattern to stash it's current context, read headers into a fresh one and once the request is handled or a handler thread
* is forked (which in turn inherits the context) it restores the previous context. For instance:
* </p>
* <pre>
* // current context is stashed and replaced with a default context
* try (StoredContext context = threadContext.stashContext()) {
* threadContext.readHeaders(in); // read headers into current context
* if (fork) {
* threadPool.execute(() -&gt; request.handle()); // inherits context
* } else {
* request.handle();
* }
* }
* // previous context is restored on StoredContext#close()
* </pre>
*
*/
public final class ThreadContext implements Closeable, Writeable<ThreadContext.ThreadContextStruct>{
public static final String PREFIX = "request.headers";
private final Map<String, String> defaultHeader;
private static final ThreadContextStruct DEFAULT_CONTEXT = new ThreadContextStruct(Collections.emptyMap());
private final ContextThreadLocal threadLocal;
/**
* Creates a new ThreadContext instance
* @param settings the settings to read the default request headers from
*/
public ThreadContext(Settings settings) {
Settings headers = settings.getAsSettings(PREFIX);
if (headers == null) {
this.defaultHeader = Collections.emptyMap();
} else {
Map<String, String> defaultHeader = new HashMap<>();
for (String key : headers.names()) {
defaultHeader.put(key, headers.get(key));
}
this.defaultHeader = Collections.unmodifiableMap(defaultHeader);
}
threadLocal = new ContextThreadLocal();
}
@Override
public void close() throws IOException {
threadLocal.close();
}
/**
* Removes the current context and resets a default context. The removed context can be
* restored when closing the returned {@link StoredContext}
*/
public StoredContext stashContext() {
final ThreadContextStruct context = threadLocal.get();
threadLocal.set(null);
return () -> {
threadLocal.set(context);
};
}
/**
* Removes the current context and resets a new context that contains a merge of the current headers and the given headers. The removed context can be
* restored when closing the returned {@link StoredContext}. The merge strategy is that headers that are already existing are preserved unless they are defaults.
*/
public StoredContext stashAndMergeHeaders(Map<String, String> headers) {
final ThreadContextStruct context = threadLocal.get();
Map<String, String> newHeader = new HashMap<>(headers);
newHeader.putAll(context.headers);
threadLocal.set(DEFAULT_CONTEXT.putHeaders(newHeader));
return () -> {
threadLocal.set(context);
};
}
/**
* Just like {@link #stashContext()} but no default context is set.
*/
public StoredContext newStoredContext() {
final ThreadContextStruct context = threadLocal.get();
return () -> {
threadLocal.set(context);
};
}
@Override
public void writeTo(StreamOutput out) throws IOException {
threadLocal.get().writeTo(out, defaultHeader);
}
@Override
public ThreadContextStruct readFrom(StreamInput in) throws IOException {
return DEFAULT_CONTEXT.readFrom(in);
}
/**
* Reads the headers from the stream into the current context
*/
public void readHeaders(StreamInput in) throws IOException {
threadLocal.set(readFrom(in));
}
/**
* Returns the header for the given key or <code>null</code> if not present
*/
public String getHeader(String key) {
String value = threadLocal.get().headers.get(key);
if (value == null) {
return defaultHeader.get(key);
}
return value;
}
/**
* Returns all of the current contexts headers
*/
public Map<String, String> getHeaders() {
HashMap<String, String> map = new HashMap<>(defaultHeader);
map.putAll(threadLocal.get().headers);
return Collections.unmodifiableMap(map);
}
/**
* Copies all header key, value pairs into the current context
*/
public void copyHeaders(Iterable<Map.Entry<String, String>> headers) {
threadLocal.set(threadLocal.get().copyHeaders(headers));
}
/**
* Puts a header into the context
*/
public void putHeader(String key, String value) {
threadLocal.set(threadLocal.get().putPersistent(key, value));
}
/**
* Puts all of the given headers into this context
*/
public void putHeader(Map<String, String> header) {
threadLocal.set(threadLocal.get().putHeaders(header));
}
/**
* Puts a transient header object into this context
*/
public void putTransient(String key, Object value) {
threadLocal.set(threadLocal.get().putTransient(key, value));
}
/**
* Returns a transient header object or <code>null</code> if there is no header for the given key
*/
public <T> T getTransient(String key) {
return (T) threadLocal.get().transientHeaders.get(key);
}
public interface StoredContext extends AutoCloseable {
@Override
void close();
default void restore() {
close();
}
}
static final class ThreadContextStruct implements Writeable<ThreadContextStruct> {
private final Map<String,String> headers;
private final Map<String, Object> transientHeaders;
private ThreadContextStruct(StreamInput in) throws IOException {
int numValues = in.readVInt();
Map<String, String> headers = numValues == 0 ? Collections.emptyMap() : new HashMap<>(numValues);
for (int i = 0; i < numValues; i++) {
headers.put(in.readString(), in.readString());
}
this.headers = headers;
this.transientHeaders = Collections.emptyMap();
}
private ThreadContextStruct(Map<String, String> headers, Map<String, Object> transientHeaders) {
this.headers = headers;
this.transientHeaders = transientHeaders;
}
private ThreadContextStruct(Map<String, String> headers) {
this(headers, Collections.emptyMap());
}
private ThreadContextStruct putPersistent(String key, String value) {
Map<String, String> newHeaders = new HashMap<>(this.headers);
putSingleHeader(key, value, newHeaders);
return new ThreadContextStruct(newHeaders, transientHeaders);
}
private void putSingleHeader(String key, String value, Map<String, String> newHeaders) {
final String existingValue;
if ((existingValue = newHeaders.putIfAbsent(key, value)) != null) {
throw new IllegalArgumentException("value for key [" + key + "] already present");
}
}
private ThreadContextStruct putHeaders(Map<String, String> headers) {
if (headers.isEmpty()) {
return this;
} else {
final Map<String, String> newHeaders = new HashMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
putSingleHeader(entry.getKey(), entry.getValue(), newHeaders);
}
newHeaders.putAll(this.headers);
return new ThreadContextStruct(newHeaders, transientHeaders);
}
}
private ThreadContextStruct putTransient(String key, Object value) {
Map<String, Object> newTransient = new HashMap<>(this.transientHeaders);
if (newTransient.putIfAbsent(key, value) != null) {
throw new IllegalArgumentException("value for key [" + key + "] already present");
}
return new ThreadContextStruct(headers, newTransient);
}
boolean isEmpty() {
return headers.isEmpty() && transientHeaders.isEmpty();
}
private ThreadContextStruct copyHeaders(Iterable<Map.Entry<String, String>> headers) {
Map<String, String> newHeaders = new HashMap<>();
for (Map.Entry<String, String> header : headers) {
newHeaders.put(header.getKey(), header.getValue());
}
return putHeaders(newHeaders);
}
@Override
public ThreadContextStruct readFrom(StreamInput in) throws IOException {
return new ThreadContextStruct(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("use the other write to");
}
public void writeTo(StreamOutput out, Map<String, String> defaultHeaders) throws IOException {
final Map<String, String> headers;
if (defaultHeaders.isEmpty()) {
headers = this.headers;
} else {
headers = new HashMap<>(defaultHeaders);
headers.putAll(this.headers);
}
int keys = headers.size();
out.writeVInt(keys);
for (Map.Entry<String, String> entry : headers.entrySet()) {
out.writeString(entry.getKey());
out.writeString(entry.getValue());
}
}
}
private static class ContextThreadLocal extends CloseableThreadLocal<ThreadContextStruct> {
private final AtomicBoolean closed = new AtomicBoolean(false);
@Override
public void set(ThreadContextStruct object) {
try {
if (object == DEFAULT_CONTEXT) {
super.set(null);
} else {
super.set(object);
}
} catch (NullPointerException ex) {
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
ensureOpen();
throw ex;
}
}
@Override
public ThreadContextStruct get() {
try {
ThreadContextStruct threadContextStruct = super.get();
if (threadContextStruct != null) {
return threadContextStruct;
}
return DEFAULT_CONTEXT;
} catch (NullPointerException ex) {
/* This is odd but CloseableThreadLocal throws a NPE if it was closed but still accessed.
to get a real exception we call ensureOpen() to tell the user we are already closed.*/
ensureOpen();
throw ex;
}
}
private void ensureOpen() {
if (closed.get()) {
throw new IllegalStateException("threadcontext is already closed");
}
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
super.close();
}
}
}
}

View File

@ -169,7 +169,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
transportService.registerRequestHandler(ACTION_NAME, UnicastPingRequest::new, ThreadPool.Names.SAME, new UnicastPingRequestHandler());
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(settings, "[unicast_connect]");
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory);
unicastConnectExecutor = EsExecutors.newScaling("unicast_connect", 0, concurrentConnects, 60, TimeUnit.SECONDS, threadFactory, threadPool.getThreadContext());
}
@Override

View File

@ -183,7 +183,7 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
}
NodeRequest(String nodeId, TransportNodesListGatewayMetaState.Request request) {
super(request, nodeId);
super(nodeId);
}
@Override

View File

@ -247,7 +247,7 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
}
NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) {
super(request, nodeId);
super(nodeId);
this.shardId = request.shardId();
this.indexUUID = request.getIndexUUID();
}

View File

@ -21,39 +21,27 @@ package org.elasticsearch.http;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestFilter;
import org.elasticsearch.rest.RestFilterChain;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
/**
* A component to serve http requests, backed by rest handlers.
*/
public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
public class HttpServer extends AbstractLifecycleComponent<HttpServer> implements HttpServerAdapter {
private final Environment environment;
@ -73,22 +61,9 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
this.restController = restController;
this.nodeService = nodeService;
nodeService.setHttpServer(this);
transport.httpServerAdapter(new Dispatcher(this));
transport.httpServerAdapter(this);
}
static class Dispatcher implements HttpServerAdapter {
private final HttpServer server;
Dispatcher(HttpServer server) {
this.server = server;
}
@Override
public void dispatchRequest(HttpRequest request, HttpChannel channel) {
server.internalDispatchRequest(request, channel);
}
}
@Override
protected void doStart() {
@ -118,12 +93,12 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> {
return transport.stats();
}
public void internalDispatchRequest(final HttpRequest request, final HttpChannel channel) {
public void dispatchRequest(HttpRequest request, HttpChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
restController.dispatchRequest(request, channel);
restController.dispatchRequest(request, channel, threadContext);
}
void handleFavicon(HttpRequest request, HttpChannel channel) {

View File

@ -19,10 +19,12 @@
package org.elasticsearch.http;
import org.elasticsearch.common.util.concurrent.ThreadContext;
/**
*
*/
public interface HttpServerAdapter {
void dispatchRequest(HttpRequest request, HttpChannel channel);
void dispatchRequest(HttpRequest request, HttpChannel channel, ThreadContext context);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.netty.pipelining.OrderedUpstreamMessageEvent;
import org.elasticsearch.rest.support.RestUtils;
import org.jboss.netty.channel.ChannelHandler;
@ -41,12 +42,14 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
private final Pattern corsPattern;
private final boolean httpPipeliningEnabled;
private final boolean detailedErrorsEnabled;
private final ThreadContext threadContext;
public HttpRequestHandler(NettyHttpServerTransport serverTransport, boolean detailedErrorsEnabled) {
public HttpRequestHandler(NettyHttpServerTransport serverTransport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
this.serverTransport = serverTransport;
this.corsPattern = RestUtils.checkCorsSettingForRegex(serverTransport.settings().get(NettyHttpServerTransport.SETTING_CORS_ALLOW_ORIGIN));
this.httpPipeliningEnabled = serverTransport.pipelining;
this.detailedErrorsEnabled = detailedErrorsEnabled;
this.threadContext = threadContext;
}
@Override
@ -60,6 +63,7 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
request = (HttpRequest) e.getMessage();
}
threadContext.copyHeaders(request.headers());
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpInfo;
@ -47,6 +48,7 @@ import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty.pipelining.HttpPipeliningHandler;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.AdaptiveReceiveBufferSizePredictorFactory;
@ -136,6 +138,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
protected final String publishHosts[];
protected final boolean detailedErrorsEnabled;
protected final ThreadPool threadPool;
protected int publishPort;
@ -164,10 +167,11 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
@Inject
@SuppressForbidden(reason = "sets org.jboss.netty.epollBugWorkaround based on netty.epollBugWorkaround")
// TODO: why be confusing like this? just let the user do it with the netty parameter instead!
public NettyHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays) {
public NettyHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool) {
super(settings);
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
if (settings.getAsBoolean("netty.epollBugWorkaround", false)) {
System.setProperty("org.jboss.netty.epollBugWorkaround", "true");
@ -384,7 +388,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
protected void dispatchRequest(HttpRequest request, HttpChannel channel) {
httpServerAdapter.dispatchRequest(request, channel);
httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
}
protected void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
@ -409,7 +413,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new HttpChannelPipelineFactory(this, detailedErrorsEnabled);
return new HttpChannelPipelineFactory(this, detailedErrorsEnabled, threadPool.getThreadContext());
}
protected static class HttpChannelPipelineFactory implements ChannelPipelineFactory {
@ -417,9 +421,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
protected final NettyHttpServerTransport transport;
protected final HttpRequestHandler requestHandler;
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled) {
public HttpChannelPipelineFactory(NettyHttpServerTransport transport, boolean detailedErrorsEnabled, ThreadContext threadContext) {
this.transport = transport;
this.requestHandler = new HttpRequestHandler(transport, detailedErrorsEnabled);
this.requestHandler = new HttpRequestHandler(transport, detailedErrorsEnabled, threadContext);
}
@Override

View File

@ -22,7 +22,6 @@ package org.elasticsearch.index.cache.query.index;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Weight;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.cache.query.QueryCache;
@ -36,7 +35,6 @@ public class IndexQueryCache extends AbstractIndexComponent implements QueryCach
final IndicesQueryCache indicesQueryCache;
@Inject
public IndexQueryCache(IndexSettings indexSettings, IndicesQueryCache indicesQueryCache) {
super(indexSettings);
this.indicesQueryCache = indicesQueryCache;

View File

@ -31,7 +31,6 @@ import org.elasticsearch.index.cache.query.QueryCache;
*/
public class NoneQueryCache extends AbstractIndexComponent implements QueryCache {
@Inject
public NoneQueryCache(IndexSettings indexSettings) {
super(indexSettings);
logger.debug("Using no query cache");

View File

@ -240,7 +240,6 @@ public class GeoShapeQueryBuilder extends AbstractQueryBuilder<GeoShapeQueryBuil
ShapeBuilder shapeToQuery = shape;
if (shapeToQuery == null) {
GetRequest getRequest = new GetRequest(indexedShapeIndex, indexedShapeType, indexedShapeId);
getRequest.copyContextAndHeadersFrom(SearchContext.current());
shapeToQuery = fetch(context.getClient(), getRequest, indexedShapePath);
}
MappedFieldType fieldType = context.fieldMapper(fieldName);

View File

@ -917,7 +917,6 @@ public class MoreLikeThisQueryBuilder extends AbstractQueryBuilder<MoreLikeThisQ
for (Item item : unlikeItems) {
request.add(item.toTermVectorsRequest());
}
request.copyContextAndHeadersFrom(searchContext);
return client.multiTermVectors(request).actionGet();
}

View File

@ -364,8 +364,8 @@ public class QueryShardContext {
/*
* Executes the given template, and returns the response.
*/
public BytesReference executeQueryTemplate(Template template, SearchContext searchContext) {
ExecutableScript executable = getScriptService().executable(template, ScriptContext.Standard.SEARCH, searchContext, Collections.emptyMap());
public BytesReference executeQueryTemplate(Template template) {
ExecutableScript executable = getScriptService().executable(template, ScriptContext.Standard.SEARCH, Collections.emptyMap());
return (BytesReference) executable.run();
}

View File

@ -100,7 +100,7 @@ public class TemplateQueryBuilder extends AbstractQueryBuilder<TemplateQueryBuil
@Override
protected Query doToQuery(QueryShardContext context) throws IOException {
BytesReference querySource = context.executeQueryTemplate(template, SearchContext.current());
BytesReference querySource = context.executeQueryTemplate(template);
try (XContentParser qSourceParser = XContentFactory.xContent(querySource).createParser(querySource)) {
final QueryShardContext contextCopy = new QueryShardContext(context);
contextCopy.reset(qSourceParser);

View File

@ -249,7 +249,6 @@ public class TermsQueryBuilder extends AbstractQueryBuilder<TermsQueryBuilder> {
List<Object> terms = new ArrayList<>();
GetRequest getRequest = new GetRequest(termsLookup.index(), termsLookup.type(), termsLookup.id())
.preference("_local").routing(termsLookup.routing());
getRequest.copyContextAndHeadersFrom(SearchContext.current());
final GetResponse getResponse = client.get(getRequest).actionGet();
if (getResponse.isExists()) {
List<Object> extractedValues = XContentMapValues.extractRawValues(termsLookup.path(), getResponse.getSourceAsMap());

Some files were not shown because too many files have changed in this diff Show More