mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-28 02:48:38 +00:00
Logging: Use settings when building daemon threads (#32751)
Subclasses of `EsIntegTestCase` run multiple Elasticsearch nodes in the same JVM and when we log we look at the name of the thread to figure out the node name. This makes sure that all calls to `daemonThreadFactory` include the node name. Closes #32574 I'd like to follow this up with more drastic changes that make it impossible to do this incorrectly but that change is much larger than this and I'd like to get these log lines fixed up sooner rather than later.
This commit is contained in:
parent
0749b18181
commit
462e91d362
server/src/main/java/org/elasticsearch
x-pack/plugin
core/src/main/java/org/elasticsearch
rollup/src
main/java/org/elasticsearch/xpack/rollup
test/java/org/elasticsearch/xpack/rollup/job
@ -160,11 +160,13 @@ public class EsExecutors {
|
||||
if (Node.NODE_NAME_SETTING.exists(settings)) {
|
||||
return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix);
|
||||
} else {
|
||||
// TODO this should only be allowed in tests
|
||||
return threadName("", namePrefix);
|
||||
}
|
||||
}
|
||||
|
||||
public static String threadName(final String nodeName, final String namePrefix) {
|
||||
// TODO missing node names should only be allowed in tests
|
||||
return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]";
|
||||
}
|
||||
|
||||
|
@ -233,7 +233,7 @@ public class IndicesService extends AbstractLifecycleComponent
|
||||
|
||||
@Override
|
||||
protected void doStop() {
|
||||
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown"));
|
||||
ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown"));
|
||||
|
||||
// Copy indices because we modify it asynchronously in the body of the loop
|
||||
final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());
|
||||
|
@ -120,7 +120,7 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.clock = clock;
|
||||
this.scheduler = new SchedulerEngine(clock);
|
||||
this.scheduler = new SchedulerEngine(settings, clock);
|
||||
this.licenseState = licenseState;
|
||||
this.operationModeFileWatcher = new OperationModeFileWatcher(resourceWatcherService,
|
||||
XPackPlugin.resolveConfigFile(env, "license_mode"), logger,
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.scheduler;
|
||||
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.FutureUtils;
|
||||
@ -92,9 +93,9 @@ public class SchedulerEngine {
|
||||
private final Clock clock;
|
||||
private final List<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
public SchedulerEngine(Clock clock) {
|
||||
public SchedulerEngine(Settings settings, Clock clock) {
|
||||
this.clock = clock;
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler"));
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler"));
|
||||
}
|
||||
|
||||
public void register(Listener listener) {
|
||||
|
@ -194,7 +194,7 @@ public class Rollup extends Plugin implements ActionPlugin, PersistentTaskPlugin
|
||||
return emptyList();
|
||||
}
|
||||
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(getClock());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock());
|
||||
return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool));
|
||||
}
|
||||
|
||||
|
@ -11,6 +11,7 @@ import org.elasticsearch.action.search.ShardSearchFailure;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.persistent.PersistentTaskState;
|
||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
@ -47,6 +48,9 @@ import static org.mockito.Mockito.when;
|
||||
|
||||
public class RollupJobTaskTests extends ESTestCase {
|
||||
|
||||
private static final Settings SETTINGS = Settings.builder()
|
||||
.put(Node.NODE_NAME_SETTING.getKey(), "test")
|
||||
.build();
|
||||
private static ThreadPool pool = new TestThreadPool("test");
|
||||
|
||||
@AfterClass
|
||||
@ -62,7 +66,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
@ -75,7 +79,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
@ -88,7 +92,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
@ -101,7 +105,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
@ -114,7 +118,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false);
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
@ -128,7 +132,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true);
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
@ -141,7 +145,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
null, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
@ -154,7 +158,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED));
|
||||
@ -641,7 +645,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job,
|
||||
status, client, schedulerEngine, pool, Collections.emptyMap());
|
||||
assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED));
|
||||
@ -748,7 +752,7 @@ public class RollupJobTaskTests extends ESTestCase {
|
||||
RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean());
|
||||
Client client = mock(Client.class);
|
||||
when(client.settings()).thenReturn(Settings.EMPTY);
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC());
|
||||
SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC());
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(2);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user