Transport: Support local (JVM level) transport. Closes #3.

This commit is contained in:
kimchy 2010-02-11 19:29:25 +02:00
parent b61964a2b8
commit 847db717c6
12 changed files with 673 additions and 12 deletions

View File

@ -40,7 +40,7 @@ public class DiscoveryModule extends AbstractModule {
@Override
protected void configure() {
Class<? extends Module> defaultDiscoveryModule = null;
Class<? extends Module> defaultDiscoveryModule;
if (settings.getAsBoolean("node.local", false)) {
defaultDiscoveryModule = LocalDiscoveryModule.class;
} else {

View File

@ -34,6 +34,7 @@ import org.elasticsearch.util.settings.Settings;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@ -42,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
import static com.google.common.collect.Sets.*;
import static org.elasticsearch.cluster.ClusterState.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/**
* @author kimchy (Shay Banon)
@ -67,7 +67,8 @@ public class LocalDiscovery extends AbstractComponent implements Discovery {
private final CopyOnWriteArrayList<InitialStateDiscoveryListener> initialStateListeners = new CopyOnWriteArrayList<InitialStateDiscoveryListener>();
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = newConcurrentMap();
// use CHM here and not ConcurrentMaps#new since we want to be able to agentify this using TC later on...
private static final ConcurrentMap<ClusterName, ClusterGroup> clusterGroups = new ConcurrentHashMap<ClusterName, ClusterGroup>();
private static final AtomicLong nodeIdGenerator = new AtomicLong();

View File

@ -21,6 +21,7 @@ package org.elasticsearch.transport;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import org.elasticsearch.transport.local.LocalTransportModule;
import org.elasticsearch.util.Classes;
import org.elasticsearch.util.settings.Settings;
@ -42,12 +43,16 @@ public class TransportModule extends AbstractModule {
bind(TransportService.class).asEagerSingleton();
bind(TransportServiceManagement.class).asEagerSingleton();
Class<? extends Module> defaultTransportModule = null;
Class<? extends Module> defaultTransportModule;
if (settings.getAsBoolean("node.local", false)) {
defaultTransportModule = LocalTransportModule.class;
} else {
try {
Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransport");
defaultTransportModule = (Class<? extends Module>) Classes.getDefaultClassLoader().loadClass("org.elasticsearch.transport.netty.NettyTransportModule");
} catch (ClassNotFoundException e) {
// TODO default to the local one
defaultTransportModule = LocalTransportModule.class;
}
}
Class<? extends Module> moduleClass = settings.getAsClass("transport.type", defaultTransportModule, "org.elasticsearch.transport.", "TransportModule");

View File

@ -0,0 +1,236 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import com.google.inject.Inject;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.Nullable;
import org.elasticsearch.util.component.AbstractComponent;
import org.elasticsearch.util.component.Lifecycle;
import org.elasticsearch.util.io.*;
import org.elasticsearch.util.settings.ImmutableSettings;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress;
import org.elasticsearch.util.transport.LocalTransportAddress;
import org.elasticsearch.util.transport.TransportAddress;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.transport.Transport.Helper.*;
import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
/**
* @author kimchy (Shay Banon)
*/
public class LocalTransport extends AbstractComponent implements Transport {
private final Lifecycle lifecycle = new Lifecycle();
private final ThreadPool threadPool;
private volatile TransportServiceAdapter transportServiceAdapter;
private volatile BoundTransportAddress boundAddress;
private volatile LocalTransportAddress localAddress;
private final static ConcurrentMap<TransportAddress, LocalTransport> transports = newConcurrentMap();
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
public LocalTransport(ThreadPool threadPool) {
this(ImmutableSettings.Builder.EMPTY_SETTINGS, threadPool);
}
@Inject public LocalTransport(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
}
@Override public Lifecycle.State lifecycleState() {
return this.lifecycle.state();
}
@Override public Transport start() throws ElasticSearchException {
if (!lifecycle.moveToStarted()) {
return this;
}
localAddress = new LocalTransportAddress(Long.toString(transportAddressIdGenerator.incrementAndGet()));
transports.put(localAddress, this);
boundAddress = new BoundTransportAddress(localAddress, localAddress);
return this;
}
@Override public Transport stop() throws ElasticSearchException {
if (!lifecycle.moveToStopped()) {
return this;
}
transports.remove(localAddress);
return this;
}
@Override public void close() throws ElasticSearchException {
if (lifecycle.started()) {
stop();
}
if (!lifecycle.moveToClosed()) {
return;
}
}
@Override public void transportServiceAdapter(TransportServiceAdapter transportServiceAdapter) {
this.transportServiceAdapter = transportServiceAdapter;
}
@Override public BoundTransportAddress boundAddress() {
return boundAddress;
}
@Override public void nodesAdded(Iterable<Node> nodes) {
}
@Override public void nodesRemoved(Iterable<Node> nodes) {
}
@Override public <T extends Streamable> void sendRequest(final Node node, final long requestId, final String action,
final Streamable message, final TransportResponseHandler<T> handler) throws IOException, TransportException {
ByteArrayDataOutputStream stream = ByteArrayDataOutputStream.Cached.cached();
stream.writeLong(requestId);
byte status = 0;
status = setRequest(status);
stream.writeByte(status); // 0 for request, 1 for response.
stream.writeUTF(action);
message.writeTo(stream);
final LocalTransport targetTransport = transports.get(node.address());
if (targetTransport == null) {
throw new ConnectTransportException(node, "Failed to connect");
}
final byte[] data = stream.copiedByteArray();
threadPool.execute(new Runnable() {
@Override public void run() {
targetTransport.messageReceived(data, action, LocalTransport.this, handler);
}
});
}
ThreadPool threadPool() {
return this.threadPool;
}
void messageReceived(byte[] data, String action, LocalTransport sourceTransport, @Nullable final TransportResponseHandler responseHandler) {
ByteArrayDataInputStream stream = new ByteArrayDataInputStream(data);
try {
long requestId = stream.readLong();
byte status = stream.readByte();
boolean isRequest = isRequest(status);
if (isRequest) {
handleRequest(stream, requestId, sourceTransport);
} else {
final TransportResponseHandler handler = transportServiceAdapter.remove(requestId);
if (handler == null) {
throw new ResponseHandlerNotFoundTransportException(requestId);
}
if (Transport.Helper.isError(status)) {
handlerResponseError(stream, handler);
} else {
handleResponse(stream, handler);
}
}
} catch (Exception e) {
if (responseHandler != null) {
responseHandler.handleException(new RemoteTransportException(nodeName(), localAddress, action, e));
} else {
logger.warn("Failed to receive message for action [" + action + "]", e);
}
}
}
private void handleRequest(DataInputStream stream, long requestId, LocalTransport sourceTransport) throws Exception {
final String action = stream.readUTF();
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, sourceTransport, action, requestId);
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}
final Streamable streamable = handler.newInstance();
streamable.readFrom(stream);
handler.messageReceived(streamable, transportChannel);
}
private void handleResponse(DataInputStream buffer, final TransportResponseHandler handler) {
final Streamable streamable = handler.newInstance();
try {
streamable.readFrom(buffer);
} catch (Exception e) {
handleException(handler, new TransportSerializationException("Failed to deserialize response of type [" + streamable.getClass().getName() + "]", e));
return;
}
if (handler.spawn()) {
threadPool.execute(new Runnable() {
@SuppressWarnings({"unchecked"}) @Override public void run() {
try {
handler.handleResponse(streamable);
} catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException("Failed to handler response", e));
}
}
});
} else {
try {
//noinspection unchecked
handler.handleResponse(streamable);
} catch (Exception e) {
handleException(handler, new ResponseHandlerFailureTransportException("Failed to handler response", e));
}
}
}
private void handlerResponseError(DataInputStream buffer, final TransportResponseHandler handler) {
Throwable error;
try {
ThrowableObjectInputStream ois = new ThrowableObjectInputStream(new DataInputInputStream(buffer));
error = (Throwable) ois.readObject();
} catch (Exception e) {
error = new TransportSerializationException("Failed to deserialize exception response from stream", e);
}
handleException(handler, error);
}
private void handleException(final TransportResponseHandler handler, Throwable error) {
if (!(error instanceof RemoteTransportException)) {
error = new RemoteTransportException("None remote transport exception", error);
}
final RemoteTransportException rtx = (RemoteTransportException) error;
handler.handleException(rtx);
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import org.elasticsearch.transport.NotSerializableTransportException;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.util.io.ByteArrayDataOutputStream;
import org.elasticsearch.util.io.Streamable;
import org.elasticsearch.util.io.ThrowableObjectOutputStream;
import java.io.IOException;
import java.io.NotSerializableException;
/**
* @author kimchy (Shay Banon)
*/
public class LocalTransportChannel implements TransportChannel {
private final LocalTransport sourceTransport;
// the transport we will *send to*
private final LocalTransport targetTransport;
private final String action;
private final long requestId;
public LocalTransportChannel(LocalTransport sourceTransport, LocalTransport targetTransport, String action, long requestId) {
this.sourceTransport = sourceTransport;
this.targetTransport = targetTransport;
this.action = action;
this.requestId = requestId;
}
@Override public String action() {
return action;
}
@Override public void sendResponse(Streamable message) throws IOException {
ByteArrayDataOutputStream stream = ByteArrayDataOutputStream.Cached.cached();
stream.writeLong(requestId);
byte status = 0;
status = Transport.Helper.setResponse(status);
stream.writeByte(status); // 0 for request, 1 for response.
message.writeTo(stream);
final byte[] data = stream.copiedByteArray();
targetTransport.threadPool().execute(new Runnable() {
@Override public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
}
});
}
@Override public void sendResponse(Throwable error) throws IOException {
ByteArrayDataOutputStream stream;
try {
stream = ByteArrayDataOutputStream.Cached.cached();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
stream = ByteArrayDataOutputStream.Cached.cached();
writeResponseExceptionHeader(stream);
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(), targetTransport.boundAddress().boundAddress(), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(stream);
too.writeObject(tx);
too.close();
}
final byte[] data = stream.copiedByteArray();
targetTransport.threadPool().execute(new Runnable() {
@Override public void run() {
targetTransport.messageReceived(data, action, sourceTransport, null);
}
});
}
private void writeResponseExceptionHeader(ByteArrayDataOutputStream stream) throws IOException {
stream.writeLong(requestId);
byte status = 0;
status = Transport.Helper.setResponse(status);
status = Transport.Helper.setError(status);
stream.writeByte(status);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import com.google.inject.Inject;
import org.elasticsearch.jmx.MBean;
/**
* @author kimchy (Shay Banon)
*/
@MBean(objectName = "service=transport,transportType=local", description = "Local Transport")
public class LocalTransportManagement {
private final LocalTransport transport;
@Inject public LocalTransportManagement(LocalTransport transport) {
this.transport = transport;
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import com.google.inject.AbstractModule;
import org.elasticsearch.transport.Transport;
/**
* @author kimchy (Shay Banon)
*/
public class LocalTransportModule extends AbstractModule {
@Override protected void configure() {
bind(LocalTransport.class).asEagerSingleton();
bind(Transport.class).to(LocalTransport.class).asEagerSingleton();
bind(LocalTransportManagement.class).asEagerSingleton();
}
}

View File

@ -90,13 +90,13 @@ public class NettyTransportChannel implements TransportChannel {
os.flush();
buffer.markWriterIndex();
try {
RemoteTransportException tx = new RemoteTransportException(transport.settings().get("name"), transport.wrapAddress(channel.getLocalAddress()), action, error);
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(os);
too.writeObject(tx);
too.close();
} catch (NotSerializableException e) {
buffer.resetWriterIndex();
RemoteTransportException tx = new RemoteTransportException(transport.settings().get("name"), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, new NotSerializableTransportException(error));
ThrowableObjectOutputStream too = new ThrowableObjectOutputStream(os);
too.writeObject(tx);
too.close();

View File

@ -29,7 +29,7 @@ import org.elasticsearch.jmx.ManagedAttribute;
@MBean(objectName = "service=transport,transportType=netty", description = "Netty Transport")
public class NettyTransportManagement {
private NettyTransport transport;
private final NettyTransport transport;
@Inject public NettyTransportManagement(NettyTransport transport) {
this.transport = transport;

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.util.transport;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author kimchy (Shay Banon)
*/
public class LocalTransportAddress implements TransportAddress {
private String id;
LocalTransportAddress() {
}
public LocalTransportAddress(String id) {
this.id = id;
}
public String id() {
return this.id;
}
@Override public short uniqueAddressTypeId() {
return 2;
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
id = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeUTF(id);
}
@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
LocalTransportAddress that = (LocalTransportAddress) o;
if (id != null ? !id.equals(that.id) : that.id != null) return false;
return true;
}
@Override public int hashCode() {
return id != null ? id.hashCode() : 0;
}
@Override public String toString() {
return "local[" + id + "]";
}
}

View File

@ -49,6 +49,7 @@ public abstract class TransportAddressSerializers {
try {
addAddressType(DummyTransportAddress.INSTANCE);
addAddressType(new InetSocketTransportAddress());
addAddressType(new LocalTransportAddress());
} catch (Exception e) {
logger.warn("Failed to add InetSocketTransportAddress", e);
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import org.elasticsearch.cluster.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.dynamic.DynamicThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.util.io.Streamable;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
public class SimpleLocalTransportTests {
private ThreadPool threadPool;
private TransportService serviceA;
private TransportService serviceB;
private Node serviceANode;
private Node serviceBNode;
@BeforeClass public void setUp() {
threadPool = new DynamicThreadPool();
serviceA = new TransportService(new LocalTransport(threadPool)).start();
serviceANode = new Node("A", serviceA.boundAddress().publishAddress());
serviceB = new TransportService(new LocalTransport(threadPool)).start();
serviceBNode = new Node("B", serviceB.boundAddress().publishAddress());
}
@AfterClass public void tearDown() {
serviceA.close();
serviceB.close();
threadPool.shutdown();
}
@Test public void testHelloWorld() {
serviceA.registerHandler("sayHello", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessage("hello " + request.message));
} catch (IOException e) {
e.printStackTrace();
assertThat(e.getMessage(), false, equalTo(true));
}
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
System.out.println("got response: " + response.message);
assertThat("hello moshe", equalTo(response.message));
}
@Override public void handleException(RemoteTransportException exp) {
exp.printStackTrace();
assertThat("got exception instead of a response: " + exp.getMessage(), false, equalTo(true));
}
});
try {
StringMessage message = res.get();
assertThat("hello moshe", equalTo(message.message));
} catch (Exception e) {
assertThat(e.getMessage(), false, equalTo(true));
}
System.out.println("after ...");
}
@Test public void testErrorMessage() {
serviceA.registerHandler("sayHelloException", new BaseTransportRequestHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void messageReceived(StringMessage request, TransportChannel channel) throws Exception {
System.out.println("got message: " + request.message);
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
});
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHelloException",
new StringMessage("moshe"), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() {
return new StringMessage();
}
@Override public void handleResponse(StringMessage response) {
assertThat("got response instead of exception", false, equalTo(true));
}
@Override public void handleException(RemoteTransportException exp) {
assertThat("bad message !!!", equalTo(exp.getCause().getMessage()));
}
});
try {
res.txGet();
assertThat("exception should be thrown", false, equalTo(true));
} catch (Exception e) {
assertThat("bad message !!!", equalTo(e.getCause().getMessage()));
}
System.out.println("after ...");
}
private class StringMessage implements Streamable {
private String message;
private StringMessage(String message) {
this.message = message;
}
private StringMessage() {
}
@Override public void readFrom(DataInput in) throws IOException, ClassNotFoundException {
message = in.readUTF();
}
@Override public void writeTo(DataOutput out) throws IOException {
out.writeUTF(message);
}
}
}