Do not log unsuccessful join attempt each time (#39756)
When performing the test with 57 master-eligible nodes and one node crash, we saw messy elections, when multiple nodes were attempting to become master. JoinHelper has logged 105 long log messages with lengthy stack traces during one such election. To address this, we decided to log these messages every time only on debug level. We will log last unsuccessful join attempt (along with a timestamp) if any with WARN level if the cluster is failing to form. (cherry picked from commit 17a148cc27b5ac6c2e04ef5ae344da05a8a90902)
This commit is contained in:
parent
8f7ab84c5c
commit
9300826d8a
|
@ -54,14 +54,16 @@ public class ClusterFormationFailureHelper {
|
||||||
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
|
private final Supplier<ClusterFormationState> clusterFormationStateSupplier;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final TimeValue clusterFormationWarningTimeout;
|
private final TimeValue clusterFormationWarningTimeout;
|
||||||
|
private final Runnable logLastFailedJoinAttempt;
|
||||||
@Nullable // if no warning is scheduled
|
@Nullable // if no warning is scheduled
|
||||||
private volatile WarningScheduler warningScheduler;
|
private volatile WarningScheduler warningScheduler;
|
||||||
|
|
||||||
public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
|
public ClusterFormationFailureHelper(Settings settings, Supplier<ClusterFormationState> clusterFormationStateSupplier,
|
||||||
ThreadPool threadPool) {
|
ThreadPool threadPool, Runnable logLastFailedJoinAttempt) {
|
||||||
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
|
this.clusterFormationStateSupplier = clusterFormationStateSupplier;
|
||||||
this.threadPool = threadPool;
|
this.threadPool = threadPool;
|
||||||
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
|
this.clusterFormationWarningTimeout = DISCOVERY_CLUSTER_FORMATION_WARNING_TIMEOUT_SETTING.get(settings);
|
||||||
|
this.logLastFailedJoinAttempt = logLastFailedJoinAttempt;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRunning() {
|
public boolean isRunning() {
|
||||||
|
@ -94,6 +96,7 @@ public class ClusterFormationFailureHelper {
|
||||||
@Override
|
@Override
|
||||||
protected void doRun() {
|
protected void doRun() {
|
||||||
if (isActive()) {
|
if (isActive()) {
|
||||||
|
logLastFailedJoinAttempt.run();
|
||||||
logger.warn(clusterFormationStateSupplier.get().getDescription());
|
logger.warn(clusterFormationStateSupplier.get().getDescription());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,7 +177,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
|
||||||
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
|
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
|
||||||
transportService::getLocalNode);
|
transportService::getLocalNode);
|
||||||
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
|
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
|
||||||
transportService.getThreadPool());
|
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ClusterFormationState getClusterFormationState() {
|
private ClusterFormationState getClusterFormationState() {
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.cluster.coordination;
|
package org.elasticsearch.cluster.coordination;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Level;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
@ -25,6 +26,7 @@ import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
import org.elasticsearch.cluster.ClusterStateTaskConfig;
|
||||||
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
import org.elasticsearch.cluster.ClusterStateTaskListener;
|
||||||
|
import org.elasticsearch.cluster.NotMasterException;
|
||||||
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
|
import org.elasticsearch.cluster.coordination.Coordinator.Mode;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
|
@ -60,6 +62,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.function.LongSupplier;
|
import java.util.function.LongSupplier;
|
||||||
|
@ -85,6 +88,8 @@ public class JoinHelper {
|
||||||
|
|
||||||
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
|
private final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = Collections.synchronizedSet(new HashSet<>());
|
||||||
|
|
||||||
|
private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
|
||||||
|
|
||||||
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
|
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
|
||||||
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
|
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
|
||||||
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
|
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
|
||||||
|
@ -199,6 +204,54 @@ public class JoinHelper {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// package-private for testing
|
||||||
|
static class FailedJoinAttempt {
|
||||||
|
private final DiscoveryNode destination;
|
||||||
|
private final JoinRequest joinRequest;
|
||||||
|
private final TransportException exception;
|
||||||
|
private final long timestamp;
|
||||||
|
|
||||||
|
FailedJoinAttempt(DiscoveryNode destination, JoinRequest joinRequest, TransportException exception) {
|
||||||
|
this.destination = destination;
|
||||||
|
this.joinRequest = joinRequest;
|
||||||
|
this.exception = exception;
|
||||||
|
this.timestamp = System.nanoTime();
|
||||||
|
}
|
||||||
|
|
||||||
|
void logNow() {
|
||||||
|
logger.log(getLogLevel(exception),
|
||||||
|
() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest),
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
|
||||||
|
static Level getLogLevel(TransportException e) {
|
||||||
|
Throwable cause = e.unwrapCause();
|
||||||
|
if (cause instanceof CoordinationStateRejectedException ||
|
||||||
|
cause instanceof FailedToCommitClusterStateException ||
|
||||||
|
cause instanceof NotMasterException) {
|
||||||
|
return Level.DEBUG;
|
||||||
|
}
|
||||||
|
return Level.INFO;
|
||||||
|
}
|
||||||
|
|
||||||
|
void logWarnWithTimestamp() {
|
||||||
|
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ms ago, failed to join {} with {}",
|
||||||
|
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
|
||||||
|
destination,
|
||||||
|
joinRequest),
|
||||||
|
exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void logLastFailedJoinAttempt() {
|
||||||
|
FailedJoinAttempt attempt = lastFailedJoinAttempt.get();
|
||||||
|
if (attempt != null) {
|
||||||
|
attempt.logWarnWithTimestamp();
|
||||||
|
lastFailedJoinAttempt.compareAndSet(attempt, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
|
public void sendJoinRequest(DiscoveryNode destination, Optional<Join> optionalJoin, Runnable onCompletion) {
|
||||||
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
|
assert destination.isMasterNode() : "trying to join master-ineligible " + destination;
|
||||||
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
|
final JoinRequest joinRequest = new JoinRequest(transportService.getLocalNode(), optionalJoin);
|
||||||
|
@ -226,6 +279,7 @@ public class JoinHelper {
|
||||||
public void handleResponse(Empty response) {
|
public void handleResponse(Empty response) {
|
||||||
pendingOutgoingJoins.remove(dedupKey);
|
pendingOutgoingJoins.remove(dedupKey);
|
||||||
logger.debug("successfully joined {} with {}", destination, joinRequest);
|
logger.debug("successfully joined {} with {}", destination, joinRequest);
|
||||||
|
lastFailedJoinAttempt.set(null);
|
||||||
onCompletion.run();
|
onCompletion.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -233,6 +287,9 @@ public class JoinHelper {
|
||||||
public void handleException(TransportException exp) {
|
public void handleException(TransportException exp) {
|
||||||
pendingOutgoingJoins.remove(dedupKey);
|
pendingOutgoingJoins.remove(dedupKey);
|
||||||
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
|
logger.info(() -> new ParameterizedMessage("failed to join {} with {}", destination, joinRequest), exp);
|
||||||
|
FailedJoinAttempt attempt = new FailedJoinAttempt(destination, joinRequest, exp);
|
||||||
|
attempt.logNow();
|
||||||
|
lastFailedJoinAttempt.set(attempt);
|
||||||
onCompletion.run();
|
onCompletion.run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,13 +65,14 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
|
||||||
= new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
|
= new DeterministicTaskQueue(Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(), random());
|
||||||
|
|
||||||
final AtomicLong warningCount = new AtomicLong();
|
final AtomicLong warningCount = new AtomicLong();
|
||||||
|
final AtomicLong logLastFailedJoinAttemptWarningCount = new AtomicLong();
|
||||||
|
|
||||||
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
|
final ClusterFormationFailureHelper clusterFormationFailureHelper = new ClusterFormationFailureHelper(settingsBuilder.build(),
|
||||||
() -> {
|
() -> {
|
||||||
warningCount.incrementAndGet();
|
warningCount.incrementAndGet();
|
||||||
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
|
return new ClusterFormationState(Settings.EMPTY, clusterState, emptyList(), emptyList(), 0L);
|
||||||
},
|
},
|
||||||
deterministicTaskQueue.getThreadPool());
|
deterministicTaskQueue.getThreadPool(), () -> logLastFailedJoinAttemptWarningCount.incrementAndGet());
|
||||||
|
|
||||||
deterministicTaskQueue.runAllTasks();
|
deterministicTaskQueue.runAllTasks();
|
||||||
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
|
assertThat("should not schedule anything yet", warningCount.get(), is(0L));
|
||||||
|
@ -105,8 +106,10 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
|
||||||
deterministicTaskQueue.runAllTasksInTimeOrder();
|
deterministicTaskQueue.runAllTasksInTimeOrder();
|
||||||
|
|
||||||
assertThat(warningCount.get(), is(5L));
|
assertThat(warningCount.get(), is(5L));
|
||||||
|
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
|
||||||
|
|
||||||
warningCount.set(0);
|
warningCount.set(0);
|
||||||
|
logLastFailedJoinAttemptWarningCount.set(0);
|
||||||
clusterFormationFailureHelper.start();
|
clusterFormationFailureHelper.start();
|
||||||
clusterFormationFailureHelper.stop();
|
clusterFormationFailureHelper.stop();
|
||||||
clusterFormationFailureHelper.start();
|
clusterFormationFailureHelper.start();
|
||||||
|
@ -127,6 +130,7 @@ public class ClusterFormationFailureHelperTests extends ESTestCase {
|
||||||
deterministicTaskQueue.runAllTasksInTimeOrder();
|
deterministicTaskQueue.runAllTasksInTimeOrder();
|
||||||
|
|
||||||
assertThat(warningCount.get(), is(5L));
|
assertThat(warningCount.get(), is(5L));
|
||||||
|
assertThat(logLastFailedJoinAttemptWarningCount.get(), is(5L));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDescriptionOnMasterIneligibleNodes() {
|
public void testDescriptionOnMasterIneligibleNodes() {
|
||||||
|
|
|
@ -18,12 +18,16 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.cluster.coordination;
|
package org.elasticsearch.cluster.coordination;
|
||||||
|
|
||||||
|
import org.apache.logging.log4j.Level;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.cluster.NotMasterException;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.test.transport.CapturingTransport;
|
import org.elasticsearch.test.transport.CapturingTransport;
|
||||||
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
|
import org.elasticsearch.test.transport.CapturingTransport.CapturedRequest;
|
||||||
|
import org.elasticsearch.transport.RemoteTransportException;
|
||||||
|
import org.elasticsearch.transport.TransportException;
|
||||||
import org.elasticsearch.transport.TransportResponse;
|
import org.elasticsearch.transport.TransportResponse;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
@ -32,6 +36,7 @@ import java.util.Optional;
|
||||||
|
|
||||||
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
|
import static org.hamcrest.core.Is.is;
|
||||||
|
|
||||||
public class JoinHelperTests extends ESTestCase {
|
public class JoinHelperTests extends ESTestCase {
|
||||||
|
|
||||||
|
@ -107,4 +112,23 @@ public class JoinHelperTests extends ESTestCase {
|
||||||
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
|
capturingTransport.handleRemoteError(capturedRequest2a.requestId, new CoordinationStateRejectedException("dummy"));
|
||||||
assertFalse(joinHelper.isJoinPending());
|
assertFalse(joinHelper.isJoinPending());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testFailedJoinAttemptLogLevel() {
|
||||||
|
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(new TransportException("generic transport exception")), is(Level.INFO));
|
||||||
|
|
||||||
|
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
|
||||||
|
new RemoteTransportException("remote transport exception with generic cause", new Exception())), is(Level.INFO));
|
||||||
|
|
||||||
|
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
|
||||||
|
new RemoteTransportException("caused by CoordinationStateRejectedException",
|
||||||
|
new CoordinationStateRejectedException("test"))), is(Level.DEBUG));
|
||||||
|
|
||||||
|
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
|
||||||
|
new RemoteTransportException("caused by FailedToCommitClusterStateException",
|
||||||
|
new FailedToCommitClusterStateException("test"))), is(Level.DEBUG));
|
||||||
|
|
||||||
|
assertThat(JoinHelper.FailedJoinAttempt.getLogLevel(
|
||||||
|
new RemoteTransportException("caused by NotMasterException",
|
||||||
|
new NotMasterException("test"))), is(Level.DEBUG));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue