From aa073e4e737ec32007b1445cb6ded5ef89f6bd90 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 14 Nov 2012 15:37:11 -0600 Subject: [PATCH] 1) Add verification of memory settings to initialization. (Fixes #21) --- .../java/com/metamx/druid/BaseServerNode.java | 33 +++++++++--- .../metamx/druid/DruidProcessingConfig.java | 34 +++++++++++++ .../com/metamx/druid/http/MasterMain.java | 13 +++-- .../druid/initialization/ServerInit.java | 50 +++++++++++++++++-- .../com/metamx/druid/master/DruidMaster.java | 2 +- .../druid/master/DruidMasterConfig.java | 5 +- 6 files changed, 121 insertions(+), 16 deletions(-) create mode 100644 server/src/main/java/com/metamx/druid/DruidProcessingConfig.java diff --git a/server/src/main/java/com/metamx/druid/BaseServerNode.java b/server/src/main/java/com/metamx/druid/BaseServerNode.java index 0813762c2d4..cdc7a68e4f6 100644 --- a/server/src/main/java/com/metamx/druid/BaseServerNode.java +++ b/server/src/main/java/com/metamx/druid/BaseServerNode.java @@ -20,6 +20,7 @@ package com.metamx.druid; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; @@ -28,7 +29,6 @@ import com.metamx.druid.initialization.ServerInit; import com.metamx.druid.query.DefaultQueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; -import com.metamx.druid.utils.PropUtils; import org.codehaus.jackson.map.ObjectMapper; import org.skife.config.ConfigurationObjectFactory; @@ -41,6 +41,7 @@ import java.util.Properties; public abstract class BaseServerNode extends QueryableNode { private final Map, QueryRunnerFactory> additionalFactories = Maps.newLinkedHashMap(); + private DruidProcessingConfig processingConfig = null; private QueryRunnerFactoryConglomerate conglomerate = null; private StupidPool computeScratchPool = null; @@ -68,6 +69,12 @@ public abstract class BaseServerNode extends QueryableN return computeScratchPool; } + public DruidProcessingConfig getProcessingConfig() + { + initializeProcessingConfig(); + return processingConfig; + } + @SuppressWarnings("unchecked") public T setConglomerate(QueryRunnerFactoryConglomerate conglomerate) { @@ -82,6 +89,13 @@ public abstract class BaseServerNode extends QueryableN return (T) this; } + @SuppressWarnings("unchecked") + public T setProcessingConfig(DruidProcessingConfig processingConfig) + { + checkFieldNotSetAndSet("processingConfig", processingConfig); + return (T) this; + } + @SuppressWarnings("unchecked") public T registerQueryRunnerFactory(Class queryClazz, QueryRunnerFactory factory) { @@ -99,11 +113,7 @@ public abstract class BaseServerNode extends QueryableN private void initializeComputeScratchPool() { if (computeScratchPool == null) { - setComputeScratchPool( - ServerInit.makeComputeScratchPool( - PropUtils.getPropertyAsInt(getProps(), "druid.computation.buffer.size", 1024 * 1024 * 1024) - ) - ); + setComputeScratchPool(ServerInit.makeComputeScratchPool(getProcessingConfig())); } } @@ -121,4 +131,15 @@ public abstract class BaseServerNode extends QueryableN setConglomerate(new DefaultQueryRunnerFactoryConglomerate(factories)); } } + + private void initializeProcessingConfig() + { + if (processingConfig == null) { + setProcessingConfig( + getConfigFactory().buildWithReplacements( + DruidProcessingConfig.class, ImmutableMap.of("base_path", "druid.processing") + ) + ); + } + } } diff --git a/server/src/main/java/com/metamx/druid/DruidProcessingConfig.java b/server/src/main/java/com/metamx/druid/DruidProcessingConfig.java new file mode 100644 index 00000000000..b497ae8f6ce --- /dev/null +++ b/server/src/main/java/com/metamx/druid/DruidProcessingConfig.java @@ -0,0 +1,34 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid; + +import com.metamx.common.concurrent.ExecutorServiceConfig; +import org.skife.config.Config; + +/** + */ +public abstract class DruidProcessingConfig extends ExecutorServiceConfig +{ + @Config({"druid.computation.buffer.size", "${base_path}.buffer.sizeBytes"}) + public int intermediateComputeSizeBytes() + { + return 1024 * 1024 * 1024; + } +} diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 80b0a3deb61..5fd06793ab4 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -169,11 +169,14 @@ public class MasterMain lifecycle ); - final ServiceProvider serviceProvider = Initialization.makeServiceProvider( - druidMasterConfig.getMergerServiceName(), - serviceDiscovery, - lifecycle - ); + ServiceProvider serviceProvider = null; + if (druidMasterConfig.getMergerServiceName() != null) { + serviceProvider = Initialization.makeServiceProvider( + druidMasterConfig.getMergerServiceName(), + serviceDiscovery, + lifecycle + ); + } final DruidClusterInfo druidClusterInfo = new DruidClusterInfo( configFactory.build(DruidClusterInfoConfig.class), diff --git a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java index 5c4599cfec0..8b388726089 100644 --- a/server/src/main/java/com/metamx/druid/initialization/ServerInit.java +++ b/server/src/main/java/com/metamx/druid/initialization/ServerInit.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import com.metamx.druid.DruidProcessingConfig; import com.metamx.druid.GroupByQueryEngine; import com.metamx.druid.GroupByQueryEngineConfig; import com.metamx.druid.Query; @@ -49,6 +50,7 @@ import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.skife.config.ConfigurationObjectFactory; +import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -57,6 +59,8 @@ import java.util.concurrent.atomic.AtomicLong; */ public class ServerInit { + private static Logger log = new Logger(ServerInit.class); + public static StorageAdapterLoader makeDefaultQueryableLoader( RestS3Service s3Client, QueryableLoaderConfig config @@ -85,9 +89,41 @@ public class ServerInit return delegateLoader; } - public static StupidPool makeComputeScratchPool(int computationBufferSize) + public static StupidPool makeComputeScratchPool(DruidProcessingConfig config) { - return new ComputeScratchPool(computationBufferSize); + try { + Class vmClass = Class.forName("sun.misc.VM"); + Object maxDirectMemoryObj = vmClass.getMethod("maxDirectMemory").invoke(null); + + if (maxDirectMemoryObj == null || !(maxDirectMemoryObj instanceof Number)) { + log.info("Cannot determine maxDirectMemory from[%s]", maxDirectMemoryObj); + } else { + long maxDirectMemory = ((Number) maxDirectMemoryObj).longValue(); + + final long memoryNeeded = (long) config.intermediateComputeSizeBytes() * (config.getNumThreads() + 1); + if (maxDirectMemory < memoryNeeded) { + throw new ISE( + "Not enough direct memory. Please adjust -XX:MaxDirectMemory or druid.computation.buffer.size: " + + "maxDirectMemory[%,d], memoryNeeded[%,d], druid.computation.buffer.size[%,d], numThreads[%,d]", + maxDirectMemory, memoryNeeded, config.intermediateComputeSizeBytes(), config.getNumThreads() + ); + } + } + } + catch (ClassNotFoundException e) { + log.info("No VM class, cannot do memory check."); + } + catch (NoSuchMethodException e) { + log.info("VM.maxDirectMemory doesn't exist, cannot do memory check."); + } + catch (InvocationTargetException e) { + log.warn(e, "static method shouldn't throw this"); + } + catch (IllegalAccessException e) { + log.warn(e, "public method, shouldn't throw this"); + } + + return new ComputeScratchPool(config.intermediateComputeSizeBytes()); } public static Map, QueryRunnerFactory> initDefaultQueryTypes( @@ -97,7 +133,15 @@ public class ServerInit { Map, QueryRunnerFactory> queryRunners = Maps.newLinkedHashMap(); queryRunners.put(TimeseriesQuery.class, new TimeseriesQueryRunnerFactory()); - queryRunners.put(GroupByQuery.class, new GroupByQueryRunnerFactory(new GroupByQueryEngine(configFactory.build(GroupByQueryEngineConfig.class), computationBufferPool))); + queryRunners.put( + GroupByQuery.class, + new GroupByQueryRunnerFactory( + new GroupByQueryEngine( + configFactory.build(GroupByQueryEngineConfig.class), + computationBufferPool + ) + ) + ); queryRunners.put(SearchQuery.class, new SearchQueryRunnerFactory()); queryRunners.put(TimeBoundaryQuery.class, new TimeBoundaryQueryRunnerFactory()); return queryRunners; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index c1a99fca888..e0cf7acc723 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -426,7 +426,7 @@ public class DruidMaster final List> masterRunnables = Lists.newArrayList(); masterRunnables.add(Pair.of(new MasterComputeManagerRunnable(), config.getMasterPeriod())); - if (config.isMergeSegments()){ + if (config.isMergeSegments() && serviceProvider != null){ masterRunnables.add(Pair.of(new MasterSegmentMergerRunnable(), config.getMasterSegmentMergerPeriod())); } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 1e6f46e0abd..e97c1588e48 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -64,7 +64,10 @@ public abstract class DruidMasterConfig } @Config("druid.master.merger.service") - public abstract String getMergerServiceName(); + public String getMergerServiceName() + { + return null; + } @Config("druid.master.merge.threshold") public long getMergeThreshold()