Remove implementations of `TransportChannel` (#27388)

Right now we have unnecessary implementations of `TransportChannel`.
Additionally, there are methods on the interface that are not used. This
commit removes unnecessary implementations and methods.
This commit is contained in:
Tim Brooks 2017-11-15 09:48:07 -07:00 committed by GitHub
parent 50a2459adf
commit a8f916911a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 35 additions and 138 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
import java.util.function.Supplier;
public class RequestHandlerRegistry<Request extends TransportRequest> {
@ -64,7 +63,7 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
} else {
boolean success = false;
try {
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
@ -91,38 +90,4 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
return handler.toString();
}
private static class TransportChannelWrapper extends DelegatingTransportChannel {
private final Task task;
private final TaskManager taskManager;
TransportChannelWrapper(TaskManager taskManager, Task task, TransportChannel channel) {
super(channel);
this.task = task;
this.taskManager = taskManager;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
endTask();
super.sendResponse(response);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
endTask();
super.sendResponse(response, options);
}
@Override
public void sendResponse(Exception exception) throws IOException {
endTask();
super.sendResponse(exception);
}
private void endTask() {
taskManager.unregister(task);
}
}
}

View File

@ -20,24 +20,22 @@
package org.elasticsearch.transport;
import org.elasticsearch.Version;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import java.io.IOException;
/**
* Wrapper around transport channel that delegates all requests to the
* underlying channel
*/
public class DelegatingTransportChannel implements TransportChannel {
public class TaskTransportChannel implements TransportChannel {
private final Task task;
private final TaskManager taskManager;
private final TransportChannel channel;
protected DelegatingTransportChannel(TransportChannel channel) {
TaskTransportChannel(TaskManager taskManager, Task task, TransportChannel channel) {
this.channel = channel;
}
@Override
public String action() {
return channel.action();
this.task = task;
this.taskManager = taskManager;
}
@Override
@ -45,11 +43,6 @@ public class DelegatingTransportChannel implements TransportChannel {
return channel.getProfileName();
}
@Override
public long getRequestId() {
return channel.getRequestId();
}
@Override
public String getChannelType() {
return channel.getChannelType();
@ -57,25 +50,32 @@ public class DelegatingTransportChannel implements TransportChannel {
@Override
public void sendResponse(TransportResponse response) throws IOException {
endTask();
channel.sendResponse(response);
}
@Override
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
endTask();
channel.sendResponse(response, options);
}
@Override
public void sendResponse(Exception exception) throws IOException {
endTask();
channel.sendResponse(exception);
}
public TransportChannel getChannel() {
return channel;
}
@Override
public Version getVersion() {
return channel.getVersion();
}
public TransportChannel getChannel() {
return channel;
}
private void endTask() {
taskManager.unregister(task);
}
}

View File

@ -24,17 +24,18 @@ import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
public final class TcpTransportChannel<Channel> implements TransportChannel {
private final TcpTransport<Channel> transport;
protected final Version version;
protected final String action;
protected final long requestId;
private final Version version;
private final String action;
private final long requestId;
private final String profileName;
private final long reservedBytes;
private final AtomicBoolean released = new AtomicBoolean();
private final String channelType;
private final Channel channel;
public TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
TcpTransportChannel(TcpTransport<Channel> transport, Channel channel, String channelType, String action,
long requestId, Version version, String profileName, long reservedBytes) {
this.version = version;
this.channel = channel;
@ -51,11 +52,6 @@ public final class TcpTransportChannel<Channel> implements TransportChannel {
return profileName;
}
@Override
public String action() {
return this.action;
}
@Override
public void sendResponse(TransportResponse response) throws IOException {
sendResponse(response, TransportResponseOptions.EMPTY);
@ -78,6 +74,7 @@ public final class TcpTransportChannel<Channel> implements TransportChannel {
release(true);
}
}
private Exception releaseBy;
private void release(boolean isExceptionResponse) {
@ -91,23 +88,18 @@ public final class TcpTransportChannel<Channel> implements TransportChannel {
}
}
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return channelType;
}
public Channel getChannel() {
return channel;
}
@Override
public Version getVersion() {
return version;
}
public Channel getChannel() {
return channel;
}
}

View File

@ -28,12 +28,8 @@ import java.io.IOException;
*/
public interface TransportChannel {
String action();
String getProfileName();
long getRequestId();
String getChannelType();
void sendResponse(TransportResponse response) throws IOException;

View File

@ -1117,8 +1117,8 @@ public class TransportService extends AbstractLifecycleComponent {
final TransportService service;
final ThreadPool threadPool;
DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId,
TransportService service, ThreadPool threadPool) {
DirectResponseChannel(Logger logger, DiscoveryNode localNode, String action, long requestId, TransportService service,
ThreadPool threadPool) {
this.logger = logger;
this.localNode = localNode;
this.action = action;
@ -1127,11 +1127,6 @@ public class TransportService extends AbstractLifecycleComponent {
this.threadPool = threadPool;
}
@Override
public String action() {
return action;
}
@Override
public String getProfileName() {
return DIRECT_RESPONSE_PROFILE;
@ -1177,13 +1172,7 @@ public class TransportService extends AbstractLifecycleComponent {
if (ThreadPool.Names.SAME.equals(executor)) {
processException(handler, rtx);
} else {
threadPool.executor(handler.executor()).execute(new Runnable() {
@SuppressWarnings({"unchecked"})
@Override
public void run() {
processException(handler, rtx);
}
});
threadPool.executor(handler.executor()).execute(() -> processException(handler, rtx));
}
}
}
@ -1205,11 +1194,6 @@ public class TransportService extends AbstractLifecycleComponent {
}
}
@Override
public long getRequestId() {
return requestId;
}
@Override
public String getChannelType() {
return "direct";

View File

@ -471,11 +471,6 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
return capturedResponse;
}
@Override
public String action() {
return null;
}
@Override
public String getProfileName() {
return "";
@ -494,11 +489,6 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
public void sendResponse(Exception exception) throws IOException {
}
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "test";

View File

@ -1237,11 +1237,6 @@ public class TransportReplicationActionTests extends ESTestCase {
public TransportChannel createTransportChannel(final PlainActionFuture<TestResponse> listener) {
return new TransportChannel() {
@Override
public String action() {
return null;
}
@Override
public String getProfileName() {
return "";
@ -1262,11 +1257,6 @@ public class TransportReplicationActionTests extends ESTestCase {
listener.onFailure(exception);
}
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "replica_test";

View File

@ -914,11 +914,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
error.set(null);
}
@Override
public String action() {
return "_noop_";
}
@Override
public String getProfileName() {
return "_noop_";
@ -942,11 +937,6 @@ public class PublishClusterStateActionTests extends ESTestCase {
assertThat(response.get(), nullValue());
}
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return "capturing";

View File

@ -47,7 +47,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.zen.PublishClusterStateActionTests.AssertingAckListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
@ -378,21 +377,12 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
} else {
AtomicBoolean sendResponse = new AtomicBoolean(false);
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), new TransportChannel() {
@Override
public String action() {
return null;
}
@Override
public String getProfileName() {
return null;
}
@Override
public long getRequestId() {
return 0;
}
@Override
public String getChannelType() {
return null;