mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 12:56:53 +00:00
Cleanup suppressed overlength line for action.support package (#34889)
Clean up lines over 140 characters in the `org.elasticsearch.action.support.*` packages Relates to #34884
This commit is contained in:
parent
baa144e844
commit
329a94be0c
@ -80,24 +80,6 @@
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]tasks[/\\]TransportPendingClusterTasksAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]MultiSearchRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchPhaseController.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]DelegatingActionListener.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]IndicesOptions.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]BroadcastOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]TransportBroadcastAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]AcknowledgedRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]MasterNodeOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]MasterNodeReadOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]info[/\\]ClusterInfoRequest.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]info[/\\]ClusterInfoRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]info[/\\]TransportClusterInfoAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]nodes[/\\]NodesOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]ReplicationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]TransportBroadcastReplicationAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]InstanceShardOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]shard[/\\]SingleShardOperationRequestBuilder.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]shard[/\\]TransportSingleShardAction.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]MultiTermVectorsRequest.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TermVectorsRequest.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]TermVectorsResponse.java" checks="LineLength" />
|
||||
@ -266,11 +248,6 @@
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]state[/\\]ClusterStateRequestTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]admin[/\\]cluster[/\\]stats[/\\]ClusterStatsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]search[/\\]SearchRequestBuilderTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]WaitActiveShardCountIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]broadcast[/\\]node[/\\]TransportBroadcastByNodeActionTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]master[/\\]TransportMasterNodeActionTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]replication[/\\]BroadcastReplicationTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]support[/\\]single[/\\]instance[/\\]TransportInstanceSingleOperationActionTests.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]AbstractTermVectorsTestCase.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]GetTermVectorsIT.java" checks="LineLength" />
|
||||
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]action[/\\]termvectors[/\\]MultiTermVectorsIT.java" checks="LineLength" />
|
||||
|
@ -25,7 +25,8 @@ package org.elasticsearch.action.support;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
|
||||
public abstract class DelegatingActionListener<Instigator extends ActionResponse, Delegated extends ActionResponse> implements ActionListener<Instigator> {
|
||||
public abstract class DelegatingActionListener<Instigator extends ActionResponse, Delegated extends ActionResponse>
|
||||
implements ActionListener<Instigator> {
|
||||
|
||||
ActionListener<Delegated> delegatedActionListener;
|
||||
|
||||
|
@ -263,12 +263,15 @@ public class IndicesOptions implements ToXContentFragment {
|
||||
}
|
||||
}
|
||||
|
||||
public static IndicesOptions fromOptions(boolean ignoreUnavailable, boolean allowNoIndices, boolean expandToOpenIndices, boolean expandToClosedIndices) {
|
||||
public static IndicesOptions fromOptions(boolean ignoreUnavailable, boolean allowNoIndices, boolean expandToOpenIndices,
|
||||
boolean expandToClosedIndices) {
|
||||
return fromOptions(ignoreUnavailable, allowNoIndices, expandToOpenIndices, expandToClosedIndices, true, false, false);
|
||||
}
|
||||
|
||||
public static IndicesOptions fromOptions(boolean ignoreUnavailable, boolean allowNoIndices, boolean expandToOpenIndices, boolean expandToClosedIndices, IndicesOptions defaultOptions) {
|
||||
return fromOptions(ignoreUnavailable, allowNoIndices, expandToOpenIndices, expandToClosedIndices, defaultOptions.allowAliasesToMultipleIndices(), defaultOptions.forbidClosedIndices(), defaultOptions.ignoreAliases());
|
||||
public static IndicesOptions fromOptions(boolean ignoreUnavailable, boolean allowNoIndices, boolean expandToOpenIndices,
|
||||
boolean expandToClosedIndices, IndicesOptions defaultOptions) {
|
||||
return fromOptions(ignoreUnavailable, allowNoIndices, expandToOpenIndices, expandToClosedIndices,
|
||||
defaultOptions.allowAliasesToMultipleIndices(), defaultOptions.forbidClosedIndices(), defaultOptions.ignoreAliases());
|
||||
}
|
||||
|
||||
public static IndicesOptions fromOptions(boolean ignoreUnavailable, boolean allowNoIndices, boolean expandToOpenIndices,
|
||||
@ -327,7 +330,8 @@ public class IndicesOptions implements ToXContentFragment {
|
||||
"allow_no_indices".equals(name) || "allowNoIndices".equals(name);
|
||||
}
|
||||
|
||||
public static IndicesOptions fromParameters(Object wildcardsString, Object ignoreUnavailableString, Object allowNoIndicesString, IndicesOptions defaultSettings) {
|
||||
public static IndicesOptions fromParameters(Object wildcardsString, Object ignoreUnavailableString, Object allowNoIndicesString,
|
||||
IndicesOptions defaultSettings) {
|
||||
if (wildcardsString == null && ignoreUnavailableString == null && allowNoIndicesString == null) {
|
||||
return defaultSettings;
|
||||
}
|
||||
|
@ -24,8 +24,11 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public abstract class BroadcastOperationRequestBuilder<Request extends BroadcastRequest<Request>, Response extends BroadcastResponse, RequestBuilder extends BroadcastOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
public abstract class BroadcastOperationRequestBuilder<
|
||||
Request extends BroadcastRequest<Request>,
|
||||
Response extends BroadcastResponse,
|
||||
RequestBuilder extends BroadcastOperationRequestBuilder<Request, Response, RequestBuilder>
|
||||
> extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected BroadcastOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
super(client, action, request);
|
||||
|
@ -51,8 +51,12 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public abstract class TransportBroadcastAction<Request extends BroadcastRequest<Request>, Response extends BroadcastResponse, ShardRequest extends BroadcastShardRequest, ShardResponse extends BroadcastShardResponse>
|
||||
extends HandledTransportAction<Request, Response> {
|
||||
public abstract class TransportBroadcastAction<
|
||||
Request extends BroadcastRequest<Request>,
|
||||
Response extends BroadcastResponse,
|
||||
ShardRequest extends BroadcastShardRequest,
|
||||
ShardResponse extends BroadcastShardResponse
|
||||
> extends HandledTransportAction<Request, Response> {
|
||||
|
||||
protected final ClusterService clusterService;
|
||||
protected final TransportService transportService;
|
||||
@ -62,8 +66,9 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
||||
private final String shardExecutor;
|
||||
|
||||
protected TransportBroadcastAction(Settings settings, String actionName, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<Request> request, Supplier<ShardRequest> shardRequest, String shardExecutor) {
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
Supplier<ShardRequest> shardRequest, String shardExecutor) {
|
||||
super(settings, actionName, transportService, actionFilters, request);
|
||||
this.clusterService = clusterService;
|
||||
this.transportService = transportService;
|
||||
@ -172,28 +177,29 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
|
||||
// no node connected, act as failure
|
||||
onOperation(shard, shardIt, shardIndex, new NoShardAvailableActionException(shardIt.shardId()));
|
||||
} else {
|
||||
transportService.sendRequest(node, transportShardAction, shardRequest, new TransportResponseHandler<ShardResponse>() {
|
||||
@Override
|
||||
public ShardResponse read(StreamInput in) throws IOException {
|
||||
ShardResponse response = newShardResponse();
|
||||
response.readFrom(in);
|
||||
return response;
|
||||
}
|
||||
transportService.sendRequest(node, transportShardAction, shardRequest,
|
||||
new TransportResponseHandler<ShardResponse>() {
|
||||
@Override
|
||||
public ShardResponse read(StreamInput in) throws IOException {
|
||||
ShardResponse response = newShardResponse();
|
||||
response.readFrom(in);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(ShardResponse response) {
|
||||
onOperation(shard, shardIndex, response);
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(ShardResponse response) {
|
||||
onOperation(shard, shardIndex, response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
onOperation(shard, shardIt, shardIndex, e);
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException e) {
|
||||
onOperation(shard, shardIt, shardIndex, e);
|
||||
}
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
@ -143,7 +143,8 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
||||
successfulShards += response.getSuccessfulShards();
|
||||
for (BroadcastShardOperationFailedException throwable : response.getExceptions()) {
|
||||
if (!TransportActions.isShardNotAvailableException(throwable)) {
|
||||
exceptions.add(new DefaultShardOperationFailedException(throwable.getShardId().getIndexName(), throwable.getShardId().getId(), throwable));
|
||||
exceptions.add(new DefaultShardOperationFailedException(throwable.getShardId().getIndexName(),
|
||||
throwable.getShardId().getId(), throwable));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -173,7 +174,9 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
||||
* @param clusterState the cluster state
|
||||
* @return the response
|
||||
*/
|
||||
protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<ShardOperationResult> results, List<DefaultShardOperationFailedException> shardFailures, ClusterState clusterState);
|
||||
protected abstract Response newResponse(Request request, int totalShards, int successfulShards, int failedShards,
|
||||
List<ShardOperationResult> results, List<DefaultShardOperationFailedException> shardFailures,
|
||||
ClusterState clusterState);
|
||||
|
||||
/**
|
||||
* Deserialize a request from an input stream
|
||||
@ -417,7 +420,8 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
|
||||
channel.sendResponse(new NodeResponse(request.getNodeId(), totalShards, results, accumulatedExceptions));
|
||||
}
|
||||
|
||||
private void onShardOperation(final NodeRequest request, final Object[] shardResults, final int shardIndex, final ShardRouting shardRouting) {
|
||||
private void onShardOperation(final NodeRequest request, final Object[] shardResults, final int shardIndex,
|
||||
final ShardRouting shardRouting) {
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}] executing operation for shard [{}]", actionName, shardRouting.shortSummary());
|
||||
|
@ -25,7 +25,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
/**
|
||||
* Base request builder for master node operations that support acknowledgements
|
||||
*/
|
||||
public abstract class AcknowledgedRequestBuilder<Request extends AcknowledgedRequest<Request>, Response extends AcknowledgedResponse, RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class AcknowledgedRequestBuilder<Request extends AcknowledgedRequest<Request>, Response extends AcknowledgedResponse,
|
||||
RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected AcknowledgedRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -28,7 +28,8 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
/**
|
||||
* Base request builder for master node operations
|
||||
*/
|
||||
public abstract class MasterNodeOperationRequestBuilder<Request extends MasterNodeRequest<Request>, Response extends ActionResponse, RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class MasterNodeOperationRequestBuilder<Request extends MasterNodeRequest<Request>, Response extends ActionResponse,
|
||||
RequestBuilder extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected MasterNodeOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -26,7 +26,8 @@ import org.elasticsearch.client.ElasticsearchClient;
|
||||
/**
|
||||
* Base request builder for master node read operations that can be executed on the local node as well
|
||||
*/
|
||||
public abstract class MasterNodeReadOperationRequestBuilder<Request extends MasterNodeReadRequest<Request>, Response extends ActionResponse, RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class MasterNodeReadOperationRequestBuilder<Request extends MasterNodeReadRequest<Request>, Response extends ActionResponse,
|
||||
RequestBuilder extends MasterNodeReadOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends MasterNodeOperationRequestBuilder<Request, Response, RequestBuilder> {
|
||||
|
||||
protected MasterNodeReadOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -28,7 +28,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends MasterNodeReadRequest<Request> implements IndicesRequest.Replaceable {
|
||||
public abstract class ClusterInfoRequest<Request extends ClusterInfoRequest<Request>> extends MasterNodeReadRequest<Request>
|
||||
implements IndicesRequest.Replaceable {
|
||||
|
||||
private String[] indices = Strings.EMPTY_ARRAY;
|
||||
private String[] types = Strings.EMPTY_ARRAY;
|
||||
|
@ -25,7 +25,9 @@ import org.elasticsearch.action.support.master.MasterNodeReadOperationRequestBui
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.util.ArrayUtils;
|
||||
|
||||
public abstract class ClusterInfoRequestBuilder<Request extends ClusterInfoRequest<Request>, Response extends ActionResponse, Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>> extends MasterNodeReadOperationRequestBuilder<Request, Response, Builder> {
|
||||
public abstract class ClusterInfoRequestBuilder<Request extends ClusterInfoRequest<Request>, Response extends ActionResponse,
|
||||
Builder extends ClusterInfoRequestBuilder<Request, Response, Builder>>
|
||||
extends MasterNodeReadOperationRequestBuilder<Request, Response, Builder> {
|
||||
|
||||
|
||||
protected ClusterInfoRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -52,5 +52,6 @@ public abstract class TransportClusterInfoAction<Request extends ClusterInfoRequ
|
||||
doMasterOperation(request, concreteIndices, state, listener);
|
||||
}
|
||||
|
||||
protected abstract void doMasterOperation(Request request, String[] concreteIndices, ClusterState state, ActionListener<Response> listener);
|
||||
protected abstract void doMasterOperation(Request request, String[] concreteIndices, ClusterState state,
|
||||
ActionListener<Response> listener);
|
||||
}
|
||||
|
@ -24,7 +24,8 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
public abstract class NodesOperationRequestBuilder<Request extends BaseNodesRequest<Request>, Response extends BaseNodesResponse, RequestBuilder extends NodesOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class NodesOperationRequestBuilder<Request extends BaseNodesRequest<Request>, Response extends BaseNodesResponse,
|
||||
RequestBuilder extends NodesOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected NodesOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -26,7 +26,8 @@ import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
public abstract class ReplicationRequestBuilder<Request extends ReplicationRequest<Request>, Response extends ActionResponse, RequestBuilder extends ReplicationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class ReplicationRequestBuilder<Request extends ReplicationRequest<Request>, Response extends ActionResponse,
|
||||
RequestBuilder extends ReplicationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected ReplicationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -51,7 +51,8 @@ import java.util.function.Supplier;
|
||||
* Base class for requests that should be executed on all shards of an index or several indices.
|
||||
* This action sends shard requests to all primary shards of the indices and they are then replicated like write requests
|
||||
*/
|
||||
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest<ShardRequest>, ShardResponse extends ReplicationResponse>
|
||||
public abstract class TransportBroadcastReplicationAction<Request extends BroadcastRequest<Request>, Response extends BroadcastResponse,
|
||||
ShardRequest extends ReplicationRequest<ShardRequest>, ShardResponse extends ReplicationResponse>
|
||||
extends HandledTransportAction<Request, Response> {
|
||||
|
||||
private final TransportReplicationAction replicatedBroadcastShardAction;
|
||||
@ -60,7 +61,8 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
||||
|
||||
public TransportBroadcastReplicationAction(String name, Supplier<Request> request, Settings settings, ClusterService clusterService,
|
||||
TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, TransportReplicationAction replicatedBroadcastShardAction) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransportReplicationAction replicatedBroadcastShardAction) {
|
||||
super(settings, name, transportService, actionFilters, request);
|
||||
this.replicatedBroadcastShardAction = replicatedBroadcastShardAction;
|
||||
this.clusterService = clusterService;
|
||||
@ -96,7 +98,8 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
||||
if (TransportActions.isShardNotAvailableException(e)) {
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[0];
|
||||
} else {
|
||||
ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId, null, e, ExceptionsHelper.status(e), true);
|
||||
ReplicationResponse.ShardInfo.Failure failure = new ReplicationResponse.ShardInfo.Failure(shardId, null, e,
|
||||
ExceptionsHelper.status(e), true);
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[totalNumCopies];
|
||||
Arrays.fill(failures, failure);
|
||||
}
|
||||
@ -126,7 +129,8 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
||||
for (String index : concreteIndices) {
|
||||
IndexMetaData indexMetaData = clusterState.metaData().getIndices().get(index);
|
||||
if (indexMetaData != null) {
|
||||
for (IntObjectCursor<IndexShardRoutingTable> shardRouting : clusterState.getRoutingTable().indicesRouting().get(index).getShards()) {
|
||||
for (IntObjectCursor<IndexShardRoutingTable> shardRouting
|
||||
: clusterState.getRoutingTable().indicesRouting().get(index).getShards()) {
|
||||
shardIds.add(shardRouting.value.shardId());
|
||||
}
|
||||
}
|
||||
@ -156,7 +160,8 @@ public abstract class TransportBroadcastReplicationAction<Request extends Broadc
|
||||
shardFailures = new ArrayList<>();
|
||||
}
|
||||
for (ReplicationResponse.ShardInfo.Failure failure : shardResponse.getShardInfo().getFailures()) {
|
||||
shardFailures.add(new DefaultShardOperationFailedException(new BroadcastShardOperationFailedException(failure.fullShardId(), failure.getCause())));
|
||||
shardFailures.add(new DefaultShardOperationFailedException(
|
||||
new BroadcastShardOperationFailedException(failure.fullShardId(), failure.getCause())));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -25,7 +25,8 @@ import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
public abstract class InstanceShardOperationRequestBuilder<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class InstanceShardOperationRequestBuilder<Request extends InstanceShardOperationRequest<Request>,
|
||||
Response extends ActionResponse, RequestBuilder extends InstanceShardOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected InstanceShardOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -51,8 +51,10 @@ import org.elasticsearch.transport.TransportService;
|
||||
import java.io.IOException;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public abstract class TransportInstanceSingleOperationAction<Request extends InstanceShardOperationRequest<Request>, Response extends ActionResponse>
|
||||
extends HandledTransportAction<Request, Response> {
|
||||
public abstract class TransportInstanceSingleOperationAction<
|
||||
Request extends InstanceShardOperationRequest<Request>,
|
||||
Response extends ActionResponse
|
||||
> extends HandledTransportAction<Request, Response> {
|
||||
|
||||
protected final ThreadPool threadPool;
|
||||
protected final ClusterService clusterService;
|
||||
@ -64,7 +66,8 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
||||
|
||||
protected TransportInstanceSingleOperationAction(Settings settings, String actionName, ThreadPool threadPool,
|
||||
ClusterService clusterService, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<Request> request) {
|
||||
super(settings, actionName, transportService, actionFilters, request);
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
@ -216,9 +219,12 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
|
||||
Exception listenFailure = failure;
|
||||
if (listenFailure == null) {
|
||||
if (shardIt == null) {
|
||||
listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}", request.timeout(), actionName);
|
||||
listenFailure = new UnavailableShardsException(request.concreteIndex(), -1, "Timeout waiting for [{}], request: {}",
|
||||
request.timeout(), actionName);
|
||||
} else {
|
||||
listenFailure = new UnavailableShardsException(shardIt.shardId(), "[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(), request.timeout(), actionName);
|
||||
listenFailure = new UnavailableShardsException(shardIt.shardId(),
|
||||
"[{}] shardIt, [{}] active : Timeout waiting for [{}], request: {}", shardIt.size(), shardIt.sizeActive(),
|
||||
request.timeout(), actionName);
|
||||
}
|
||||
}
|
||||
listener.onFailure(listenFailure);
|
||||
|
@ -24,7 +24,8 @@ import org.elasticsearch.action.ActionRequestBuilder;
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
|
||||
public abstract class SingleShardOperationRequestBuilder<Request extends SingleShardRequest<Request>, Response extends ActionResponse, RequestBuilder extends SingleShardOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
public abstract class SingleShardOperationRequestBuilder<Request extends SingleShardRequest<Request>, Response extends ActionResponse,
|
||||
RequestBuilder extends SingleShardOperationRequestBuilder<Request, Response, RequestBuilder>>
|
||||
extends ActionRequestBuilder<Request, Response> {
|
||||
|
||||
protected SingleShardOperationRequestBuilder(ElasticsearchClient client, Action<Response> action, Request request) {
|
||||
|
@ -60,7 +60,8 @@ import static org.elasticsearch.action.support.TransportActions.isShardNotAvaila
|
||||
* the read operation can be performed on other shard copies. Concrete implementations can provide their own list
|
||||
* of candidate shards to try the read operation on.
|
||||
*/
|
||||
public abstract class TransportSingleShardAction<Request extends SingleShardRequest<Request>, Response extends ActionResponse> extends TransportAction<Request, Response> {
|
||||
public abstract class TransportSingleShardAction<Request extends SingleShardRequest<Request>, Response extends ActionResponse>
|
||||
extends TransportAction<Request, Response> {
|
||||
|
||||
protected final ThreadPool threadPool;
|
||||
protected final ClusterService clusterService;
|
||||
@ -71,8 +72,9 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
private final String executor;
|
||||
|
||||
protected TransportSingleShardAction(Settings settings, String actionName, ThreadPool threadPool, ClusterService clusterService,
|
||||
TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<Request> request, String executor) {
|
||||
TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
String executor) {
|
||||
super(settings, actionName, actionFilters, transportService.getTaskManager());
|
||||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
@ -181,7 +183,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
public void start() {
|
||||
if (shardIt == null) {
|
||||
// just execute it on the local node
|
||||
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
|
||||
transportService.sendRequest(clusterService.localNode(), transportShardAction, internalRequest.request(),
|
||||
new TransportResponseHandler<Response>() {
|
||||
@Override
|
||||
public Response read(StreamInput in) throws IOException {
|
||||
Response response = newResponse();
|
||||
@ -211,7 +214,8 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
|
||||
private void onFailure(ShardRouting shardRouting, Exception e) {
|
||||
if (e != null) {
|
||||
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting, internalRequest.request()), e);
|
||||
logger.trace(() -> new ParameterizedMessage("{}: failed to execute [{}]", shardRouting,
|
||||
internalRequest.request()), e);
|
||||
}
|
||||
perform(e);
|
||||
}
|
||||
@ -226,9 +230,11 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
if (shardRouting == null) {
|
||||
Exception failure = lastFailure;
|
||||
if (failure == null || isShardNotAvailableException(failure)) {
|
||||
failure = new NoShardAvailableActionException(null, LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
|
||||
failure = new NoShardAvailableActionException(null,
|
||||
LoggerMessageFormat.format("No shard available for [{}]", internalRequest.request()), failure);
|
||||
} else {
|
||||
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null, internalRequest.request()), failure);
|
||||
logger.debug(() -> new ParameterizedMessage("{}: failed to execute [{}]", null,
|
||||
internalRequest.request()), failure);
|
||||
}
|
||||
listener.onFailure(failure);
|
||||
return;
|
||||
@ -246,29 +252,30 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
|
||||
node
|
||||
);
|
||||
}
|
||||
transportService.sendRequest(node, transportShardAction, internalRequest.request(), new TransportResponseHandler<Response>() {
|
||||
transportService.sendRequest(node, transportShardAction, internalRequest.request(),
|
||||
new TransportResponseHandler<Response>() {
|
||||
|
||||
@Override
|
||||
public Response read(StreamInput in) throws IOException {
|
||||
Response response = newResponse();
|
||||
response.readFrom(in);
|
||||
return response;
|
||||
}
|
||||
@Override
|
||||
public Response read(StreamInput in) throws IOException {
|
||||
Response response = newResponse();
|
||||
response.readFrom(in);
|
||||
return response;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(final Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(final Response response) {
|
||||
listener.onResponse(response);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
onFailure(shardRouting, exp);
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
onFailure(shardRouting, exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -54,7 +54,8 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
|
||||
fail("can't index, does not enough active shard copies");
|
||||
} catch (UnavailableShardsException e) {
|
||||
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
||||
assertThat(e.getMessage(), startsWith("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request:"));
|
||||
assertThat(e.getMessage(),
|
||||
startsWith("[test][0] Not enough active copies to meet shard count of [2] (have 1, needed 2). Timeout: [100ms], request:"));
|
||||
// but really, all is well
|
||||
}
|
||||
|
||||
@ -83,12 +84,14 @@ public class WaitActiveShardCountIT extends ESIntegTestCase {
|
||||
fail("can't index, not enough active shard copies");
|
||||
} catch (UnavailableShardsException e) {
|
||||
assertThat(e.status(), equalTo(RestStatus.SERVICE_UNAVAILABLE));
|
||||
assertThat(e.getMessage(), startsWith("[test][0] Not enough active copies to meet shard count of [" + ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request:"));
|
||||
assertThat(e.getMessage(), startsWith("[test][0] Not enough active copies to meet shard count of ["
|
||||
+ ActiveShardCount.ALL + "] (have 2, needed 3). Timeout: [100ms], request:"));
|
||||
// but really, all is well
|
||||
}
|
||||
|
||||
allowNodes("test", 3);
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(3).setWaitForGreenStatus().execute().actionGet();
|
||||
clusterHealth = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForActiveShards(3)
|
||||
.setWaitForGreenStatus().execute().actionGet();
|
||||
logger.info("Done Cluster Health, status {}", clusterHealth.getStatus());
|
||||
assertThat(clusterHealth.isTimedOut(), equalTo(false));
|
||||
assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.GREEN));
|
||||
|
@ -114,11 +114,15 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
|
||||
class TestTransportBroadcastByNodeAction extends TransportBroadcastByNodeAction<Request, Response, TransportBroadcastByNodeAction.EmptyResult> {
|
||||
class TestTransportBroadcastByNodeAction
|
||||
extends TransportBroadcastByNodeAction<Request, Response, TransportBroadcastByNodeAction.EmptyResult> {
|
||||
private final Map<ShardRouting, Object> shards = new HashMap<>();
|
||||
|
||||
TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request, String executor) {
|
||||
super(settings, "indices:admin/test", TransportBroadcastByNodeActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request, executor);
|
||||
TestTransportBroadcastByNodeAction(Settings settings, TransportService transportService, ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request,
|
||||
String executor) {
|
||||
super(settings, "indices:admin/test", TransportBroadcastByNodeActionTests.this.clusterService, transportService,
|
||||
actionFilters, indexNameExpressionResolver, request, executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -127,7 +131,9 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards, List<EmptyResult> emptyResults, List<DefaultShardOperationFailedException> shardFailures, ClusterState clusterState) {
|
||||
protected Response newResponse(Request request, int totalShards, int successfulShards, int failedShards,
|
||||
List<EmptyResult> emptyResults, List<DefaultShardOperationFailedException> shardFailures,
|
||||
ClusterState clusterState) {
|
||||
return new Response(totalShards, successfulShards, failedShards, shardFailures);
|
||||
}
|
||||
|
||||
@ -226,7 +232,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
totalIndexShards += numberOfShards;
|
||||
for (int j = 0; j < numberOfShards; j++) {
|
||||
final ShardId shardId = new ShardId(index, "_na_", ++shardIndex);
|
||||
ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.getId(), true, ShardRoutingState.STARTED);
|
||||
ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.getId(), true,
|
||||
ShardRoutingState.STARTED);
|
||||
IndexShardRoutingTable.Builder indexShard = new IndexShardRoutingTable.Builder(shardId);
|
||||
indexShard.addShard(shard);
|
||||
indexRoutingTable.addIndexShard(indexShard.build());
|
||||
@ -263,7 +270,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addGlobalBlock(new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
.addGlobalBlock(new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE,
|
||||
ClusterBlockLevel.ALL));
|
||||
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
@ -278,7 +286,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
PlainActionFuture<Response> listener = new PlainActionFuture<>();
|
||||
|
||||
ClusterBlocks.Builder block = ClusterBlocks.builder()
|
||||
.addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
|
||||
.addIndexBlock(TEST_INDEX, new ClusterBlock(1, "test-block", false, true, false, RestStatus.SERVICE_UNAVAILABLE,
|
||||
ClusterBlockLevel.ALL));
|
||||
setState(clusterService, ClusterState.builder(clusterService.state()).blocks(block));
|
||||
try {
|
||||
action.new AsyncAction(null, request, listener).start();
|
||||
@ -450,7 +459,8 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
|
||||
}
|
||||
}
|
||||
totalSuccessfulShards += shardResults.size();
|
||||
TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(), shardResults, exceptions);
|
||||
TransportBroadcastByNodeAction.NodeResponse nodeResponse = action.new NodeResponse(entry.getKey(), shards.size(),
|
||||
shardResults, exceptions);
|
||||
transport.handleResponse(requestId, nodeResponse);
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
||||
class Response extends ActionResponse {}
|
||||
|
||||
class Action extends TransportMasterNodeAction<Request, Response> {
|
||||
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService, ThreadPool threadPool) {
|
||||
Action(Settings settings, String actionName, TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool) {
|
||||
super(settings, actionName, transportService, clusterService, threadPool,
|
||||
new ActionFilters(new HashSet<>()), new IndexNameExpressionResolver(Settings.EMPTY), Request::new);
|
||||
}
|
||||
@ -176,7 +177,8 @@ public class TransportMasterNodeActionTests extends ESTestCase {
|
||||
|
||||
new Action(Settings.EMPTY, "internal:testAction", transportService, clusterService, threadPool) {
|
||||
@Override
|
||||
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener) throws Exception {
|
||||
protected void masterOperation(Task task, Request request, ClusterState state, ActionListener<Response> listener)
|
||||
throws Exception {
|
||||
if (masterOperationFailure) {
|
||||
listener.onFailure(exception);
|
||||
} else {
|
||||
|
@ -171,7 +171,8 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
if (shardsSucceeded == 1 && randomBoolean()) {
|
||||
//sometimes add failure (no failure means shard unavailable)
|
||||
failures = new ReplicationResponse.ShardInfo.Failure[1];
|
||||
failures[0] = new ReplicationResponse.ShardInfo.Failure(shardRequests.v1(), null, new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
|
||||
failures[0] = new ReplicationResponse.ShardInfo.Failure(shardRequests.v1(), null,
|
||||
new Exception("pretend shard failed"), RestStatus.GATEWAY_TIMEOUT, false);
|
||||
failed++;
|
||||
}
|
||||
replicationResponse.setShardInfo(new ReplicationResponse.ShardInfo(2, shardsSucceeded, failures));
|
||||
@ -204,14 +205,16 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
assertThat(shards.get(0), equalTo(shardId));
|
||||
}
|
||||
|
||||
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<DummyBroadcastRequest, BroadcastResponse, BasicReplicationRequest, ReplicationResponse> {
|
||||
protected final Set<Tuple<ShardId, ActionListener<ReplicationResponse>>> capturedShardRequests = ConcurrentCollections.newConcurrentSet();
|
||||
private class TestBroadcastReplicationAction extends TransportBroadcastReplicationAction<DummyBroadcastRequest, BroadcastResponse,
|
||||
BasicReplicationRequest, ReplicationResponse> {
|
||||
protected final Set<Tuple<ShardId, ActionListener<ReplicationResponse>>> capturedShardRequests =
|
||||
ConcurrentCollections.newConcurrentSet();
|
||||
|
||||
TestBroadcastReplicationAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> replicatedBroadcastShardAction) {
|
||||
TransportReplicationAction<BasicReplicationRequest, BasicReplicationRequest, ReplicationResponse> action) {
|
||||
super("internal:test-broadcast-replication-action", DummyBroadcastRequest::new, settings, clusterService, transportService,
|
||||
actionFilters, indexNameExpressionResolver, replicatedBroadcastShardAction);
|
||||
actionFilters, indexNameExpressionResolver, action);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -231,7 +234,8 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shardExecute(Task task, DummyBroadcastRequest request, ShardId shardId, ActionListener<ReplicationResponse> shardActionListener) {
|
||||
protected void shardExecute(Task task, DummyBroadcastRequest request, ShardId shardId,
|
||||
ActionListener<ReplicationResponse> shardActionListener) {
|
||||
capturedShardRequests.add(new Tuple<>(shardId, shardActionListener));
|
||||
}
|
||||
}
|
||||
@ -241,7 +245,8 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
FlushResponse flushResponse = ActionTestUtils.executeBlocking(flushAction, new FlushRequest(index));
|
||||
Date endDate = new Date();
|
||||
long maxTime = 500;
|
||||
assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere", endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime));
|
||||
assertThat("this should not take longer than " + maxTime + " ms. The request hangs somewhere",
|
||||
endDate.getTime() - beginDate.getTime(), lessThanOrEqualTo(maxTime));
|
||||
return flushResponse;
|
||||
}
|
||||
|
||||
|
@ -88,8 +88,11 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
class TestTransportInstanceSingleOperationAction extends TransportInstanceSingleOperationAction<Request, Response> {
|
||||
private final Map<ShardId, Object> shards = new HashMap<>();
|
||||
|
||||
TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
|
||||
super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService, actionFilters, indexNameExpressionResolver, request);
|
||||
TestTransportInstanceSingleOperationAction(Settings settings, String actionName, TransportService transportService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
Supplier<Request> request) {
|
||||
super(settings, actionName, THREAD_POOL, TransportInstanceSingleOperationActionTests.this.clusterService, transportService,
|
||||
actionFilters, indexNameExpressionResolver, request);
|
||||
}
|
||||
|
||||
public Map<ShardId, Object> getResults() {
|
||||
@ -212,7 +215,8 @@ public class TransportInstanceSingleOperationActionTests extends ESTestCase {
|
||||
long requestId = transport.capturedRequests()[0].requestId;
|
||||
transport.clear();
|
||||
// this should not trigger retry or anything and the listener should report exception immediately
|
||||
transport.handleRemoteError(requestId, new TransportException("a generic transport exception", new Exception("generic test exception")));
|
||||
transport.handleRemoteError(requestId, new TransportException("a generic transport exception",
|
||||
new Exception("generic test exception")));
|
||||
|
||||
try {
|
||||
// result should return immediately
|
||||
|
Loading…
x
Reference in New Issue
Block a user