minor transport refactoring, add options to send response, though no options to set yet

This commit is contained in:
kimchy 2010-08-14 02:25:58 +03:00
parent b3afca2589
commit 7c5f0fe405
9 changed files with 64 additions and 22 deletions

View File

@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
/** /**
@ -34,8 +33,8 @@ import java.io.IOException;
public interface Transport extends LifecycleComponent<Transport> { public interface Transport extends LifecycleComponent<Transport> {
class Helper { class Helper {
public static final byte TRANSPORT_TYPE = 1; public static final byte TRANSPORT_TYPE = 1 << 0;
public static final byte RESPONSE_TYPE = 1 << 1; public static final byte ERROR = 1 << 1;
public static boolean isRequest(byte value) { public static boolean isRequest(byte value) {
return (value & TRANSPORT_TYPE) == 0; return (value & TRANSPORT_TYPE) == 0;
@ -52,14 +51,13 @@ public interface Transport extends LifecycleComponent<Transport> {
} }
public static boolean isError(byte value) { public static boolean isError(byte value) {
return (value & RESPONSE_TYPE) != 0; return (value & ERROR) != 0;
} }
public static byte setError(byte value) { public static byte setError(byte value) {
value |= RESPONSE_TYPE; value |= ERROR;
return value; return value;
} }
} }
void transportServiceAdapter(TransportServiceAdapter service); void transportServiceAdapter(TransportServiceAdapter service);
@ -97,5 +95,5 @@ public interface Transport extends LifecycleComponent<Transport> {
/** /**
* Sends the request to the node. * Sends the request to the node.
*/ */
<T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException; <T extends Streamable> void sendRequest(DiscoveryNode node, long requestId, String action, Streamable message, TransportRequestOptions options) throws IOException, TransportException;
} }

View File

@ -24,12 +24,17 @@ import org.elasticsearch.common.io.stream.Streamable;
import java.io.IOException; import java.io.IOException;
/** /**
* @author kimchy (Shay Banon) * A transport channel allows to send a response to a request on the channel.
*
* @author kimchy (shay.banon)
*/ */
public interface TransportChannel { public interface TransportChannel {
String action(); String action();
void sendResponse(Streamable message) throws IOException; void sendResponse(Streamable message) throws IOException;
void sendResponse(Streamable message, TransportResponseOptions options) throws IOException;
void sendResponse(Throwable error) throws IOException; void sendResponse(Throwable error) throws IOException;
} }

View File

@ -26,6 +26,8 @@ import org.elasticsearch.common.unit.TimeValue;
*/ */
public class TransportRequestOptions { public class TransportRequestOptions {
public static TransportRequestOptions EMPTY = options();
public static TransportRequestOptions options() { public static TransportRequestOptions options() {
return new TransportRequestOptions(); return new TransportRequestOptions();
} }

View File

@ -0,0 +1,32 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
/**
* @author kimchy (shay.banon)
*/
public class TransportResponseOptions {
public static final TransportResponseOptions EMPTY = options();
public static TransportResponseOptions options() {
return new TransportResponseOptions();
}
}

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.timer.TimerService; import org.elasticsearch.timer.TimerService;
import javax.annotation.Nullable;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -156,11 +155,11 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message, public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message,
TransportResponseHandler<T> handler) throws TransportException { TransportResponseHandler<T> handler) throws TransportException {
return submitRequest(node, action, message, null, handler); return submitRequest(node, action, message, TransportRequestOptions.EMPTY, handler);
} }
public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message, public <T extends Streamable> TransportFuture<T> submitRequest(DiscoveryNode node, String action, Streamable message,
@Nullable TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException { TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler); PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler);
sendRequest(node, action, message, options, futureHandler); sendRequest(node, action, message, options, futureHandler);
return futureHandler; return futureHandler;
@ -168,15 +167,15 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message, public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
final TransportResponseHandler<T> handler) throws TransportException { final TransportResponseHandler<T> handler) throws TransportException {
sendRequest(node, action, message, null, handler); sendRequest(node, action, message, TransportRequestOptions.EMPTY, handler);
} }
public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message, public <T extends Streamable> void sendRequest(final DiscoveryNode node, final String action, final Streamable message,
@Nullable final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException { final TransportRequestOptions options, final TransportResponseHandler<T> handler) throws TransportException {
final long requestId = newRequestId(); final long requestId = newRequestId();
Timeout timeoutX = null; Timeout timeoutX = null;
try { try {
if (options != null && options.timeout() != null) { if (options.timeout() != null) {
timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED); timeoutX = timerService.newTimeout(new TimeoutTimerTask(requestId), options.timeout(), TimerService.ExecutionType.THREADED);
} }
clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX)); clientHandlers.put(requestId, new RequestHolder<T>(handler, node, action, timeoutX));

View File

@ -134,7 +134,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
} }
} }
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, @Nullable TransportRequestOptions options) throws IOException, TransportException { @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeLong(requestId); stream.writeLong(requestId);

View File

@ -23,16 +23,13 @@ import org.elasticsearch.common.io.ThrowableObjectOutputStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.HandlesStreamOutput; import org.elasticsearch.common.io.stream.HandlesStreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.transport.NotSerializableTransportException; import org.elasticsearch.transport.*;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import java.io.IOException; import java.io.IOException;
import java.io.NotSerializableException; import java.io.NotSerializableException;
/** /**
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public class LocalTransportChannel implements TransportChannel { public class LocalTransportChannel implements TransportChannel {
@ -57,6 +54,10 @@ public class LocalTransportChannel implements TransportChannel {
} }
@Override public void sendResponse(Streamable message) throws IOException { @Override public void sendResponse(Streamable message) throws IOException {
sendResponse(message, TransportResponseOptions.EMPTY);
}
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeLong(requestId); stream.writeLong(requestId);
byte status = 0; byte status = 0;

View File

@ -53,7 +53,6 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -383,7 +382,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private static final byte[] LENGTH_PLACEHOLDER = new byte[4]; private static final byte[] LENGTH_PLACEHOLDER = new byte[4];
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, @Nullable TransportRequestOptions options) throws IOException, TransportException { @Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable streamable, TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node); Channel targetChannel = nodeChannel(node);

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.transport.NotSerializableTransportException; import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportResponseOptions;
import java.io.IOException; import java.io.IOException;
import java.io.NotSerializableException; import java.io.NotSerializableException;
@ -62,6 +63,10 @@ public class NettyTransportChannel implements TransportChannel {
} }
@Override public void sendResponse(Streamable message) throws IOException { @Override public void sendResponse(Streamable message) throws IOException {
sendResponse(message, TransportResponseOptions.EMPTY);
}
@Override public void sendResponse(Streamable message, TransportResponseOptions options) throws IOException {
HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles(); HandlesStreamOutput stream = BytesStreamOutput.Cached.cachedHandles();
stream.writeBytes(LENGTH_PLACEHOLDER); // fake size stream.writeBytes(LENGTH_PLACEHOLDER); // fake size
stream.writeLong(requestId); stream.writeLong(requestId);
@ -69,6 +74,7 @@ public class NettyTransportChannel implements TransportChannel {
status = setResponse(status); status = setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response. stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream); message.writeTo(stream);
stream.flush();
byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray(); byte[] data = ((BytesStreamOutput) stream.wrappedOut()).copiedByteArray();
ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data); ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(data);
buffer.setInt(0, buffer.writerIndex() - 4); // update real size. buffer.setInt(0, buffer.writerIndex() - 4); // update real size.