Always compress based on the settings (#36522)

Currently TransportRequestOptions allows specific requests to request
compression. This commit removes this and always compresses based on the
settings. Additionally, it removes TransportResponseOptions as they
are unused.

This closes #36399.
This commit is contained in:
Tim Brooks 2018-12-12 09:39:15 -07:00 committed by GitHub
parent 02d0f163a4
commit 7f612d5dd8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 45 additions and 163 deletions

View File

@ -102,7 +102,6 @@ public class TransportGetTaskAction extends HandledTransportAction<GetTaskReques
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(false);
DiscoveryNode node = clusterService.state().nodes().get(request.getTaskId().getNodeId());
if (node == null) {
// Node is no longer part of the cluster! Try and look the task up from the results index.

View File

@ -69,11 +69,6 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
this.snapshotShardsService = snapshotShardsService;
}
@Override
protected boolean transportCompress() {
return true; // compress since the metadata can become large
}
@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);

View File

@ -39,9 +39,6 @@ public class BulkAction extends Action<BulkResponse> {
@Override
public TransportRequestOptions transportOptions(Settings settings) {
return TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.BULK)
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
).build();
return TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build();
}
}

View File

@ -82,10 +82,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
new AsyncAction(task, request, listener).start();
}
protected boolean transportCompress() {
return false;
}
/**
* Map the responses into {@code nodeResponseClass} responses and {@link FailedNodeException}s.
*
@ -173,7 +169,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
if (request.timeout() != null) {
builder.withTimeout(request.timeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodes.length; i++) {
final int idx = i;
final DiscoveryNode node = nodes[i];

View File

@ -212,10 +212,6 @@ public abstract class TransportTasksAction<
*/
protected abstract void taskOperation(TasksRequest request, OperationTask task, ActionListener<TaskResponse> listener);
protected boolean transportCompress() {
return false;
}
private class AsyncAction {
private final TasksRequest request;
@ -255,7 +251,6 @@ public abstract class TransportTasksAction<
if (request.getTimeout() != null) {
builder.withTimeout(request.getTimeout());
}
builder.withCompress(transportCompress());
for (int i = 0; i < nodesIds.length; i++) {
final String nodeId = nodesIds[i];
final int idx = i;

View File

@ -78,6 +78,10 @@ public class PublicationTransportHandler {
private final AtomicLong fullClusterStateReceivedCount = new AtomicLong();
private final AtomicLong incompatibleClusterStateDiffReceivedCount = new AtomicLong();
private final AtomicLong compatibleClusterStateDiffReceivedCount = new AtomicLong();
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).build();
public PublicationTransportHandler(TransportService transportService, NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
@ -213,7 +217,6 @@ public class PublicationTransportHandler {
@Override
public void sendApplyCommit(DiscoveryNode destination, ApplyCommitRequest applyCommitRequest,
ActionListener<TransportResponse.Empty> responseActionListener) {
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
final String actionName;
final TransportRequest transportRequest;
if (Coordinator.isZen1Node(destination)) {
@ -223,7 +226,7 @@ public class PublicationTransportHandler {
actionName = COMMIT_STATE_ACTION_NAME;
transportRequest = applyCommitRequest;
}
transportService.sendRequest(destination, actionName, transportRequest, options,
transportService.sendRequest(destination, actionName, transportRequest, stateRequestOptions,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
@ -254,11 +257,6 @@ public class PublicationTransportHandler {
ActionListener<PublishWithJoinResponse> responseActionListener, boolean sendDiffs,
Map<Version, BytesReference> serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
final TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
final BytesTransportRequest request = new BytesTransportRequest(bytes, node.getVersion());
final Consumer<TransportException> transportExceptionHandler = exp -> {
if (sendDiffs && exp.unwrapCause() instanceof IncompatibleClusterStateVersionException) {
@ -304,7 +302,7 @@ public class PublicationTransportHandler {
actionName = PUBLISH_STATE_ACTION_NAME;
transportResponseHandler = publishWithJoinResponseHandler;
}
transportService.sendRequest(node, actionName, request, options, transportResponseHandler);
transportService.sendRequest(node, actionName, request, stateRequestOptions, transportResponseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", node), e);
responseActionListener.onFailure(e);

View File

@ -76,6 +76,11 @@ public class PublishClusterStateAction {
public static final String SEND_ACTION_NAME = "internal:discovery/zen/publish/send";
public static final String COMMIT_ACTION_NAME = "internal:discovery/zen/publish/commit";
// -> no need to put a timeout on the options, because we want the state response to eventually be received
// and not log an error if it arrives after the timeout
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).build();
public interface IncomingClusterStateListener {
/**
@ -284,14 +289,9 @@ public class PublishClusterStateAction {
final boolean sendDiffs, final Map<Version, BytesReference> serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
options,
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
@ -324,12 +324,9 @@ public class PublishClusterStateAction {
try {
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]",
clusterState.stateUUID(), clusterState.version(), node);
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
// no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
transportService.sendRequest(node, COMMIT_ACTION_NAME,
new CommitClusterStateRequest(clusterState.stateUUID()),
options,
stateRequestOptions,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override

View File

@ -66,11 +66,6 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
return future;
}
@Override
protected boolean transportCompress() {
return true; // compress since the metadata can become large
}
@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId);

View File

@ -91,11 +91,6 @@ public class TransportNodesListGatewayStartedShards extends
execute(new Request(shardId, nodes), listener);
}
@Override
protected boolean transportCompress() {
return true; // this can become big...
}
@Override
protected NodeRequest newNodeRequest(String nodeId, Request request) {
return new NodeRequest(nodeId, request);

View File

@ -62,12 +62,10 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
this.recoverySettings = recoverySettings;
this.onSourceThrottle = onSourceThrottle;
this.translogOpsRequestOptions = TransportRequestOptions.builder()
.withCompress(true)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionLongTimeout())
.build();
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withCompress(false) // lucene files are already compressed and therefore compressing this won't really help much so
// we are saving the cpu for other things
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())

View File

@ -195,7 +195,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
// this lock is here to make sure we close this transport and disconnect all the client nodes
// connections while no connect operations is going on
private final ReadWriteLock closeLock = new ReentrantReadWriteLock();
private final boolean compressResponses;
private final boolean compressAllResponses;
private volatile BoundTransportAddress boundAddress;
private final String transportName;
@ -220,16 +220,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
this.pageCacheRecycler = pageCacheRecycler;
this.circuitBreakerService = circuitBreakerService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.compressAllResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.transportLogger = new TransportLogger();
this.handshaker = new TransportHandshaker(version, threadPool,
(node, channel, requestId, v) -> sendRequestToChannel(node, channel, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, new TransportHandshaker.HandshakeRequest(version),
TransportRequestOptions.EMPTY, v, TransportStatus.setHandshake((byte) 0)),
TransportRequestOptions.EMPTY, v, false, TransportStatus.setHandshake((byte) 0)),
(v, features, channel, response, requestId) -> sendResponse(v, features, channel, response, requestId,
TransportHandshaker.HANDSHAKE_ACTION_NAME, TransportResponseOptions.EMPTY, TransportStatus.setHandshake((byte) 0)));
TransportHandshaker.HANDSHAKE_ACTION_NAME, false, TransportStatus.setHandshake((byte) 0)));
this.keepAlive = new TransportKeepAlive(threadPool, this::internalSendMessage);
this.nodeName = Node.NODE_NAME_SETTING.get(settings);
@ -337,11 +337,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
throw new NodeNotConnectedException(node, "connection already closed");
}
TcpChannel channel = channel(options.type());
if (compress) {
options = TransportRequestOptions.builder(options).withCompress(true).build();
}
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0);
sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), compress, (byte) 0);
}
}
@ -768,11 +764,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action,
final TransportRequest request, TransportRequestOptions options, Version channelVersion,
byte status) throws IOException, TransportException {
boolean compressRequest, byte status) throws IOException, TransportException {
// only compress if asked and the request is not bytes. Otherwise only
// the header part is compressed, and the "body" can't be extracted as compressed
final boolean compressMessage = options.compress() && canCompress(request);
final boolean compressMessage = compressRequest && canCompress(request);
status = TransportStatus.setRequest(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
@ -871,8 +867,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final TransportResponse response,
final long requestId,
final String action,
final TransportResponseOptions options) throws IOException {
sendResponse(nodeVersion, features, channel, response, requestId, action, options, (byte) 0);
final boolean compress) throws IOException {
sendResponse(nodeVersion, features, channel, response, requestId, action, compress, (byte) 0);
}
private void sendResponse(
@ -882,18 +878,16 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final TransportResponse response,
final long requestId,
final String action,
TransportResponseOptions options,
boolean compress,
byte status) throws IOException {
if (compressResponses && options.compress() == false) {
options = TransportResponseOptions.builder(options).withCompress(true).build();
}
boolean compressMessage = compress || compressAllResponses;
status = TransportStatus.setResponse(status);
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays);
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress());
CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, compressMessage);
boolean addedReleaseListener = false;
try {
if (options.compress()) {
if (compressMessage) {
status = TransportStatus.setCompress(status);
}
threadPool.getThreadContext().writeTo(stream);
@ -901,10 +895,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
stream.setFeatures(features);
BytesReference message = buildMessage(requestId, status, nodeVersion, response, stream);
final TransportResponseOptions finalOptions = options;
// this might be called in a different thread
ReleaseListener releaseListener = new ReleaseListener(stream,
() -> messageListener.onResponseSent(requestId, action, response, finalOptions));
() -> messageListener.onResponseSent(requestId, action, response));
internalSendMessage(channel, message, releaseListener);
addedReleaseListener = true;
} finally {
@ -1530,9 +1523,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
@Override
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {
public void onResponseSent(long requestId, String action, TransportResponse response) {
for (TransportMessageListener listener : listeners) {
listener.onResponseSent(requestId, action, response, finalOptions);
listener.onResponseSent(requestId, action, response);
}
}

View File

@ -61,13 +61,7 @@ public final class TcpTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response) throws IOException {
try {
TransportResponseOptions options;
if (compressResponse) {
options = TransportResponseOptions.builder().withCompress(true).build();
} else {
options = TransportResponseOptions.EMPTY;
}
transport.sendResponse(version, features, channel, response, requestId, action, options);
transport.sendResponse(version, features, channel, response, requestId, action, compressResponse);
} finally {
release(false);
}

View File

@ -35,9 +35,8 @@ public interface TransportMessageListener {
* @param requestId the request ID (unique per client)
* @param action the request action
* @param response the response send
* @param finalOptions the response options
*/
default void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions finalOptions) {}
default void onResponseSent(long requestId, String action, TransportResponse response) {}
/***
* Called for every failed action response after the response has been passed to the underlying network implementation.

View File

@ -24,12 +24,10 @@ import org.elasticsearch.common.unit.TimeValue;
public class TransportRequestOptions {
private final TimeValue timeout;
private final boolean compress;
private final Type type;
private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) {
private TransportRequestOptions(TimeValue timeout, Type type) {
this.timeout = timeout;
this.compress = compress;
this.type = type;
}
@ -37,10 +35,6 @@ public class TransportRequestOptions {
return this.timeout;
}
public boolean compress() {
return this.compress;
}
public Type type() {
return this.type;
}
@ -60,15 +54,11 @@ public class TransportRequestOptions {
}
public static Builder builder(TransportRequestOptions options) {
return new Builder()
.withTimeout(options.timeout)
.withCompress(options.compress)
.withType(options.type());
return new Builder().withTimeout(options.timeout).withType(options.type());
}
public static class Builder {
private TimeValue timeout;
private boolean compress;
private Type type = Type.REG;
private Builder() {
@ -83,18 +73,13 @@ public class TransportRequestOptions {
return this;
}
public Builder withCompress(boolean compress) {
this.compress = compress;
return this;
}
public Builder withType(Type type) {
this.type = type;
return this;
}
public TransportRequestOptions build() {
return new TransportRequestOptions(timeout, compress, type);
return new TransportRequestOptions(timeout, type);
}
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
public class TransportResponseOptions {
private final boolean compress;
private TransportResponseOptions(boolean compress) {
this.compress = compress;
}
public boolean compress() {
return this.compress;
}
public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build();
public static Builder builder() {
return new Builder();
}
public static Builder builder(TransportResponseOptions options) {
return new Builder().withCompress(options.compress);
}
public static class Builder {
private boolean compress;
public Builder withCompress(boolean compress) {
this.compress = compress;
return this;
}
public TransportResponseOptions build() {
return new TransportResponseOptions(compress);
}
}
}

View File

@ -897,13 +897,15 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
}
/** called by the {@link Transport} implementation once a response was sent to calling node */
public void onResponseSent(long requestId, String action, TransportResponse response, TransportResponseOptions options) {
@Override
public void onResponseSent(long requestId, String action, TransportResponse response) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action);
}
}
/** called by the {@link Transport} implementation after an exception was sent as a response to an incoming request */
@Override
public void onResponseSent(long requestId, String action, Exception e) {
if (traceEnabled() && shouldTraceAction(action)) {
traceResponseSent(requestId, action, e);
@ -918,6 +920,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
* called by the {@link Transport} implementation when an incoming request arrives but before
* any parsing of it has happened (with the exception of the requestId and action)
*/
@Override
public void onRequestReceived(long requestId, String action) {
try {
blockIncomingRequestsLatch.await();
@ -1172,7 +1175,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran
@Override
public void sendResponse(TransportResponse response) throws IOException {
service.onResponseSent(requestId, action, response, TransportResponseOptions.EMPTY);
service.onResponseSent(requestId, action, response);
final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service);
// ignore if its null, the service logs it
if (handler != null) {

View File

@ -260,7 +260,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
res = serviceB.submitRequest(nodeA, "internal:sayHello", new StringMessageRequest("moshe"),
TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler<StringMessageResponse>() {
TransportRequestOptions.EMPTY, new TransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse read(StreamInput in) throws IOException {
return new StringMessageResponse(in);
@ -519,7 +519,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
TransportFuture<TransportResponse.Empty> res = serviceC.submitRequest(nodeA, "internal:sayHello",
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
TransportRequest.Empty.INSTANCE, TransportRequestOptions.EMPTY,
new TransportResponseHandler<TransportResponse.Empty>() {
@Override
public TransportResponse.Empty read(StreamInput in) {
@ -572,7 +572,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile);
TransportFuture<StringMessageResponse> res = serviceC.submitRequest(nodeA, "internal:sayHello",
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
new StringMessageRequest("moshe"), TransportRequestOptions.EMPTY,
new TransportResponseHandler<StringMessageResponse>() {
@Override
public StringMessageResponse read(StreamInput in) throws IOException {
@ -1795,7 +1795,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
DiscoveryNode node = randomFrom(nodeA, nodeB, nodeC);
logger.debug("send secondary request from {} to {} - {}", toNodeMap.get(service), node, request.info);
service.sendRequest(node, "internal:action1", new TestRequest("secondary " + request.info),
TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
TransportRequestOptions.EMPTY,
new TransportResponseHandler<TestResponse>() {
@Override
public TestResponse read(StreamInput in) throws IOException {
@ -1886,7 +1886,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
DiscoveryNode node = randomFrom(nodeC, nodeB, nodeA);
logger.debug("send from {} to {}", toNodeMap.get(service), node);
service.sendRequest(node, "internal:action1", new TestRequest("REQ[" + i + "]"),
TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TestResponseHandler(i));
TransportRequestOptions.EMPTY, new TestResponseHandler(i));
}
logger.debug("waiting for response");
fail.set(randomBoolean());