diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 9e6e4018a2f..bdbcacb8fd1 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -31,17 +31,15 @@ import com.metamx.common.logger.Logger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.collections.StupidPool; -import io.druid.concurrent.Execs; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; import io.druid.query.MetricsEmittingExecutorService; import io.druid.query.PrioritizedExecutorService; import io.druid.server.DruidProcessingConfig; +import io.druid.server.VMUtils; -import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; /** @@ -82,37 +80,26 @@ public class DruidProcessingModule implements Module public StupidPool getIntermediateResultsPool(DruidProcessingConfig config) { try { - Class vmClass = Class.forName("sun.misc.VM"); - Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); + long maxDirectMemory = VMUtils.getMaxDirectMemory(); - if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { - log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj); - } else { - long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue(); - - final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1); - if (maxDirectMemory < memoryNeeded) { - throw new ProvisionException( - String.format( - "Not enough direct memory. Please adjust -XX:MaxDirectMemorySize or druid.computation.buffer.size: " - + "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]", - maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads() - ) - ); - } + final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1); + if (maxDirectMemory < memoryNeeded) { + throw new ProvisionException( + String.format( + "Not enough direct memory. Please adjust -XX:MaxDirectMemorySize, druid.computation.buffer.size, or druid.processing.numThreads: " + + "maxDirectMemory[%,d], memoryNeeded[%,d] = druid.computation.buffer.size[%,d] * ( druid.processing.numThreads[%,d] + 1 )", + maxDirectMemory, + memoryNeeded, + config.intermediateComputeSizeBytes(), + config.getNumThreads() + ) + ); } + } catch(UnsupportedOperationException e) { + log.info(e.getMessage()); } - catch (ClassNotFoundException e) { - log.info("No VM class, cannot do memory check."); - } - catch (NoSuchMethodException e) { - log.info("VM.maxDirectMemory doesn't exist, cannot do memory check."); - } - catch (InvocationTargetException e) { - log.warn(e, "static method shouldn't throw this"); - } - catch (IllegalAccessException e) { - log.warn(e, "public method, shouldn't throw this"); + catch(RuntimeException e) { + log.warn(e, e.getMessage()); } return new IntermediateProcessingBufferPool(config.intermediateComputeSizeBytes()); diff --git a/server/src/main/java/io/druid/server/DruidProcessingConfig.java b/server/src/main/java/io/druid/server/DruidProcessingConfig.java index ada4eef609d..af596f2da02 100644 --- a/server/src/main/java/io/druid/server/DruidProcessingConfig.java +++ b/server/src/main/java/io/druid/server/DruidProcessingConfig.java @@ -31,4 +31,12 @@ public abstract class DruidProcessingConfig extends ExecutorServiceConfig { return 1024 * 1024 * 1024; } + + @Override @Config(value = "${base_path}.numThreads") + public int getNumThreads() + { + // default to leaving one core for background tasks + final int processors = Runtime.getRuntime().availableProcessors(); + return processors > 1 ? processors - 1 : processors; + } } diff --git a/server/src/main/java/io/druid/server/VMUtils.java b/server/src/main/java/io/druid/server/VMUtils.java new file mode 100644 index 00000000000..bc694f59355 --- /dev/null +++ b/server/src/main/java/io/druid/server/VMUtils.java @@ -0,0 +1,51 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013, 2014 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.server; + +import java.lang.reflect.InvocationTargetException; + +public class VMUtils +{ + public static long getMaxDirectMemory() throws UnsupportedOperationException + { + try { + Class vmClass = Class.forName("sun.misc.VM"); + Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); + + if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { + throw new UnsupportedOperationException(String.format("Cannot determine maxDirectMemory from [%s]", maxDirectMemoryObj)); + } else { + return ((Number) maxDirectMemoryObj).longValue(); + } + } + catch (ClassNotFoundException e) { + throw new UnsupportedOperationException("No VM class, cannot do memory check.", e); + } + catch (NoSuchMethodException e) { + throw new UnsupportedOperationException("VM.maxDirectMemory doesn't exist, cannot do memory check.", e); + } + catch (InvocationTargetException e) { + throw new RuntimeException("static method shouldn't throw this", e); + } + catch (IllegalAccessException e) { + throw new RuntimeException("public method, shouldn't throw this", e); + } + } +} diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index aec1ee56867..fa5e40df923 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -42,7 +42,7 @@ import java.util.concurrent.Executor; */ @Command( name = "realtime", - description = "Runs a standalone realtime node for examples, see http://druid.io/docs/0.6.73/Realtime.html for a description" + description = "Runs a standalone realtime node for examples, see http://druid.io/docs/latest/Realtime.html for a description" ) public class CliRealtimeExample extends ServerRunnable {