From 847db717c66509a817e1b965226ee1e44c08918d Mon Sep 17 00:00:00 2001 From: kimchy Date: Thu, 11 Feb 2010 19:29:25 +0200 Subject: [PATCH] Transport: Support local (JVM level) transport. Closes #3. --- .../discovery/DiscoveryModule.java | 2 +- .../discovery/local/LocalDiscovery.java | 5 +- .../transport/TransportModule.java | 17 +- .../transport/local/LocalTransport.java | 236 ++++++++++++++++++ .../local/LocalTransportChannel.java | 105 ++++++++ .../local/LocalTransportManagement.java | 36 +++ .../transport/local/LocalTransportModule.java | 35 +++ .../netty/NettyTransportChannel.java | 4 +- .../netty/NettyTransportManagement.java | 2 +- .../util/transport/LocalTransportAddress.java | 74 ++++++ .../TransportAddressSerializers.java | 1 + .../local/SimpleLocalTransportTests.java | 168 +++++++++++++ 12 files changed, 673 insertions(+), 12 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportManagement.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/LocalTransportAddress.java create mode 100644 modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java index 2026346ea6d..59a031e2419 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java @@ -40,7 +40,7 @@ public class DiscoveryModule extends AbstractModule { @Override protected void configure() { - Class defaultDiscoveryModule = null; + Class defaultDiscoveryModule; if (settings.getAsBoolean("node.local", false)) { defaultDiscoveryModule = LocalDiscoveryModule.class; } else { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index fc512a1d1e0..55b4cfdd2d0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -34,6 +34,7 @@ import org.elasticsearch.util.settings.Settings; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -42,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong; import static com.google.common.collect.Sets.*; import static org.elasticsearch.cluster.ClusterState.*; -import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; /** * @author kimchy (Shay Banon) @@ -67,7 +67,8 @@ public class LocalDiscovery extends AbstractComponent implements Discovery { private final CopyOnWriteArrayList initialStateListeners = new CopyOnWriteArrayList(); - private static final ConcurrentMap clusterGroups = newConcurrentMap(); + // use CHM here and not ConcurrentMaps#new since we want to be able to agentify this using TC later on... + private static final ConcurrentMap clusterGroups = new ConcurrentHashMap(); private static final AtomicLong nodeIdGenerator = new AtomicLong(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportModule.java index e15295d6971..962b4b5eb06 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.transport; import com.google.inject.AbstractModule; import com.google.inject.Module; +import org.elasticsearch.transport.local.LocalTransportModule; import org.elasticsearch.util.Classes; import org.elasticsearch.util.settings.Settings; @@ -42,12 +43,16 @@ public class TransportModule extends AbstractModule { bind(TransportService.class).asEagerSingleton(); bind(TransportServiceManagement.class).asEagerSingleton(); - Class defaultTransportModule = null; - try { - Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransport"); - defaultTransportModule = (Class) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransportModule"); - } catch (ClassNotFoundException e) { - // TODO default to the local one + Class defaultTransportModule; + if (settings.getAsBoolean("node.local", false)) { + defaultTransportModule = LocalTransportModule.class; + } else { + try { + Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransport"); + defaultTransportModule = (Class) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransportModule"); + } catch (ClassNotFoundException e) { + defaultTransportModule = LocalTransportModule.class; + } } Class moduleClass = settings.getAsClass("transport.type", defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java new file mode 100644 index 00000000000..ff3723bc6be --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransport.java @@ -0,0 +1,236 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.local; + +import com.google.inject.Inject; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.cluster.node.Node; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.elasticsearch.util.Nullable; +import org.elasticsearch.util.component.AbstractComponent; +import org.elasticsearch.util.component.Lifecycle; +import org.elasticsearch.util.io.*; +import org.elasticsearch.util.settings.ImmutableSettings; +import org.elasticsearch.util.settings.Settings; +import org.elasticsearch.util.transport.BoundTransportAddress; +import org.elasticsearch.util.transport.LocalTransportAddress; +import org.elasticsearch.util.transport.TransportAddress; + +import java.io.DataInputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.elasticsearch.transport.Transport.Helper.*; +import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; + +/** + * @author kimchy (Shay Banon) + */ +public class LocalTransport extends AbstractComponent implements Transport { + + private final Lifecycle lifecycle = new Lifecycle(); + + private final ThreadPool threadPool; + + private volatile TransportServiceAdapter transportServiceAdapter; + + private volatile BoundTransportAddress boundAddress; + + private volatile LocalTransportAddress localAddress; + + private final static ConcurrentMap transports = newConcurrentMap(); + + private static final AtomicLong transportAddressIdGenerator = new AtomicLong(); + + public LocalTransport(ThreadPool threadPool) { + this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool); + } + + @Inject public LocalTransport(Settings settings, ThreadPool threadPool) { + super(settings); + this.threadPool = threadPool; + } + + @Override public Lifecycle.State lifecycleState() { + return this.lifecycle.state(); + } + + @Override public Transport start() throws ElasticSearchException { + if (!lifecycle.moveToStarted()) { + return this; + } + localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet())); + transports.put(localAddress, this); + boundAddress = new BoundTransportAddress(localAddress, localAddress); + return this; + } + + @Override public Transport stop() throws ElasticSearchException { + if (!lifecycle.moveToStopped()) { + return this; + } + transports.remove(localAddress); + return this; + } + + @Override public void close() throws ElasticSearchException { + if (lifecycle.started()) { + stop(); + } + if (!lifecycle.moveToClosed()) { + return; + } + } + + @Override public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) { + this.transportServiceAdapter = transportServiceAdapter; + } + + @Override public BoundTransportAddress boundAddress() { + return boundAddress; + } + + @Override public void nodesAdded(Iterable nodes) { + } + + @Override public void nodesRemoved(Iterable nodes) { + } + + @Override public void sendRequest(final Node node, final long requestId, final String action, + final Streamable message, final TransportResponseHandler handler) throws IOException, TransportException { + ByteArrayDataOutputStream stream = ByteArrayDataOutputStream.Cached.cached(); + + stream.writeLong(requestId); + byte status = 0; + status = setRequest(status); + stream.writeByte(status); // 0 for request, 1 for response. + + stream.writeUTF(action); + message.writeTo(stream); + + final LocalTransport targetTransport = transports.get(node.address()); + if (targetTransport == null) { + throw new ConnectTransportException(node, "Failed to connect"); + } + + final byte[] data = stream.copiedByteArray(); + threadPool.execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, LocalTransport.this, handler); + } + }); + } + + ThreadPool threadPool() { + return this.threadPool; + } + + void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) { + ByteArrayDataInputStream stream = new ByteArrayDataInputStream(data); + + try { + long requestId = stream.readLong(); + byte status = stream.readByte(); + boolean isRequest = isRequest(status); + + if (isRequest) { + handleRequest(stream, requestId, sourceTransport); + } else { + final TransportResponseHandler handler = transportServiceAdapter.remove(requestId); + if (handler == null) { + throw new ResponseHandlerNotFoundTransportException(requestId); + } + if (Transport.Helper.isError(status)) { + handlerResponseError(stream, handler); + } else { + handleResponse(stream, handler); + } + } + } catch (Exception e) { + if (responseHandler != null) { + responseHandler.handleException(new RemoteTransportException(nodeName(), localAddress, action, e)); + } else { + logger.warn("Failed to receive message for action [" + action + "]", e); + } + } + } + + private void handleRequest(DataInputStream stream, long requestId, LocalTransport sourceTransport) throws Exception { + final String action = stream.readUTF(); + final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId); + final TransportRequestHandler handler = transportServiceAdapter.handler(action); + if (handler == null) { + throw new ActionNotFoundTransportException("Action [" + action + "] not found"); + } + final Streamable streamable = handler.newInstance(); + streamable.readFrom(stream); + handler.messageReceived(streamable, transportChannel); + } + + + private void handleResponse(DataInputStream buffer, final TransportResponseHandler handler) { + final Streamable streamable = handler.newInstance(); + try { + streamable.readFrom(buffer); + } catch (Exception e) { + handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e)); + return; + } + if (handler.spawn()) { + threadPool.execute(new Runnable() { + @SuppressWarnings({"unchecked"}) @Override public void run() { + try { + handler.handleResponse(streamable); + } catch (Exception e) { + handleException(handler, new ResponseHandlerFailureTransportException("Failed to handler response", e)); + } + } + }); + } else { + try { + //noinspection unchecked + handler.handleResponse(streamable); + } catch (Exception e) { + handleException(handler, new ResponseHandlerFailureTransportException("Failed to handler response", e)); + } + } + } + + private void handlerResponseError(DataInputStream buffer, final TransportResponseHandler handler) { + Throwable error; + try { + ThrowableObjectInputStream ois = new ThrowableObjectInputStream(new DataInputInputStream(buffer)); + error = (Throwable) ois.readObject(); + } catch (Exception e) { + error = new TransportSerializationException("Failed to deserialize exception response from stream", e); + } + handleException(handler, error); + } + + private void handleException(final TransportResponseHandler handler, Throwable error) { + if (!(error instanceof RemoteTransportException)) { + error = new RemoteTransportException("None remote transport exception", error); + } + final RemoteTransportException rtx = (RemoteTransportException) error; + handler.handleException(rtx); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java new file mode 100644 index 00000000000..afd5ac9af51 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportChannel.java @@ -0,0 +1,105 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.local; + +import org.elasticsearch.transport.NotSerializableTransportException; +import org.elasticsearch.transport.RemoteTransportException; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.util.io.ByteArrayDataOutputStream; +import org.elasticsearch.util.io.Streamable; +import org.elasticsearch.util.io.ThrowableObjectOutputStream; + +import java.io.IOException; +import java.io.NotSerializableException; + +/** + * @author kimchy (Shay Banon) + */ +public class LocalTransportChannel implements TransportChannel { + + private final LocalTransport sourceTransport; + + // the transport we will *send to* + private final LocalTransport targetTransport; + + private final String action; + + private final long requestId; + + public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId) { + this.sourceTransport = sourceTransport; + this.targetTransport = targetTransport; + this.action = action; + this.requestId = requestId; + } + + @Override public String action() { + return action; + } + + @Override public void sendResponse(Streamable message) throws IOException { + ByteArrayDataOutputStream stream = ByteArrayDataOutputStream.Cached.cached(); + stream.writeLong(requestId); + byte status = 0; + status = Transport.Helper.setResponse(status); + stream.writeByte(status); // 0 for request, 1 for response. + message.writeTo(stream); + final byte[] data = stream.copiedByteArray(); + targetTransport.threadPool().execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, sourceTransport, null); + } + }); + } + + @Override public void sendResponse(Throwable error) throws IOException { + ByteArrayDataOutputStream stream; + try { + stream = ByteArrayDataOutputStream.Cached.cached(); + writeResponseExceptionHeader(stream); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); + } catch (NotSerializableException e) { + stream = ByteArrayDataOutputStream.Cached.cached(); + writeResponseExceptionHeader(stream); + RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error)); + ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream); + too.writeObject(tx); + too.close(); + } + final byte[] data = stream.copiedByteArray(); + targetTransport.threadPool().execute(new Runnable() { + @Override public void run() { + targetTransport.messageReceived(data, action, sourceTransport, null); + } + }); + } + + private void writeResponseExceptionHeader(ByteArrayDataOutputStream stream) throws IOException { + stream.writeLong(requestId); + byte status = 0; + status = Transport.Helper.setResponse(status); + status = Transport.Helper.setError(status); + stream.writeByte(status); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportManagement.java new file mode 100644 index 00000000000..f93fedb7d83 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportManagement.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.local; + +import com.google.inject.Inject; +import org.elasticsearch.jmx.MBean; + +/** + * @author kimchy (Shay Banon) + */ +@MBean(objectName = "service=transport,transportType=local", description = "Local Transport") +public class LocalTransportManagement { + + private final LocalTransport transport; + + @Inject public LocalTransportManagement(LocalTransport transport) { + this.transport = transport; + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java new file mode 100644 index 00000000000..d3f33c8ca9e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.local; + +import com.google.inject.AbstractModule; +import org.elasticsearch.transport.Transport; + +/** + * @author kimchy (Shay Banon) + */ +public class LocalTransportModule extends AbstractModule { + + @Override protected void configure() { + bind(LocalTransport.class).asEagerSingleton(); + bind(Transport.class).to(LocalTransport.class).asEagerSingleton(); + bind(LocalTransportManagement.class).asEagerSingleton(); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index bb473a06149..ca9d55b175e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -90,13 +90,13 @@ public class NettyTransportChannel implements TransportChannel { os.flush(); buffer.markWriterIndex(); try { - RemoteTransportException tx = new RemoteTransportException(transport.settings().get("name"), transport.wrapAddress(channel.getLocalAddress()), action, error); + RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(os); too.writeObject(tx); too.close(); } catch (NotSerializableException e) { buffer.resetWriterIndex(); - RemoteTransportException tx = new RemoteTransportException(transport.settings().get("name"), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); + RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error)); ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(os); too.writeObject(tx); too.close(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java index 8f11b2aa2b1..6496942e91b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransportManagement.java @@ -29,7 +29,7 @@ import org.elasticsearch.jmx.ManagedAttribute; @MBean(objectName = "service=transport,transportType=netty", description = "Netty Transport") public class NettyTransportManagement { - private NettyTransport transport; + private final NettyTransport transport; @Inject public NettyTransportManagement(NettyTransport transport) { this.transport = transport; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/LocalTransportAddress.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/LocalTransportAddress.java new file mode 100644 index 00000000000..a919d19043f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/LocalTransportAddress.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.util.transport; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * @author kimchy (Shay Banon) + */ +public class LocalTransportAddress implements TransportAddress { + + private String id; + + LocalTransportAddress() { + } + + public LocalTransportAddress(String id) { + this.id = id; + } + + public String id() { + return this.id; + } + + @Override public short uniqueAddressTypeId() { + return 2; + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + id = in.readUTF(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeUTF(id); + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LocalTransportAddress that = (LocalTransportAddress) o; + + if (id != null ? !id.equals(that.id) : that.id != null) return false; + + return true; + } + + @Override public int hashCode() { + return id != null ? id.hashCode() : 0; + } + + @Override public String toString() { + return "local[" + id + "]"; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/TransportAddressSerializers.java b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/TransportAddressSerializers.java index 7f9f716400e..a5012a63cff 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/TransportAddressSerializers.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/util/transport/TransportAddressSerializers.java @@ -49,6 +49,7 @@ public abstract class TransportAddressSerializers { try { addAddressType(DummyTransportAddress.INSTANCE); addAddressType(new InetSocketTransportAddress()); + addAddressType(new LocalTransportAddress()); } catch (Exception e) { logger.warn("Failed to add InetSocketTransportAddress", e); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java new file mode 100644 index 00000000000..00806ca672c --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/local/SimpleLocalTransportTests.java @@ -0,0 +1,168 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.local; + +import org.elasticsearch.cluster.node.Node; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.dynamic.DynamicThreadPool; +import org.elasticsearch.transport.*; +import org.elasticsearch.util.io.Streamable; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +public class SimpleLocalTransportTests { + + private ThreadPool threadPool; + + private TransportService serviceA; + private TransportService serviceB; + private Node serviceANode; + private Node serviceBNode; + + @BeforeClass public void setUp() { + threadPool = new DynamicThreadPool(); + + serviceA = new TransportService(new LocalTransport(threadPool)).start(); + serviceANode = new Node("A", serviceA.boundAddress().publishAddress()); + + serviceB = new TransportService(new LocalTransport(threadPool)).start(); + serviceBNode = new Node("B", serviceB.boundAddress().publishAddress()); + } + + @AfterClass public void tearDown() { + serviceA.close(); + serviceB.close(); + + threadPool.shutdown(); + } + + @Test public void testHelloWorld() { + serviceA.registerHandler("sayHello", new BaseTransportRequestHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void messageReceived(StringMessage request, TransportChannel channel) { + System.out.println("got message: " + request.message); + assertThat("moshe", equalTo(request.message)); + try { + channel.sendResponse(new StringMessage("hello " + request.message)); + } catch (IOException e) { + e.printStackTrace(); + assertThat(e.getMessage(), false, equalTo(true)); + } + } + }); + + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHello", + new StringMessage("moshe"), new BaseTransportResponseHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void handleResponse(StringMessage response) { + System.out.println("got response: " + response.message); + assertThat("hello moshe", equalTo(response.message)); + } + + @Override public void handleException(RemoteTransportException exp) { + exp.printStackTrace(); + assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true)); + } + }); + + try { + StringMessage message = res.get(); + assertThat("hello moshe", equalTo(message.message)); + } catch (Exception e) { + assertThat(e.getMessage(), false, equalTo(true)); + } + + System.out.println("after ..."); + } + + @Test public void testErrorMessage() { + serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception { + System.out.println("got message: " + request.message); + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); + + TransportFuture res = serviceB.submitRequest(serviceANode, "sayHelloException", + new StringMessage("moshe"), new BaseTransportResponseHandler() { + @Override public StringMessage newInstance() { + return new StringMessage(); + } + + @Override public void handleResponse(StringMessage response) { + assertThat("got response instead of exception", false, equalTo(true)); + } + + @Override public void handleException(RemoteTransportException exp) { + assertThat("bad message !!!", equalTo(exp.getCause().getMessage())); + } + }); + + try { + res.txGet(); + assertThat("exception should be thrown", false, equalTo(true)); + } catch (Exception e) { + assertThat("bad message !!!", equalTo(e.getCause().getMessage())); + } + + System.out.println("after ..."); + + } + + private class StringMessage implements Streamable { + + private String message; + + private StringMessage(String message) { + this.message = message; + } + + private StringMessage() { + } + + @Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException { + message = in.readUTF(); + } + + @Override public void writeTo(DataOutput out) throws IOException { + out.writeUTF(message); + } + } + +} \ No newline at end of file