Use try-with-resources with MockLogAppender (#1595)

I previously added a helper that started a MockLogAppender to ensure it
was never added to a Logger before it was started. I subsequently found
the opposite case in RolloverIT.java where the appender was stopped
before it was closed, therefore creating a race where a concurrently
running test in the same JVM could cause a logging failure. This seems
like a really easy mistake to make when writing a test or introduce when
refactoring a test. I've made a change to use try-with-resources to
ensure that proper setup and teardown is done. This should make it much
harder to introduce this particular test bug in the future.
Unfortunately, it did involve touching a lot of files. The changes here
are purely structural to leverage try-with-resources; no testing logic
has been changed.

Signed-off-by: Andrew Ross <andrross@amazon.com>
This commit is contained in:
Andrew Ross 2021-12-21 15:00:55 -06:00 committed by GitHub
parent ca40ba9c64
commit 96d55966a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 839 additions and 1021 deletions

View File

@ -36,7 +36,6 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.opensearch.OpenSearchNetty4IntegTestCase; import org.opensearch.OpenSearchNetty4IntegTestCase;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.opensearch.common.logging.Loggers;
import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.InternalTestCluster; import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.MockLogAppender; import org.opensearch.test.MockLogAppender;
@ -53,17 +52,15 @@ public class OpenSearchLoggingHandlerIT extends OpenSearchNetty4IntegTestCase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
appender = MockLogAppender.createStarted(); appender = MockLogAppender.createForLoggers(
Loggers.addAppender(LogManager.getLogger(OpenSearchLoggingHandler.class), appender); LogManager.getLogger(OpenSearchLoggingHandler.class),
Loggers.addAppender(LogManager.getLogger(TransportLogger.class), appender); LogManager.getLogger(TransportLogger.class),
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender); LogManager.getLogger(TcpTransport.class)
);
} }
public void tearDown() throws Exception { public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(OpenSearchLoggingHandler.class), appender); appender.close();
Loggers.removeAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
super.tearDown(); super.tearDown();
} }

View File

@ -37,7 +37,6 @@ import org.apache.logging.log4j.LogManager;
import org.opensearch.NioIntegTestCase; import org.opensearch.NioIntegTestCase;
import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest; import org.opensearch.action.admin.cluster.node.hotthreads.NodesHotThreadsRequest;
import org.opensearch.common.logging.Loggers;
import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.InternalTestCluster; import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.MockLogAppender; import org.opensearch.test.MockLogAppender;
@ -54,15 +53,11 @@ public class NioTransportLoggingIT extends NioIntegTestCase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
appender = MockLogAppender.createStarted(); appender = MockLogAppender.createForLoggers(LogManager.getLogger(TransportLogger.class), LogManager.getLogger(TcpTransport.class));
Loggers.addAppender(LogManager.getLogger(TransportLogger.class), appender);
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
} }
public void tearDown() throws Exception { public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(TransportLogger.class), appender); appender.close();
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
super.tearDown(); super.tearDown();
} }

View File

@ -44,7 +44,6 @@ import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AutoExpandReplicas; import org.opensearch.cluster.metadata.AutoExpandReplicas;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.time.DateFormatter; import org.opensearch.common.time.DateFormatter;
import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeUnit;
@ -263,23 +262,21 @@ public class RolloverIT extends OpenSearchIntegTestCase {
ensureGreen(); ensureGreen();
Logger allocationServiceLogger = LogManager.getLogger(AllocationService.class); Logger allocationServiceLogger = LogManager.getLogger(AllocationService.class);
MockLogAppender appender = new MockLogAppender(); final RolloverResponse response;
appender.start(); try (MockLogAppender appender = MockLogAppender.createForLoggers(allocationServiceLogger)) {
appender.addExpectation( appender.addExpectation(
new MockLogAppender.UnseenEventExpectation( new MockLogAppender.UnseenEventExpectation(
"no related message logged on dry run", "no related message logged on dry run",
AllocationService.class.getName(), AllocationService.class.getName(),
Level.INFO, Level.INFO,
"*test_index*" "*test_index*"
) )
); );
Loggers.addAppender(allocationServiceLogger, appender);
final RolloverResponse response = client().admin().indices().prepareRolloverIndex("test_alias").dryRun(true).get(); response = client().admin().indices().prepareRolloverIndex("test_alias").dryRun(true).get();
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
appender.stop(); }
Loggers.removeAppender(allocationServiceLogger, appender);
assertThat(response.getOldIndex(), equalTo("test_index-1")); assertThat(response.getOldIndex(), equalTo("test_index-1"));
assertThat(response.getNewIndex(), equalTo("test_index-000002")); assertThat(response.getNewIndex(), equalTo("test_index-000002"));

View File

@ -59,7 +59,6 @@ import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.common.Priority; import org.opensearch.common.Priority;
import org.opensearch.common.io.FileSystemUtils; import org.opensearch.common.io.FileSystemUtils;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.internal.io.IOUtils; import org.opensearch.core.internal.io.IOUtils;
@ -458,74 +457,68 @@ public class ClusterRerouteIT extends OpenSearchIntegTestCase {
Logger actionLogger = LogManager.getLogger(TransportClusterRerouteAction.class); Logger actionLogger = LogManager.getLogger(TransportClusterRerouteAction.class);
MockLogAppender dryRunMockLog = new MockLogAppender(); try (MockLogAppender dryRunMockLog = MockLogAppender.createForLoggers(actionLogger)) {
dryRunMockLog.start(); dryRunMockLog.addExpectation(
dryRunMockLog.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation( "no completed message logged on dry run",
"no completed message logged on dry run", TransportClusterRerouteAction.class.getName(),
TransportClusterRerouteAction.class.getName(), Level.INFO,
Level.INFO, "allocated an empty primary*"
"allocated an empty primary*" )
) );
);
Loggers.addAppender(actionLogger, dryRunMockLog);
AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true); AllocationCommand dryRunAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
ClusterRerouteResponse dryRunResponse = client().admin() ClusterRerouteResponse dryRunResponse = client().admin()
.cluster() .cluster()
.prepareReroute() .prepareReroute()
.setExplain(randomBoolean()) .setExplain(randomBoolean())
.setDryRun(true) .setDryRun(true)
.add(dryRunAllocation) .add(dryRunAllocation)
.execute() .execute()
.actionGet(); .actionGet();
// during a dry run, messages exist but are not logged or exposed // during a dry run, messages exist but are not logged or exposed
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages(), hasSize(1)); assertThat(dryRunResponse.getExplanations().getYesDecisionMessages(), hasSize(1));
assertThat(dryRunResponse.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary")); assertThat(dryRunResponse.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
dryRunMockLog.assertAllExpectationsMatched(); dryRunMockLog.assertAllExpectationsMatched();
dryRunMockLog.stop(); }
Loggers.removeAppender(actionLogger, dryRunMockLog);
MockLogAppender allocateMockLog = new MockLogAppender(); try (MockLogAppender allocateMockLog = MockLogAppender.createForLoggers(actionLogger)) {
allocateMockLog.start(); allocateMockLog.addExpectation(
allocateMockLog.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "message for first allocate empty primary",
"message for first allocate empty primary", TransportClusterRerouteAction.class.getName(),
TransportClusterRerouteAction.class.getName(), Level.INFO,
Level.INFO, "allocated an empty primary*" + nodeName1 + "*"
"allocated an empty primary*" + nodeName1 + "*" )
) );
); allocateMockLog.addExpectation(
allocateMockLog.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation( "no message for second allocate empty primary",
"no message for second allocate empty primary", TransportClusterRerouteAction.class.getName(),
TransportClusterRerouteAction.class.getName(), Level.INFO,
Level.INFO, "allocated an empty primary*" + nodeName2 + "*"
"allocated an empty primary*" + nodeName2 + "*" )
) );
);
Loggers.addAppender(actionLogger, allocateMockLog);
AllocationCommand yesDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true); AllocationCommand yesDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand(indexName, 0, nodeName1, true);
AllocationCommand noDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand("noexist", 1, nodeName2, true); AllocationCommand noDecisionAllocation = new AllocateEmptyPrimaryAllocationCommand("noexist", 1, nodeName2, true);
ClusterRerouteResponse response = client().admin() ClusterRerouteResponse response = client().admin()
.cluster() .cluster()
.prepareReroute() .prepareReroute()
.setExplain(true) // so we get a NO decision back rather than an exception .setExplain(true) // so we get a NO decision back rather than an exception
.add(yesDecisionAllocation) .add(yesDecisionAllocation)
.add(noDecisionAllocation) .add(noDecisionAllocation)
.execute() .execute()
.actionGet(); .actionGet();
assertThat(response.getExplanations().getYesDecisionMessages(), hasSize(1)); assertThat(response.getExplanations().getYesDecisionMessages(), hasSize(1));
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary")); assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString("allocated an empty primary"));
assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString(nodeName1)); assertThat(response.getExplanations().getYesDecisionMessages().get(0), containsString(nodeName1));
allocateMockLog.assertAllExpectationsMatched(); allocateMockLog.assertAllExpectationsMatched();
allocateMockLog.stop(); }
Loggers.removeAppender(actionLogger, allocateMockLog);
} }
public void testClusterRerouteWithBlocks() { public void testClusterRerouteWithBlocks() {

View File

@ -39,7 +39,6 @@ import org.apache.logging.log4j.core.LogEvent;
import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.JoinHelper; import org.opensearch.cluster.coordination.JoinHelper;
import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.node.Node.DiscoverySettings; import org.opensearch.node.Node.DiscoverySettings;
import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.OpenSearchIntegTestCase;
@ -119,73 +118,66 @@ public class SingleNodeDiscoveryIT extends OpenSearchIntegTestCase {
} }
public void testCannotJoinNodeWithSingleNodeDiscovery() throws Exception { public void testCannotJoinNodeWithSingleNodeDiscovery() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); Logger clusterLogger = LogManager.getLogger(JoinHelper.class);
mockAppender.start(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(clusterLogger)) {
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation("test", JoinHelper.class.getCanonicalName(), Level.INFO, "failed to join") { new MockLogAppender.SeenEventExpectation("test", JoinHelper.class.getCanonicalName(), Level.INFO, "failed to join") {
@Override
public boolean innerMatch(final LogEvent event) {
return event.getThrown() != null
&& event.getThrown().getClass() == RemoteTransportException.class
&& event.getThrown().getCause() != null
&& event.getThrown().getCause().getClass() == IllegalStateException.class
&& event.getThrown()
.getCause()
.getMessage()
.contains("cannot join node with [discovery.type] set to [single-node]");
}
}
);
final TransportService service = internalCluster().getInstance(TransportService.class);
final int port = service.boundAddress().publishAddress().getPort();
final NodeConfigurationSource configurationSource = new NodeConfigurationSource() {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put("discovery.type", "zen")
.put("transport.type", getTestTransportType())
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s")
/*
* We align the port ranges of the two as then with zen discovery these two
* nodes would find each other.
*/
.put("transport.port", port + "-" + (port + 5 - 1))
.build();
}
@Override @Override
public boolean innerMatch(final LogEvent event) { public Path nodeConfigPath(int nodeOrdinal) {
return event.getThrown() != null return null;
&& event.getThrown().getClass() == RemoteTransportException.class
&& event.getThrown().getCause() != null
&& event.getThrown().getCause().getClass() == IllegalStateException.class
&& event.getThrown()
.getCause()
.getMessage()
.contains("cannot join node with [discovery.type] set to [single-node]");
} }
} };
); try (
final TransportService service = internalCluster().getInstance(TransportService.class); InternalTestCluster other = new InternalTestCluster(
final int port = service.boundAddress().publishAddress().getPort(); randomLong(),
final NodeConfigurationSource configurationSource = new NodeConfigurationSource() { createTempDir(),
@Override false,
public Settings nodeSettings(int nodeOrdinal) { false,
return Settings.builder() 1,
.put("discovery.type", "zen") 1,
.put("transport.type", getTestTransportType()) internalCluster().getClusterName(),
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") configurationSource,
/* 0,
* We align the port ranges of the two as then with zen discovery these two "other",
* nodes would find each other. Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class),
*/ Function.identity()
.put("transport.port", port + "-" + (port + 5 - 1)) )
.build(); ) {
}
@Override
public Path nodeConfigPath(int nodeOrdinal) {
return null;
}
};
try (
InternalTestCluster other = new InternalTestCluster(
randomLong(),
createTempDir(),
false,
false,
1,
1,
internalCluster().getClusterName(),
configurationSource,
0,
"other",
Arrays.asList(getTestTransportPlugin(), MockHttpTransport.TestPlugin.class),
Function.identity()
)
) {
Logger clusterLogger = LogManager.getLogger(JoinHelper.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
other.beforeTest(random(), 0); other.beforeTest(random(), 0);
final ClusterState first = internalCluster().getInstance(ClusterService.class).state(); final ClusterState first = internalCluster().getInstance(ClusterService.class).state();
assertThat(first.nodes().getSize(), equalTo(1)); assertThat(first.nodes().getSize(), equalTo(1));
assertBusy(() -> mockAppender.assertAllExpectationsMatched()); assertBusy(() -> mockAppender.assertAllExpectationsMatched());
} finally {
Loggers.removeAppender(clusterLogger, mockAppender);
mockAppender.stop();
} }
} }
} }

View File

@ -40,7 +40,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtils;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.test.AbstractBootstrapCheckTestCase; import org.opensearch.test.AbstractBootstrapCheckTestCase;
import org.opensearch.test.MockLogAppender; import org.opensearch.test.MockLogAppender;
@ -157,48 +156,42 @@ public class MaxMapCountCheckTests extends AbstractBootstrapCheckTestCase {
final IOException ioException = new IOException("fatal"); final IOException ioException = new IOException("fatal");
when(reader.readLine()).thenThrow(ioException); when(reader.readLine()).thenThrow(ioException);
final Logger logger = LogManager.getLogger("testGetMaxMapCountIOException"); final Logger logger = LogManager.getLogger("testGetMaxMapCountIOException");
final MockLogAppender appender = new MockLogAppender(); try (MockLogAppender appender = MockLogAppender.createForLoggers(logger)) {
appender.start(); appender.addExpectation(
appender.addExpectation( new ParameterizedMessageLoggingExpectation(
new ParameterizedMessageLoggingExpectation( "expected logged I/O exception",
"expected logged I/O exception", "testGetMaxMapCountIOException",
"testGetMaxMapCountIOException", Level.WARN,
Level.WARN, "I/O exception while trying to read [{}]",
"I/O exception while trying to read [{}]", new Object[] { procSysVmMaxMapCountPath },
new Object[] { procSysVmMaxMapCountPath }, e -> ioException == e
e -> ioException == e )
) );
); assertThat(check.getMaxMapCount(logger), equalTo(-1L));
Loggers.addAppender(logger, appender); appender.assertAllExpectationsMatched();
assertThat(check.getMaxMapCount(logger), equalTo(-1L)); verify(reader).close();
appender.assertAllExpectationsMatched(); }
verify(reader).close();
Loggers.removeAppender(logger, appender);
appender.stop();
} }
{ {
reset(reader); reset(reader);
when(reader.readLine()).thenReturn("eof"); when(reader.readLine()).thenReturn("eof");
final Logger logger = LogManager.getLogger("testGetMaxMapCountNumberFormatException"); final Logger logger = LogManager.getLogger("testGetMaxMapCountNumberFormatException");
final MockLogAppender appender = new MockLogAppender(); try (MockLogAppender appender = MockLogAppender.createForLoggers(logger)) {
appender.start(); appender.addExpectation(
appender.addExpectation( new ParameterizedMessageLoggingExpectation(
new ParameterizedMessageLoggingExpectation( "expected logged number format exception",
"expected logged number format exception", "testGetMaxMapCountNumberFormatException",
"testGetMaxMapCountNumberFormatException", Level.WARN,
Level.WARN, "unable to parse vm.max_map_count [{}]",
"unable to parse vm.max_map_count [{}]", new Object[] { "eof" },
new Object[] { "eof" }, e -> e instanceof NumberFormatException && e.getMessage().equals("For input string: \"eof\"")
e -> e instanceof NumberFormatException && e.getMessage().equals("For input string: \"eof\"") )
) );
); assertThat(check.getMaxMapCount(logger), equalTo(-1L));
Loggers.addAppender(logger, appender); appender.assertAllExpectationsMatched();
assertThat(check.getMaxMapCount(logger), equalTo(-1L)); verify(reader).close();
appender.assertAllExpectationsMatched(); }
verify(reader).close();
Loggers.removeAppender(logger, appender);
appender.stop();
} }
} }

View File

@ -34,6 +34,7 @@ package org.opensearch.cluster;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchTimeoutException; import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
@ -46,7 +47,6 @@ import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.component.Lifecycle; import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.component.LifecycleListener; import org.opensearch.common.component.LifecycleListener;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.BoundTransportAddress; import org.opensearch.common.transport.BoundTransportAddress;
import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.transport.TransportAddress;
@ -352,10 +352,8 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
for (DiscoveryNode disconnectedNode : disconnectedNodes) { for (DiscoveryNode disconnectedNode : disconnectedNodes) {
transportService.disconnectFromNode(disconnectedNode); transportService.disconnectFromNode(disconnectedNode);
} }
MockLogAppender appender = new MockLogAppender(); final Logger logger = LogManager.getLogger("org.opensearch.cluster.NodeConnectionsService");
try { try (MockLogAppender appender = MockLogAppender.createForLoggers(logger)) {
appender.start();
Loggers.addAppender(LogManager.getLogger("org.opensearch.cluster.NodeConnectionsService"), appender);
for (DiscoveryNode targetNode : targetNodes) { for (DiscoveryNode targetNode : targetNodes) {
if (disconnectedNodes.contains(targetNode)) { if (disconnectedNodes.contains(targetNode)) {
appender.addExpectation( appender.addExpectation(
@ -396,9 +394,6 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
runTasksUntil(deterministicTaskQueue, CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis()); runTasksUntil(deterministicTaskQueue, CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(Settings.EMPTY).millis());
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger("org.opensearch.cluster.NodeConnectionsService"), appender);
appender.stop();
} }
for (DiscoveryNode disconnectedNode : disconnectedNodes) { for (DiscoveryNode disconnectedNode : disconnectedNodes) {
transportService.disconnectFromNode(disconnectedNode); transportService.disconnectFromNode(disconnectedNode);
@ -409,10 +404,8 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
for (DiscoveryNode disconnectedNode : disconnectedNodes) { for (DiscoveryNode disconnectedNode : disconnectedNodes) {
transportService.disconnectFromNode(disconnectedNode); transportService.disconnectFromNode(disconnectedNode);
} }
appender = new MockLogAppender();
try { try (MockLogAppender appender = MockLogAppender.createForLoggers(logger)) {
appender.start();
Loggers.addAppender(LogManager.getLogger("org.opensearch.cluster.NodeConnectionsService"), appender);
for (DiscoveryNode targetNode : targetNodes) { for (DiscoveryNode targetNode : targetNodes) {
if (disconnectedNodes.contains(targetNode) && newTargetNodes.get(targetNode.getId()) != null) { if (disconnectedNodes.contains(targetNode) && newTargetNodes.get(targetNode.getId()) != null) {
appender.addExpectation( appender.addExpectation(
@ -493,9 +486,6 @@ public class NodeConnectionsServiceTests extends OpenSearchTestCase {
service.connectToNodes(newTargetNodes, () -> {}); service.connectToNodes(newTargetNodes, () -> {});
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger("org.opensearch.cluster.NodeConnectionsService"), appender);
appender.stop();
} }
} }

View File

@ -33,7 +33,6 @@ package org.opensearch.cluster.coordination;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.LogEvent;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
@ -48,7 +47,6 @@ import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.regex.Regex; import org.opensearch.common.regex.Regex;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.Settings.Builder; import org.opensearch.common.settings.Settings.Builder;
@ -1265,19 +1263,12 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
cluster1.clusterNodes.add(newNode); cluster1.clusterNodes.add(newNode);
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(JoinHelper.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation("test1", JoinHelper.class.getCanonicalName(), Level.INFO, "*failed to join*")
new MockLogAppender.SeenEventExpectation("test1", JoinHelper.class.getCanonicalName(), Level.INFO, "*failed to join*") );
); cluster1.runFor(DEFAULT_STABILISATION_TIME, "failing join validation");
Logger joinLogger = LogManager.getLogger(JoinHelper.class);
Loggers.addAppender(joinLogger, mockAppender);
cluster1.runFor(DEFAULT_STABILISATION_TIME, "failing join validation");
try {
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(joinLogger, mockAppender);
mockAppender.stop();
} }
assertEquals(0, newNode.getLastAppliedClusterState().version()); assertEquals(0, newNode.getLastAppliedClusterState().version());
@ -1554,10 +1545,11 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
} }
for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) { for (int i = scaledRandomIntBetween(1, 10); i >= 0; i--) {
final MockLogAppender mockLogAppender = new MockLogAppender(); try (
try { MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(
mockLogAppender.start(); LogManager.getLogger(ClusterFormationFailureHelper.class)
Loggers.addAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender); )
) {
mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() { mockLogAppender.addExpectation(new MockLogAppender.LoggingExpectation() {
final Set<DiscoveryNode> nodesLogged = new HashSet<>(); final Set<DiscoveryNode> nodesLogged = new HashSet<>();
@ -1599,9 +1591,6 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
}); });
cluster.runFor(warningDelayMillis + DEFAULT_DELAY_VARIABILITY, "waiting for warning to be emitted"); cluster.runFor(warningDelayMillis + DEFAULT_DELAY_VARIABILITY, "waiting for warning to be emitted");
mockLogAppender.assertAllExpectationsMatched(); mockLogAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger(ClusterFormationFailureHelper.class), mockLogAppender);
mockLogAppender.stop();
} }
} }
} }
@ -1613,11 +1602,12 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
cluster.stabilise(); cluster.stabilise();
final ClusterNode brokenNode = cluster.getAnyNodeExcept(cluster.getAnyLeader()); final ClusterNode brokenNode = cluster.getAnyNodeExcept(cluster.getAnyLeader());
final MockLogAppender mockLogAppender = new MockLogAppender(); try (
try { MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(
mockLogAppender.start(); LogManager.getLogger(Coordinator.CoordinatorPublication.class),
Loggers.addAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender); LogManager.getLogger(LagDetector.class)
Loggers.addAppender(LogManager.getLogger(LagDetector.class), mockLogAppender); )
) {
mockLogAppender.addExpectation( mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
@ -1683,10 +1673,6 @@ public class CoordinatorTests extends AbstractCoordinatorTestCase {
); );
mockLogAppender.assertAllExpectationsMatched(); mockLogAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger(Coordinator.CoordinatorPublication.class), mockLogAppender);
Loggers.removeAppender(LogManager.getLogger(LagDetector.class), mockLogAppender);
mockLogAppender.stop();
} }
} }
} }

View File

@ -33,7 +33,6 @@ package org.opensearch.cluster.routing.allocation;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterInfo; import org.opensearch.cluster.ClusterInfo;
@ -51,7 +50,6 @@ import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.common.Priority; import org.opensearch.common.Priority;
import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.index.shard.ShardId; import org.opensearch.index.shard.ShardId;
@ -659,25 +657,30 @@ public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase {
private void assertNoLogging(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages) private void assertNoLogging(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages)
throws IllegalAccessException { throws IllegalAccessException {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation("any INFO message", DiskThresholdMonitor.class.getCanonicalName(), Level.INFO, "*") "any INFO message",
); DiskThresholdMonitor.class.getCanonicalName(),
mockAppender.addExpectation( Level.INFO,
new MockLogAppender.UnseenEventExpectation("any WARN message", DiskThresholdMonitor.class.getCanonicalName(), Level.WARN, "*") "*"
); )
);
mockAppender.addExpectation(
new MockLogAppender.UnseenEventExpectation(
"any WARN message",
DiskThresholdMonitor.class.getCanonicalName(),
Level.WARN,
"*"
)
);
Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); for (int i = between(1, 3); i >= 0; i--) {
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); monitor.onNewInfo(clusterInfo(diskUsages));
}
for (int i = between(1, 3); i >= 0; i--) { mockAppender.assertAllExpectationsMatched();
monitor.onNewInfo(clusterInfo(diskUsages));
} }
mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender);
mockAppender.stop();
} }
private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages, String message) private void assertRepeatedWarningMessages(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages, String message)
@ -701,28 +704,24 @@ public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase {
private void assertLogging(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages, Level level, String message) private void assertLogging(DiskThresholdMonitor monitor, ImmutableOpenMap<String, DiskUsage> diskUsages, Level level, String message)
throws IllegalAccessException { throws IllegalAccessException {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(DiskThresholdMonitor.class))) {
mockAppender.start(); mockAppender.start();
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation("expected message", DiskThresholdMonitor.class.getCanonicalName(), level, message) new MockLogAppender.SeenEventExpectation("expected message", DiskThresholdMonitor.class.getCanonicalName(), level, message)
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.UnseenEventExpectation( new MockLogAppender.UnseenEventExpectation(
"any message of another level", "any message of another level",
DiskThresholdMonitor.class.getCanonicalName(), DiskThresholdMonitor.class.getCanonicalName(),
level == Level.INFO ? Level.WARN : Level.INFO, level == Level.INFO ? Level.WARN : Level.INFO,
"*" "*"
) )
); );
Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); monitor.onNewInfo(clusterInfo(diskUsages));
Loggers.addAppender(diskThresholdMonitorLogger, mockAppender);
monitor.onNewInfo(clusterInfo(diskUsages)); mockAppender.assertAllExpectationsMatched();
}
mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender);
mockAppender.stop();
} }
private static ClusterInfo clusterInfo(ImmutableOpenMap<String, DiskUsage> diskUsages) { private static ClusterInfo clusterInfo(ImmutableOpenMap<String, DiskUsage> diskUsages) {

View File

@ -34,7 +34,6 @@ package org.opensearch.cluster.service;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterState;
@ -47,7 +46,6 @@ import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener; import org.opensearch.cluster.service.ClusterApplier.ClusterApplyListener;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
@ -131,36 +129,32 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase {
@TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level")
public void testClusterStateUpdateLogging() throws Exception { public void testClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterApplierService.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test1",
"test1", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.DEBUG,
Level.DEBUG, "*processing [test1]: took [1s] no change in cluster state"
"*processing [test1]: took [1s] no change in cluster state" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test2",
"test2", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.TRACE,
Level.TRACE, "*failed to execute cluster state applier in [2s]*"
"*failed to execute cluster state applier in [2s]*" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test3",
"test3", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.DEBUG,
Level.DEBUG, "*processing [test3]: took [0s] no change in cluster state*"
"*processing [test3]: took [0s] no change in cluster state*" )
) );
);
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis(); clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
clusterApplierService.runOnApplierThread( clusterApplierService.runOnApplierThread(
"test1", "test1",
@ -198,46 +192,39 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase {
} }
}); });
assertBusy(mockAppender::assertAllExpectationsMatched); assertBusy(mockAppender::assertAllExpectationsMatched);
} finally {
Loggers.removeAppender(clusterLogger, mockAppender);
mockAppender.stop();
} }
} }
@TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level")
public void testLongClusterStateUpdateLogging() throws Exception { public void testLongClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(ClusterApplierService.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation( "test1 shouldn't see because setting is too low",
"test1 shouldn't see because setting is too low", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*cluster state applier task [test1] took [*] which is above the warn threshold of *"
"*cluster state applier task [test1] took [*] which is above the warn threshold of *" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test2",
"test2", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*cluster state applier task [test2] took [32s] which is above the warn threshold of [*]: "
"*cluster state applier task [test2] took [32s] which is above the warn threshold of [*]: " + "[running task [test2]] took [*"
+ "[running task [test2]] took [*" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test4",
"test4", ClusterApplierService.class.getCanonicalName(),
ClusterApplierService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: "
"*cluster state applier task [test3] took [34s] which is above the warn threshold of [*]: " + "[running task [test3]] took [*"
+ "[running task [test3]] took [*" )
) );
);
Logger clusterLogger = LogManager.getLogger(ClusterApplierService.class);
Loggers.addAppender(clusterLogger, mockAppender);
try {
final CountDownLatch latch = new CountDownLatch(4); final CountDownLatch latch = new CountDownLatch(4);
final CountDownLatch processedFirstTask = new CountDownLatch(1); final CountDownLatch processedFirstTask = new CountDownLatch(1);
clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis(); clusterApplierService.currentTimeOverride = threadPool.relativeTimeInMillis();
@ -301,11 +288,8 @@ public class ClusterApplierServiceTests extends OpenSearchTestCase {
} }
}); });
latch.await(); latch.await();
} finally { mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(clusterLogger, mockAppender);
mockAppender.stop();
} }
mockAppender.assertAllExpectationsMatched();
} }
public void testLocalNodeMasterListenerCallbacks() { public void testLocalNodeMasterListenerCallbacks() {

View File

@ -34,7 +34,6 @@ package org.opensearch.cluster.service;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.cluster.AckedClusterStateUpdateTask; import org.opensearch.cluster.AckedClusterStateUpdateTask;
@ -54,7 +53,6 @@ import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.Priority; import org.opensearch.common.Priority;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
@ -327,168 +325,163 @@ public class MasterServiceTests extends OpenSearchTestCase {
@TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level") @TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level")
public void testClusterStateUpdateLogging() throws Exception { public void testClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test1 start",
"test1 start", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.DEBUG,
Level.DEBUG, "executing cluster state update for [test1]"
"executing cluster state update for [test1]" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test1 computation",
"test1 computation", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.DEBUG,
Level.DEBUG, "took [1s] to compute cluster state update for [test1]"
"took [1s] to compute cluster state update for [test1]" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test1 notification",
"test1 notification", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.DEBUG,
Level.DEBUG, "took [0s] to notify listeners on unchanged cluster state for [test1]"
"took [0s] to notify listeners on unchanged cluster state for [test1]" )
) );
);
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test2 start", "test2 start",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"executing cluster state update for [test2]" "executing cluster state update for [test2]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test2 failure", "test2 failure",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.TRACE, Level.TRACE,
"failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*" "failed to execute cluster state update (on version: [*], uuid: [*]) for [test2]*"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test2 computation", "test2 computation",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"took [2s] to compute cluster state update for [test2]" "took [2s] to compute cluster state update for [test2]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test2 notification", "test2 notification",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [test2]" "took [0s] to notify listeners on unchanged cluster state for [test2]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test3 start", "test3 start",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"executing cluster state update for [test3]" "executing cluster state update for [test3]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test3 computation", "test3 computation",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"took [3s] to compute cluster state update for [test3]" "took [3s] to compute cluster state update for [test3]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test3 notification", "test3 notification",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]" "took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [test3]"
) )
); );
mockAppender.addExpectation( mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation( new MockLogAppender.SeenEventExpectation(
"test4", "test4",
MasterService.class.getCanonicalName(), MasterService.class.getCanonicalName(),
Level.DEBUG, Level.DEBUG,
"executing cluster state update for [test4]" "executing cluster state update for [test4]"
) )
); );
Logger clusterLogger = LogManager.getLogger(MasterService.class); try (MasterService masterService = createMasterService(true)) {
Loggers.addAppender(clusterLogger, mockAppender); masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
try (MasterService masterService = createMasterService(true)) { @Override
masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { public ClusterState execute(ClusterState currentState) {
@Override relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis();
public ClusterState execute(ClusterState currentState) { return currentState;
relativeTimeInMillis += TimeValue.timeValueSeconds(1).millis(); }
return currentState;
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
}); });
masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis(); relativeTimeInMillis += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task"); throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail(); fail();
} }
@Override @Override
public void onFailure(String source, Exception e) {} public void onFailure(String source, Exception e) {}
}); });
masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis(); relativeTimeInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build(); return ClusterState.builder(currentState).incrementVersion().build();
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis(); relativeTimeInMillis += TimeValue.timeValueSeconds(4).millis();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
}); });
masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
return currentState; return currentState;
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
}); });
assertBusy(mockAppender::assertAllExpectationsMatched); assertBusy(mockAppender::assertAllExpectationsMatched);
} finally { }
Loggers.removeAppender(clusterLogger, mockAppender);
mockAppender.stop();
} }
} }
@ -741,233 +734,228 @@ public class MasterServiceTests extends OpenSearchTestCase {
@TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level") @TestLogging(value = "org.opensearch.cluster.service:WARN", reason = "to ensure that we log cluster state events on WARN level")
public void testLongClusterStateUpdateLogging() throws Exception { public void testLongClusterStateUpdateLogging() throws Exception {
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation( "test1 shouldn't log because it was fast enough",
"test1 shouldn't log because it was fast enough", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*took*test1*"
"*took*test1*" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test2",
"test2", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test2]"
"*took [*], which is over [10s], to compute cluster state update for [test2]" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test3",
"test3", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test3]"
"*took [*], which is over [10s], to compute cluster state update for [test3]" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test4",
"test4", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*took [*], which is over [10s], to compute cluster state update for [test4]"
"*took [*], which is over [10s], to compute cluster state update for [test4]" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.UnseenEventExpectation(
new MockLogAppender.UnseenEventExpectation( "test5 should not log despite publishing slowly",
"test5 should not log despite publishing slowly", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "*took*test5*"
"*took*test5*" )
) );
); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test6 should log due to slow and failing publication",
"test6 should log due to slow and failing publication", MasterService.class.getCanonicalName(),
MasterService.class.getCanonicalName(), Level.WARN,
Level.WARN, "took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*"
"took [*] and then failed to publish updated cluster state (version: *, uuid: *) for [test6]:*" )
)
);
Logger clusterLogger = LogManager.getLogger(MasterService.class);
Loggers.addAppender(clusterLogger, mockAppender);
try (
MasterService masterService = new MasterService(
Settings.builder()
.put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
.put(Node.NODE_NAME_SETTING.getKey(), "test_node")
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
)
) {
final DiscoveryNode localNode = new DiscoveryNode(
"node1",
buildNewFakeTransportAddress(),
emptyMap(),
emptySet(),
Version.CURRENT
); );
final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
if (event.source().contains("test5")) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+ randomLongBetween(1, 1000000);
}
if (event.source().contains("test6")) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+ randomLongBetween(1, 1000000);
throw new OpenSearchException("simulated error during slow publication which should trigger logging");
}
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
masterService.setClusterStateSupplier(clusterStateRef::get);
masterService.start();
final CountDownLatch latch = new CountDownLatch(6); try (
final CountDownLatch processedFirstTask = new CountDownLatch(1); MasterService masterService = new MasterService(
masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { Settings.builder()
@Override .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), MasterServiceTests.class.getSimpleName())
public ClusterState execute(ClusterState currentState) { .put(Node.NODE_NAME_SETTING.getKey(), "test_node")
relativeTimeInMillis += randomLongBetween( .build(),
0L, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() threadPool
); )
return currentState; ) {
}
@Override final DiscoveryNode localNode = new DiscoveryNode(
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { "node1",
latch.countDown(); buildNewFakeTransportAddress(),
processedFirstTask.countDown(); emptyMap(),
} emptySet(),
Version.CURRENT
);
final ClusterState initialClusterState = ClusterState.builder(new ClusterName(MasterServiceTests.class.getSimpleName()))
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).masterNodeId(localNode.getId()))
.blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK)
.build();
final AtomicReference<ClusterState> clusterStateRef = new AtomicReference<>(initialClusterState);
masterService.setClusterStatePublisher((event, publishListener, ackListener) -> {
if (event.source().contains("test5")) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY)
.millis() + randomLongBetween(1, 1000000);
}
if (event.source().contains("test6")) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY)
.millis() + randomLongBetween(1, 1000000);
throw new OpenSearchException("simulated error during slow publication which should trigger logging");
}
clusterStateRef.set(event.state());
publishListener.onResponse(null);
});
masterService.setClusterStateSupplier(clusterStateRef::get);
masterService.start();
@Override final CountDownLatch latch = new CountDownLatch(6);
public void onFailure(String source, Exception e) { final CountDownLatch processedFirstTask = new CountDownLatch(1);
fail(); masterService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
} @Override
}); public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += randomLongBetween(
0L,
MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
);
return currentState;
}
processedFirstTask.await(); @Override
masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
@Override latch.countDown();
public ClusterState execute(ClusterState currentState) { processedFirstTask.countDown();
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() }
+ randomLongBetween(1, 1000000);
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
});
@Override processedFirstTask.await();
public void onFailure(String source, Exception e) { masterService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
latch.countDown(); @Override
} public ClusterState execute(ClusterState currentState) {
}); relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY)
masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() { .millis() + randomLongBetween(1, 1000000);
@Override throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
public ClusterState execute(ClusterState currentState) { }
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis()
+ randomLongBetween(1, 1000000);
return ClusterState.builder(currentState).incrementVersion().build();
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown(); fail();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); latch.countDown();
} }
}); });
masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY).millis() relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY)
+ randomLongBetween(1, 1000000); .millis() + randomLongBetween(1, 1000000);
return currentState; return ClusterState.builder(currentState).incrementVersion().build();
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown(); latch.countDown();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
}); });
masterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build(); relativeTimeInMillis += MasterService.MASTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING.get(Settings.EMPTY)
} .millis() + randomLongBetween(1, 1000000);
return currentState;
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown(); latch.countDown();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail();
} }
}); });
masterService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() { masterService.submitStateUpdateTask("test5", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).incrementVersion().build(); return ClusterState.builder(currentState).incrementVersion().build();
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail(); latch.countDown();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); // maybe we should notify here? fail();
} }
}); });
// Additional update task to make sure all previous logging made it to the loggerName masterService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() {
// We don't check logging for this on since there is no guarantee that it will occur before our check @Override
masterService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() { public ClusterState execute(ClusterState currentState) {
@Override return ClusterState.builder(currentState).incrementVersion().build();
public ClusterState execute(ClusterState currentState) { }
return currentState;
}
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown(); fail();
} }
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
fail(); fail(); // maybe we should notify here?
} }
}); });
latch.await(); // Additional update task to make sure all previous logging made it to the loggerName
} finally { // We don't check logging for this on since there is no guarantee that it will occur before our check
Loggers.removeAppender(clusterLogger, mockAppender); masterService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() {
mockAppender.stop(); @Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
@Override
public void onFailure(String source, Exception e) {
fail();
}
});
latch.await();
}
mockAppender.assertAllExpectationsMatched();
} }
mockAppender.assertAllExpectationsMatched();
} }
public void testAcking() throws InterruptedException { public void testAcking() throws InterruptedException {

View File

@ -37,7 +37,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.LogEvent;
import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.AbstractScopedSettings.SettingUpdater; import org.opensearch.common.settings.AbstractScopedSettings.SettingUpdater;
import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.ByteSizeUnit; import org.opensearch.common.unit.ByteSizeUnit;
@ -1479,32 +1478,26 @@ public class SettingTests extends OpenSearchTestCase {
); );
final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY); final IndexSettings settings = new IndexSettings(metadata, Settings.EMPTY);
final MockLogAppender mockLogAppender = new MockLogAppender();
mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
"org.opensearch.common.settings.IndexScopedSettings",
Level.INFO,
"updating [index.refresh_interval] from [20s] to [10s]"
) {
@Override
public boolean innerMatch(LogEvent event) {
return event.getMarker().getName().equals(" [index1]");
}
}
);
mockLogAppender.start();
final Logger logger = LogManager.getLogger(IndexScopedSettings.class); final Logger logger = LogManager.getLogger(IndexScopedSettings.class);
try { try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(logger)) {
Loggers.addAppender(logger, mockLogAppender); mockLogAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
"org.opensearch.common.settings.IndexScopedSettings",
Level.INFO,
"updating [index.refresh_interval] from [20s] to [10s]"
) {
@Override
public boolean innerMatch(LogEvent event) {
return event.getMarker().getName().equals(" [index1]");
}
}
);
settings.updateIndexMetadata( settings.updateIndexMetadata(
newIndexMeta("index1", Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s").build()) newIndexMeta("index1", Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "10s").build())
); );
mockLogAppender.assertAllExpectationsMatched(); mockLogAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(logger, mockLogAppender);
mockLogAppender.stop();
} }
} }
} }

View File

@ -35,7 +35,6 @@ import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.opensearch.common.Strings; import org.opensearch.common.Strings;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.common.xcontent.json.JsonXContent;
@ -143,14 +142,10 @@ public class SettingsFilterTests extends OpenSearchTestCase {
private void assertExpectedLogMessages(Consumer<Logger> consumer, MockLogAppender.LoggingExpectation... expectations) private void assertExpectedLogMessages(Consumer<Logger> consumer, MockLogAppender.LoggingExpectation... expectations)
throws IllegalAccessException { throws IllegalAccessException {
Logger testLogger = LogManager.getLogger("org.opensearch.test"); Logger testLogger = LogManager.getLogger("org.opensearch.test");
MockLogAppender appender = MockLogAppender.createStarted(); try (MockLogAppender appender = MockLogAppender.createForLoggers(testLogger)) {
Loggers.addAppender(testLogger, appender);
try {
Arrays.stream(expectations).forEach(appender::addExpectation); Arrays.stream(expectations).forEach(appender::addExpectation);
consumer.accept(testLogger); consumer.accept(testLogger);
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(testLogger, appender);
} }
} }

View File

@ -42,7 +42,6 @@ import org.opensearch.action.ActionListener;
import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Nullable; import org.opensearch.common.Nullable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress; import org.opensearch.common.transport.TransportAddress;
import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.OpenSearchTestCase;
@ -174,26 +173,20 @@ public class HandshakingTransportAddressConnectorTests extends OpenSearchTestCas
FailureListener failureListener = new FailureListener(); FailureListener failureListener = new FailureListener();
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
HandshakingTransportAddressConnector.class.getCanonicalName(),
Level.WARN,
"*completed handshake with [*] but followup connection failed*"
)
);
Logger targetLogger = LogManager.getLogger(HandshakingTransportAddressConnector.class); Logger targetLogger = LogManager.getLogger(HandshakingTransportAddressConnector.class);
Loggers.addAppender(targetLogger, mockAppender); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(targetLogger)) {
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"message",
HandshakingTransportAddressConnector.class.getCanonicalName(),
Level.WARN,
"*completed handshake with [*] but followup connection failed*"
)
);
try {
handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener); handshakingTransportAddressConnector.connectToRemoteMasterNode(discoveryAddress, failureListener);
failureListener.assertFailure(); failureListener.assertFailure();
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(targetLogger, mockAppender);
mockAppender.stop();
} }
} }

View File

@ -51,7 +51,6 @@ import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.common.collect.Tuple; import org.opensearch.common.collect.Tuple;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.NamedXContentRegistry;
@ -568,18 +567,11 @@ public class IncrementalClusterStateWriterTests extends OpenSearchAllocationTest
IncrementalClusterStateWriter incrementalClusterStateWriter, IncrementalClusterStateWriter incrementalClusterStateWriter,
MockLogAppender.LoggingExpectation expectation MockLogAppender.LoggingExpectation expectation
) throws IllegalAccessException, WriteStateException { ) throws IllegalAccessException, WriteStateException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class); Logger classLogger = LogManager.getLogger(IncrementalClusterStateWriter.class);
Loggers.addAppender(classLogger, mockAppender); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(classLogger)) {
mockAppender.addExpectation(expectation);
try {
incrementalClusterStateWriter.updateClusterState(clusterState); incrementalClusterStateWriter.updateClusterState(clusterState);
} finally { mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
} }
mockAppender.assertAllExpectationsMatched();
} }
} }

View File

@ -51,7 +51,6 @@ import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.BigArrays;
@ -1178,23 +1177,16 @@ public class PersistedClusterStateServiceTests extends OpenSearchTestCase {
PersistedClusterStateService.Writer writer, PersistedClusterStateService.Writer writer,
MockLogAppender.LoggingExpectation expectation MockLogAppender.LoggingExpectation expectation
) throws IllegalAccessException, IOException { ) throws IllegalAccessException, IOException {
MockLogAppender mockAppender = new MockLogAppender();
mockAppender.start();
mockAppender.addExpectation(expectation);
Logger classLogger = LogManager.getLogger(PersistedClusterStateService.class); Logger classLogger = LogManager.getLogger(PersistedClusterStateService.class);
Loggers.addAppender(classLogger, mockAppender); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(classLogger)) {
mockAppender.addExpectation(expectation);
try {
if (previousState == null) { if (previousState == null) {
writer.writeFullStateAndCommit(currentTerm, clusterState); writer.writeFullStateAndCommit(currentTerm, clusterState);
} else { } else {
writer.writeIncrementalStateAndCommit(currentTerm, previousState, clusterState); writer.writeIncrementalStateAndCommit(currentTerm, previousState, clusterState);
} }
} finally { mockAppender.assertAllExpectationsMatched();
Loggers.removeAppender(classLogger, mockAppender);
mockAppender.stop();
} }
mockAppender.assertAllExpectationsMatched();
} }
@Override @Override

View File

@ -37,7 +37,6 @@ import org.apache.logging.log4j.LogManager;
import org.opensearch.common.UUIDs; import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkService; import org.opensearch.common.network.NetworkService;
import org.opensearch.common.network.NetworkUtils; import org.opensearch.common.network.NetworkUtils;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
@ -268,12 +267,8 @@ public class AbstractHttpServerTransportTests extends OpenSearchTestCase {
.put(HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE.getKey(), excludeSettings) .put(HttpTransportSettings.SETTING_HTTP_TRACE_LOG_EXCLUDE.getKey(), excludeSettings)
.build() .build()
); );
MockLogAppender appender = new MockLogAppender();
final String traceLoggerName = "org.opensearch.http.HttpTracer"; final String traceLoggerName = "org.opensearch.http.HttpTracer";
try { try (MockLogAppender appender = MockLogAppender.createForLoggers(LogManager.getLogger(traceLoggerName))) {
appender.start();
Loggers.addAppender(LogManager.getLogger(traceLoggerName), appender);
final String opaqueId = UUIDs.randomBase64UUID(random()); final String opaqueId = UUIDs.randomBase64UUID(random());
appender.addExpectation( appender.addExpectation(
new MockLogAppender.PatternSeenEventExpectation( new MockLogAppender.PatternSeenEventExpectation(
@ -342,9 +337,6 @@ public class AbstractHttpServerTransportTests extends OpenSearchTestCase {
transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel()); transport.incomingRequest(fakeRestRequestExcludedPath.getHttpRequest(), fakeRestRequestExcludedPath.getHttpChannel());
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(LogManager.getLogger(traceLoggerName), appender);
appender.stop();
} }
} }
} }

View File

@ -34,7 +34,6 @@ package org.opensearch.ingest;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.opensearch.OpenSearchParseException; import org.opensearch.OpenSearchParseException;
import org.opensearch.ResourceNotFoundException; import org.opensearch.ResourceNotFoundException;
@ -59,7 +58,6 @@ import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService; import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentBuilder;
@ -544,24 +542,17 @@ public class IngestServiceTests extends OpenSearchTestCase {
); );
ClusterState previousClusterState = clusterState; ClusterState previousClusterState = clusterState;
clusterState = IngestService.innerPut(putRequest, clusterState); clusterState = IngestService.innerPut(putRequest, clusterState);
MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(IngestService.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "test1",
"test1", IngestService.class.getCanonicalName(),
IngestService.class.getCanonicalName(), Level.WARN,
Level.WARN, "failed to update ingest pipelines"
"failed to update ingest pipelines" )
) );
);
Logger ingestLogger = LogManager.getLogger(IngestService.class);
Loggers.addAppender(ingestLogger, mockAppender);
try {
ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState));
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(ingestLogger, mockAppender);
mockAppender.stop();
} }
pipeline = ingestService.getPipeline(id); pipeline = ingestService.getPipeline(id);
assertNotNull(pipeline); assertNotNull(pipeline);

View File

@ -34,13 +34,11 @@ package org.opensearch.monitor.fs;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.opensearch.cluster.coordination.DeterministicTaskQueue; import org.opensearch.cluster.coordination.DeterministicTaskQueue;
import org.opensearch.common.io.PathUtils; import org.opensearch.common.io.PathUtils;
import org.opensearch.common.io.PathUtilsForTesting; import org.opensearch.common.io.PathUtilsForTesting;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.env.NodeEnvironment; import org.opensearch.env.NodeEnvironment;
@ -158,12 +156,10 @@ public class FsHealthServiceTests extends OpenSearchTestCase {
PathUtilsForTesting.installMock(fileSystem); PathUtilsForTesting.installMock(fileSystem);
final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
MockLogAppender mockAppender = new MockLogAppender(); try (
mockAppender.start(); MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(FsHealthService.class));
NodeEnvironment env = newNodeEnvironment()
Logger logger = LogManager.getLogger(FsHealthService.class); ) {
Loggers.addAppender(logger, mockAppender);
try (NodeEnvironment env = newNodeEnvironment()) {
FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env); FsHealthService fsHealthService = new FsHealthService(settings, clusterSettings, testThreadPool, env);
int counter = 0; int counter = 0;
for (Path path : env.nodeDataPaths()) { for (Path path : env.nodeDataPaths()) {
@ -183,8 +179,6 @@ public class FsHealthServiceTests extends OpenSearchTestCase {
assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount()); assertEquals(env.nodeDataPaths().length, disruptFileSystemProvider.getInjectedPathCount());
assertBusy(mockAppender::assertAllExpectationsMatched); assertBusy(mockAppender::assertAllExpectationsMatched);
} finally { } finally {
Loggers.removeAppender(logger, mockAppender);
mockAppender.stop();
PathUtilsForTesting.teardown(); PathUtilsForTesting.teardown();
ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS); ThreadPool.terminate(testThreadPool, 500, TimeUnit.MILLISECONDS);
} }

View File

@ -32,9 +32,7 @@
package org.opensearch.plugins; package org.opensearch.plugins;
import org.opensearch.common.logging.Loggers;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
@ -737,45 +735,43 @@ public class PluginsServiceTests extends OpenSearchTestCase {
public void testFindPluginDirs() throws Exception { public void testFindPluginDirs() throws Exception {
final Path plugins = createTempDir(); final Path plugins = createTempDir();
final MockLogAppender mockLogAppender = new MockLogAppender(); try (MockLogAppender mockLogAppender = MockLogAppender.createForLoggers(LogManager.getLogger(PluginsService.class))) {
mockLogAppender.start(); mockLogAppender.addExpectation(
mockLogAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "[.test] warning",
"[.test] warning", "org.opensearch.plugins.PluginsService",
"org.opensearch.plugins.PluginsService", Level.WARN,
Level.WARN, "Non-plugin file located in the plugins folder with the following name: [.DS_Store]"
"Non-plugin file located in the plugins folder with the following name: [.DS_Store]" )
) );
);
final Logger testLogger = LogManager.getLogger(PluginsService.class);
Loggers.addAppender(testLogger, mockLogAppender);
final Path fake = plugins.resolve("fake"); final Path fake = plugins.resolve("fake");
Path testFile = plugins.resolve(".DS_Store"); Path testFile = plugins.resolve(".DS_Store");
Files.createFile(testFile); Files.createFile(testFile);
PluginTestUtil.writePluginProperties( PluginTestUtil.writePluginProperties(
fake, fake,
"description", "description",
"description", "description",
"name", "name",
"fake", "fake",
"version", "version",
"1.0.0", "1.0.0",
"opensearch.version", "opensearch.version",
Version.CURRENT.toString(), Version.CURRENT.toString(),
"java.version", "java.version",
System.getProperty("java.specification.version"), System.getProperty("java.specification.version"),
"classname", "classname",
"test.DummyPlugin" "test.DummyPlugin"
); );
try (InputStream jar = PluginsServiceTests.class.getResourceAsStream("dummy-plugin.jar")) { try (InputStream jar = PluginsServiceTests.class.getResourceAsStream("dummy-plugin.jar")) {
Files.copy(jar, fake.resolve("plugin.jar")); Files.copy(jar, fake.resolve("plugin.jar"));
}
assertThat(PluginsService.findPluginDirs(plugins), containsInAnyOrder(fake));
mockLogAppender.assertAllExpectationsMatched();
} }
assertThat(PluginsService.findPluginDirs(plugins), containsInAnyOrder(fake));
mockLogAppender.assertAllExpectationsMatched();
} }
public void testExistingMandatoryClasspathPlugin() { public void testExistingMandatoryClasspathPlugin() {

View File

@ -34,7 +34,6 @@ package org.opensearch.transport;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchException;
import org.opensearch.Version; import org.opensearch.Version;
import org.opensearch.action.ActionListener; import org.opensearch.action.ActionListener;
@ -46,7 +45,6 @@ import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.InputStreamStreamInput; import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue; import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays; import org.opensearch.common.util.BigArrays;
@ -260,20 +258,16 @@ public class InboundHandlerTests extends OpenSearchTestCase {
// response so we must just close the connection on an error. To avoid the failure disappearing into a black hole we at least log // response so we must just close the connection on an error. To avoid the failure disappearing into a black hole we at least log
// it. // it.
final MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(InboundHandler.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "expected message",
"expected message", InboundHandler.class.getCanonicalName(),
InboundHandler.class.getCanonicalName(), Level.WARN,
Level.WARN, "could not send error response to handshake"
"could not send error response to handshake" )
) );
);
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Loggers.addAppender(inboundHandlerLogger, mockAppender);
try {
final AtomicBoolean isClosed = new AtomicBoolean(); final AtomicBoolean isClosed = new AtomicBoolean();
channel.addCloseListener(ActionListener.wrap(() -> assertTrue(isClosed.compareAndSet(false, true)))); channel.addCloseListener(ActionListener.wrap(() -> assertTrue(isClosed.compareAndSet(false, true))));
@ -293,28 +287,21 @@ public class InboundHandlerTests extends OpenSearchTestCase {
assertTrue(isClosed.get()); assertTrue(isClosed.get());
assertNull(channel.getMessageCaptor().get()); assertNull(channel.getMessageCaptor().get());
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
mockAppender.stop();
} }
} }
public void testLogsSlowInboundProcessing() throws Exception { public void testLogsSlowInboundProcessing() throws Exception {
final MockLogAppender mockAppender = new MockLogAppender(); try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(InboundHandler.class))) {
mockAppender.start(); mockAppender.addExpectation(
mockAppender.addExpectation( new MockLogAppender.SeenEventExpectation(
new MockLogAppender.SeenEventExpectation( "expected message",
"expected message", InboundHandler.class.getCanonicalName(),
InboundHandler.class.getCanonicalName(), Level.WARN,
Level.WARN, "handling inbound transport message "
"handling inbound transport message " )
) );
);
final Logger inboundHandlerLogger = LogManager.getLogger(InboundHandler.class);
Loggers.addAppender(inboundHandlerLogger, mockAppender);
handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L)); handler.setSlowLogThreshold(TimeValue.timeValueMillis(5L));
try {
final Version remoteVersion = Version.CURRENT; final Version remoteVersion = Version.CURRENT;
final long requestId = randomNonNegativeLong(); final long requestId = randomNonNegativeLong();
final Header requestHeader = new Header( final Header requestHeader = new Header(
@ -336,9 +323,6 @@ public class InboundHandlerTests extends OpenSearchTestCase {
handler.inboundMessage(channel, requestMessage); handler.inboundMessage(channel, requestMessage);
assertNotNull(channel.getMessageCaptor().get()); assertNotNull(channel.getMessageCaptor().get());
mockAppender.assertAllExpectationsMatched(); mockAppender.assertAllExpectationsMatched();
} finally {
Loggers.removeAppender(inboundHandlerLogger, mockAppender);
mockAppender.stop();
} }
} }

View File

@ -40,7 +40,6 @@ import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.component.Lifecycle; import org.opensearch.common.component.Lifecycle;
import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.NetworkService; import org.opensearch.common.network.NetworkService;
import org.opensearch.common.network.NetworkUtils; import org.opensearch.common.network.NetworkUtils;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
@ -524,12 +523,7 @@ public class TcpTransportTests extends OpenSearchTestCase {
MockLogAppender.LoggingExpectation... expectations MockLogAppender.LoggingExpectation... expectations
) throws IllegalAccessException { ) throws IllegalAccessException {
final TestThreadPool testThreadPool = new TestThreadPool("test"); final TestThreadPool testThreadPool = new TestThreadPool("test");
MockLogAppender appender = new MockLogAppender(); try (MockLogAppender appender = MockLogAppender.createForLoggers(LogManager.getLogger(TcpTransport.class))) {
try {
appender.start();
Loggers.addAppender(LogManager.getLogger(TcpTransport.class), appender);
for (MockLogAppender.LoggingExpectation expectation : expectations) { for (MockLogAppender.LoggingExpectation expectation : expectations) {
appender.addExpectation(expectation); appender.addExpectation(expectation);
} }
@ -568,8 +562,6 @@ public class TcpTransportTests extends OpenSearchTestCase {
appender.assertAllExpectationsMatched(); appender.assertAllExpectationsMatched();
} finally { } finally {
Loggers.removeAppender(LogManager.getLogger(TcpTransport.class), appender);
appender.stop();
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
} }
} }

View File

@ -38,7 +38,6 @@ import org.opensearch.action.admin.cluster.stats.ClusterStatsAction;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest; import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest;
import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.OpenSearchTestCase;
@ -51,57 +50,44 @@ import static org.mockito.Mockito.mock;
@TestLogging(value = "org.opensearch.transport.TransportLogger:trace", reason = "to ensure we log network events on TRACE level") @TestLogging(value = "org.opensearch.transport.TransportLogger:trace", reason = "to ensure we log network events on TRACE level")
public class TransportLoggerTests extends OpenSearchTestCase { public class TransportLoggerTests extends OpenSearchTestCase {
public void testLoggingHandler() throws Exception {
try (MockLogAppender appender = MockLogAppender.createForLoggers(LogManager.getLogger(TransportLogger.class))) {
final String writePattern = ".*\\[length: \\d+"
+ ", request id: \\d+"
+ ", type: request"
+ ", version: .*"
+ ", header size: \\d+B"
+ ", action: cluster:monitor/stats]"
+ " WRITE: \\d+B";
final MockLogAppender.LoggingExpectation writeExpectation = new MockLogAppender.PatternSeenEventExpectation(
"hot threads request",
TransportLogger.class.getCanonicalName(),
Level.TRACE,
writePattern
);
private MockLogAppender appender; final String readPattern = ".*\\[length: \\d+"
+ ", request id: \\d+"
+ ", type: request"
+ ", version: .*"
+ ", header size: \\d+B"
+ ", action: cluster:monitor/stats]"
+ " READ: \\d+B";
public void setUp() throws Exception { final MockLogAppender.LoggingExpectation readExpectation = new MockLogAppender.PatternSeenEventExpectation(
super.setUp(); "cluster monitor request",
appender = MockLogAppender.createStarted(); TransportLogger.class.getCanonicalName(),
Loggers.addAppender(LogManager.getLogger(TransportLogger.class), appender); Level.TRACE,
} readPattern
);
public void tearDown() throws Exception { appender.addExpectation(writeExpectation);
Loggers.removeAppender(LogManager.getLogger(TransportLogger.class), appender); appender.addExpectation(readExpectation);
appender.stop(); BytesReference bytesReference = buildRequest();
super.tearDown(); TransportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6));
} TransportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference);
appender.assertAllExpectationsMatched();
public void testLoggingHandler() throws IOException { }
final String writePattern = ".*\\[length: \\d+"
+ ", request id: \\d+"
+ ", type: request"
+ ", version: .*"
+ ", header size: \\d+B"
+ ", action: cluster:monitor/stats]"
+ " WRITE: \\d+B";
final MockLogAppender.LoggingExpectation writeExpectation = new MockLogAppender.PatternSeenEventExpectation(
"hot threads request",
TransportLogger.class.getCanonicalName(),
Level.TRACE,
writePattern
);
final String readPattern = ".*\\[length: \\d+"
+ ", request id: \\d+"
+ ", type: request"
+ ", version: .*"
+ ", header size: \\d+B"
+ ", action: cluster:monitor/stats]"
+ " READ: \\d+B";
final MockLogAppender.LoggingExpectation readExpectation = new MockLogAppender.PatternSeenEventExpectation(
"cluster monitor request",
TransportLogger.class.getCanonicalName(),
Level.TRACE,
readPattern
);
appender.addExpectation(writeExpectation);
appender.addExpectation(readExpectation);
BytesReference bytesReference = buildRequest();
TransportLogger.logInboundMessage(mock(TcpChannel.class), bytesReference.slice(6, bytesReference.length() - 6));
TransportLogger.logOutboundMessage(mock(TcpChannel.class), bytesReference);
appender.assertAllExpectationsMatched();
} }
private BytesReference buildRequest() throws IOException { private BytesReference buildRequest() throws IOException {

View File

@ -32,11 +32,15 @@
package org.opensearch.test; package org.opensearch.test;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender; import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.filter.RegexFilter; import org.apache.logging.log4j.core.filter.RegexFilter;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.regex.Regex; import org.opensearch.common.regex.Regex;
import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -47,32 +51,43 @@ import static org.hamcrest.MatcherAssert.assertThat;
/** /**
* Test appender that can be used to verify that certain events were logged correctly * Test appender that can be used to verify that certain events were logged correctly
*/ */
public class MockLogAppender extends AbstractAppender { public class MockLogAppender extends AbstractAppender implements AutoCloseable {
private static final String COMMON_PREFIX = System.getProperty("opensearch.logger.prefix", "org.opensearch."); private static final String COMMON_PREFIX = System.getProperty("opensearch.logger.prefix", "org.opensearch.");
private final List<LoggingExpectation> expectations; private final List<LoggingExpectation> expectations;
private final List<Logger> loggers;
/** /**
* Creates and starts a MockLogAppender. Generally preferred over using the constructor * Creates an instance and adds it as an appender to the given Loggers. Upon
* directly because adding an unstarted appender to the static logging context can cause * closure, this instance will then remove itself from the Loggers it was added
* difficult-to-identify errors in the tests and this method makes it impossible to do * to. It is strongly recommended to use this class in a try-with-resources block
* that. * to guarantee that it is properly removed from all Loggers. Since the logging
* state is static and therefore global within a JVM, it can cause unrelated
* tests to fail if, for example, they trigger a logging statement that tried to
* write to a closed MockLogAppender instance.
*/ */
public static MockLogAppender createStarted() throws IllegalAccessException { public static MockLogAppender createForLoggers(Logger... loggers) throws IllegalAccessException {
final MockLogAppender appender = new MockLogAppender(); final MockLogAppender appender = new MockLogAppender(
RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null),
Collections.unmodifiableList(Arrays.asList(loggers))
);
appender.start(); appender.start();
for (Logger logger : loggers) {
Loggers.addAppender(logger, appender);
}
return appender; return appender;
} }
public MockLogAppender() throws IllegalAccessException { private MockLogAppender(RegexFilter filter, List<Logger> loggers) {
super("mock", RegexFilter.createFilter(".*(\n.*)*", new String[0], false, null, null), null); super("mock", filter, null);
/* /*
* We use a copy-on-write array list since log messages could be appended while we are setting up expectations. When that occurs, * We use a copy-on-write array list since log messages could be appended while we are setting up expectations. When that occurs,
* we would run into a concurrent modification exception from the iteration over the expectations in #append, concurrent with a * we would run into a concurrent modification exception from the iteration over the expectations in #append, concurrent with a
* modification from #addExpectation. * modification from #addExpectation.
*/ */
expectations = new CopyOnWriteArrayList<>(); this.expectations = new CopyOnWriteArrayList<>();
this.loggers = loggers;
} }
public void addExpectation(LoggingExpectation expectation) { public void addExpectation(LoggingExpectation expectation) {
@ -92,6 +107,14 @@ public class MockLogAppender extends AbstractAppender {
} }
} }
@Override
public void close() {
for (Logger logger : loggers) {
Loggers.removeAppender(logger, this);
}
this.stop();
}
public interface LoggingExpectation { public interface LoggingExpectation {
void match(LogEvent event); void match(LogEvent event);

View File

@ -34,6 +34,7 @@ package org.opensearch.transport;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
@ -51,7 +52,6 @@ import org.opensearch.common.SuppressForbidden;
import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.network.CloseableChannel; import org.opensearch.common.network.CloseableChannel;
import org.opensearch.common.network.NetworkAddress; import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.network.NetworkUtils; import org.opensearch.common.network.NetworkUtils;
@ -1202,10 +1202,8 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
.build() .build()
); );
MockLogAppender appender = new MockLogAppender(); final Logger logger = LogManager.getLogger("org.opensearch.transport.TransportService.tracer");
try { try (MockLogAppender appender = MockLogAppender.createForLoggers(logger)) {
appender.start();
Loggers.addAppender(LogManager.getLogger("org.opensearch.transport.TransportService.tracer"), appender);
final String requestSent = ".*\\[internal:test].*sent to.*\\{TS_B}.*"; final String requestSent = ".*\\[internal:test].*sent to.*\\{TS_B}.*";
final MockLogAppender.LoggingExpectation requestSentExpectation = new MockLogAppender.PatternSeenEventExpectation( final MockLogAppender.LoggingExpectation requestSentExpectation = new MockLogAppender.PatternSeenEventExpectation(
"sent request", "sent request",
@ -1291,9 +1289,6 @@ public abstract class AbstractSimpleTransportTestCase extends OpenSearchTestCase
future.txGet(); future.txGet();
assertBusy(appender::assertAllExpectationsMatched); assertBusy(appender::assertAllExpectationsMatched);
} finally {
Loggers.removeAppender(LogManager.getLogger("org.opensearch.transport.TransportService.tracer"), appender);
appender.stop();
} }
} }

View File

@ -35,7 +35,6 @@ package org.opensearch.transport.nio;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.opensearch.common.CheckedRunnable; import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.Settings;
import org.opensearch.nio.ServerChannelContext; import org.opensearch.nio.ServerChannelContext;
import org.opensearch.nio.SocketChannelContext; import org.opensearch.nio.SocketChannelContext;
@ -54,73 +53,60 @@ import static org.mockito.Mockito.mock;
public class TestEventHandlerTests extends OpenSearchTestCase { public class TestEventHandlerTests extends OpenSearchTestCase {
private MockLogAppender appender;
public void setUp() throws Exception {
super.setUp();
appender = new MockLogAppender();
Loggers.addAppender(LogManager.getLogger(MockNioTransport.class), appender);
appender.start();
}
public void tearDown() throws Exception {
Loggers.removeAppender(LogManager.getLogger(MockNioTransport.class), appender);
appender.stop();
super.tearDown();
}
public void testLogOnElapsedTime() throws Exception { public void testLogOnElapsedTime() throws Exception {
long start = System.nanoTime(); try (MockLogAppender appender = MockLogAppender.createForLoggers(LogManager.getLogger(MockNioTransport.class))) {
long end = start + TimeUnit.MILLISECONDS.toNanos(400); long start = System.nanoTime();
AtomicBoolean isStart = new AtomicBoolean(true); long end = start + TimeUnit.MILLISECONDS.toNanos(400);
LongSupplier timeSupplier = () -> { AtomicBoolean isStart = new AtomicBoolean(true);
if (isStart.compareAndSet(true, false)) { LongSupplier timeSupplier = () -> {
return start; if (isStart.compareAndSet(true, false)) {
} else if (isStart.compareAndSet(false, true)) { return start;
return end; } else if (isStart.compareAndSet(false, true)) {
} return end;
throw new IllegalStateException("Cannot update isStart"); }
}; throw new IllegalStateException("Cannot update isStart");
final ThreadPool threadPool = mock(ThreadPool.class); };
doAnswer(i -> timeSupplier.getAsLong()).when(threadPool).relativeTimeInNanos(); final ThreadPool threadPool = mock(ThreadPool.class);
TestEventHandler eventHandler = new TestEventHandler( doAnswer(i -> timeSupplier.getAsLong()).when(threadPool).relativeTimeInNanos();
e -> {}, TestEventHandler eventHandler = new TestEventHandler(
() -> null, e -> {},
new MockNioTransport.TransportThreadWatchdog(threadPool, Settings.EMPTY) () -> null,
); new MockNioTransport.TransportThreadWatchdog(threadPool, Settings.EMPTY)
ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
SocketChannelContext socketChannelContext = mock(SocketChannelContext.class);
RuntimeException exception = new RuntimeException("boom");
Map<String, CheckedRunnable<Exception>> tests = new HashMap<>();
tests.put("acceptChannel", () -> eventHandler.acceptChannel(serverChannelContext));
tests.put("acceptException", () -> eventHandler.acceptException(serverChannelContext, exception));
tests.put("registrationException", () -> eventHandler.registrationException(socketChannelContext, exception));
tests.put("handleConnect", () -> eventHandler.handleConnect(socketChannelContext));
tests.put("connectException", () -> eventHandler.connectException(socketChannelContext, exception));
tests.put("handleRead", () -> eventHandler.handleRead(socketChannelContext));
tests.put("readException", () -> eventHandler.readException(socketChannelContext, exception));
tests.put("handleWrite", () -> eventHandler.handleWrite(socketChannelContext));
tests.put("writeException", () -> eventHandler.writeException(socketChannelContext, exception));
tests.put("handleTask", () -> eventHandler.handleTask(mock(Runnable.class)));
tests.put("taskException", () -> eventHandler.taskException(exception));
tests.put("handleClose", () -> eventHandler.handleClose(socketChannelContext));
tests.put("closeException", () -> eventHandler.closeException(socketChannelContext, exception));
tests.put("genericChannelException", () -> eventHandler.genericChannelException(socketChannelContext, exception));
for (Map.Entry<String, CheckedRunnable<Exception>> entry : tests.entrySet()) {
String message = "*Slow execution on network thread*";
MockLogAppender.LoggingExpectation slowExpectation = new MockLogAppender.SeenEventExpectation(
entry.getKey(),
MockNioTransport.class.getCanonicalName(),
Level.WARN,
message
); );
appender.addExpectation(slowExpectation);
entry.getValue().run(); ServerChannelContext serverChannelContext = mock(ServerChannelContext.class);
appender.assertAllExpectationsMatched(); SocketChannelContext socketChannelContext = mock(SocketChannelContext.class);
RuntimeException exception = new RuntimeException("boom");
Map<String, CheckedRunnable<Exception>> tests = new HashMap<>();
tests.put("acceptChannel", () -> eventHandler.acceptChannel(serverChannelContext));
tests.put("acceptException", () -> eventHandler.acceptException(serverChannelContext, exception));
tests.put("registrationException", () -> eventHandler.registrationException(socketChannelContext, exception));
tests.put("handleConnect", () -> eventHandler.handleConnect(socketChannelContext));
tests.put("connectException", () -> eventHandler.connectException(socketChannelContext, exception));
tests.put("handleRead", () -> eventHandler.handleRead(socketChannelContext));
tests.put("readException", () -> eventHandler.readException(socketChannelContext, exception));
tests.put("handleWrite", () -> eventHandler.handleWrite(socketChannelContext));
tests.put("writeException", () -> eventHandler.writeException(socketChannelContext, exception));
tests.put("handleTask", () -> eventHandler.handleTask(mock(Runnable.class)));
tests.put("taskException", () -> eventHandler.taskException(exception));
tests.put("handleClose", () -> eventHandler.handleClose(socketChannelContext));
tests.put("closeException", () -> eventHandler.closeException(socketChannelContext, exception));
tests.put("genericChannelException", () -> eventHandler.genericChannelException(socketChannelContext, exception));
for (Map.Entry<String, CheckedRunnable<Exception>> entry : tests.entrySet()) {
String message = "*Slow execution on network thread*";
MockLogAppender.LoggingExpectation slowExpectation = new MockLogAppender.SeenEventExpectation(
entry.getKey(),
MockNioTransport.class.getCanonicalName(),
Level.WARN,
message
);
appender.addExpectation(slowExpectation);
entry.getValue().run();
appender.assertAllExpectationsMatched();
}
} }
} }
} }