Use transport service to handle RetryOnReplicaException and execute replica action on the current node.

Transport service will delegate to threadpool internally.
This commit is contained in:
Martijn van Groningen 2015-12-10 12:20:19 +01:00
parent 5f7b863067
commit ec908ddfd6
2 changed files with 82 additions and 2 deletions

View File

@ -300,11 +300,15 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
@Override
public void onFailure(Throwable t) {
if (t instanceof RetryOnReplicaException) {
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, actionName, request);
logger.trace("Retrying operation on replica, action [{}], request [{}]", t, transportReplicaAction, request);
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
threadPool.executor(executor).execute(AsyncReplicaAction.this);
// Forking a thread on local node via transport service so that custom transport service have an
// opportunity to execute custom logic before the replica operation begins
String extraMessage = "action [" + transportReplicaAction + "], request[" + request + "]";
TransportChannelResponseHandler<TransportResponse.Empty> handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage);
transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler);
}
@Override

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
/**
* Base class for delegating transport response to a transport channel
*/
public abstract class TransportChannelResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {
/**
* Convenience method for delegating an empty response to the provided changed
*/
public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) {
return new TransportChannelResponseHandler<TransportResponse.Empty>(logger, channel, extraInfoOnError) {
@Override
public TransportResponse.Empty newInstance() {
return TransportResponse.Empty.INSTANCE;
}
};
}
private final ESLogger logger;
private final TransportChannel channel;
private final String extraInfoOnError;
protected TransportChannelResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) {
this.logger = logger;
this.channel = channel;
this.extraInfoOnError = extraInfoOnError;
}
@Override
public void handleResponse(T response) {
try {
channel.sendResponse(response);
} catch (IOException e) {
handleException(new TransportException(e));
}
}
@Override
public void handleException(TransportException exp) {
try {
channel.sendResponse(exp);
} catch (IOException e) {
logger.debug("failed to send failure {}", e, extraInfoOnError == null ? "" : "(" + extraInfoOnError + ")");
}
}
@Override
public String executor() {
return ThreadPool.Names.SAME;
}
}