From 7c5f0fe40528e63f6a7b88d3cd0ea95d23e9181b Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 14 Aug 2010 02:25:58 +0300 Subject: [PATCH] minor transport refactoring, add options to send response, though no options to set yet --- .../elasticsearch/transport/Transport.java | 12 +++---- .../transport/TransportChannel.java | 7 +++- .../transport/TransportRequestOptions.java | 2 ++ .../transport/TransportResponseOptions.java | 32 +++++++++++++++++++ .../transport/TransportService.java | 11 +++---- .../transport/local/LocalTransport.java | 2 +- .../local/LocalTransportChannel.java | 11 ++++--- .../transport/netty/NettyTransport.java | 3 +- .../netty/NettyTransportChannel.java | 6 ++++ 9 files changed, 64 insertions(+), 22 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java index 0728b5e34d5..9c39aa520ac 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/Transport.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; -import javax.annotation.Nullable; import java.io.IOException; /** @@ -34,8 +33,8 @@ import java.io.IOException; public interface Transport extends LifecycleComponent { class Helper { - public static final byte TRANSPORT_TYPE = 1; - public static final byte RESPONSE_TYPE = 1 << 1; + public static final byte TRANSPORT_TYPE = 1 << 0; + public static final byte ERROR = 1 << 1; public static boolean isRequest(byte value) { return (value & TRANSPORT_TYPE) == 0; @@ -52,14 +51,13 @@ public interface Transport extends LifecycleComponent { } public static boolean isError(byte value) { - return (value & RESPONSE_TYPE) != 0; + return (value & ERROR) != 0; } public static byte setError(byte value) { - value |= RESPONSE_TYPE; + value |= ERROR; return value; } - } void transportServiceAdapter(TransportServiceAdapter service); @@ -97,5 +95,5 @@ public interface Transport extends LifecycleComponent { /** * Sends the request to the node. */ - void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException; + void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportChannel.java index 19d7c21e898..dae9a9624eb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -24,12 +24,17 @@ import org.elasticsearch.common.io.stream.Streamable; import java.io.IOException; /** - * @author kimchy (Shay Banon) + * A transport channel allows to send a response to a request on the channel. + * + * @author kimchy (shay.banon) */ public interface TransportChannel { + String action(); void sendResponse(Streamable message) throws IOException; + void sendResponse(Streamable message, TransportResponseOptions options) throws IOException; + void sendResponse(Throwable error) throws IOException; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index ba69a472fd8..beeb8fd586f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -26,6 +26,8 @@ import org.elasticsearch.common.unit.TimeValue; */ public class TransportRequestOptions { + public static TransportRequestOptions EMPTY = options(); + public static TransportRequestOptions options() { return new TransportRequestOptions(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java new file mode 100644 index 00000000000..b4e642e552d --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java @@ -0,0 +1,32 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +/** + * @author kimchy (shay.banon) + */ +public class TransportResponseOptions { + + public static final TransportResponseOptions EMPTY = options(); + + public static TransportResponseOptions options() { + return new TransportResponseOptions(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index 7f9d641b42c..2c237ca619b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.timer.TimerService; -import javax.annotation.Nullable; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -156,11 +155,11 @@ public class TransportService extends AbstractLifecycleComponent TransportFuture submitRequest(DiscoveryNode node, String action, Streamable message, TransportResponseHandler handler) throws TransportException { - return submitRequest(node, action, message, null, handler); + return submitRequest(node, action, message, TransportRequestOptions.EMPTY, handler); } public TransportFuture submitRequest(DiscoveryNode node, String action, Streamable message, - @Nullable TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { + TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture(handler); sendRequest(node, action, message, options, futureHandler); return futureHandler; @@ -168,15 +167,15 @@ public class TransportService extends AbstractLifecycleComponent void sendRequest(final DiscoveryNode node, final String action, final Streamable message, final TransportResponseHandler handler) throws TransportException { - sendRequest(node, action, message, null, handler); + sendRequest(node, action, message, TransportRequestOptions.EMPTY, handler); } public void sendRequest(final DiscoveryNode node, final String action, final Streamable message, - @Nullable final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { + final TransportRequestOptions options, final TransportResponseHandler handler) throws TransportException { final long requestId = newRequestId(); Timeout timeoutX = null; try { - if (options != null && options.timeout() != null) { + if (options.timeout() != null) { timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED); } clientHandlers.put(requestId, new RequestHolder(handler, node, action, timeoutX)); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java index 12177ca91ca..0907077c9a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -134,7 +134,7 @@ public class LocalTransport extends AbstractLifecycleComponent implem } } - @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException { + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); stream.writeLong(requestId); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java index aa5b4933894..faa363d1584 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -23,16 +23,13 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.Streamable; -import org.elasticsearch.transport.NotSerializableTransportException; -import org.elasticsearch.transport.RemoteTransportException; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.*; import java.io.IOException; import java.io.NotSerializableException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class LocalTransportChannel implements TransportChannel { @@ -57,6 +54,10 @@ public class LocalTransportChannel implements TransportChannel { } @Override public void sendResponse(Streamable message) throws IOException { + sendResponse(message, TransportResponseOptions.EMPTY); + } + + @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); stream.writeLong(requestId); byte status = 0; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index c7e909c228b..600442ab7b8 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -53,7 +53,6 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; -import javax.annotation.Nullable; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -383,7 +382,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; - @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, @Nullable TransportRequestOptions options) throws IOException, TransportException { + @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, TransportRequestOptions options) throws IOException, TransportException { Channel targetChannel = nodeChannel(node); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index b551687c6e2..7276c5fcaa9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.netty.channel.Channel; import org.elasticsearch.transport.NotSerializableTransportException; import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportResponseOptions; import java.io.IOException; import java.io.NotSerializableException; @@ -62,6 +63,10 @@ public class NettyTransportChannel implements TransportChannel { } @Override public void sendResponse(Streamable message) throws IOException { + sendResponse(message, TransportResponseOptions.EMPTY); + } + + @Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException { HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeLong(requestId); @@ -69,6 +74,7 @@ public class NettyTransportChannel implements TransportChannel { status = setResponse(status); stream.writeByte(status); // 0 for request, 1 for response. message.writeTo(stream); + stream.flush(); byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); buffer.setInt(0, buffer.writerIndex() - 4); // update real size.