better thread naming

This commit is contained in:
Shay Banon 2012-06-23 18:35:42 +02:00
parent 3163499aef
commit 6fb836c25e
40 changed files with 100 additions and 264 deletions

View File

@ -224,7 +224,7 @@ public class Bootstrap {
// bail out
}
}
}, "es[keepAlive]");
}, "elasticsearch[keepAlive]");
keepAliveThread.setDaemon(false);
keepAliveThread.start();
} catch (Throwable e) {

View File

@ -57,8 +57,8 @@ import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.EnvironmentModule;
import org.elasticsearch.monitor.MonitorService;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.common;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import java.util.Arrays;

View File

@ -0,0 +1,37 @@
package org.elasticsearch.common.netty;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
/**
*/
public class NettyStaticSetup {
private static EsThreadNameDeterminer ES_THREAD_NAME_DETERMINER = new EsThreadNameDeterminer();
public static class EsThreadNameDeterminer implements ThreadNameDeterminer {
@Override
public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
// we control the thread name with a context, so use both
return currentThreadName + "{" + proposedThreadName + "}";
}
}
static {
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
@Override
public InternalLogger newInstance(String name) {
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
}
});
ThreadRenamingRunnable.setThreadNameDeterminer(ES_THREAD_NAME_DETERMINER);
}
public static void setup() {
}
}

View File

@ -1,61 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.thread;
/**
* Overrides the thread name proposed by {@link ThreadRenamingRunnable}.
*
*
*/
public interface ThreadNameDeterminer {
/**
* {@link ThreadNameDeterminer} that accepts the proposed thread name
* as is.
*/
ThreadNameDeterminer PROPOSED = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return proposedThreadName;
}
};
/**
* {@link ThreadNameDeterminer} that rejects the proposed thread name and
* retains the current one.
*/
ThreadNameDeterminer CURRENT = new ThreadNameDeterminer() {
public String determineThreadName(String currentThreadName,
String proposedThreadName) throws Exception {
return null;
}
};
/**
* Overrides the thread name proposed by {@link ThreadRenamingRunnable}.
*
* @param currentThreadName the current thread name
* @param proposedThreadName the proposed new thread name
* @return the actual new thread name.
* If {@code null} is returned, the proposed thread name is
* discarded (i.e. no rename).
*/
String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception;
}

View File

@ -1,122 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.thread;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
/**
* A {@link Runnable} that changes the current thread name and reverts it back
* when its execution ends. To change the default thread names set by Netty,
* use {@link #setThreadNameDeterminer(ThreadNameDeterminer)}.
*
*
*/
public class ThreadRenamingRunnable implements Runnable {
private static final ESLogger logger = Loggers.getLogger(ThreadRenamingRunnable.class);
private static volatile ThreadNameDeterminer threadNameDeterminer =
ThreadNameDeterminer.PROPOSED;
/**
* Returns the {@link ThreadNameDeterminer} which overrides the proposed
* new thread name.
*/
public static ThreadNameDeterminer getThreadNameDeterminer() {
return threadNameDeterminer;
}
/**
* Sets the {@link ThreadNameDeterminer} which overrides the proposed new
* thread name. Please note that the specified {@link ThreadNameDeterminer}
* affects only new {@link ThreadRenamingRunnable}s; the existing instances
* are not affected at all. Therefore, you should make sure to call this
* method at the earliest possible point (i.e. before any Netty worker
* thread starts) for consistent thread naming. Otherwise, you might see
* the default thread names and the new names appear at the same time in
* the full thread dump.
*/
public static void setThreadNameDeterminer(ThreadNameDeterminer threadNameDeterminer) {
if (threadNameDeterminer == null) {
throw new NullPointerException("threadNameDeterminer");
}
ThreadRenamingRunnable.threadNameDeterminer = threadNameDeterminer;
}
private final Runnable runnable;
private final String proposedThreadName;
/**
* Creates a new instance which wraps the specified {@code runnable}
* and changes the thread name to the specified thread name when the
* specified {@code runnable} is running.
*/
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName) {
if (runnable == null) {
throw new NullPointerException("runnable");
}
if (proposedThreadName == null) {
throw new NullPointerException("proposedThreadName");
}
this.runnable = runnable;
this.proposedThreadName = proposedThreadName;
}
public void run() {
final Thread currentThread = Thread.currentThread();
final String oldThreadName = currentThread.getName();
final String newThreadName = getNewThreadName(oldThreadName);
// Change the thread name before starting the actual runnable.
boolean renamed = false;
if (!oldThreadName.equals(newThreadName)) {
try {
currentThread.setName(newThreadName);
renamed = true;
} catch (SecurityException e) {
logger.debug("Failed to rename a thread due to security restriction.", e);
}
}
// Run the actual runnable and revert the name back when it ends.
try {
runnable.run();
} finally {
if (renamed) {
// Revert the name back if the current thread was renamed.
// We do not check the exception here because we know it works.
currentThread.setName(oldThreadName);
}
}
}
private String getNewThreadName(String currentThreadName) {
String newThreadName = null;
try {
newThreadName = getThreadNameDeterminer().determineThreadName(currentThreadName, proposedThreadName);
} catch (Throwable t) {
logger.warn("Failed to determine the thread name", t);
}
return newThreadName == null ? currentThreadName : newThreadName;
}
}

View File

@ -23,6 +23,7 @@ import jsr166y.LinkedTransferQueue;
import org.elasticsearch.common.settings.Settings;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
@ -56,44 +57,37 @@ public class EsExecutors {
} else {
name = "elasticsearch[" + name + "]";
}
return name + namePrefix;
return name + "[" + namePrefix + "]";
}
public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return daemonThreadFactory(threadName(settings, namePrefix));
}
/**
* A priority based thread factory, for all Thread priority constants:
* <tt>Thread.MIN_PRIORITY, Thread.NORM_PRIORITY, Thread.MAX_PRIORITY</tt>;
* <p/>
* This factory is used instead of Executers.DefaultThreadFactory to allow
* manipulation of priority and thread owner name.
*
* @param namePrefix a name prefix for this thread
* @return a thread factory based on given priority.
*/
public static ThreadFactory daemonThreadFactory(String namePrefix) {
final ThreadFactory f = java.util.concurrent.Executors.defaultThreadFactory();
final String o = namePrefix + "-";
return new EsThreadFactory(namePrefix);
}
return new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = f.newThread(r);
static class EsThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
/*
* Thread name: owner-pool-N-thread-M, where N is the sequence
* number of this factory, and M is the sequence number of the
* thread created by this factory.
*/
t.setName(o + t.getName());
public EsThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
/* override default definition t.setDaemon(false); */
t.setDaemon(true);
return t;
}
};
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}
}
/**

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.common.thread;
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.logging.ESLogger;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.common.thread;
package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;

View File

@ -22,6 +22,7 @@ package org.elasticsearch.http.netty;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.netty.NettyStaticSetup;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
@ -35,15 +36,12 @@ import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.*;
import org.elasticsearch.http.HttpRequest;
import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.netty.NettyInternalESLoggerFactory;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
@ -60,12 +58,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpServerTransport> implements HttpServerTransport {
static {
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
@Override
public InternalLogger newInstance(String name) {
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
}
});
NettyStaticSetup.setup();
}
private final NetworkService networkService;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import java.io.IOException;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.bytes;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.bytes;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.doubles;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.doubles;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.floats;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.floats;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.ints;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.ints;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -23,7 +23,7 @@ import gnu.trove.list.array.TLongArrayList;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.FieldCache;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.NumericFieldData;
import org.elasticsearch.index.field.data.support.FieldDataLoader;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.longs;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
import org.joda.time.DateTimeZone;
import org.joda.time.MutableDateTime;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.longs;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
import org.joda.time.DateTimeZone;
import org.joda.time.MutableDateTime;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.shorts;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.field.data.shorts;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
/**

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.field.data.strings;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*

View File

@ -21,7 +21,7 @@ package org.elasticsearch.index.field.data.strings;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*

View File

@ -22,8 +22,8 @@ package org.elasticsearch.index.mapper.geo;
import gnu.trove.list.array.TDoubleArrayList;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.support.FieldDataLoader;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.mapper.geo;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
import org.elasticsearch.index.search.geo.GeoHashUtils;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.mapper.geo;
import org.elasticsearch.common.RamUsage;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.field.data.doubles.DoubleFieldData;
import org.elasticsearch.index.search.geo.GeoHashUtils;

View File

@ -24,6 +24,7 @@ import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
import org.elasticsearch.index.settings.IndexSettings;
@ -111,7 +112,7 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
@Override
protected MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
MergeThread thread = super.getMergeThread(writer, merge);
thread.setName("[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName());
thread.setName(EsExecutors.threadName(provider.indexSettings(), "[" + shardId.index().name() + "][" + shardId.id() + "]: " + thread.getName()));
return thread;
}

View File

@ -46,7 +46,7 @@ import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.DiscoveryService;
import org.elasticsearch.env.Environment;

View File

@ -20,7 +20,7 @@
package org.elasticsearch.rest;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
/**
*

View File

@ -20,7 +20,7 @@
package org.elasticsearch.rest;
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;

View File

@ -22,7 +22,7 @@ package org.elasticsearch.search.dfs;
import com.google.common.collect.ImmutableMap;
import gnu.trove.set.hash.THashSet;
import org.apache.lucene.index.Term;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchPhase;
import org.elasticsearch.search.internal.SearchContext;

View File

@ -29,7 +29,7 @@ import org.apache.lucene.search.Scorer;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldDataType;
import org.elasticsearch.index.field.data.longs.LongFieldData;

View File

@ -27,7 +27,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Scorer;
import org.elasticsearch.common.CacheRecycler;
import org.elasticsearch.common.collect.BoundedTreeSet;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.index.cache.field.data.FieldDataCache;
import org.elasticsearch.index.field.data.FieldData;
import org.elasticsearch.index.field.data.FieldDataType;

View File

@ -23,7 +23,7 @@ import com.google.common.collect.Iterators;
import gnu.trove.map.hash.TIntObjectHashMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.thread.ThreadLocals;
import org.elasticsearch.common.util.concurrent.ThreadLocals;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.search.SearchHit;

View File

@ -104,7 +104,7 @@ public class ThreadPool extends AbstractComponent {
executors.put(Names.SNAPSHOT, build(Names.SNAPSHOT, "scaling", groupSettings.get(Names.SNAPSHOT), settingsBuilder().put("keep_alive", "5m").put("size", 5).build()));
executors.put(Names.SAME, new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(Names.SAME, "same")));
this.executors = ImmutableMap.copyOf(executors);
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "[scheduler]"));
this.scheduler = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "scheduler"));
this.scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
this.scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
@ -216,7 +216,7 @@ public class ThreadPool extends AbstractComponent {
settings = ImmutableSettings.Builder.EMPTY_SETTINGS;
}
String type = settings.get("type", defaultType);
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, "[" + name + "]");
ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(this.settings, name);
if ("same".equals(type)) {
logger.debug("creating thread_pool [{}], type [{}]", name, type);
return new ExecutorHolder(MoreExecutors.sameThreadExecutor(), new Info(name, type));

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.netty.NettyStaticSetup;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
@ -51,8 +52,6 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.jboss.netty.logging.InternalLogger;
import org.jboss.netty.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
@ -81,12 +80,7 @@ import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadF
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
static {
InternalLoggerFactory.setDefaultFactory(new NettyInternalESLoggerFactory() {
@Override
public InternalLogger newInstance(String name) {
return super.newInstance(name.replace("org.jboss.netty.", "netty.").replace("org.jboss.netty.", "netty."));
}
});
NettyStaticSetup.setup();
}
private final NetworkService networkService;

View File

@ -20,10 +20,10 @@
package org.elasticsearch.test.unit.common.util.concurrent;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadBarrier;
import org.testng.annotations.Test;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -161,7 +161,7 @@ public class EsExecutorsTests {
}
});
assertThat("Should have thrown RejectedExecutionException", false, equalTo(true));
} catch (RejectedExecutionException e) {
} catch (EsRejectedExecutionException e) {
//caught expected exception
}