Bulk API: Allow to control if its compressed or not using `action.bulk.compress` (defaults to true which is current behavior), closes #1850.

This commit is contained in:
Shay Banon 2012-04-05 20:47:38 +03:00
parent 824b0bd347
commit 5348c41924
9 changed files with 42 additions and 14 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.action;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;
/**
@ -50,7 +51,7 @@ public abstract class GenericAction<Request extends ActionRequest, Response exte
/**
* Optional request options for the action.
*/
public TransportRequestOptions options() {
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.EMPTY;
}

View File

@ -22,10 +22,13 @@ package org.elasticsearch.action;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
@ -33,16 +36,20 @@ import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
/**
* A generic proxy that will execute the given action against a specific node.
*/
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> {
public class TransportActionNodeProxy<Request extends ActionRequest, Response extends ActionResponse> extends AbstractComponent {
protected final TransportService transportService;
private final GenericAction<Request, Response> action;
private final TransportRequestOptions transportOptions;
@Inject
public TransportActionNodeProxy(GenericAction<Request, Response> action, TransportService transportService) {
public TransportActionNodeProxy(Settings settings, GenericAction<Request, Response> action, TransportService transportService) {
super(settings);
this.action = action;
this.transportService = transportService;
this.transportOptions = action.transportOptions(settings);
}
public ActionFuture<Response> execute(DiscoveryNode node, Request request) throws ElasticSearchException {
@ -53,7 +60,7 @@ public class TransportActionNodeProxy<Request extends ActionRequest, Response ex
}
public void execute(DiscoveryNode node, final Request request, final ActionListener<Response> listener) {
transportService.sendRequest(node, action.name(), request, action.options(), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, action.name(), request, transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
return action.newResponse();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.bulk;
import org.elasticsearch.action.Action;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.TransportRequestOptions;
/**
@ -45,7 +46,10 @@ public class BulkAction extends Action<BulkRequest, BulkResponse, BulkRequestBui
}
@Override
public TransportRequestOptions options() {
return TransportRequestOptions.options().withLowType().withCompress(true);
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.options()
.withType(TransportRequestOptions.Type.fromString(settings.get("action.bulk.transport.type", TransportRequestOptions.Type.LOW.toString())))
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
);
}
}

View File

@ -85,8 +85,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
@Override
protected TransportRequestOptions transportOptions() {
// low type since we don't want the large bulk requests to cause high latency on typical requests
return TransportRequestOptions.options().withCompress(true).withLowType();
return BulkAction.INSTANCE.transportOptions(settings);
}
@Override

View File

@ -73,6 +73,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
protected final WriteConsistencyLevel defaultWriteConsistencyLevel;
protected final TransportRequestOptions transportOptions;
final String transportAction;
final String transportReplicaAction;
final String executor;
@ -95,6 +97,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
transportService.registerHandler(transportAction, new OperationTransportHandler());
transportService.registerHandler(transportReplicaAction, new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();
this.defaultReplicationType = ReplicationType.fromString(settings.get("action.replication_type", "sync"));
this.defaultWriteConsistencyLevel = WriteConsistencyLevel.fromString(settings.get("action.write_consistency", "quorum"));
}
@ -431,7 +435,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportAction, request, transportOptions(), new BaseTransportResponseHandler<Response>() {
transportService.sendRequest(node, transportAction, request, transportOptions, new BaseTransportResponseHandler<Response>() {
@Override
public Response newInstance() {
@ -625,7 +629,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions(), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(VoidStreamable vResponse) {
finishIfPossible();

View File

@ -59,7 +59,7 @@ public class InternalTransportClient extends AbstractClient implements InternalC
MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<Action, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof Action) {
actionsBuilder.put((Action) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();

View File

@ -54,7 +54,7 @@ public class InternalTransportClusterAdminClient extends AbstractClusterAdminCli
MapBuilder<ClusterAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<ClusterAction, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof ClusterAction) {
actionsBuilder.put((ClusterAction) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((ClusterAction) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();

View File

@ -54,7 +54,7 @@ public class InternalTransportIndicesAdminClient extends AbstractIndicesAdminCli
MapBuilder<IndicesAction, TransportActionNodeProxy> actionsBuilder = new MapBuilder<IndicesAction, TransportActionNodeProxy>();
for (GenericAction action : actions.values()) {
if (action instanceof IndicesAction) {
actionsBuilder.put((IndicesAction) action, new TransportActionNodeProxy(action, transportService));
actionsBuilder.put((IndicesAction) action, new TransportActionNodeProxy(settings, action, transportService));
}
}
this.actions = actionsBuilder.immutableMap();

View File

@ -19,6 +19,7 @@
package org.elasticsearch.transport;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -35,7 +36,19 @@ public class TransportRequestOptions {
public static enum Type {
LOW,
MED,
HIGH
HIGH;
public static Type fromString(String type) {
if ("low".equalsIgnoreCase(type)) {
return LOW;
} else if ("med".equalsIgnoreCase(type)) {
return MED;
} else if ("high".equalsIgnoreCase(type)) {
return HIGH;
} else {
throw new ElasticSearchIllegalArgumentException("failed to match transport type for [" + type + "]");
}
}
}
private TimeValue timeout;