From 0f5ff617f134e095b89a7eaa04e4006039af7fff Mon Sep 17 00:00:00 2001 From: kimchy Date: Mon, 22 Feb 2010 01:19:06 +0200 Subject: [PATCH] when failing to send a message using the transport (connect / serialization), call the response handler with it by default --- .../org/elasticsearch/ExceptionsHelper.java | 2 ++ .../TransportClientNodesService.java | 3 ++ .../transport/RemoteTransportException.java | 4 +++ .../SendRequestTransportException.java | 36 +++++++++++++++++++ .../transport/TransportService.java | 30 +++++++++++++--- 5 files changed, 70 insertions(+), 5 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/transport/SendRequestTransportException.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/ExceptionsHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/ExceptionsHelper.java index 658523915a7..981da3d0608 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/ExceptionsHelper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/ExceptionsHelper.java @@ -45,7 +45,9 @@ public final class ExceptionsHelper { StringBuilder sb = new StringBuilder(); while (t != null) { if (t.getMessage() != null) { + sb.append(t.getClass().getSimpleName()).append("{"); sb.append(t.getMessage()); + sb.append("}"); if (!newLines) { sb.append("; "); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 9add94d60f1..4dde28a8739 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -87,6 +87,9 @@ public class TransportClientNodesService extends AbstractComponent implements Cl this.nodesSamplerInterval = componentSettings.getAsTime("nodesSamplerInterval", timeValueSeconds(1)); this.nodesSamplerFuture = threadPool.scheduleWithFixedDelay(nodesSampler, nodesSamplerInterval); + + // we want the transport service to throw connect exceptions, so we can retry + transportService.throwConnectException(true); } public ImmutableList transportAddresses() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/RemoteTransportException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/RemoteTransportException.java index 9d11feec84d..4abae142a50 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/RemoteTransportException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/RemoteTransportException.java @@ -54,6 +54,10 @@ public class RemoteTransportException extends TransportException implements Elas return null; } + protected Throwable fillStack() { + return super.fillInStackTrace(); + } + private static String buildMessage(String name, TransportAddress address, String action) { StringBuilder sb = new StringBuilder(); if (name != null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/SendRequestTransportException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/SendRequestTransportException.java new file mode 100644 index 00000000000..5d2c219ca17 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/SendRequestTransportException.java @@ -0,0 +1,36 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.Node; + +/** + * @author kimchy (shay.banon) + */ +public class SendRequestTransportException extends RemoteTransportException { + + public SendRequestTransportException(Node node, String action, Throwable cause) { + super(node.name(), node.address(), action, cause); + } + + @Override public Throwable fillInStackTrace() { + return fillStack(); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java index b1b2935960b..c50912a7573 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportService.java @@ -30,7 +30,6 @@ import org.elasticsearch.util.io.Streamable; import org.elasticsearch.util.settings.Settings; import org.elasticsearch.util.transport.BoundTransportAddress; -import java.io.IOException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; @@ -38,7 +37,7 @@ import static org.elasticsearch.util.concurrent.ConcurrentMaps.*; import static org.elasticsearch.util.settings.ImmutableSettings.Builder.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TransportService extends AbstractComponent implements LifecycleComponent { @@ -52,6 +51,8 @@ public class TransportService extends AbstractComponent implements LifecycleComp final AtomicLong requestIds = new AtomicLong(); + private boolean throwConnectException = false; + public TransportService(Transport transport) { this(EMPTY_SETTINGS, transport); } @@ -124,6 +125,17 @@ public class TransportService extends AbstractComponent implements LifecycleComp } } + /** + * Set to true to indicate that a {@link ConnectTransportException} should be thrown when + * sending a message (otherwise, it will be passed to the response handler). Defaults to false. + * + *

This is useful when logic based on connect failure is needed without having to wrap the handler, + * for example, in case of retries across several nodes. + */ + public void throwConnectException(boolean throwConnectException) { + this.throwConnectException = throwConnectException; + } + public TransportFuture submitRequest(Node node, String action, Streamable message, TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture(handler); @@ -133,12 +145,20 @@ public class TransportService extends AbstractComponent implements LifecycleComp public void sendRequest(Node node, String action, Streamable message, TransportResponseHandler handler) throws TransportException { + final long requestId = newRequestId(); try { - final long requestId = newRequestId(); clientHandlers.put(requestId, handler); transport.sendRequest(node, requestId, action, message, handler); - } catch (IOException e) { - throw new TransportException("Can't serialize request", e); + } catch (Exception e) { + // usually happen either because we failed to connect to the node + // or because we failed serializing the message + clientHandlers.remove(requestId); + if (throwConnectException) { + if (e instanceof ConnectTransportException) { + throw (ConnectTransportException) e; + } + } + handler.handleException(new SendRequestTransportException(node, action, e)); } }