[Discovery] add a debug log if a node responds to a publish request after publishing timed out.

This commit is contained in:
Boaz Leskes 2014-08-16 15:36:37 +02:00
parent 5932371f21
commit ff8b7409f7
1 changed files with 13 additions and 7 deletions

View File

@ -39,6 +39,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*; import org.elasticsearch.transport.*;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* *
@ -82,12 +83,15 @@ public class PublishClusterStateAction extends AbstractComponent {
publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener)); publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener));
} }
private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { private void publish(final ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) {
DiscoveryNode localNode = nodesProvider.nodes().localNode(); DiscoveryNode localNode = nodesProvider.nodes().localNode();
Map<Version, BytesReference> serializedStates = Maps.newHashMap(); Map<Version, BytesReference> serializedStates = Maps.newHashMap();
final AtomicBoolean timedOutWaitingForNodes = new AtomicBoolean(false);
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
for (final DiscoveryNode node : clusterState.nodes()) { for (final DiscoveryNode node : clusterState.nodes()) {
if (node.equals(localNode)) { if (node.equals(localNode)) {
continue; continue;
@ -122,28 +126,30 @@ public class PublishClusterStateAction extends AbstractComponent {
@Override @Override
public void handleResponse(TransportResponse.Empty response) { public void handleResponse(TransportResponse.Empty response) {
if (timedOutWaitingForNodes.get()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node, clusterState.version(), publishTimeout);
}
publishResponseHandler.onResponse(node); publishResponseHandler.onResponse(node);
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
logger.debug("failed to send cluster state to [{}]", exp, node); logger.debug("failed to send cluster state to {}", exp, node);
publishResponseHandler.onFailure(node, exp); publishResponseHandler.onFailure(node, exp);
} }
}); });
} catch (Throwable t) { } catch (Throwable t) {
logger.debug("error sending cluster state to [{}]", t, node); logger.debug("error sending cluster state to {}", t, node);
publishResponseHandler.onFailure(node, t); publishResponseHandler.onFailure(node, t);
} }
} }
TimeValue publishTimeout = discoverySettings.getPublishTimeout();
if (publishTimeout.millis() > 0) { if (publishTimeout.millis() > 0) {
// only wait if the publish timeout is configured... // only wait if the publish timeout is configured...
try { try {
boolean awaited = publishResponseHandler.awaitAllNodes(publishTimeout); timedOutWaitingForNodes.set(!publishResponseHandler.awaitAllNodes(publishTimeout));
if (!awaited) { if (timedOutWaitingForNodes.get()) {
logger.debug("awaiting all nodes to process published state {} timed out, timeout {}", clusterState.version(), publishTimeout); logger.debug("timed out waiting for all nodes to process published state [{}] (timeout [{}])", clusterState.version(), publishTimeout);
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore & restore interrupt // ignore & restore interrupt