From ec908ddfd652cc66639653d98021b6cce9894f38 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 10 Dec 2015 12:20:19 +0100 Subject: [PATCH] Use transport service to handle RetryOnReplicaException and execute replica action on the current node. Transport service will delegate to threadpool internally. --- .../TransportReplicationAction.java | 8 +- .../TransportChannelResponseHandler.java | 76 +++++++++++++++++++ 2 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 26c439c0a3d..d17cc02c5b0 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -300,11 +300,15 @@ public abstract class TransportReplicationAction handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage); + transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); } @Override diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java new file mode 100644 index 00000000000..8c042cd1937 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; + +/** + * Base class for delegating transport response to a transport channel + */ +public abstract class TransportChannelResponseHandler implements TransportResponseHandler { + + /** + * Convenience method for delegating an empty response to the provided changed + */ + public static TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { + return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { + @Override + public TransportResponse.Empty newInstance() { + return TransportResponse.Empty.INSTANCE; + } + }; + } + + private final ESLogger logger; + private final TransportChannel channel; + private final String extraInfoOnError; + + protected TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { + this.logger = logger; + this.channel = channel; + this.extraInfoOnError = extraInfoOnError; + } + + @Override + public void handleResponse(T response) { + try { + channel.sendResponse(response); + } catch (IOException e) { + handleException(new TransportException(e)); + } + } + + @Override + public void handleException(TransportException exp) { + try { + channel.sendResponse(exp); + } catch (IOException e) { + logger.debug("failed to send failure {}", e, extraInfoOnError == null ? "" : "(" + extraInfoOnError + ")"); + } + } + + @Override + public String executor() { + return ThreadPool.Names.SAME; + } +}