Merge branch 'master' into feature-suggest-refactoring
This commit is contained in:
commit
75e93c2eca
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -179,7 +180,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
|
||||
protected void doExecute(Task task, UpgradeRequest request, final ActionListener<UpgradeResponse> listener) {
|
||||
ActionListener<UpgradeResponse> settingsUpdateListener = new ActionListener<UpgradeResponse>() {
|
||||
@Override
|
||||
public void onResponse(UpgradeResponse upgradeResponse) {
|
||||
|
@ -199,7 +200,7 @@ public class TransportUpgradeAction extends TransportBroadcastByNodeAction<Upgra
|
|||
listener.onFailure(e);
|
||||
}
|
||||
};
|
||||
super.doExecute(request, settingsUpdateListener);
|
||||
super.doExecute(task, request, settingsUpdateListener);
|
||||
}
|
||||
|
||||
private void updateSettings(final UpgradeResponse upgradeResponse, final ActionListener<UpgradeResponse> listener) {
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.search.SearchService;
|
|||
import org.elasticsearch.search.internal.DefaultSearchContext;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -89,9 +90,9 @@ public class TransportValidateQueryAction extends TransportBroadcastAction<Valid
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(ValidateQueryRequest request, ActionListener<ValidateQueryResponse> listener) {
|
||||
protected void doExecute(Task task, ValidateQueryRequest request, ActionListener<ValidateQueryResponse> listener) {
|
||||
request.nowInMillis = System.currentTimeMillis();
|
||||
super.doExecute(request, listener);
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.elasticsearch.index.shard.IndexShard;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -69,27 +70,27 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
protected void doExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
ClusterState state = clusterService.state();
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
|
||||
createIndexAction.execute(new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
||||
createIndexAction.execute(task, new CreateIndexRequest().index(request.index()).cause("auto(delete api)").masterNodeTimeout(request.timeout()), new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||
// we have the index, do it
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
} else {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
} else {
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,8 +115,8 @@ public class TransportDeleteAction extends TransportReplicationAction<DeleteRequ
|
|||
request.setShardId(shardId);
|
||||
}
|
||||
|
||||
private void innerExecute(final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
super.doExecute(request, listener);
|
||||
private void innerExecute(Task task, final DeleteRequest request, final ActionListener<DeleteResponse> listener) {
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.ShardId;
|
|||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndexAlreadyExistsException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -84,7 +85,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||
protected void doExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||
// if we don't have a master, we don't have metadata, that's fine, let it find a master using create index API
|
||||
ClusterState state = clusterService.state();
|
||||
if (autoCreateIndex.shouldAutoCreate(request.index(), state)) {
|
||||
|
@ -93,10 +94,10 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
createIndexRequest.mapping(request.type());
|
||||
createIndexRequest.cause("auto(index api)");
|
||||
createIndexRequest.masterNodeTimeout(request.timeout());
|
||||
createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
createIndexAction.execute(task, createIndexRequest, new ActionListener<CreateIndexResponse>() {
|
||||
@Override
|
||||
public void onResponse(CreateIndexResponse result) {
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -104,7 +105,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
if (ExceptionsHelper.unwrapCause(e) instanceof IndexAlreadyExistsException) {
|
||||
// we have the index, do it
|
||||
try {
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
} catch (Throwable e1) {
|
||||
listener.onFailure(e1);
|
||||
}
|
||||
|
@ -114,7 +115,7 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
}
|
||||
});
|
||||
} else {
|
||||
innerExecute(request, listener);
|
||||
innerExecute(task, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,8 +130,8 @@ public class TransportIndexAction extends TransportReplicationAction<IndexReques
|
|||
request.setShardId(shardId);
|
||||
}
|
||||
|
||||
private void innerExecute(final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||
super.doExecute(request, listener);
|
||||
private void innerExecute(Task task, final IndexRequest request, final ActionListener<IndexResponse> listener) {
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.engine.DocumentMissingException;
|
||||
import org.elasticsearch.percolator.PercolateException;
|
||||
import org.elasticsearch.percolator.PercolatorService;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -70,7 +71,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
|
||||
protected void doExecute(Task task, final PercolateRequest request, final ActionListener<PercolateResponse> listener) {
|
||||
request.startTime = System.currentTimeMillis();
|
||||
if (request.getRequest() != null) {
|
||||
//create a new get request to make sure it has the same headers and context as the original percolate request
|
||||
|
@ -84,7 +85,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
|
|||
}
|
||||
|
||||
BytesReference docSource = getResponse.getSourceAsBytesRef();
|
||||
TransportPercolateAction.super.doExecute(new PercolateRequest(request, docSource), listener);
|
||||
TransportPercolateAction.super.doExecute(task, new PercolateRequest(request, docSource), listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,7 +94,7 @@ public class TransportPercolateAction extends TransportBroadcastAction<Percolate
|
|||
}
|
||||
});
|
||||
} else {
|
||||
super.doExecute(request, listener);
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Base class for action requests that can have associated child tasks
|
||||
*/
|
||||
public abstract class ChildTaskActionRequest<Request extends ActionRequest<Request>> extends ActionRequest<Request> {
|
||||
|
||||
private String parentTaskNode;
|
||||
|
||||
private long parentTaskId;
|
||||
|
||||
protected ChildTaskActionRequest() {
|
||||
|
||||
}
|
||||
|
||||
public void setParentTask(String parentTaskNode, long parentTaskId) {
|
||||
this.parentTaskNode = parentTaskNode;
|
||||
this.parentTaskId = parentTaskId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
parentTaskNode = in.readOptionalString();
|
||||
parentTaskId = in.readLong();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeOptionalString(parentTaskNode);
|
||||
out.writeLong(parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action) {
|
||||
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
|
||||
}
|
||||
|
||||
}
|
|
@ -19,10 +19,8 @@
|
|||
|
||||
package org.elasticsearch.action.support;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.tasks.ChildTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
|
@ -61,6 +59,6 @@ public class ChildTaskRequest extends TransportRequest {
|
|||
|
||||
@Override
|
||||
public Task createTask(long id, String type, String action) {
|
||||
return new ChildTask(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
|
||||
return new Task(id, type, action, this::getDescription, parentTaskNode, parentTaskId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,13 +44,14 @@ public abstract class HandledTransportAction<Request extends ActionRequest<Reque
|
|||
class TransportHandler implements TransportRequestHandler<Request> {
|
||||
|
||||
@Override
|
||||
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
messageReceived(request, channel);
|
||||
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
execute(request, new ActionListener<Response>() {
|
||||
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
// We already got the task created on the netty layer - no need to create it again on the transport layer
|
||||
execute(task, request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response response) {
|
||||
try {
|
||||
|
|
|
@ -66,6 +66,11 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
|
|||
return future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Use this method when the transport action call should result in creation of a new task associated with the call.
|
||||
*
|
||||
* This is a typical behavior.
|
||||
*/
|
||||
public final Task execute(Request request, ActionListener<Response> listener) {
|
||||
Task task = taskManager.register("transport", actionName, request);
|
||||
if (task == null) {
|
||||
|
@ -88,7 +93,10 @@ public abstract class TransportAction<Request extends ActionRequest<Request>, Re
|
|||
return task;
|
||||
}
|
||||
|
||||
private final void execute(Task task, Request request, ActionListener<Response> listener) {
|
||||
/**
|
||||
* Use this method when the transport action should continue to run in the context of the current task
|
||||
*/
|
||||
public final void execute(Task task, Request request, ActionListener<Response> listener) {
|
||||
|
||||
ActionRequestValidationException validationException = request.validate();
|
||||
if (validationException != null) {
|
||||
|
|
|
@ -21,18 +21,18 @@ package org.elasticsearch.action.support.broadcast;
|
|||
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.OriginalIndices;
|
||||
import org.elasticsearch.action.support.ChildTaskRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class BroadcastShardRequest extends TransportRequest implements IndicesRequest {
|
||||
public abstract class BroadcastShardRequest extends ChildTaskRequest implements IndicesRequest {
|
||||
|
||||
private ShardId shardId;
|
||||
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.ShardIterator;
|
|||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.TransportChannel;
|
||||
|
@ -69,8 +70,13 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new AsyncBroadcastAction(request, listener).start();
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
new AsyncBroadcastAction(task, request, listener).start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void doExecute(Request request, ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState);
|
||||
|
@ -93,6 +99,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
|
||||
protected class AsyncBroadcastAction {
|
||||
|
||||
private final Task task;
|
||||
private final Request request;
|
||||
private final ActionListener<Response> listener;
|
||||
private final ClusterState clusterState;
|
||||
|
@ -102,7 +109,8 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
private final AtomicInteger counterOps = new AtomicInteger();
|
||||
private final AtomicReferenceArray shardsResponses;
|
||||
|
||||
protected AsyncBroadcastAction(Request request, ActionListener<Response> listener) {
|
||||
protected AsyncBroadcastAction(Task task, Request request, ActionListener<Response> listener) {
|
||||
this.task = task;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
||||
|
@ -158,6 +166,7 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
|||
} else {
|
||||
try {
|
||||
final ShardRequest shardRequest = newShardRequest(shardIt.size(), shard, request);
|
||||
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
DiscoveryNode node = nodes.get(shard.currentNodeId());
|
||||
if (node == null) {
|
||||
// no node connected, act as failure
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.elasticsearch.action.IndicesRequest;
|
|||
import org.elasticsearch.action.NoShardAvailableActionException;
|
||||
import org.elasticsearch.action.ShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.ChildTaskRequest;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.HandledTransportAction;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
|
@ -44,6 +45,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.NodeShouldNotConnectException;
|
||||
|
@ -206,11 +208,17 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
protected abstract ClusterBlockException checkRequestBlock(ClusterState state, Request request, String[] concreteIndices);
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new AsyncAction(request, listener).start();
|
||||
protected final void doExecute(Request request, ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
new AsyncAction(task, request, listener).start();
|
||||
}
|
||||
|
||||
protected class AsyncAction {
|
||||
private final Task task;
|
||||
private final Request request;
|
||||
private final ActionListener<Response> listener;
|
||||
private final ClusterState clusterState;
|
||||
|
@ -220,7 +228,8 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
private final AtomicInteger counter = new AtomicInteger();
|
||||
private List<NoShardAvailableActionException> unavailableShardExceptions = new ArrayList<>();
|
||||
|
||||
protected AsyncAction(Request request, ActionListener<Response> listener) {
|
||||
protected AsyncAction(Task task, Request request, ActionListener<Response> listener) {
|
||||
this.task = task;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
||||
|
@ -290,6 +299,9 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
private void sendNodeRequest(final DiscoveryNode node, List<ShardRouting> shards, final int nodeIndex) {
|
||||
try {
|
||||
NodeRequest nodeRequest = new NodeRequest(node.getId(), request, shards);
|
||||
if (task != null) {
|
||||
nodeRequest.setParentTask(clusterService.localNode().id(), task.getId());
|
||||
}
|
||||
transportService.sendRequest(node, transportNodeBroadcastAction, nodeRequest, new BaseTransportResponseHandler<NodeResponse>() {
|
||||
@Override
|
||||
public NodeResponse newInstance() {
|
||||
|
@ -422,7 +434,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
|||
}
|
||||
}
|
||||
|
||||
public class NodeRequest extends TransportRequest implements IndicesRequest {
|
||||
public class NodeRequest extends ChildTaskRequest implements IndicesRequest {
|
||||
private String nodeId;
|
||||
|
||||
private List<ShardRouting> shards;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.action.support.master;
|
||||
|
||||
import org.elasticsearch.action.ActionRequest;
|
||||
import org.elasticsearch.action.support.ChildTaskActionRequest;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -29,7 +30,7 @@ import java.io.IOException;
|
|||
/**
|
||||
* A based request for master based operation.
|
||||
*/
|
||||
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ActionRequest<Request> {
|
||||
public abstract class MasterNodeRequest<Request extends MasterNodeRequest<Request>> extends ChildTaskActionRequest<Request> {
|
||||
|
||||
public static final TimeValue DEFAULT_MASTER_NODE_TIMEOUT = TimeValue.timeValueSeconds(30);
|
||||
|
||||
|
|
|
@ -113,6 +113,9 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
|
|||
AsyncSingleAction(Task task, Request request, ActionListener<Response> listener) {
|
||||
this.task = task;
|
||||
this.request = request;
|
||||
if (task != null) {
|
||||
request.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
}
|
||||
// TODO do we really need to wrap it in a listener? the handlers should be cheap
|
||||
if ((listener instanceof ThreadedActionListener) == false) {
|
||||
listener = new ThreadedActionListener<>(logger, threadPool, ThreadPool.Names.LISTENER, listener);
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionRequest;
|
|||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.WriteConsistencyLevel;
|
||||
import org.elasticsearch.action.support.ChildTaskActionRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -38,7 +39,7 @@ import static org.elasticsearch.action.ValidateActions.addValidationError;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ActionRequest<Request> implements IndicesRequest {
|
||||
public abstract class ReplicationRequest<Request extends ReplicationRequest<Request>> extends ChildTaskActionRequest<Request> implements IndicesRequest {
|
||||
|
||||
public static final TimeValue DEFAULT_TIMEOUT = new TimeValue(1, TimeUnit.MINUTES);
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.CountDown;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -67,8 +68,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
this.clusterService = clusterService;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void doExecute(final Request request, final ActionListener<Response> listener) {
|
||||
protected final void doExecute(final Request request, final ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
final ClusterState clusterState = clusterService.state();
|
||||
List<ShardId> shards = shards(request, clusterState);
|
||||
final CopyOnWriteArrayList<ShardResponse> shardsResponses = new CopyOnWriteArrayList();
|
||||
|
@ -107,12 +114,14 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
|||
}
|
||||
}
|
||||
};
|
||||
shardExecute(request, shardId, shardActionListener);
|
||||
shardExecute(task, request, shardId, shardActionListener);
|
||||
}
|
||||
}
|
||||
|
||||
protected void shardExecute(Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
|
||||
replicatedBroadcastShardAction.execute(newShardRequest(request, shardId), shardActionListener);
|
||||
protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener<ShardResponse> shardActionListener) {
|
||||
ShardRequest shardRequest = newShardRequest(request, shardId);
|
||||
shardRequest.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
replicatedBroadcastShardAction.execute(shardRequest, shardActionListener);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.index.translog.Translog;
|
|||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.node.NodeClosedException;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BaseTransportResponseHandler;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
|
@ -134,8 +135,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Request request, ActionListener<Response> listener) {
|
||||
new ReroutePhase(request, listener).run();
|
||||
protected final void doExecute(Request request, ActionListener<Response> listener) {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
|
||||
new ReroutePhase(task, request, listener).run();
|
||||
}
|
||||
|
||||
protected abstract Response newResponseInstance();
|
||||
|
@ -244,8 +250,8 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
|
||||
class OperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
@Override
|
||||
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
|
||||
execute(request, new ActionListener<Response>() {
|
||||
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
|
||||
execute(task, request, new ActionListener<Response>() {
|
||||
@Override
|
||||
public void onResponse(Response result) {
|
||||
try {
|
||||
|
@ -265,6 +271,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(Request request, TransportChannel channel) throws Exception {
|
||||
throw new UnsupportedOperationException("the task parameter is required for this operation");
|
||||
}
|
||||
}
|
||||
|
||||
class PrimaryOperationTransportHandler implements TransportRequestHandler<Request> {
|
||||
|
@ -407,8 +418,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
|
|||
private final ClusterStateObserver observer;
|
||||
private final AtomicBoolean finished = new AtomicBoolean();
|
||||
|
||||
ReroutePhase(Request request, ActionListener<Response> listener) {
|
||||
ReroutePhase(Task task, Request request, ActionListener<Response> listener) {
|
||||
this.request = request;
|
||||
if (task != null) {
|
||||
this.request.setParentTask(clusterService.localNode().getId(), task.getId());
|
||||
}
|
||||
this.listener = listener;
|
||||
this.observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext());
|
||||
}
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
|||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.tasks.ChildTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -164,20 +163,13 @@ public class BaseTasksRequest<Request extends BaseTasksRequest<Request>> extends
|
|||
if (actions() != null && actions().length > 0 && Regex.simpleMatch(actions(), task.getAction()) == false) {
|
||||
return false;
|
||||
}
|
||||
if (parentNode() != null || parentTaskId() != BaseTasksRequest.ALL_TASKS) {
|
||||
if (task instanceof ChildTask) {
|
||||
if (parentNode() != null) {
|
||||
if (parentNode().equals(((ChildTask) task).getParentNode()) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (parentTaskId() != BaseTasksRequest.ALL_TASKS) {
|
||||
if (parentTaskId() != ((ChildTask) task).getParentId()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// This is not a child task and we need to match parent node or id
|
||||
if (parentNode() != null) {
|
||||
if (parentNode().equals(task.getParentNode()) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
if (parentTaskId() != BaseTasksRequest.ALL_TASKS) {
|
||||
if (parentTaskId() != task.getParentId()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.search.SearchService;
|
|||
import org.elasticsearch.search.controller.SearchPhaseController;
|
||||
import org.elasticsearch.search.dfs.AggregatedDfs;
|
||||
import org.elasticsearch.search.dfs.DfsSearchResult;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
|
@ -69,9 +70,9 @@ public class TransportDfsOnlyAction extends TransportBroadcastAction<DfsOnlyRequ
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(DfsOnlyRequest request, ActionListener<DfsOnlyResponse> listener) {
|
||||
protected void doExecute(Task task, DfsOnlyRequest request, ActionListener<DfsOnlyResponse> listener) {
|
||||
request.nowInMillis = System.currentTimeMillis();
|
||||
super.doExecute(request, listener);
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -181,11 +181,6 @@ public abstract class ParseContext {
|
|||
this.in = in;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flyweight() {
|
||||
return in.flyweight();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentMapperParser docMapperParser() {
|
||||
return in.docMapperParser();
|
||||
|
@ -411,11 +406,6 @@ public abstract class ParseContext {
|
|||
this.dynamicMappingsUpdate = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean flyweight() {
|
||||
return sourceToParse.flyweight();
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocumentMapperParser docMapperParser() {
|
||||
return this.docMapperParser;
|
||||
|
@ -580,8 +570,6 @@ public abstract class ParseContext {
|
|||
}
|
||||
}
|
||||
|
||||
public abstract boolean flyweight();
|
||||
|
||||
public abstract DocumentMapperParser docMapperParser();
|
||||
|
||||
/**
|
||||
|
@ -658,6 +646,7 @@ public abstract class ParseContext {
|
|||
|
||||
public abstract SourceToParse sourceToParse();
|
||||
|
||||
@Nullable
|
||||
public abstract BytesReference source();
|
||||
|
||||
// only should be used by SourceFieldMapper to update with a compressed source
|
||||
|
|
|
@ -46,8 +46,6 @@ public class SourceToParse {
|
|||
|
||||
private final XContentParser parser;
|
||||
|
||||
private boolean flyweight = false;
|
||||
|
||||
private String index;
|
||||
|
||||
private String type;
|
||||
|
@ -106,15 +104,6 @@ public class SourceToParse {
|
|||
return this;
|
||||
}
|
||||
|
||||
public SourceToParse flyweight(boolean flyweight) {
|
||||
this.flyweight = flyweight;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean flyweight() {
|
||||
return this.flyweight;
|
||||
}
|
||||
|
||||
public String id() {
|
||||
return this.id;
|
||||
}
|
||||
|
|
|
@ -220,7 +220,7 @@ public class IdFieldMapper extends MetadataFieldMapper {
|
|||
|
||||
@Override
|
||||
public void postParse(ParseContext context) throws IOException {
|
||||
if (context.id() == null && !context.sourceToParse().flyweight()) {
|
||||
if (context.id() == null) {
|
||||
throw new MapperParsingException("No id found while parsing the content source");
|
||||
}
|
||||
// it either get built in the preParse phase, or get parsed...
|
||||
|
|
|
@ -228,9 +228,7 @@ public class ParentFieldMapper extends MetadataFieldMapper {
|
|||
|
||||
@Override
|
||||
public void postParse(ParseContext context) throws IOException {
|
||||
if (context.sourceToParse().flyweight() == false) {
|
||||
parse(context);
|
||||
}
|
||||
parse(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -251,10 +251,11 @@ public class SourceFieldMapper extends MetadataFieldMapper {
|
|||
if (!fieldType().stored()) {
|
||||
return;
|
||||
}
|
||||
if (context.flyweight()) {
|
||||
BytesReference source = context.source();
|
||||
// Percolate and tv APIs may not set the source and that is ok, because these APIs will not index any data
|
||||
if (source == null) {
|
||||
return;
|
||||
}
|
||||
BytesReference source = context.source();
|
||||
|
||||
boolean filtered = (includes != null && includes.length > 0) || (excludes != null && excludes.length > 0);
|
||||
if (filtered) {
|
||||
|
|
|
@ -212,7 +212,7 @@ public class TTLFieldMapper extends MetadataFieldMapper {
|
|||
|
||||
@Override
|
||||
protected void parseCreateField(ParseContext context, List<Field> fields) throws IOException, AlreadyExpiredException {
|
||||
if (enabledState.enabled && !context.sourceToParse().flyweight()) {
|
||||
if (enabledState.enabled) {
|
||||
long ttl = context.sourceToParse().ttl();
|
||||
if (ttl <= 0 && defaultTTL > 0) { // no ttl provided so we use the default value
|
||||
ttl = defaultTTL;
|
||||
|
|
|
@ -149,7 +149,7 @@ public class UidFieldMapper extends MetadataFieldMapper {
|
|||
|
||||
@Override
|
||||
public void postParse(ParseContext context) throws IOException {
|
||||
if (context.id() == null && !context.sourceToParse().flyweight()) {
|
||||
if (context.id() == null) {
|
||||
throw new MapperParsingException("No id found while parsing the content source");
|
||||
}
|
||||
// if we did not have the id as part of the sourceToParse, then we need to parse it here
|
||||
|
|
|
@ -126,9 +126,7 @@ public class PercolatorFieldMapper extends FieldMapper {
|
|||
public Mapper parse(ParseContext context) throws IOException {
|
||||
QueryShardContext queryShardContext = new QueryShardContext(this.queryShardContext);
|
||||
Query query = PercolatorQueriesRegistry.parseQuery(queryShardContext, mapUnmappedFieldAsString, context.parser());
|
||||
if (context.flyweight() == false) {
|
||||
ExtractQueryTermsService.extractQueryTerms(query, context.doc(), queryTermsField.name(), unknownQueryField.name(), queryTermsField.fieldType());
|
||||
}
|
||||
ExtractQueryTermsService.extractQueryTerms(query, context.doc(), queryTermsField.name(), unknownQueryField.name(), queryTermsField.fieldType());
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -71,12 +71,10 @@ import static org.elasticsearch.index.mapper.SourceToParse.source;
|
|||
|
||||
public class TermVectorsService {
|
||||
|
||||
private final MappingUpdatedAction mappingUpdatedAction;
|
||||
private final TransportDfsOnlyAction dfsAction;
|
||||
|
||||
@Inject
|
||||
public TermVectorsService(MappingUpdatedAction mappingUpdatedAction, TransportDfsOnlyAction dfsAction) {
|
||||
this.mappingUpdatedAction = mappingUpdatedAction;
|
||||
public TermVectorsService(TransportDfsOnlyAction dfsAction) {
|
||||
this.dfsAction = dfsAction;
|
||||
}
|
||||
|
||||
|
@ -293,16 +291,11 @@ public class TermVectorsService {
|
|||
|
||||
private ParsedDocument parseDocument(IndexShard indexShard, String index, String type, BytesReference doc) throws Throwable {
|
||||
MapperService mapperService = indexShard.mapperService();
|
||||
|
||||
// TODO: make parsing not dynamically create fields not in the original mapping
|
||||
DocumentMapperForType docMapper = mapperService.documentMapperWithAutoCreate(type);
|
||||
ParsedDocument parsedDocument = docMapper.getDocumentMapper().parse(source(doc).index(index).type(type).flyweight(true));
|
||||
ParsedDocument parsedDocument = docMapper.getDocumentMapper().parse(source(doc).index(index).type(type).id("_id_for_tv_api"));
|
||||
if (docMapper.getMapping() != null) {
|
||||
parsedDocument.addDynamicMappingsUpdate(docMapper.getMapping());
|
||||
}
|
||||
if (parsedDocument.dynamicMappingsUpdate() != null) {
|
||||
mappingUpdatedAction.updateMappingOnMasterSynchronously(index, type, parsedDocument.dynamicMappingsUpdate());
|
||||
}
|
||||
return parsedDocument;
|
||||
}
|
||||
|
||||
|
|
|
@ -119,12 +119,15 @@ public class OsProbe {
|
|||
}
|
||||
// fallback
|
||||
}
|
||||
if (Constants.WINDOWS) {
|
||||
return null;
|
||||
}
|
||||
if (getSystemLoadAverage == null) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
double oneMinuteLoadAverage = (double) getSystemLoadAverage.invoke(osMxBean);
|
||||
return new double[] { oneMinuteLoadAverage, -1, -1 };
|
||||
return new double[] { oneMinuteLoadAverage >= 0 ? oneMinuteLoadAverage : -1, -1, -1 };
|
||||
} catch (Throwable t) {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -89,7 +90,7 @@ public class OsStats implements Streamable, ToXContent {
|
|||
if (cpu != null) {
|
||||
builder.startObject(Fields.CPU);
|
||||
builder.field(Fields.PERCENT, cpu.getPercent());
|
||||
if (cpu.getLoadAverage() != null) {
|
||||
if (cpu.getLoadAverage() != null && Arrays.stream(cpu.getLoadAverage()).anyMatch(load -> load != -1)) {
|
||||
builder.startObject(Fields.LOAD_AVERAGE);
|
||||
if (cpu.getLoadAverage()[0] != -1) {
|
||||
builder.field(Fields.LOAD_AVERAGE_1M, cpu.getLoadAverage()[0]);
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.lucene.search.Query;
|
|||
import org.elasticsearch.ElasticsearchParseException;
|
||||
import org.elasticsearch.action.percolate.PercolateShardRequest;
|
||||
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
|||
import org.elasticsearch.index.mapper.DocumentMapperForType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.index.mapper.SourceToParse;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.search.SearchParseElement;
|
||||
import org.elasticsearch.search.aggregations.AggregationPhase;
|
||||
|
@ -93,7 +95,7 @@ public class PercolateDocumentParser {
|
|||
|
||||
DocumentMapperForType docMapper = mapperService.documentMapperWithAutoCreate(request.documentType());
|
||||
String index = context.shardTarget().index();
|
||||
doc = docMapper.getDocumentMapper().parse(source(parser).index(index).type(request.documentType()).flyweight(true));
|
||||
doc = docMapper.getDocumentMapper().parse(source(parser).index(index).type(request.documentType()).id("_id_for_percolate_api"));
|
||||
if (docMapper.getMapping() != null) {
|
||||
doc.addDynamicMappingsUpdate(docMapper.getMapping());
|
||||
}
|
||||
|
@ -202,19 +204,15 @@ public class PercolateDocumentParser {
|
|||
}
|
||||
|
||||
private ParsedDocument parseFetchedDoc(PercolateContext context, BytesReference fetchedDoc, MapperService mapperService, String index, String type) {
|
||||
try (XContentParser parser = XContentFactory.xContent(fetchedDoc).createParser(fetchedDoc)) {
|
||||
DocumentMapperForType docMapper = mapperService.documentMapperWithAutoCreate(type);
|
||||
ParsedDocument doc = docMapper.getDocumentMapper().parse(source(parser).index(index).type(type).flyweight(true));
|
||||
if (doc == null) {
|
||||
throw new ElasticsearchParseException("No doc to percolate in the request");
|
||||
}
|
||||
if (context.highlight() != null) {
|
||||
doc.setSource(fetchedDoc);
|
||||
}
|
||||
return doc;
|
||||
} catch (Throwable e) {
|
||||
throw new ElasticsearchParseException("failed to parse request", e);
|
||||
DocumentMapperForType docMapper = mapperService.documentMapperWithAutoCreate(type);
|
||||
ParsedDocument doc = docMapper.getDocumentMapper().parse(source(fetchedDoc).index(index).type(type).id("_id_for_percolate_api"));
|
||||
if (doc == null) {
|
||||
throw new ElasticsearchParseException("No doc to percolate in the request");
|
||||
}
|
||||
if (context.highlight() != null) {
|
||||
doc.setSource(fetchedDoc);
|
||||
}
|
||||
return doc;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -27,14 +27,14 @@ import org.elasticsearch.search.suggest.phrase.DirectCandidateGenerator.Candidat
|
|||
import java.io.IOException;
|
||||
//TODO public for tests
|
||||
public final class LaplaceScorer extends WordScorer {
|
||||
|
||||
|
||||
public static final WordScorerFactory FACTORY = new WordScorer.WordScorerFactory() {
|
||||
@Override
|
||||
public WordScorer newScorer(IndexReader reader, Terms terms, String field, double realWordLikelyhood, BytesRef separator) throws IOException {
|
||||
return new LaplaceScorer(reader, terms, field, realWordLikelyhood, separator, 0.5);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
private double alpha;
|
||||
|
||||
public LaplaceScorer(IndexReader reader, Terms terms, String field,
|
||||
|
@ -42,7 +42,11 @@ public final class LaplaceScorer extends WordScorer {
|
|||
super(reader, terms, field, realWordLikelyhood, separator);
|
||||
this.alpha = alpha;
|
||||
}
|
||||
|
||||
|
||||
double alpha() {
|
||||
return this.alpha;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double scoreBigram(Candidate word, Candidate w_1) throws IOException {
|
||||
SuggestUtils.join(separator, spare, w_1.term, word.term);
|
||||
|
|
|
@ -41,7 +41,19 @@ public final class LinearInterpoatingScorer extends WordScorer {
|
|||
this.bigramLambda = bigramLambda / sum;
|
||||
this.trigramLambda = trigramLambda / sum;
|
||||
}
|
||||
|
||||
|
||||
double trigramLambda() {
|
||||
return this.trigramLambda;
|
||||
}
|
||||
|
||||
double bigramLambda() {
|
||||
return this.bigramLambda;
|
||||
}
|
||||
|
||||
double unigramLambda() {
|
||||
return this.unigramLambda;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double scoreBigram(Candidate word, Candidate w_1) throws IOException {
|
||||
SuggestUtils.join(separator, spare, w_1.term, word.term);
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.elasticsearch.script.Template;
|
|||
import org.elasticsearch.search.suggest.SuggestContextParser;
|
||||
import org.elasticsearch.search.suggest.SuggestUtils;
|
||||
import org.elasticsearch.search.suggest.SuggestionSearchContext;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.Laplace;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.StupidBackoff;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionContext.DirectCandidateGenerator;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -262,7 +264,7 @@ public final class PhraseSuggestParser implements SuggestContextParser {
|
|||
});
|
||||
} else if ("laplace".equals(fieldName)) {
|
||||
ensureNoSmoothing(suggestion);
|
||||
double theAlpha = 0.5;
|
||||
double theAlpha = Laplace.DEFAULT_LAPLACE_ALPHA;
|
||||
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
|
@ -283,7 +285,7 @@ public final class PhraseSuggestParser implements SuggestContextParser {
|
|||
|
||||
} else if ("stupid_backoff".equals(fieldName) || "stupidBackoff".equals(fieldName)) {
|
||||
ensureNoSmoothing(suggestion);
|
||||
double theDiscount = 0.4;
|
||||
double theDiscount = StupidBackoff.DEFAULT_BACKOFF_DISCOUNT;
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = parser.currentName();
|
||||
|
|
|
@ -18,13 +18,25 @@
|
|||
*/
|
||||
package org.elasticsearch.search.suggest.phrase;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.Terms;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.ParsingException;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.script.Template;
|
||||
import org.elasticsearch.search.suggest.SuggestionBuilder;
|
||||
|
||||
import org.elasticsearch.search.suggest.phrase.WordScorer.WordScorerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
|
@ -376,7 +388,14 @@ public final class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSugge
|
|||
* </p>
|
||||
*/
|
||||
public static final class StupidBackoff extends SmoothingModel {
|
||||
private final double discount;
|
||||
/**
|
||||
* Default discount parameter for {@link StupidBackoff} smoothing
|
||||
*/
|
||||
public static final double DEFAULT_BACKOFF_DISCOUNT = 0.4;
|
||||
private double discount = DEFAULT_BACKOFF_DISCOUNT;
|
||||
static final StupidBackoff PROTOTYPE = new StupidBackoff(DEFAULT_BACKOFF_DISCOUNT);
|
||||
private static final String NAME = "stupid_backoff";
|
||||
private static final ParseField DISCOUNT_FIELD = new ParseField("discount");
|
||||
|
||||
/**
|
||||
* Creates a Stupid-Backoff smoothing model.
|
||||
|
@ -385,15 +404,70 @@ public final class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSugge
|
|||
* the discount given to lower order ngrams if the higher order ngram doesn't exits
|
||||
*/
|
||||
public StupidBackoff(double discount) {
|
||||
super("stupid_backoff");
|
||||
this.discount = discount;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the discount parameter of the model
|
||||
*/
|
||||
public double getDiscount() {
|
||||
return this.discount;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("discount", discount);
|
||||
builder.field(DISCOUNT_FIELD.getPreferredName(), discount);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeDouble(discount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StupidBackoff readFrom(StreamInput in) throws IOException {
|
||||
return new StupidBackoff(in.readDouble());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doEquals(SmoothingModel other) {
|
||||
StupidBackoff otherModel = (StupidBackoff) other;
|
||||
return Objects.equals(discount, otherModel.discount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return Objects.hash(discount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SmoothingModel fromXContent(QueryParseContext parseContext) throws IOException {
|
||||
XContentParser parser = parseContext.parser();
|
||||
XContentParser.Token token;
|
||||
String fieldName = null;
|
||||
double discount = DEFAULT_BACKOFF_DISCOUNT;
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = parser.currentName();
|
||||
}
|
||||
if (token.isValue() && parseContext.parseFieldMatcher().match(fieldName, DISCOUNT_FIELD)) {
|
||||
discount = parser.doubleValue();
|
||||
}
|
||||
}
|
||||
return new StupidBackoff(discount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WordScorerFactory buildWordScorerFactory() {
|
||||
return (IndexReader reader, Terms terms, String field, double realWordLikelyhood, BytesRef separator)
|
||||
-> new StupidBackoffScorer(reader, terms, field, realWordLikelyhood, separator, discount);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -406,39 +480,119 @@ public final class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSugge
|
|||
* </p>
|
||||
*/
|
||||
public static final class Laplace extends SmoothingModel {
|
||||
private final double alpha;
|
||||
private double alpha = DEFAULT_LAPLACE_ALPHA;
|
||||
private static final String NAME = "laplace";
|
||||
private static final ParseField ALPHA_FIELD = new ParseField("alpha");
|
||||
/**
|
||||
* Default alpha parameter for laplace smoothing
|
||||
*/
|
||||
public static final double DEFAULT_LAPLACE_ALPHA = 0.5;
|
||||
static final Laplace PROTOTYPE = new Laplace(DEFAULT_LAPLACE_ALPHA);
|
||||
|
||||
/**
|
||||
* Creates a Laplace smoothing model.
|
||||
*
|
||||
*/
|
||||
public Laplace(double alpha) {
|
||||
super("laplace");
|
||||
this.alpha = alpha;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the laplace model alpha parameter
|
||||
*/
|
||||
public double getAlpha() {
|
||||
return this.alpha;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("alpha", alpha);
|
||||
builder.field(ALPHA_FIELD.getPreferredName(), alpha);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeDouble(alpha);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SmoothingModel readFrom(StreamInput in) throws IOException {
|
||||
return new Laplace(in.readDouble());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doEquals(SmoothingModel other) {
|
||||
Laplace otherModel = (Laplace) other;
|
||||
return Objects.equals(alpha, otherModel.alpha);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return Objects.hash(alpha);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SmoothingModel fromXContent(QueryParseContext parseContext) throws IOException {
|
||||
XContentParser parser = parseContext.parser();
|
||||
XContentParser.Token token;
|
||||
String fieldName = null;
|
||||
double alpha = DEFAULT_LAPLACE_ALPHA;
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = parser.currentName();
|
||||
}
|
||||
if (token.isValue() && parseContext.parseFieldMatcher().match(fieldName, ALPHA_FIELD)) {
|
||||
alpha = parser.doubleValue();
|
||||
}
|
||||
}
|
||||
return new Laplace(alpha);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WordScorerFactory buildWordScorerFactory() {
|
||||
return (IndexReader reader, Terms terms, String field, double realWordLikelyhood, BytesRef separator)
|
||||
-> new LaplaceScorer(reader, terms, field, realWordLikelyhood, separator, alpha);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static abstract class SmoothingModel implements ToXContent {
|
||||
private final String type;
|
||||
|
||||
protected SmoothingModel(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
public static abstract class SmoothingModel implements NamedWriteable<SmoothingModel>, ToXContent {
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(type);
|
||||
builder.startObject(getWriteableName());
|
||||
innerToXContent(builder,params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
return true;
|
||||
}
|
||||
if (obj == null || getClass() != obj.getClass()) {
|
||||
return false;
|
||||
}
|
||||
@SuppressWarnings("unchecked")
|
||||
SmoothingModel other = (SmoothingModel) obj;
|
||||
return doEquals(other);
|
||||
}
|
||||
|
||||
public abstract SmoothingModel fromXContent(QueryParseContext parseContext) throws IOException;
|
||||
|
||||
public abstract WordScorerFactory buildWordScorerFactory();
|
||||
|
||||
/**
|
||||
* subtype specific implementation of "equals".
|
||||
*/
|
||||
protected abstract boolean doEquals(SmoothingModel other);
|
||||
|
||||
protected abstract XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException;
|
||||
}
|
||||
|
||||
|
@ -451,9 +605,14 @@ public final class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSugge
|
|||
* </p>
|
||||
*/
|
||||
public static final class LinearInterpolation extends SmoothingModel {
|
||||
private static final String NAME = "linear";
|
||||
static final LinearInterpolation PROTOTYPE = new LinearInterpolation(0.8, 0.1, 0.1);
|
||||
private final double trigramLambda;
|
||||
private final double bigramLambda;
|
||||
private final double unigramLambda;
|
||||
private static final ParseField TRIGRAM_FIELD = new ParseField("trigram_lambda");
|
||||
private static final ParseField BIGRAM_FIELD = new ParseField("bigram_lambda");
|
||||
private static final ParseField UNIGRAM_FIELD = new ParseField("unigram_lambda");
|
||||
|
||||
/**
|
||||
* Creates a linear interpolation smoothing model.
|
||||
|
@ -468,19 +627,110 @@ public final class PhraseSuggestionBuilder extends SuggestionBuilder<PhraseSugge
|
|||
* the unigram lambda
|
||||
*/
|
||||
public LinearInterpolation(double trigramLambda, double bigramLambda, double unigramLambda) {
|
||||
super("linear");
|
||||
double sum = trigramLambda + bigramLambda + unigramLambda;
|
||||
if (Math.abs(sum - 1.0) > 0.001) {
|
||||
throw new IllegalArgumentException("linear smoothing lambdas must sum to 1");
|
||||
}
|
||||
this.trigramLambda = trigramLambda;
|
||||
this.bigramLambda = bigramLambda;
|
||||
this.unigramLambda = unigramLambda;
|
||||
}
|
||||
|
||||
public double getTrigramLambda() {
|
||||
return this.trigramLambda;
|
||||
}
|
||||
|
||||
public double getBigramLambda() {
|
||||
return this.bigramLambda;
|
||||
}
|
||||
|
||||
public double getUnigramLambda() {
|
||||
return this.unigramLambda;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected XContentBuilder innerToXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field("trigram_lambda", trigramLambda);
|
||||
builder.field("bigram_lambda", bigramLambda);
|
||||
builder.field("unigram_lambda", unigramLambda);
|
||||
builder.field(TRIGRAM_FIELD.getPreferredName(), trigramLambda);
|
||||
builder.field(BIGRAM_FIELD.getPreferredName(), bigramLambda);
|
||||
builder.field(UNIGRAM_FIELD.getPreferredName(), unigramLambda);
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeDouble(trigramLambda);
|
||||
out.writeDouble(bigramLambda);
|
||||
out.writeDouble(unigramLambda);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinearInterpolation readFrom(StreamInput in) throws IOException {
|
||||
return new LinearInterpolation(in.readDouble(), in.readDouble(), in.readDouble());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doEquals(SmoothingModel other) {
|
||||
final LinearInterpolation otherModel = (LinearInterpolation) other;
|
||||
return Objects.equals(trigramLambda, otherModel.trigramLambda) &&
|
||||
Objects.equals(bigramLambda, otherModel.bigramLambda) &&
|
||||
Objects.equals(unigramLambda, otherModel.unigramLambda);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int hashCode() {
|
||||
return Objects.hash(trigramLambda, bigramLambda, unigramLambda);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LinearInterpolation fromXContent(QueryParseContext parseContext) throws IOException {
|
||||
XContentParser parser = parseContext.parser();
|
||||
XContentParser.Token token;
|
||||
String fieldName = null;
|
||||
double trigramLambda = 0.0;
|
||||
double bigramLambda = 0.0;
|
||||
double unigramLambda = 0.0;
|
||||
ParseFieldMatcher matcher = parseContext.parseFieldMatcher();
|
||||
while ((token = parser.nextToken()) != Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
fieldName = parser.currentName();
|
||||
} else if (token.isValue()) {
|
||||
if (matcher.match(fieldName, TRIGRAM_FIELD)) {
|
||||
trigramLambda = parser.doubleValue();
|
||||
if (trigramLambda < 0) {
|
||||
throw new IllegalArgumentException("trigram_lambda must be positive");
|
||||
}
|
||||
} else if (matcher.match(fieldName, BIGRAM_FIELD)) {
|
||||
bigramLambda = parser.doubleValue();
|
||||
if (bigramLambda < 0) {
|
||||
throw new IllegalArgumentException("bigram_lambda must be positive");
|
||||
}
|
||||
} else if (matcher.match(fieldName, UNIGRAM_FIELD)) {
|
||||
unigramLambda = parser.doubleValue();
|
||||
if (unigramLambda < 0) {
|
||||
throw new IllegalArgumentException("unigram_lambda must be positive");
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException(
|
||||
"suggester[phrase][smoothing][linear] doesn't support field [" + fieldName + "]");
|
||||
}
|
||||
} else {
|
||||
throw new ParsingException(parser.getTokenLocation(), "[" + NAME + "] unknown token [" + token + "] after [" + fieldName + "]");
|
||||
}
|
||||
}
|
||||
return new LinearInterpolation(trigramLambda, bigramLambda, unigramLambda);
|
||||
}
|
||||
|
||||
@Override
|
||||
public WordScorerFactory buildWordScorerFactory() {
|
||||
return (IndexReader reader, Terms terms, String field, double realWordLikelyhood, BytesRef separator) ->
|
||||
new LinearInterpoatingScorer(reader, terms, field, realWordLikelyhood, separator, trigramLambda, bigramLambda,
|
||||
unigramLambda);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,6 +42,10 @@ public class StupidBackoffScorer extends WordScorer {
|
|||
this.discount = discount;
|
||||
}
|
||||
|
||||
double discount() {
|
||||
return this.discount;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected double scoreBigram(Candidate word, Candidate w_1) throws IOException {
|
||||
SuggestUtils.join(separator, spare, w_1.term, word.term);
|
||||
|
|
|
@ -1,57 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.tasks;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.inject.Provider;
|
||||
|
||||
/**
|
||||
* Child task
|
||||
*/
|
||||
public class ChildTask extends Task {
|
||||
|
||||
private final String parentNode;
|
||||
|
||||
private final long parentId;
|
||||
|
||||
public ChildTask(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
|
||||
super(id, type, action, description);
|
||||
this.parentNode = parentNode;
|
||||
this.parentId = parentId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns parent node of the task or null if task doesn't have any parent tasks
|
||||
*/
|
||||
public String getParentNode() {
|
||||
return parentNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns id of the parent task or -1L if task doesn't have any parent tasks
|
||||
*/
|
||||
public long getParentId() {
|
||||
return parentId;
|
||||
}
|
||||
|
||||
public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) {
|
||||
return new TaskInfo(node, getId(), getType(), getAction(), detailed ? getDescription() : null, parentNode, parentId);
|
||||
}
|
||||
}
|
|
@ -29,6 +29,8 @@ import org.elasticsearch.common.inject.Provider;
|
|||
*/
|
||||
public class Task {
|
||||
|
||||
public static final long NO_PARENT_ID = 0;
|
||||
|
||||
private final long id;
|
||||
|
||||
private final String type;
|
||||
|
@ -37,15 +39,27 @@ public class Task {
|
|||
|
||||
private final Provider<String> description;
|
||||
|
||||
private final String parentNode;
|
||||
|
||||
private final long parentId;
|
||||
|
||||
|
||||
public Task(long id, String type, String action, Provider<String> description) {
|
||||
this(id, type, action, description, null, NO_PARENT_ID);
|
||||
}
|
||||
|
||||
public Task(long id, String type, String action, Provider<String> description, String parentNode, long parentId) {
|
||||
this.id = id;
|
||||
this.type = type;
|
||||
this.action = action;
|
||||
this.description = description;
|
||||
this.parentNode = parentNode;
|
||||
this.parentId = parentId;
|
||||
}
|
||||
|
||||
|
||||
public TaskInfo taskInfo(DiscoveryNode node, boolean detailed) {
|
||||
return new TaskInfo(node, id, type, action, detailed ? getDescription() : null);
|
||||
return new TaskInfo(node, getId(), getType(), getAction(), detailed ? getDescription() : null, parentNode, parentId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -76,4 +90,18 @@ public class Task {
|
|||
return description.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the parent node of the task or null if the task doesn't have any parent tasks
|
||||
*/
|
||||
public String getParentNode() {
|
||||
return parentNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns id of the parent task or NO_PARENT_ID if the task doesn't have any parent tasks
|
||||
*/
|
||||
public long getParentId() {
|
||||
return parentId;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,9 +25,11 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
/**
|
||||
|
@ -61,9 +63,9 @@ public class TaskManager extends AbstractComponent {
|
|||
/**
|
||||
* Unregister the task
|
||||
*/
|
||||
public void unregister(Task task) {
|
||||
public Task unregister(Task task) {
|
||||
logger.trace("unregister task for id: {}", task.getId());
|
||||
tasks.remove(task.getId());
|
||||
return tasks.remove(task.getId());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,5 +74,4 @@ public class TaskManager extends AbstractComponent {
|
|||
public Map<Long, Task> getTasks() {
|
||||
return Collections.unmodifiableMap(new HashMap<>(tasks));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -117,7 +117,7 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
setTracerLogExclude(TRACE_LOG_EXCLUDE_SETTING.get(settings));
|
||||
tracerLog = Loggers.getLogger(logger, ".tracer");
|
||||
adapter = createAdapter();
|
||||
taskManager = new TaskManager(settings);
|
||||
taskManager = createTaskManager();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -141,6 +141,10 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
return new Adapter();
|
||||
}
|
||||
|
||||
protected TaskManager createTaskManager() {
|
||||
return new TaskManager(settings);
|
||||
}
|
||||
|
||||
// These need to be optional as they don't exist in the context of a transport client
|
||||
@Inject(optional = true)
|
||||
public void setDynamicSettings(ClusterSettings clusterSettings) {
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.regex.Regex;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.tasks.MockTaskManagerListener;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* MockTaskManagerListener that records all task registration/unregistration events
|
||||
*/
|
||||
public class RecordingTaskManagerListener implements MockTaskManagerListener {
|
||||
|
||||
private String[] actionMasks;
|
||||
private DiscoveryNode localNode;
|
||||
|
||||
private List<Tuple<Boolean, TaskInfo>> events = new ArrayList<>();
|
||||
|
||||
public RecordingTaskManagerListener(DiscoveryNode localNode, String... actionMasks) {
|
||||
this.actionMasks = actionMasks;
|
||||
this.localNode = localNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onTaskRegistered(Task task) {
|
||||
if (Regex.simpleMatch(actionMasks, task.getAction())) {
|
||||
events.add(new Tuple<>(true, task.taskInfo(localNode, true)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onTaskUnregistered(Task task) {
|
||||
if (Regex.simpleMatch(actionMasks, task.getAction())) {
|
||||
events.add(new Tuple<>(false, task.taskInfo(localNode, true)));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized List<Tuple<Boolean, TaskInfo>> getEvents() {
|
||||
return Collections.unmodifiableList(new ArrayList<>(events));
|
||||
}
|
||||
|
||||
public synchronized List<TaskInfo> getRegistrationEvents() {
|
||||
List<TaskInfo> events = this.events.stream().filter(Tuple::v1).map(Tuple::v2).collect(Collectors.toList());
|
||||
return Collections.unmodifiableList(events);
|
||||
}
|
||||
|
||||
public synchronized List<TaskInfo> getUnregistrationEvents() {
|
||||
List<TaskInfo> events = this.events.stream().filter(event -> event.v1() == false).map(Tuple::v2).collect(Collectors.toList());
|
||||
return Collections.unmodifiableList(events);
|
||||
}
|
||||
|
||||
public synchronized void reset() {
|
||||
events.clear();
|
||||
}
|
||||
|
||||
}
|
|
@ -18,21 +18,277 @@
|
|||
*/
|
||||
package org.elasticsearch.action.admin.cluster.node.tasks;
|
||||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
|
||||
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
||||
import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeAction;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction;
|
||||
import org.elasticsearch.action.percolate.PercolateAction;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
/**
|
||||
* Integration tests for task management API
|
||||
* <p>
|
||||
* We need at least 2 nodes so we have a master node a non-master node
|
||||
*/
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, minNumDataNodes = 2)
|
||||
public class TasksIT extends ESIntegTestCase {
|
||||
|
||||
private Map<Tuple<String, String>, RecordingTaskManagerListener> listeners = new HashMap<>();
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(MockTaskManager.USE_MOCK_TASK_MANAGER, true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testTaskCounts() {
|
||||
// Run only on data nodes
|
||||
ListTasksResponse response = client().admin().cluster().prepareListTasks("data:true").setActions(ListTasksAction.NAME + "[n]").get();
|
||||
assertThat(response.getTasks().size(), greaterThanOrEqualTo(cluster().numDataNodes()));
|
||||
}
|
||||
|
||||
public void testMasterNodeOperationTasks() {
|
||||
registerTaskManageListeners(ClusterHealthAction.NAME);
|
||||
|
||||
// First run the health on the master node - should produce only one task on the master node
|
||||
internalCluster().masterClient().admin().cluster().prepareHealth().get();
|
||||
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, Tuple::v1)); // counting only registration events
|
||||
assertEquals(1, numberOfEvents(ClusterHealthAction.NAME, event -> event.v1() == false)); // counting only unregistration events
|
||||
|
||||
resetTaskManageListeners(ClusterHealthAction.NAME);
|
||||
|
||||
// Now run the health on a non-master node - should produce one task on master and one task on another node
|
||||
internalCluster().nonMasterClient().admin().cluster().prepareHealth().get();
|
||||
assertEquals(2, numberOfEvents(ClusterHealthAction.NAME, Tuple::v1)); // counting only registration events
|
||||
assertEquals(2, numberOfEvents(ClusterHealthAction.NAME, event -> event.v1() == false)); // counting only unregistration events
|
||||
List<TaskInfo> tasks = findEvents(ClusterHealthAction.NAME, Tuple::v1);
|
||||
|
||||
// Verify that one of these tasks is a parent of another task
|
||||
if (tasks.get(0).getParentNode() == null) {
|
||||
assertParentTask(Collections.singletonList(tasks.get(1)), tasks.get(0));
|
||||
} else {
|
||||
assertParentTask(Collections.singletonList(tasks.get(0)), tasks.get(1));
|
||||
}
|
||||
}
|
||||
|
||||
public void testTransportReplicationAllShardsTasks() {
|
||||
registerTaskManageListeners(PercolateAction.NAME); // main task
|
||||
registerTaskManageListeners(PercolateAction.NAME + "[s]"); // shard level tasks
|
||||
createIndex("test");
|
||||
ensureGreen("test"); // Make sure all shards are allocated
|
||||
client().preparePercolate().setIndices("test").setDocumentType("foo").setSource("{}").get();
|
||||
|
||||
// the percolate operation should produce one main task
|
||||
NumShards numberOfShards = getNumShards("test");
|
||||
assertEquals(1, numberOfEvents(PercolateAction.NAME, Tuple::v1));
|
||||
// and then one operation per shard
|
||||
assertEquals(numberOfShards.totalNumShards, numberOfEvents(PercolateAction.NAME + "[s]", Tuple::v1));
|
||||
|
||||
// the shard level tasks should have the main task as a parent
|
||||
assertParentTask(findEvents(PercolateAction.NAME + "[s]", Tuple::v1), findEvents(PercolateAction.NAME, Tuple::v1).get(0));
|
||||
}
|
||||
|
||||
public void testTransportBroadcastByNodeTasks() {
|
||||
registerTaskManageListeners(UpgradeAction.NAME); // main task
|
||||
registerTaskManageListeners(UpgradeAction.NAME + "[n]"); // node level tasks
|
||||
createIndex("test");
|
||||
ensureGreen("test"); // Make sure all shards are allocated
|
||||
client().admin().indices().prepareUpgrade("test").get();
|
||||
|
||||
// the percolate operation should produce one main task
|
||||
assertEquals(1, numberOfEvents(UpgradeAction.NAME, Tuple::v1));
|
||||
// and then one operation per each node where shards are located
|
||||
assertEquals(internalCluster().nodesInclude("test").size(), numberOfEvents(UpgradeAction.NAME + "[n]", Tuple::v1));
|
||||
|
||||
// all node level tasks should have the main task as a parent
|
||||
assertParentTask(findEvents(UpgradeAction.NAME + "[n]", Tuple::v1), findEvents(UpgradeAction.NAME, Tuple::v1).get(0));
|
||||
}
|
||||
|
||||
public void testTransportReplicationSingleShardTasks() {
|
||||
registerTaskManageListeners(ValidateQueryAction.NAME); // main task
|
||||
registerTaskManageListeners(ValidateQueryAction.NAME + "[s]"); // shard level tasks
|
||||
createIndex("test");
|
||||
ensureGreen("test"); // Make sure all shards are allocated
|
||||
client().admin().indices().prepareValidateQuery("test").get();
|
||||
|
||||
// the validate operation should produce one main task
|
||||
assertEquals(1, numberOfEvents(ValidateQueryAction.NAME, Tuple::v1));
|
||||
// and then one operation
|
||||
assertEquals(1, numberOfEvents(ValidateQueryAction.NAME + "[s]", Tuple::v1));
|
||||
// the shard level operation should have the main task as its parent
|
||||
assertParentTask(findEvents(ValidateQueryAction.NAME + "[s]", Tuple::v1), findEvents(ValidateQueryAction.NAME, Tuple::v1).get(0));
|
||||
}
|
||||
|
||||
|
||||
public void testTransportBroadcastReplicationTasks() {
|
||||
registerTaskManageListeners(RefreshAction.NAME); // main task
|
||||
registerTaskManageListeners(RefreshAction.NAME + "[s]"); // shard level tasks
|
||||
registerTaskManageListeners(RefreshAction.NAME + "[s][*]"); // primary and replica shard tasks
|
||||
createIndex("test");
|
||||
ensureGreen("test"); // Make sure all shards are allocated
|
||||
client().admin().indices().prepareRefresh("test").get();
|
||||
|
||||
// the refresh operation should produce one main task
|
||||
NumShards numberOfShards = getNumShards("test");
|
||||
|
||||
logger.debug("number of shards, total: [{}], primaries: [{}] ", numberOfShards.totalNumShards, numberOfShards.numPrimaries);
|
||||
logger.debug("main events {}", numberOfEvents(RefreshAction.NAME, Tuple::v1));
|
||||
logger.debug("main event node {}", findEvents(RefreshAction.NAME, Tuple::v1).get(0).getNode().name());
|
||||
logger.debug("[s] events {}", numberOfEvents(RefreshAction.NAME + "[s]", Tuple::v1));
|
||||
logger.debug("[s][*] events {}", numberOfEvents(RefreshAction.NAME + "[s][*]", Tuple::v1));
|
||||
logger.debug("nodes with the index {}", internalCluster().nodesInclude("test"));
|
||||
|
||||
assertEquals(1, numberOfEvents(RefreshAction.NAME, Tuple::v1));
|
||||
// Because it's broadcast replication action we will have as many [s] level requests
|
||||
// as we have primary shards on the coordinating node plus we will have one task per primary outside of the
|
||||
// coordinating node due to replication.
|
||||
// If all primaries are on the coordinating node, the number of tasks should be equal to the number of primaries
|
||||
// If all primaries are not on the coordinating node, the number of tasks should be equal to the number of primaries times 2
|
||||
assertThat(numberOfEvents(RefreshAction.NAME + "[s]", Tuple::v1), greaterThanOrEqualTo(numberOfShards.numPrimaries));
|
||||
assertThat(numberOfEvents(RefreshAction.NAME + "[s]", Tuple::v1), lessThanOrEqualTo(numberOfShards.numPrimaries * 2));
|
||||
|
||||
// Verify that all [s] events have the proper parent
|
||||
// This is complicated because if the shard task runs on the same node it has main task as a parent
|
||||
// but if it runs on non-coordinating node it would have another intermediate [s] task on the coordinating node as a parent
|
||||
TaskInfo mainTask = findEvents(RefreshAction.NAME, Tuple::v1).get(0);
|
||||
List<TaskInfo> sTasks = findEvents(RefreshAction.NAME + "[s]", Tuple::v1);
|
||||
for (TaskInfo taskInfo : sTasks) {
|
||||
if (mainTask.getNode().equals(taskInfo.getNode())) {
|
||||
// This shard level task runs on the same node as a parent task - it should have the main task as a direct parent
|
||||
assertParentTask(Collections.singletonList(taskInfo), mainTask);
|
||||
} else {
|
||||
String description = taskInfo.getDescription();
|
||||
// This shard level task runs on another node - it should have a corresponding shard level task on the node where main task is running
|
||||
List<TaskInfo> sTasksOnRequestingNode = findEvents(RefreshAction.NAME + "[s]",
|
||||
event -> event.v1() && mainTask.getNode().equals(event.v2().getNode()) && description.equals(event.v2().getDescription()));
|
||||
// There should be only one parent task
|
||||
assertEquals(1, sTasksOnRequestingNode.size());
|
||||
assertParentTask(Collections.singletonList(taskInfo), sTasksOnRequestingNode.get(0));
|
||||
}
|
||||
}
|
||||
|
||||
// we will have as many [s][p] and [s][r] tasks as we have primary and replica shards
|
||||
assertEquals(numberOfShards.totalNumShards, numberOfEvents(RefreshAction.NAME + "[s][*]", Tuple::v1));
|
||||
|
||||
// we the [s][p] and [s][r] tasks should have a corresponding [s] task on the same node as a parent
|
||||
List<TaskInfo> spEvents = findEvents(RefreshAction.NAME + "[s][*]", Tuple::v1);
|
||||
for (TaskInfo taskInfo : spEvents) {
|
||||
List<TaskInfo> sTask;
|
||||
if (taskInfo.getAction().endsWith("[s][p]")) {
|
||||
// A [s][p] level task should have a corresponding [s] level task on the same node
|
||||
sTask = findEvents(RefreshAction.NAME + "[s]",
|
||||
event -> event.v1() && taskInfo.getNode().equals(event.v2().getNode()) && taskInfo.getDescription().equals(event.v2().getDescription()));
|
||||
} else {
|
||||
// A [s][r] level task should have a corresponding [s] level task on the a different node (where primary is located)
|
||||
sTask = findEvents(RefreshAction.NAME + "[s]",
|
||||
event -> event.v1() && taskInfo.getParentNode().equals(event.v2().getNode().getId()) && taskInfo.getDescription().equals(event.v2().getDescription()));
|
||||
}
|
||||
// There should be only one parent task
|
||||
assertEquals(1, sTask.size());
|
||||
assertParentTask(Collections.singletonList(taskInfo), sTask.get(0));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void tearDown() throws Exception {
|
||||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
|
||||
((MockTaskManager)internalCluster().getInstance(ClusterService.class, entry.getKey().v1()).getTaskManager()).removeListener(entry.getValue());
|
||||
}
|
||||
listeners.clear();
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers recording task event listeners with the given action mask on all nodes
|
||||
*/
|
||||
private void registerTaskManageListeners(String actionMasks) {
|
||||
for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
|
||||
DiscoveryNode node = clusterService.localNode();
|
||||
RecordingTaskManagerListener listener = new RecordingTaskManagerListener(node, Strings.splitStringToArray(actionMasks, ','));
|
||||
((MockTaskManager)clusterService.getTaskManager()).addListener(listener);
|
||||
RecordingTaskManagerListener oldListener = listeners.put(new Tuple<>(node.name(), actionMasks), listener);
|
||||
assertNull(oldListener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Resets all recording task event listeners with the given action mask on all nodes
|
||||
*/
|
||||
private void resetTaskManageListeners(String actionMasks) {
|
||||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
|
||||
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
|
||||
entry.getValue().reset();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of events that satisfy the criteria across all nodes
|
||||
*
|
||||
* @param actionMasks action masks to match
|
||||
* @return number of events that satisfy the criteria
|
||||
*/
|
||||
private int numberOfEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
|
||||
return findEvents(actionMasks, criteria).size();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns all events that satisfy the criteria across all nodes
|
||||
*
|
||||
* @param actionMasks action masks to match
|
||||
* @return number of events that satisfy the criteria
|
||||
*/
|
||||
private List<TaskInfo> findEvents(String actionMasks, Function<Tuple<Boolean, TaskInfo>, Boolean> criteria) {
|
||||
List<TaskInfo> events = new ArrayList<>();
|
||||
for (Map.Entry<Tuple<String, String>, RecordingTaskManagerListener> entry : listeners.entrySet()) {
|
||||
if (actionMasks == null || entry.getKey().v2().equals(actionMasks)) {
|
||||
for (Tuple<Boolean, TaskInfo> taskEvent : entry.getValue().getEvents()) {
|
||||
if (criteria.apply(taskEvent)) {
|
||||
events.add(taskEvent.v2());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that all tasks in the tasks list have the same parentTask
|
||||
*/
|
||||
private void assertParentTask(List<TaskInfo> tasks, TaskInfo parentTask) {
|
||||
for (TaskInfo task : tasks) {
|
||||
assertNotNull(task.getParentNode());
|
||||
assertEquals(parentTask.getNode().getId(), task.getParentNode());
|
||||
assertEquals(parentTask.getId(), task.getParentId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,10 +48,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
|||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.ChildTask;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.cluster.TestClusterService;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.transport.local.LocalTransport;
|
||||
|
@ -95,12 +96,11 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
threadPool = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public final void setupTestNodes() throws Exception {
|
||||
public void setupTestNodes(Settings settings) {
|
||||
nodesCount = randomIntBetween(2, 10);
|
||||
testNodes = new TestNode[nodesCount];
|
||||
for (int i = 0; i < testNodes.length; i++) {
|
||||
testNodes[i] = new TestNode("node" + i, threadPool, Settings.EMPTY);
|
||||
testNodes[i] = new TestNode("node" + i, threadPool, settings);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -113,11 +113,20 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
|
||||
private static class TestNode implements Releasable {
|
||||
public TestNode(String name, ThreadPool threadPool, Settings settings) {
|
||||
clusterService = new TestClusterService(threadPool);
|
||||
transportService = new TransportService(Settings.EMPTY,
|
||||
new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
|
||||
threadPool);
|
||||
transportService = new TransportService(settings,
|
||||
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
|
||||
threadPool){
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
if (settings.getAsBoolean(MockTaskManager.USE_MOCK_TASK_MANAGER, false)) {
|
||||
return new MockTaskManager(settings);
|
||||
} else {
|
||||
return super.createTaskManager();
|
||||
}
|
||||
}
|
||||
};
|
||||
transportService.start();
|
||||
clusterService = new TestClusterService(threadPool, transportService);
|
||||
discoveryNode = new DiscoveryNode(name, transportService.boundAddress().publishAddress(), Version.CURRENT);
|
||||
transportListTasksAction = new TransportListTasksAction(settings, clusterName, threadPool, clusterService, transportService,
|
||||
new ActionFilters(Collections.emptySet()), new IndexNameExpressionResolver(settings));
|
||||
|
@ -150,6 +159,15 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public static RecordingTaskManagerListener[] setupListeners(TestNode[] nodes, String... actionMasks) {
|
||||
RecordingTaskManagerListener[] listeners = new RecordingTaskManagerListener[nodes.length];
|
||||
for (int i = 0; i < nodes.length; i++) {
|
||||
listeners[i] = new RecordingTaskManagerListener(nodes[i].discoveryNode, actionMasks);
|
||||
((MockTaskManager)(nodes[i].clusterService.getTaskManager())).addListener(listeners[i]);
|
||||
}
|
||||
return listeners;
|
||||
}
|
||||
|
||||
public static class NodeRequest extends BaseNodeRequest {
|
||||
protected String requestName;
|
||||
private boolean enableTaskManager;
|
||||
|
@ -180,7 +198,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
|
||||
@Override
|
||||
public String getDescription() {
|
||||
return "NodeRequest[" + requestName + ", " + enableTaskManager + "]";
|
||||
return "NodeRequest[" + requestName + ", " + enableTaskManager + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -464,6 +482,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRunningTasksCount() throws Exception {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
|
@ -553,6 +572,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFindChildTasks() throws Exception {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
|
||||
|
@ -586,10 +606,11 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTaskManagementOptOut() throws Exception {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
// Starting actions that disable task manager
|
||||
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));
|
||||
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));
|
||||
|
||||
TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];
|
||||
|
||||
|
@ -606,6 +627,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTasksDescriptions() throws Exception {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
|
||||
|
@ -637,8 +659,11 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testFailedTasksCount() throws ExecutionException, InterruptedException, IOException {
|
||||
Settings settings = Settings.builder().put(MockTaskManager.USE_MOCK_TASK_MANAGER, true).build();
|
||||
setupTestNodes(settings);
|
||||
connectNodes(testNodes);
|
||||
TestNodesAction[] actions = new TestNodesAction[nodesCount];
|
||||
RecordingTaskManagerListener[] listeners = setupListeners(testNodes, "testAction*");
|
||||
for (int i = 0; i < testNodes.length; i++) {
|
||||
final int node = i;
|
||||
actions[i] = new TestNodesAction(Settings.EMPTY, "testAction", clusterName, threadPool, testNodes[i].clusterService, testNodes[i].transportService) {
|
||||
|
@ -656,9 +681,21 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
NodesRequest request = new NodesRequest("Test Request");
|
||||
NodesResponse responses = actions[0].execute(request).get();
|
||||
assertEquals(nodesCount, responses.failureCount());
|
||||
|
||||
// Make sure that actions are still registered in the task manager on all nodes
|
||||
// Twice on the coordinating node and once on all other nodes.
|
||||
assertEquals(4, listeners[0].getEvents().size());
|
||||
assertEquals(2, listeners[0].getRegistrationEvents().size());
|
||||
assertEquals(2, listeners[0].getUnregistrationEvents().size());
|
||||
for (int i = 1; i < listeners.length; i++) {
|
||||
assertEquals(2, listeners[i].getEvents().size());
|
||||
assertEquals(1, listeners[i].getRegistrationEvents().size());
|
||||
assertEquals(1, listeners[i].getUnregistrationEvents().size());
|
||||
}
|
||||
}
|
||||
|
||||
public void testTaskLevelActionFailures() throws ExecutionException, InterruptedException, IOException {
|
||||
setupTestNodes(Settings.EMPTY);
|
||||
connectNodes(testNodes);
|
||||
CountDownLatch checkLatch = new CountDownLatch(1);
|
||||
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch);
|
||||
|
@ -672,7 +709,7 @@ public class TransportTasksActionTests extends ESTestCase {
|
|||
@Override
|
||||
protected TestTaskResponse taskOperation(TestTasksRequest request, Task task) {
|
||||
logger.info("Task action on node " + node);
|
||||
if (failTaskOnNode == node && ((ChildTask) task).getParentNode() != null) {
|
||||
if (failTaskOnNode == node && task.getParentNode() != null) {
|
||||
logger.info("Failing on node " + node);
|
||||
throw new RuntimeException("Task level failure");
|
||||
}
|
||||
|
|
|
@ -242,7 +242,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
|||
.addGlobalBlock(new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncAction(request, listener).start();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
fail("expected ClusterBlockException");
|
||||
} catch (ClusterBlockException expected) {
|
||||
assertEquals("blocked by: [SERVICE_UNAVAILABLE/1/test-block];", expected.getMessage());
|
||||
|
@ -257,7 +257,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
|||
.addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncAction(request, listener).start();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
fail("expected ClusterBlockException");
|
||||
} catch (ClusterBlockException expected) {
|
||||
assertEquals("blocked by: [SERVICE_UNAVAILABLE/1/test-block];", expected.getMessage());
|
||||
|
@ -268,7 +268,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
|||
Request request = new Request(new String[]{TEST_INDEX});
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
action.new AsyncAction(request, listener).start();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
|
||||
ShardsIterator shardIt = clusterService.state().routingTable().allShards(new String[]{TEST_INDEX});
|
||||
|
@ -302,7 +302,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
|||
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
|
||||
|
||||
action.new AsyncAction(request, listener).start();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
|
||||
|
@ -389,7 +389,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
|||
clusterService.setState(ClusterState.builder(clusterService.state()).nodes(builder));
|
||||
}
|
||||
|
||||
action.new AsyncAction(request, listener).start();
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
|
||||
|
||||
ShardsIterator shardIt = clusterService.state().getRoutingTable().allShards(new String[]{TEST_INDEX});
|
||||
|
|
|
@ -41,6 +41,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.cluster.TestClusterService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -207,7 +208,7 @@ public class BroadcastReplicationTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected void shardExecute(DummyBroadcastRequest request, ShardId shardId, ActionListener<ReplicationResponse> shardActionListener) {
|
||||
protected void shardExecute(Task task, DummyBroadcastRequest request, ShardId shardId, ActionListener<ReplicationResponse> shardActionListener) {
|
||||
capturedShardRequests.add(new Tuple<>(shardId, shardActionListener));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -140,7 +140,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("primary phase should fail operation", listener, ClusterBlockException.class);
|
||||
|
||||
|
@ -148,13 +148,13 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
.addGlobalBlock(new ClusterBlock(1, "retryable", true, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
clusterService.setState(ClusterState.builder(clusterService.state()).blocks(block));
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(new Request().timeout("5ms"), listener);
|
||||
reroutePhase = action.new ReroutePhase(null, new Request().timeout("5ms"), listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
|
||||
|
||||
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(new Request(), listener);
|
||||
reroutePhase = action.new ReroutePhase(null, new Request(), listener);
|
||||
reroutePhase.run();
|
||||
assertFalse("primary phase should wait on retryable block", listener.isDone());
|
||||
|
||||
|
@ -180,13 +180,13 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
|
||||
Request request = new Request(shardId).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
|
||||
|
||||
request = new Request(shardId);
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(request, listener);
|
||||
reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertFalse("unassigned primary didn't cause a retry", listener.isDone());
|
||||
|
||||
|
@ -211,12 +211,12 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
|
||||
Request request = new Request(new ShardId("unknown_index", "_na_", 0)).timeout("1ms");
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
|
||||
request = new Request(new ShardId(index, "_na_", 10)).timeout("1ms");
|
||||
listener = new PlainActionFuture<>();
|
||||
reroutePhase = action.new ReroutePhase(request, listener);
|
||||
reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
|
||||
}
|
||||
|
@ -234,7 +234,7 @@ public class TransportReplicationActionTests extends ESTestCase {
|
|||
Request request = new Request(shardId);
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(request, listener);
|
||||
TransportReplicationAction.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
|
||||
reroutePhase.run();
|
||||
assertThat(request.shardId(), equalTo(shardId));
|
||||
logger.info("--> primary is assigned to [{}], checking request forwarded", primaryNodeId);
|
||||
|
|
|
@ -870,53 +870,6 @@ public class GetTermVectorsIT extends AbstractTermVectorsTestCase {
|
|||
checkBrownFoxTermVector(resp.getFields(), "field1", false);
|
||||
}
|
||||
|
||||
public void testArtificialNonExistingField() throws Exception {
|
||||
// setup indices
|
||||
Settings.Builder settings = settingsBuilder()
|
||||
.put(indexSettings())
|
||||
.put("index.analysis.analyzer", "standard");
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(settings)
|
||||
.addMapping("type1", "field1", "type=string"));
|
||||
ensureGreen();
|
||||
|
||||
// index just one doc
|
||||
List<IndexRequestBuilder> indexBuilders = new ArrayList<>();
|
||||
indexBuilders.add(client().prepareIndex()
|
||||
.setIndex("test")
|
||||
.setType("type1")
|
||||
.setId("1")
|
||||
.setRouting("1")
|
||||
.setSource("field1", "some text"));
|
||||
indexRandom(true, indexBuilders);
|
||||
|
||||
// request tvs from artificial document
|
||||
XContentBuilder doc = jsonBuilder()
|
||||
.startObject()
|
||||
.field("field1", "the quick brown fox jumps over the lazy dog")
|
||||
.field("non_existing", "the quick brown fox jumps over the lazy dog")
|
||||
.endObject();
|
||||
|
||||
for (int i = 0; i < 2; i++) {
|
||||
TermVectorsResponse resp = client().prepareTermVectors()
|
||||
.setIndex("test")
|
||||
.setType("type1")
|
||||
.setDoc(doc)
|
||||
.setRouting("" + i)
|
||||
.setOffsets(true)
|
||||
.setPositions(true)
|
||||
.setFieldStatistics(true)
|
||||
.setTermStatistics(true)
|
||||
.get();
|
||||
assertThat(resp.isExists(), equalTo(true));
|
||||
checkBrownFoxTermVector(resp.getFields(), "field1", false);
|
||||
// we should have created a mapping for this field
|
||||
assertMappingOnMaster("test", "type1", "non_existing");
|
||||
// and return the generated term vectors
|
||||
checkBrownFoxTermVector(resp.getFields(), "non_existing", false);
|
||||
}
|
||||
}
|
||||
|
||||
public void testPerFieldAnalyzer() throws IOException {
|
||||
int numFields = 25;
|
||||
|
||||
|
|
|
@ -89,7 +89,6 @@ public abstract class AbstractShapeBuilderTestCase<SB extends ShapeBuilder> exte
|
|||
}
|
||||
XContentBuilder builder = testShape.toXContent(contentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
XContentParser shapeParser = XContentHelper.createParser(builder.bytes());
|
||||
XContentHelper.createParser(builder.bytes());
|
||||
shapeParser.nextToken();
|
||||
ShapeBuilder parsedShape = ShapeBuilder.parse(shapeParser);
|
||||
assertNotSame(testShape, parsedShape);
|
||||
|
|
|
@ -29,7 +29,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
|
||||
public class OsProbeTests extends ESTestCase {
|
||||
|
@ -56,22 +55,29 @@ public class OsProbeTests extends ESTestCase {
|
|||
}
|
||||
if (Constants.WINDOWS) {
|
||||
// load average is unavailable on Windows
|
||||
if (loadAverage != null) {
|
||||
assertThat(loadAverage[0], equalTo((double) -1));
|
||||
assertThat(loadAverage[1], equalTo((double) -1));
|
||||
assertThat(loadAverage[2], equalTo((double) -1));
|
||||
}
|
||||
assertNull(loadAverage);
|
||||
} else if (Constants.LINUX) {
|
||||
// we should be able to get the load average
|
||||
assertNotNull(loadAverage);
|
||||
assertThat(loadAverage[0], greaterThanOrEqualTo((double) 0));
|
||||
assertThat(loadAverage[1], greaterThanOrEqualTo((double) 0));
|
||||
assertThat(loadAverage[2], greaterThanOrEqualTo((double) 0));
|
||||
} else {
|
||||
} else if (Constants.FREE_BSD) {
|
||||
// five- and fifteen-minute load averages not available if linprocfs is not mounted at /compat/linux/proc
|
||||
assertNotNull(loadAverage);
|
||||
assertThat(loadAverage[0], greaterThanOrEqualTo((double) 0));
|
||||
assertThat(loadAverage[1], anyOf(equalTo((double) -1), greaterThanOrEqualTo((double) 0)));
|
||||
assertThat(loadAverage[2], anyOf(equalTo((double) -1), greaterThanOrEqualTo((double) 0)));
|
||||
} else if (Constants.MAC_OS_X) {
|
||||
// one minute load average is available, but 10-minute and 15-minute load averages are not
|
||||
// load average can be negative if not available or not computed yet, otherwise it should be >= 0
|
||||
assertNotNull(loadAverage);
|
||||
assertThat(loadAverage[0], greaterThanOrEqualTo((double) 0));
|
||||
assertThat(loadAverage[1], equalTo((double) -1));
|
||||
assertThat(loadAverage[2], equalTo((double) -1));
|
||||
} else {
|
||||
// unknown system, but the best case is that we have the one-minute load average
|
||||
if (loadAverage != null) {
|
||||
assertThat(loadAverage[0], anyOf(lessThan((double) 0), greaterThanOrEqualTo((double) 0)));
|
||||
assertThat(loadAverage[0], anyOf(equalTo((double) -1), greaterThanOrEqualTo((double) 0)));
|
||||
assertThat(loadAverage[1], equalTo((double) -1));
|
||||
assertThat(loadAverage[2], equalTo((double) -1));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.suggest.phrase;
|
||||
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.Laplace;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.SmoothingModel;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class LaplaceModelTests extends SmoothingModelTestCase {
|
||||
|
||||
@Override
|
||||
protected SmoothingModel createTestModel() {
|
||||
return new Laplace(randomDoubleBetween(0.0, 10.0, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* mutate the given model so the returned smoothing model is different
|
||||
*/
|
||||
@Override
|
||||
protected Laplace createMutation(SmoothingModel input) {
|
||||
Laplace original = (Laplace) input;
|
||||
return new Laplace(original.getAlpha() + 0.1);
|
||||
}
|
||||
|
||||
@Override
|
||||
void assertWordScorer(WordScorer wordScorer, SmoothingModel input) {
|
||||
Laplace model = (Laplace) input;
|
||||
assertThat(wordScorer, instanceOf(LaplaceScorer.class));
|
||||
assertEquals(model.getAlpha(), ((LaplaceScorer) wordScorer).alpha(), Double.MIN_VALUE);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.suggest.phrase;
|
||||
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.LinearInterpolation;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.SmoothingModel;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class LinearInterpolationModelTests extends SmoothingModelTestCase {
|
||||
|
||||
@Override
|
||||
protected SmoothingModel createTestModel() {
|
||||
double trigramLambda = randomDoubleBetween(0.0, 10.0, false);
|
||||
double bigramLambda = randomDoubleBetween(0.0, 10.0, false);
|
||||
double unigramLambda = randomDoubleBetween(0.0, 10.0, false);
|
||||
// normalize so parameters sum to 1
|
||||
double sum = trigramLambda + bigramLambda + unigramLambda;
|
||||
return new LinearInterpolation(trigramLambda / sum, bigramLambda / sum, unigramLambda / sum);
|
||||
}
|
||||
|
||||
/**
|
||||
* mutate the given model so the returned smoothing model is different
|
||||
*/
|
||||
@Override
|
||||
protected LinearInterpolation createMutation(SmoothingModel input) {
|
||||
LinearInterpolation original = (LinearInterpolation) input;
|
||||
// swap two values permute original lambda values
|
||||
switch (randomIntBetween(0, 2)) {
|
||||
case 0:
|
||||
// swap first two
|
||||
return new LinearInterpolation(original.getBigramLambda(), original.getTrigramLambda(), original.getUnigramLambda());
|
||||
case 1:
|
||||
// swap last two
|
||||
return new LinearInterpolation(original.getTrigramLambda(), original.getUnigramLambda(), original.getBigramLambda());
|
||||
case 2:
|
||||
default:
|
||||
// swap first and last
|
||||
return new LinearInterpolation(original.getUnigramLambda(), original.getBigramLambda(), original.getTrigramLambda());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
void assertWordScorer(WordScorer wordScorer, SmoothingModel in) {
|
||||
LinearInterpolation testModel = (LinearInterpolation) in;
|
||||
LinearInterpoatingScorer testScorer = (LinearInterpoatingScorer) wordScorer;
|
||||
assertThat(wordScorer, instanceOf(LinearInterpoatingScorer.class));
|
||||
assertEquals(testModel.getTrigramLambda(), (testScorer).trigramLambda(), 1e-15);
|
||||
assertEquals(testModel.getBigramLambda(), (testScorer).bigramLambda(), 1e-15);
|
||||
assertEquals(testModel.getUnigramLambda(), (testScorer).unigramLambda(), 1e-15);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,196 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.suggest.phrase;
|
||||
|
||||
import org.apache.lucene.analysis.Analyzer;
|
||||
import org.apache.lucene.analysis.core.WhitespaceAnalyzer;
|
||||
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.DirectoryReader;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.index.IndexWriterConfig;
|
||||
import org.apache.lucene.index.MultiFields;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.lucene.BytesRefs;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.Laplace;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.LinearInterpolation;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.SmoothingModel;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.StupidBackoff;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public abstract class SmoothingModelTestCase extends ESTestCase {
|
||||
|
||||
private static NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
/**
|
||||
* setup for the whole base test class
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void init() {
|
||||
if (namedWriteableRegistry == null) {
|
||||
namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
namedWriteableRegistry.registerPrototype(SmoothingModel.class, Laplace.PROTOTYPE);
|
||||
namedWriteableRegistry.registerPrototype(SmoothingModel.class, LinearInterpolation.PROTOTYPE);
|
||||
namedWriteableRegistry.registerPrototype(SmoothingModel.class, StupidBackoff.PROTOTYPE);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
namedWriteableRegistry = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* create random model that is put under test
|
||||
*/
|
||||
protected abstract SmoothingModel createTestModel();
|
||||
|
||||
/**
|
||||
* mutate the given model so the returned smoothing model is different
|
||||
*/
|
||||
protected abstract SmoothingModel createMutation(SmoothingModel original) throws IOException;
|
||||
|
||||
/**
|
||||
* Test that creates new smoothing model from a random test smoothing model and checks both for equality
|
||||
*/
|
||||
public void testFromXContent() throws IOException {
|
||||
QueryParseContext context = new QueryParseContext(new IndicesQueriesRegistry(Settings.settingsBuilder().build(), Collections.emptyMap()));
|
||||
context.parseFieldMatcher(new ParseFieldMatcher(Settings.EMPTY));
|
||||
|
||||
SmoothingModel testModel = createTestModel();
|
||||
XContentBuilder contentBuilder = XContentFactory.contentBuilder(randomFrom(XContentType.values()));
|
||||
if (randomBoolean()) {
|
||||
contentBuilder.prettyPrint();
|
||||
}
|
||||
contentBuilder.startObject();
|
||||
testModel.innerToXContent(contentBuilder, ToXContent.EMPTY_PARAMS);
|
||||
contentBuilder.endObject();
|
||||
XContentParser parser = XContentHelper.createParser(contentBuilder.bytes());
|
||||
context.reset(parser);
|
||||
parser.nextToken(); // go to start token, real parsing would do that in the outer element parser
|
||||
SmoothingModel prototype = (SmoothingModel) namedWriteableRegistry.getPrototype(SmoothingModel.class,
|
||||
testModel.getWriteableName());
|
||||
SmoothingModel parsedModel = prototype.fromXContent(context);
|
||||
assertNotSame(testModel, parsedModel);
|
||||
assertEquals(testModel, parsedModel);
|
||||
assertEquals(testModel.hashCode(), parsedModel.hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test the WordScorer emitted by the smoothing model
|
||||
*/
|
||||
public void testBuildWordScorer() throws IOException {
|
||||
SmoothingModel testModel = createTestModel();
|
||||
|
||||
Map<String, Analyzer> mapping = new HashMap<>();
|
||||
mapping.put("field", new WhitespaceAnalyzer());
|
||||
PerFieldAnalyzerWrapper wrapper = new PerFieldAnalyzerWrapper(new WhitespaceAnalyzer(), mapping);
|
||||
IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(wrapper));
|
||||
Document doc = new Document();
|
||||
doc.add(new Field("field", "someText", TextField.TYPE_NOT_STORED));
|
||||
writer.addDocument(doc);
|
||||
DirectoryReader ir = DirectoryReader.open(writer, false);
|
||||
|
||||
WordScorer wordScorer = testModel.buildWordScorerFactory().newScorer(ir, MultiFields.getTerms(ir , "field"), "field", 0.9d, BytesRefs.toBytesRef(" "));
|
||||
assertWordScorer(wordScorer, testModel);
|
||||
}
|
||||
|
||||
/**
|
||||
* implementation dependant assertions on the wordScorer produced by the smoothing model under test
|
||||
*/
|
||||
abstract void assertWordScorer(WordScorer wordScorer, SmoothingModel testModel);
|
||||
|
||||
/**
|
||||
* Test serialization and deserialization of the tested model.
|
||||
*/
|
||||
public void testSerialization() throws IOException {
|
||||
SmoothingModel testModel = createTestModel();
|
||||
SmoothingModel deserializedModel = copyModel(testModel);
|
||||
assertEquals(testModel, deserializedModel);
|
||||
assertEquals(testModel.hashCode(), deserializedModel.hashCode());
|
||||
assertNotSame(testModel, deserializedModel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test equality and hashCode properties
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testEqualsAndHashcode() throws IOException {
|
||||
SmoothingModel firstModel = createTestModel();
|
||||
assertFalse("smoothing model is equal to null", firstModel.equals(null));
|
||||
assertFalse("smoothing model is equal to incompatible type", firstModel.equals(""));
|
||||
assertTrue("smoothing model is not equal to self", firstModel.equals(firstModel));
|
||||
assertThat("same smoothing model's hashcode returns different values if called multiple times", firstModel.hashCode(),
|
||||
equalTo(firstModel.hashCode()));
|
||||
assertThat("different smoothing models should not be equal", createMutation(firstModel), not(equalTo(firstModel)));
|
||||
|
||||
SmoothingModel secondModel = copyModel(firstModel);
|
||||
assertTrue("smoothing model is not equal to self", secondModel.equals(secondModel));
|
||||
assertTrue("smoothing model is not equal to its copy", firstModel.equals(secondModel));
|
||||
assertTrue("equals is not symmetric", secondModel.equals(firstModel));
|
||||
assertThat("smoothing model copy's hashcode is different from original hashcode", secondModel.hashCode(), equalTo(firstModel.hashCode()));
|
||||
|
||||
SmoothingModel thirdModel = copyModel(secondModel);
|
||||
assertTrue("smoothing model is not equal to self", thirdModel.equals(thirdModel));
|
||||
assertTrue("smoothing model is not equal to its copy", secondModel.equals(thirdModel));
|
||||
assertThat("smoothing model copy's hashcode is different from original hashcode", secondModel.hashCode(), equalTo(thirdModel.hashCode()));
|
||||
assertTrue("equals is not transitive", firstModel.equals(thirdModel));
|
||||
assertThat("smoothing model copy's hashcode is different from original hashcode", firstModel.hashCode(), equalTo(thirdModel.hashCode()));
|
||||
assertTrue("equals is not symmetric", thirdModel.equals(secondModel));
|
||||
assertTrue("equals is not symmetric", thirdModel.equals(firstModel));
|
||||
}
|
||||
|
||||
static SmoothingModel copyModel(SmoothingModel original) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
original.writeTo(output);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
SmoothingModel prototype = (SmoothingModel) namedWriteableRegistry.getPrototype(SmoothingModel.class, original.getWriteableName());
|
||||
return prototype.readFrom(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.suggest.phrase;
|
||||
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.SmoothingModel;
|
||||
import org.elasticsearch.search.suggest.phrase.PhraseSuggestionBuilder.StupidBackoff;
|
||||
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
||||
public class StupidBackoffModelTests extends SmoothingModelTestCase {
|
||||
|
||||
@Override
|
||||
protected SmoothingModel createTestModel() {
|
||||
return new StupidBackoff(randomDoubleBetween(0.0, 10.0, false));
|
||||
}
|
||||
|
||||
/**
|
||||
* mutate the given model so the returned smoothing model is different
|
||||
*/
|
||||
@Override
|
||||
protected StupidBackoff createMutation(SmoothingModel input) {
|
||||
StupidBackoff original = (StupidBackoff) input;
|
||||
return new StupidBackoff(original.getDiscount() + 0.1);
|
||||
}
|
||||
|
||||
@Override
|
||||
void assertWordScorer(WordScorer wordScorer, SmoothingModel input) {
|
||||
assertThat(wordScorer, instanceOf(StupidBackoffScorer.class));
|
||||
StupidBackoff testModel = (StupidBackoff) input;
|
||||
assertEquals(testModel.getDiscount(), ((StupidBackoffScorer) wordScorer).discount(), Double.MIN_VALUE);
|
||||
}
|
||||
}
|
|
@ -225,6 +225,7 @@ public class TribeIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/16299")
|
||||
public void testOnConflictDrop() throws Exception {
|
||||
logger.info("create 2 indices, test1 on t1, and test2 on t2");
|
||||
assertAcked(cluster().client().admin().indices().prepareCreate("conflict"));
|
||||
|
|
|
@ -18,6 +18,7 @@ your application to Elasticsearch 3.0.
|
|||
* <<breaking_30_percolator>>
|
||||
* <<breaking_30_packaging>>
|
||||
* <<breaking_30_scripting>>
|
||||
* <<breaking_30_term_vectors>>
|
||||
|
||||
[[breaking_30_search_changes]]
|
||||
=== Warmers
|
||||
|
@ -707,3 +708,8 @@ Previously script mode settings (e.g., "script.inline: true",
|
|||
values `off`, `false`, `0`, and `no` for disabling a scripting mode.
|
||||
The variants `on`, `1`, and `yes ` for enabling and `off`, `0`,
|
||||
and `no` for disabling are no longer supported.
|
||||
|
||||
[[breaking_30_term_vectors]]
|
||||
=== Term vectors
|
||||
|
||||
The term vectors APIs no longer persist unmapped fields in the mappings.
|
||||
|
|
|
@ -92,6 +92,7 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.groovy.GroovyPlugin;
|
||||
import org.elasticsearch.search.action.SearchServiceTransportAction;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -821,7 +822,7 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(T request, TransportChannel channel) throws Exception {
|
||||
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
|
||||
synchronized (InterceptingTransportService.this) {
|
||||
if (actions.contains(action)) {
|
||||
List<TransportRequest> requestList = requests.get(action);
|
||||
|
@ -834,7 +835,12 @@ public class IndicesRequestTests extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
requestHandler.messageReceived(request, channel);
|
||||
requestHandler.messageReceived(request, channel, task);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(T request, TransportChannel channel) throws Exception {
|
||||
messageReceived(request, channel, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ public class SizeFieldMapper extends MetadataFieldMapper {
|
|||
if (!enabledState.enabled) {
|
||||
return;
|
||||
}
|
||||
if (context.flyweight()) {
|
||||
if (context.source() == null) {
|
||||
return;
|
||||
}
|
||||
fields.add(new IntegerFieldMapper.CustomIntegerNumericField(context.source().length(), fieldType()));
|
||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
|||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
@ -77,6 +78,11 @@ public class TestClusterService implements ClusterService {
|
|||
taskManager = new TaskManager(Settings.EMPTY);
|
||||
}
|
||||
|
||||
public TestClusterService(ThreadPool threadPool, TransportService transportService) {
|
||||
this(ClusterState.builder(new ClusterName("test")).build(), threadPool);
|
||||
taskManager = transportService.getTaskManager();
|
||||
}
|
||||
|
||||
public TestClusterService(ClusterState state) {
|
||||
this(state, null);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.tasks;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
* A mock task manager that allows adding listeners for events
|
||||
*/
|
||||
public class MockTaskManager extends TaskManager {
|
||||
|
||||
public static final String USE_MOCK_TASK_MANAGER = "tests.mock.taskmanager.enabled";
|
||||
|
||||
private final Collection<MockTaskManagerListener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
public MockTaskManager(Settings settings) {
|
||||
super(settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task register(String type, String action, TransportRequest request) {
|
||||
Task task = super.register(type, action, request);
|
||||
if (task != null) {
|
||||
for (MockTaskManagerListener listener : listeners) {
|
||||
try {
|
||||
listener.onTaskRegistered(task);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to notify task manager listener about unregistering the task with id {}", t, task.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
return task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Task unregister(Task task) {
|
||||
Task removedTask = super.unregister(task);
|
||||
if (removedTask != null) {
|
||||
for (MockTaskManagerListener listener : listeners) {
|
||||
try {
|
||||
listener.onTaskUnregistered(task);
|
||||
} catch (Throwable t) {
|
||||
logger.warn("failed to notify task manager listener about unregistering the task with id {}", t, task.getId());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
logger.warn("trying to remove the same with id {} twice", task.getId());
|
||||
}
|
||||
return removedTask;
|
||||
}
|
||||
|
||||
public void addListener(MockTaskManagerListener listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
public void removeListener(MockTaskManagerListener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.test.tasks;
|
||||
|
||||
import org.elasticsearch.tasks.Task;
|
||||
|
||||
/**
|
||||
* Listener for task registration/unregistration
|
||||
*/
|
||||
public interface MockTaskManagerListener {
|
||||
void onTaskRegistered(Task task);
|
||||
|
||||
void onTaskUnregistered(Task task);
|
||||
}
|
|
@ -34,6 +34,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
|
@ -100,6 +102,15 @@ public class MockTransportService extends TransportService {
|
|||
return transportAddresses.toArray(new TransportAddress[transportAddresses.size()]);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
if (settings.getAsBoolean(MockTaskManager.USE_MOCK_TASK_MANAGER, false)) {
|
||||
return new MockTaskManager(settings);
|
||||
} else {
|
||||
return super.createTaskManager();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clears all the registered rules.
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue