when failing to send a message using the transport (connect / serialization), call the response handler with it by default

This commit is contained in:
kimchy 2010-02-22 01:19:06 +02:00
parent 16a7dd137d
commit 0f5ff617f1
5 changed files with 70 additions and 5 deletions

View File

@ -45,7 +45,9 @@ public final class ExceptionsHelper {
StringBuilder sb = new StringBuilder();
while (t != null) {
if (t.getMessage() != null) {
sb.append(t.getClass().getSimpleName()).append("{");
sb.append(t.getMessage());
sb.append("}");
if (!newLines) {
sb.append("; ");
}

View File

@ -87,6 +87,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl
this.nodesSamplerInterval = componentSettings.getAsTime("nodesSamplerInterval", timeValueSeconds(1));
this.nodesSamplerFuture = threadPool.scheduleWithFixedDelay(nodesSampler, nodesSamplerInterval);
// we want the transport service to throw connect exceptions, so we can retry
transportService.throwConnectException(true);
}
public ImmutableList<TransportAddress> transportAddresses() {

View File

@ -54,6 +54,10 @@ public class RemoteTransportException extends TransportException implements Elas
return null;
}
protected Throwable fillStack() {
return super.fillInStackTrace();
}
private static String buildMessage(String name, TransportAddress address, String action) {
StringBuilder sb = new StringBuilder();
if (name != null) {

View File

@ -0,0 +1,36 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.cluster.node.Node;
/**
* @author kimchy (shay.banon)
*/
public class SendRequestTransportException extends RemoteTransportException {
public SendRequestTransportException(Node node, String action, Throwable cause) {
super(node.name(), node.address(), action, cause);
}
@Override public Throwable fillInStackTrace() {
return fillStack();
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.util.io.Streamable;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.transport.BoundTransportAddress;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
@ -38,7 +37,7 @@ import static org.elasticsearch.util.concurrent.ConcurrentMaps.*;
import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*;
/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TransportService extends AbstractComponent implements LifecycleComponent<TransportService> {
@ -52,6 +51,8 @@ public class TransportService extends AbstractComponent implements LifecycleComp
final AtomicLong requestIds = new AtomicLong();
private boolean throwConnectException = false;
public TransportService(Transport transport) {
this(EMPTY_SETTINGS, transport);
}
@ -124,6 +125,17 @@ public class TransportService extends AbstractComponent implements LifecycleComp
}
}
/**
* Set to <tt>true</tt> to indicate that a {@link ConnectTransportException} should be thrown when
* sending a message (otherwise, it will be passed to the response handler). Defaults to <tt>false</tt>.
*
* <p>This is useful when logic based on connect failure is needed without having to wrap the handler,
* for example, in case of retries across several nodes.
*/
public void throwConnectException(boolean throwConnectException) {
this.throwConnectException = throwConnectException;
}
public <T extends Streamable> TransportFuture<T> submitRequest(Node node, String action, Streamable message,
TransportResponseHandler<T> handler) throws TransportException {
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<T>(handler);
@ -133,12 +145,20 @@ public class TransportService extends AbstractComponent implements LifecycleComp
public <T extends Streamable> void sendRequest(Node node, String action, Streamable message,
TransportResponseHandler<T> handler) throws TransportException {
try {
final long requestId = newRequestId();
try {
clientHandlers.put(requestId, handler);
transport.sendRequest(node, requestId, action, message, handler);
} catch (IOException e) {
throw new TransportException("Can't serialize request", e);
} catch (Exception e) {
// usually happen either because we failed to connect to the node
// or because we failed serializing the message
clientHandlers.remove(requestId);
if (throwConnectException) {
if (e instanceof ConnectTransportException) {
throw (ConnectTransportException) e;
}
}
handler.handleException(new SendRequestTransportException(node, action, e));
}
}