diff --git a/client/pom.xml b/client/pom.xml index efc6ccd59ca..aa00fc5df59 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -112,10 +112,6 @@ com.google.inject.extensions guice-servlet - - com.google.inject.extensions - guice-multibindings - com.ibm.icu icu4j diff --git a/client/src/main/java/com/metamx/druid/initialization/DruidNode.java b/client/src/main/java/com/metamx/druid/initialization/DruidNode.java index 35bfb5415b3..43f78ac0682 100644 --- a/client/src/main/java/com/metamx/druid/initialization/DruidNode.java +++ b/client/src/main/java/com/metamx/druid/initialization/DruidNode.java @@ -31,6 +31,8 @@ import javax.validation.constraints.NotNull; */ public class DruidNode { + private String hostNoPort; + @JsonProperty("service") @NotNull private String serviceName = null; @@ -54,35 +56,37 @@ public class DruidNode if (port == null) { if (host == null) { - setHostAndPort(null, -1); + setHostAndPort(null, -1, null); } else if (host.contains(":")) { + final String[] hostParts = host.split(":"); try { - setHostAndPort(host, Integer.parseInt(host.split(":")[1])); + setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]); } - catch (Exception e) { - setHostAndPort(host, -1); + catch (NumberFormatException e) { + setHostAndPort(host, -1, hostParts[0]); } } else { final int openPort = SocketUtil.findOpenPort(8080); - setHostAndPort(String.format("%s:%d", host, openPort), openPort); + setHostAndPort(String.format("%s:%d", host, openPort), openPort, host); } } else { if (host == null || host.contains(":")) { - setHostAndPort(host, port); + setHostAndPort(host, port, host == null ? null : host.split(":")[0]); } else { - setHostAndPort(String.format("%s:%d", host, port), port); + setHostAndPort(String.format("%s:%d", host, port), port, host); } } } - private void setHostAndPort(String host, int port) + private void setHostAndPort(String host, int port, String hostNoPort) { this.host = host; this.port = port; + this.hostNoPort = hostNoPort; } public String getServiceName() @@ -100,6 +104,11 @@ public class DruidNode return port; } + public String getHostNoPort() + { + return hostNoPort; + } + @Override public String toString() { diff --git a/client/src/main/java/com/metamx/druid/initialization/Initialization.java b/client/src/main/java/com/metamx/druid/initialization/Initialization.java index c4660cc2bfe..353938bf69a 100644 --- a/client/src/main/java/com/metamx/druid/initialization/Initialization.java +++ b/client/src/main/java/com/metamx/druid/initialization/Initialization.java @@ -220,12 +220,14 @@ public class Initialization @Override public void start() throws Exception { + log.info("Starting Curator"); framework.start(); } @Override public void stop() { + log.info("Stopping Curator"); framework.close(); } } diff --git a/client/src/main/java/com/metamx/druid/query/filter/SelectorDimFilter.java b/client/src/main/java/com/metamx/druid/query/filter/SelectorDimFilter.java index 039d9893411..945c753c652 100644 --- a/client/src/main/java/com/metamx/druid/query/filter/SelectorDimFilter.java +++ b/client/src/main/java/com/metamx/druid/query/filter/SelectorDimFilter.java @@ -39,7 +39,6 @@ public class SelectorDimFilter implements DimFilter ) { Preconditions.checkArgument(dimension != null, "dimension must not be null"); - Preconditions.checkArgument(value != null, "value must not be null"); this.dimension = dimension; this.value = value; } @@ -48,7 +47,7 @@ public class SelectorDimFilter implements DimFilter public byte[] getCacheKey() { byte[] dimensionBytes = dimension.getBytes(); - byte[] valueBytes = value.getBytes(); + byte[] valueBytes = value == null ? new byte[]{} : value.getBytes(); return ByteBuffer.allocate(1 + dimensionBytes.length + valueBytes.length) .put(DimFilterCacheHelper.SELECTOR_CACHE_ID) diff --git a/common/pom.xml b/common/pom.xml index ca373dee814..af7e334f6b7 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -103,6 +103,10 @@ com.google.inject guice + + com.google.inject.extensions + guice-multibindings + org.jdbi jdbi diff --git a/common/src/main/java/com/metamx/druid/guava/DSuppliers.java b/common/src/main/java/com/metamx/druid/guava/DSuppliers.java new file mode 100644 index 00000000000..ce0768afd6e --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guava/DSuppliers.java @@ -0,0 +1,41 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guava; + +import com.google.common.base.Supplier; + +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class DSuppliers +{ + public static Supplier of(final AtomicReference ref) + { + return new Supplier() + { + @Override + public T get() + { + return ref.get(); + } + }; + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/JacksonConfigManagerModule.java b/common/src/main/java/com/metamx/druid/guice/JacksonConfigManagerModule.java index 05838c04383..8bb33dafabf 100644 --- a/common/src/main/java/com/metamx/druid/guice/JacksonConfigManagerModule.java +++ b/common/src/main/java/com/metamx/druid/guice/JacksonConfigManagerModule.java @@ -19,7 +19,7 @@ public class JacksonConfigManagerModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.manager.config", ConfigManagerConfig.class); - binder.bind(JacksonConfigManager.class); + binder.bind(JacksonConfigManager.class).in(LazySingleton.class); } @Provides @ManageLifecycle diff --git a/common/src/main/java/com/metamx/druid/guice/JacksonConfigProvider.java b/common/src/main/java/com/metamx/druid/guice/JacksonConfigProvider.java new file mode 100644 index 00000000000..0d4bb53f01d --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/JacksonConfigProvider.java @@ -0,0 +1,94 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Supplier; +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Key; +import com.google.inject.Provider; +import com.google.inject.util.Types; +import com.metamx.druid.config.JacksonConfigManager; +import com.metamx.druid.guava.DSuppliers; + +/** + */ +public class JacksonConfigProvider implements Provider> +{ + public static void bind(Binder binder, String key, Class clazz, T defaultVal) + { + binder.bind(Key.get(Types.newParameterizedType(Supplier.class, clazz))) + .toProvider((Provider) of(key, clazz, defaultVal)) + .in(LazySingleton.class); + } + + public static JacksonConfigProvider of(String key, Class clazz) + { + return of(key, clazz, null); + } + + public static JacksonConfigProvider of(String key, Class clazz, T defaultVal) + { + return new JacksonConfigProvider(key, clazz, null, defaultVal); + } + + public static JacksonConfigProvider of(String key, TypeReference clazz) + { + return of(key, clazz, null); + } + + public static JacksonConfigProvider of(String key, TypeReference typeRef, T defaultVal) + { + return new JacksonConfigProvider(key, null, typeRef, defaultVal); + } + + private final String key; + private final Class clazz; + private final TypeReference typeRef; + private final T defaultVal; + private JacksonConfigManager configManager; + + JacksonConfigProvider(String key, Class clazz, TypeReference typeRef, T defaultVal) + { + this.key = key; + this.clazz = clazz; + this.typeRef = typeRef; + this.defaultVal = defaultVal; + } + + @Inject + public void configure(JacksonConfigManager configManager) + { + this.configManager = configManager; + } + + @Override + public Supplier get() + { + if (clazz == null) { + return DSuppliers.of(configManager.watch(key, typeRef, defaultVal)); + } + else { + return DSuppliers.of(configManager.watch(key, clazz, defaultVal)); + } + } + +} diff --git a/common/src/main/java/com/metamx/druid/guice/ListProvider.java b/common/src/main/java/com/metamx/druid/guice/ListProvider.java new file mode 100644 index 00000000000..86d7a03c24a --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/ListProvider.java @@ -0,0 +1,74 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Provider; + +import java.lang.annotation.Annotation; +import java.util.List; + +/** + */ +public class ListProvider implements Provider> +{ + private final List> itemsToLoad = Lists.newArrayList(); + private Injector injector; + + public ListProvider add(Class clazz) + { + return add(Key.get(clazz)); + } + + public ListProvider add(Class clazz, Class annotation) + { + return add(Key.get(clazz, annotation)); + } + + public ListProvider add(Class clazz, Annotation annotation) + { + return add(Key.get(clazz, annotation)); + } + + public ListProvider add(Key key) + { + itemsToLoad.add(key); + return this; + } + + @Inject + private void configure(Injector injector) + { + this.injector = injector; + } + + @Override + public List get() + { + List retVal = Lists.newArrayListWithExpectedSize(itemsToLoad.size()); + for (Key key : itemsToLoad) { + retVal.add(injector.getInstance(key)); + } + return retVal; + } +} diff --git a/common/src/main/java/com/metamx/druid/guice/PolyBind.java b/common/src/main/java/com/metamx/druid/guice/PolyBind.java new file mode 100644 index 00000000000..195f2923306 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guice/PolyBind.java @@ -0,0 +1,157 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Provider; +import com.google.inject.ProvisionException; +import com.google.inject.TypeLiteral; +import com.google.inject.binder.ScopedBindingBuilder; +import com.google.inject.multibindings.MapBinder; +import com.google.inject.util.Types; + +import javax.annotation.Nullable; +import java.lang.reflect.ParameterizedType; +import java.util.Map; +import java.util.Properties; + +/** + * Provides the ability to create "polymorphic" bindings. Where the polymorphism is actually just making a decision + * based on a value in a Properties. + * + * The workflow is that you first create a choice by calling createChoice(). Then you create options using the binder + * returned by the optionBinder() method. Multiple different modules can call optionBinder and all options will be + * reflected at injection time as long as equivalent interface Key objects are passed into the various methods. + */ +public class PolyBind +{ + /** + * Sets up a "choice" for the injector to resolve at injection time. + * + * @param binder the binder for the injector that is being configured + * @param property the property that will be checked to determine the implementation choice + * @param interfaceKey the interface that will be injected using this choice + * @param defaultKey the default instance to be injected if the property doesn't match a choice. Can be null + * @param interface type + * @return A ScopedBindingBuilder so that scopes can be added to the binding, if required. + */ + public static ScopedBindingBuilder createChoice( + Binder binder, + String property, + Key interfaceKey, + @Nullable Key defaultKey + ) + { + return binder.bind(interfaceKey).toProvider(new ConfiggedProvider(interfaceKey, property, defaultKey)); + } + + /** + * Binds an option for a specific choice. The choice must already be registered on the injector for this to work. + * + * @param binder the binder for the injector that is being configured + * @param interfaceKey the interface that will have an option added to it. This must equal the + * Key provided to createChoice + * @param interface type + * @return A MapBinder that can be used to create the actual option bindings. + */ + public static MapBinder optionBinder(Binder binder, Key interfaceKey) + { + final TypeLiteral interfaceType = interfaceKey.getTypeLiteral(); + + if (interfaceKey.getAnnotation() != null) { + return MapBinder.newMapBinder( + binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotation() + ); + } + else if (interfaceKey.getAnnotationType() != null) { + return MapBinder.newMapBinder( + binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotationType() + ); + } + else { + return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType); + } + } + + static class ConfiggedProvider implements Provider + { + private final Key key; + private final String property; + private final Key defaultKey; + + private Injector injector; + private Properties props; + + ConfiggedProvider( + Key key, + String property, + Key defaultKey + ) + { + this.key = key; + this.property = property; + this.defaultKey = defaultKey; + } + + @Inject + void configure(Injector injector, Properties props) + { + this.injector = injector; + this.props = props; + } + + @Override + @SuppressWarnings("unchecked") + public T get() + { + final ParameterizedType mapType = Types.mapOf( + String.class, Types.newParameterizedType(Provider.class, key.getTypeLiteral().getType()) + ); + + final Map> implsMap; + if (key.getAnnotation() != null) { + implsMap = (Map>) injector.getInstance(Key.get(mapType, key.getAnnotation())); + } + else if (key.getAnnotationType() != null) { + implsMap = (Map>) injector.getInstance(Key.get(mapType, key.getAnnotation())); + } + else { + implsMap = (Map>) injector.getInstance(Key.get(mapType)); + } + + final String implName = props.getProperty(property); + final Provider provider = implsMap.get(implName); + + if (provider == null) { + if (defaultKey == null) { + throw new ProvisionException( + String.format("Unknown provider[%s] of %s, known options[%s]", implName, key, implsMap.keySet()) + ); + } + return injector.getInstance(defaultKey); + } + + return provider.get(); + } + } +} diff --git a/common/src/test/java/com/metamx/druid/guice/PolyBindTest.java b/common/src/test/java/com/metamx/druid/guice/PolyBindTest.java new file mode 100644 index 00000000000..be688e7b560 --- /dev/null +++ b/common/src/test/java/com/metamx/druid/guice/PolyBindTest.java @@ -0,0 +1,128 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.common.collect.Iterables; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.multibindings.MapBinder; +import com.google.inject.name.Names; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Properties; + +/** + */ +public class PolyBindTest +{ + private Properties props; + private Injector injector; + + public void setUp(Module... modules) throws Exception + { + props = new Properties(); + injector = Guice.createInjector( + Iterables.concat( + Arrays.asList( + new Module() + { + @Override + public void configure(Binder binder) + { + binder.bind(Properties.class).toInstance(props); + PolyBind.createChoice(binder, "billy", Key.get(Gogo.class), Key.get(GoA.class)); + } + } + ), + Arrays.asList(modules) + ) + ); + } + + @Test + public void testSanity() throws Exception + { + setUp( + new Module() + { + @Override + public void configure(Binder binder) + { + final MapBinder gogoBinder = PolyBind.optionBinder(binder, Key.get(Gogo.class)); + gogoBinder.addBinding("a").to(GoA.class); + gogoBinder.addBinding("b").to(GoB.class); + + PolyBind.createChoice( + binder, "billy", Key.get(Gogo.class, Names.named("reverse")), Key.get(GoB.class) + ); + final MapBinder annotatedGogoBinder = PolyBind.optionBinder( + binder, Key.get(Gogo.class, Names.named("reverse")) + ); + annotatedGogoBinder.addBinding("a").to(GoB.class); + annotatedGogoBinder.addBinding("b").to(GoA.class); + } + } + ); + + + Assert.assertEquals("A", injector.getInstance(Gogo.class).go()); + Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go()); + props.setProperty("billy.type", "b"); + Assert.assertEquals("B", injector.getInstance(Gogo.class).go()); + Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go()); + props.setProperty("billy.type", "a"); + Assert.assertEquals("A", injector.getInstance(Gogo.class).go()); + Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go()); + props.setProperty("billy.type", "b"); + Assert.assertEquals("B", injector.getInstance(Gogo.class).go()); + Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go()); + props.setProperty("billy.type", "c"); + Assert.assertEquals("A", injector.getInstance(Gogo.class).go()); + Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go()); + } + + public static interface Gogo + { + public String go(); + } + + public static class GoA implements Gogo + { + @Override + public String go() + { + return "A"; + } + } + + public static class GoB implements Gogo + { + @Override + public String go() + { + return "B"; + } + } +} diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index 9607f56d542..7495d0b9013 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -48,25 +48,6 @@ java-util - - com.amazonaws - aws-java-sdk - 1.3.27 - - - javax.mail - mail - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - commons-io commons-io diff --git a/indexing-service/src/main/java/com/metamx/druid/guice/OverlordModule.java b/indexing-service/src/main/java/com/metamx/druid/guice/OverlordModule.java new file mode 100644 index 00000000000..0b1c908d40c --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/guice/OverlordModule.java @@ -0,0 +1,168 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guice; + +import com.google.inject.Binder; +import com.google.inject.Key; +import com.google.inject.Module; +import com.google.inject.TypeLiteral; +import com.google.inject.multibindings.MapBinder; +import com.metamx.druid.http.RedirectFilter; +import com.metamx.druid.http.RedirectInfo; +import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; +import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; +import com.metamx.druid.indexing.common.actions.TaskActionToolbox; +import com.metamx.druid.indexing.common.tasklogs.NoopTaskLogs; +import com.metamx.druid.indexing.common.tasklogs.S3TaskLogs; +import com.metamx.druid.indexing.common.tasklogs.S3TaskLogsConfig; +import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; +import com.metamx.druid.indexing.common.tasklogs.TaskLogs; +import com.metamx.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; +import com.metamx.druid.indexing.coordinator.DbTaskStorage; +import com.metamx.druid.indexing.coordinator.ForkingTaskRunnerFactory; +import com.metamx.druid.indexing.coordinator.HeapMemoryTaskStorage; +import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerFactory; +import com.metamx.druid.indexing.coordinator.TaskLockbox; +import com.metamx.druid.indexing.coordinator.TaskMaster; +import com.metamx.druid.indexing.coordinator.TaskQueue; +import com.metamx.druid.indexing.coordinator.TaskRunnerFactory; +import com.metamx.druid.indexing.coordinator.TaskStorage; +import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter; +import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig; +import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; +import com.metamx.druid.indexing.coordinator.http.OverlordRedirectInfo; +import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy; +import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy; +import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy; +import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; +import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; +import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl; +import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementStrategy; +import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; +import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; +import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; + +import java.util.List; + +/** + */ +public class OverlordModule implements Module +{ + @Override + public void configure(Binder binder) + { + binder.bind(TaskMaster.class).in(ManageLifecycle.class); + + binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class); + binder.bind(new TypeLiteral>(){}) + .toProvider( + new ListProvider() + .add(TaskRunnerTaskLogStreamer.class) + .add(TaskLogs.class) + ) + .in(LazySingleton.class); + + binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class); + binder.bind(TaskActionToolbox.class).in(LazySingleton.class); + binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead + binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class); + binder.bind(TaskLockbox.class).in(LazySingleton.class); + binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); + binder.bind(ResourceManagementSchedulerFactory.class) + .to(ResourceManagementSchedulerFactoryImpl.class) + .in(LazySingleton.class); + + configureTaskStorage(binder); + configureRunners(binder); + configureAutoscale(binder); + + binder.bind(RedirectFilter.class).in(LazySingleton.class); + binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class); + } + + private void configureTaskStorage(Binder binder) + { + PolyBind.createChoice( + binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class) + ); + final MapBinder storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class)); + + storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class); + binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class); + + storageBinder.addBinding("db").to(DbTaskStorage.class); + binder.bind(DbTaskStorage.class).in(LazySingleton.class); + + PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(NoopTaskLogs.class)); + final MapBinder taskLogBinder = PolyBind.optionBinder(binder, Key.get(TaskLogs.class)); + + JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); + taskLogBinder.addBinding("s3").to(S3TaskLogs.class); + binder.bind(S3TaskLogs.class).in(LazySingleton.class); + + taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class); + binder.bind(NoopTaskLogs.class).in(LazySingleton.class); + + } + + private void configureRunners(Binder binder) + { + PolyBind.createChoice( + binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class) + ); + final MapBinder biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class)); + + JsonConfigProvider.bind(binder, "druid.indexer.runner", ForkingTaskRunnerConfig.class); + biddy.addBinding("local").to(ForkingTaskRunnerFactory.class); + binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class); + + JsonConfigProvider.bind(binder, "druid.indexer.runner", RemoteTaskRunnerConfig.class); + biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class); + } + + private void configureAutoscale(Binder binder) + { + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class); + binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class); + + JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null); + + PolyBind.createChoice( + binder, + "druid.indexer.autoscale.strategy", + Key.get(AutoScalingStrategy.class), + Key.get(NoopAutoScalingStrategy.class) + ); + + final MapBinder autoScalingBinder = PolyBind.optionBinder( + binder, Key.get(AutoScalingStrategy.class) + ); + autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class); + binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class); + + autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class); + binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class); + + JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/LocalTaskActionClientFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/LocalTaskActionClientFactory.java index 0f7cdeaa748..e3f44f506d2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/LocalTaskActionClientFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/LocalTaskActionClientFactory.java @@ -19,6 +19,7 @@ package com.metamx.druid.indexing.common.actions; +import com.google.inject.Inject; import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.coordinator.TaskStorage; @@ -29,6 +30,7 @@ public class LocalTaskActionClientFactory implements TaskActionClientFactory private final TaskStorage storage; private final TaskActionToolbox toolbox; + @Inject public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox) { this.storage = storage; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentInsertAction.java index 4a4dd43f67e..76512a0a109 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentInsertAction.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentInsertAction.java @@ -65,7 +65,7 @@ public class SegmentInsertAction implements TaskAction> throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments); } - final Set retVal = toolbox.getMergerDBCoordinator().announceHistoricalSegments(segments); + final Set retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments); // Emit metrics final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUnusedAction.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUnusedAction.java index efff8dc0a65..adfbfc628e6 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUnusedAction.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUnusedAction.java @@ -49,7 +49,7 @@ public class SegmentListUnusedAction implements TaskAction> @Override public List perform(Task task, TaskActionToolbox toolbox) throws IOException { - return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval); + return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval); } @Override diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUsedAction.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUsedAction.java index 2d86b393f0c..100c6d15e8f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUsedAction.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentListUsedAction.java @@ -49,7 +49,7 @@ public class SegmentListUsedAction implements TaskAction> @Override public List perform(Task task, TaskActionToolbox toolbox) throws IOException { - return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval); + return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval); } @Override diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentNukeAction.java index d2fdc5be392..db142096696 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/SegmentNukeAction.java @@ -44,7 +44,7 @@ public class SegmentNukeAction implements TaskAction throw new ISE("Segments not covered by locks for task: %s", task.getId()); } - toolbox.getMergerDBCoordinator().deleteSegments(segments); + toolbox.getIndexerDBCoordinator().deleteSegments(segments); // Emit metrics final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/TaskActionToolbox.java index 81f8131c77b..c18dbacd747 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/TaskActionToolbox.java @@ -2,10 +2,11 @@ package com.metamx.druid.indexing.common.actions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import com.google.inject.Inject; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.coordinator.MergerDBCoordinator; +import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator; import com.metamx.druid.indexing.coordinator.TaskLockbox; import com.metamx.druid.indexing.coordinator.TaskQueue; import com.metamx.emitter.service.ServiceEmitter; @@ -17,19 +18,20 @@ public class TaskActionToolbox { private final TaskQueue taskQueue; private final TaskLockbox taskLockbox; - private final MergerDBCoordinator mergerDBCoordinator; + private final IndexerDBCoordinator indexerDBCoordinator; private final ServiceEmitter emitter; + @Inject public TaskActionToolbox( TaskQueue taskQueue, TaskLockbox taskLockbox, - MergerDBCoordinator mergerDBCoordinator, + IndexerDBCoordinator indexerDBCoordinator, ServiceEmitter emitter ) { this.taskQueue = taskQueue; this.taskLockbox = taskLockbox; - this.mergerDBCoordinator = mergerDBCoordinator; + this.indexerDBCoordinator = indexerDBCoordinator; this.emitter = emitter; } @@ -43,9 +45,9 @@ public class TaskActionToolbox return taskLockbox; } - public MergerDBCoordinator getMergerDBCoordinator() + public IndexerDBCoordinator getIndexerDBCoordinator() { - return mergerDBCoordinator; + return indexerDBCoordinator; } public ServiceEmitter getEmitter() diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index dc3ad87a9cb..808d5e3fad1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -278,7 +278,7 @@ public class RealtimeIndexTask extends AbstractTask realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer); realtimePlumberSchool.setSegmentPublisher(segmentPublisher); realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView()); - realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter()); + realtimePlumberSchool.setEmitter(toolbox.getEmitter()); if (this.rejectionPolicyFactory != null) { realtimePlumberSchool.setRejectionPolicyFactory(rejectionPolicyFactory); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java index a09f2ef7e00..8e684ffa06e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogs.java @@ -1,9 +1,9 @@ package com.metamx.druid.indexing.common.tasklogs; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.io.InputSupplier; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import org.jets3t.service.ServiceException; import org.jets3t.service.StorageService; @@ -21,15 +21,14 @@ public class S3TaskLogs implements TaskLogs { private static final Logger log = new Logger(S3TaskLogs.class); - private final String bucket; - private final String prefix; private final StorageService service; + private final S3TaskLogsConfig config; - public S3TaskLogs(String bucket, String prefix, RestS3Service service) + @Inject + public S3TaskLogs(S3TaskLogsConfig config, RestS3Service service) { - this.bucket = Preconditions.checkNotNull(bucket, "bucket"); - this.prefix = Preconditions.checkNotNull(prefix, "prefix"); - this.service = Preconditions.checkNotNull(service, "service"); + this.config = config; + this.service = service; } @Override @@ -38,7 +37,7 @@ public class S3TaskLogs implements TaskLogs final String taskKey = getTaskLogKey(taskid); try { - final StorageObject objectDetails = service.getObjectDetails(bucket, taskKey, null, null, null, null); + final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null); return Optional.>of( new InputSupplier() @@ -59,7 +58,7 @@ public class S3TaskLogs implements TaskLogs } return service.getObject( - bucket, + config.getS3Bucket(), taskKey, null, null, @@ -95,7 +94,7 @@ public class S3TaskLogs implements TaskLogs final StorageObject object = new StorageObject(logFile); object.setKey(taskKey); - service.putObject(bucket, object); + service.putObject(config.getS3Bucket(), object); } catch (Exception e) { Throwables.propagateIfInstanceOf(e, IOException.class); @@ -105,6 +104,6 @@ public class S3TaskLogs implements TaskLogs private String getTaskLogKey(String taskid) { - return String.format("%s/%s/log", prefix, taskid); + return String.format("%s/%s/log", config.getS3Prefix(), taskid); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogsConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogsConfig.java new file mode 100644 index 00000000000..5642d1e3b05 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/S3TaskLogsConfig.java @@ -0,0 +1,47 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.common.tasklogs; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + +/** + */ +public class S3TaskLogsConfig +{ + @JsonProperty + @NotNull + private String s3Bucket = null; + + @JsonProperty + @NotNull + private String s3Prefix = null; + + public String getS3Bucket() + { + return s3Bucket; + } + + public String getS3Prefix() + { + return s3Prefix; + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java similarity index 73% rename from indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogProvider.java rename to indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java index 4acad86386b..a573d8fc17e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/SwitchingTaskLogStreamer.java @@ -3,6 +3,7 @@ package com.metamx.druid.indexing.common.tasklogs; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.io.InputSupplier; +import com.google.inject.Inject; import java.io.IOException; import java.io.InputStream; @@ -11,11 +12,12 @@ import java.util.List; /** * Provides task logs based on a series of underlying task log providers. */ -public class SwitchingTaskLogProvider implements TaskLogProvider +public class SwitchingTaskLogStreamer implements TaskLogStreamer { - private final List providers; + private final List providers; - public SwitchingTaskLogProvider(List providers) + @Inject + public SwitchingTaskLogStreamer(List providers) { this.providers = ImmutableList.copyOf(providers); } @@ -23,7 +25,7 @@ public class SwitchingTaskLogProvider implements TaskLogProvider @Override public Optional> streamTaskLog(String taskid, long offset) throws IOException { - for (TaskLogProvider provider : providers) { + for (TaskLogStreamer provider : providers) { final Optional> stream = provider.streamTaskLog(taskid, offset); if (stream.isPresent()) { return stream; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogStreamer.java similarity index 95% rename from indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogProvider.java rename to indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogStreamer.java index 46b1bfc2b9c..89bc46aa22f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogStreamer.java @@ -9,7 +9,7 @@ import java.io.InputStream; /** * Something that knows how to stream logs for tasks. */ -public interface TaskLogProvider +public interface TaskLogStreamer { /** * Stream log for a task. diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogs.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogs.java index 0c1994d6073..b42aa29fa4b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogs.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskLogs.java @@ -1,5 +1,5 @@ package com.metamx.druid.indexing.common.tasklogs; -public interface TaskLogs extends TaskLogProvider, TaskLogPusher +public interface TaskLogs extends TaskLogStreamer, TaskLogPusher { } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java new file mode 100644 index 00000000000..b6fb47aca3d --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/tasklogs/TaskRunnerTaskLogStreamer.java @@ -0,0 +1,54 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.common.tasklogs; + +import com.google.common.base.Optional; +import com.google.common.io.InputSupplier; +import com.google.inject.Inject; +import com.metamx.druid.indexing.coordinator.TaskMaster; +import com.metamx.druid.indexing.coordinator.TaskRunner; + +import java.io.IOException; +import java.io.InputStream; + +/** +*/ +public class TaskRunnerTaskLogStreamer implements TaskLogStreamer +{ + private final TaskMaster taskMaster; + + @Inject + public TaskRunnerTaskLogStreamer( + final TaskMaster taskMaster + ) { + this.taskMaster = taskMaster; + } + + @Override + public Optional> streamTaskLog(String taskid, long offset) throws IOException + { + final TaskRunner runner = taskMaster.getTaskRunner().orNull(); + if (runner instanceof TaskLogStreamer) { + return ((TaskLogStreamer) runner).streamTaskLog(taskid, offset); + } else { + return Optional.absent(); + } + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java index a9c4ad7c60c..c92257e1413 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/DbTaskStorage.java @@ -28,11 +28,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.inject.Inject; +import com.metamx.druid.db.DbTablesConfig; import com.metamx.druid.indexing.common.TaskLock; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.actions.TaskAction; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig; import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; @@ -46,15 +47,16 @@ import java.util.Map; public class DbTaskStorage implements TaskStorage { private final ObjectMapper jsonMapper; - private final IndexerDbConnectorConfig dbConnectorConfig; + private final DbTablesConfig dbTables; private final IDBI dbi; private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); - public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, IDBI dbi) + @Inject + public DbTaskStorage(ObjectMapper jsonMapper, DbTablesConfig dbTables, IDBI dbi) { this.jsonMapper = jsonMapper; - this.dbConnectorConfig = dbConnectorConfig; + this.dbTables = dbTables; this.dbi = dbi; } @@ -82,7 +84,7 @@ public class DbTaskStorage implements TaskStorage handle.createStatement( String.format( "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", - dbConnectorConfig.getTaskTable() + dbTables.getTasksTable() ) ) .bind("id", task.getId()) @@ -123,7 +125,7 @@ public class DbTaskStorage implements TaskStorage return handle.createStatement( String.format( "UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1", - dbConnectorConfig.getTaskTable() + dbTables.getTasksTable() ) ) .bind("id", status.getId()) @@ -152,7 +154,7 @@ public class DbTaskStorage implements TaskStorage handle.createQuery( String.format( "SELECT payload FROM %s WHERE id = :id", - dbConnectorConfig.getTaskTable() + dbTables.getTasksTable() ) ) .bind("id", taskid) @@ -182,7 +184,7 @@ public class DbTaskStorage implements TaskStorage handle.createQuery( String.format( "SELECT status_payload FROM %s WHERE id = :id", - dbConnectorConfig.getTaskTable() + dbTables.getTasksTable() ) ) .bind("id", taskid) @@ -212,7 +214,7 @@ public class DbTaskStorage implements TaskStorage handle.createQuery( String.format( "SELECT id, payload, status_payload FROM %s WHERE active = 1", - dbConnectorConfig.getTaskTable() + dbTables.getTasksTable() ) ) .list(); @@ -261,7 +263,7 @@ public class DbTaskStorage implements TaskStorage return handle.createStatement( String.format( "INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)", - dbConnectorConfig.getTaskLockTable() + dbTables.getTaskLockTable() ) ) .bind("task_id", taskid) @@ -296,7 +298,7 @@ public class DbTaskStorage implements TaskStorage return handle.createStatement( String.format( "DELETE FROM %s WHERE id = :id", - dbConnectorConfig.getTaskLockTable() + dbTables.getTaskLockTable() ) ) .bind("id", id) @@ -341,7 +343,7 @@ public class DbTaskStorage implements TaskStorage return handle.createStatement( String.format( "INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)", - dbConnectorConfig.getTaskLogTable() + dbTables.getTaskLogTable() ) ) .bind("task_id", task.getId()) @@ -365,7 +367,7 @@ public class DbTaskStorage implements TaskStorage handle.createQuery( String.format( "SELECT log_payload FROM %s WHERE task_id = :task_id", - dbConnectorConfig.getTaskLogTable() + dbTables.getTaskLogTable() ) ) .bind("task_id", taskid) @@ -402,7 +404,7 @@ public class DbTaskStorage implements TaskStorage handle.createQuery( String.format( "SELECT id, lock_payload FROM %s WHERE task_id = :task_id", - dbConnectorConfig.getTaskLockTable() + dbTables.getTaskLockTable() ) ) .bind("task_id", taskid) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java index 7934a088a3b..3c223a3f8de 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java @@ -38,12 +38,14 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.guice.annotations.Self; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; import com.metamx.druid.indexing.common.tasklogs.TaskLogPusher; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig; import com.metamx.druid.indexing.worker.executor.ExecutorMain; +import com.metamx.druid.initialization.DruidNode; import com.metamx.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; @@ -59,12 +61,12 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; /** * Runs tasks in separate processes using {@link ExecutorMain}. */ -public class ForkingTaskRunner implements TaskRunner, TaskLogProvider +public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer { private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class); private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property."; @@ -72,6 +74,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider private final ForkingTaskRunnerConfig config; private final Properties props; private final TaskLogPusher taskLogPusher; + private final DruidNode node; private final ListeningExecutorService exec; private final ObjectMapper jsonMapper; @@ -81,15 +84,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider ForkingTaskRunnerConfig config, Properties props, TaskLogPusher taskLogPusher, - ExecutorService exec, - ObjectMapper jsonMapper + ObjectMapper jsonMapper, + @Self DruidNode node ) { this.config = config; this.props = props; this.taskLogPusher = taskLogPusher; - this.exec = MoreExecutors.listeningDecorator(exec); this.jsonMapper = jsonMapper; + this.node = node; + + this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(config.maxForks())); } @Override @@ -113,7 +118,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider public TaskStatus call() { final String attemptUUID = UUID.randomUUID().toString(); - final File taskDir = new File(config.getBaseTaskDir(), task.getId()); + final File taskDir = new File(config.getTaskDir(), task.getId()); final File attemptDir = new File(taskDir, attemptUUID); final ProcessHolder processHolder; @@ -147,17 +152,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider final List command = Lists.newArrayList(); final int childPort = findUnusedPort(); - final String childHost = String.format(config.getHostPattern(), childPort); + final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort); command.add(config.getJavaCommand()); command.add("-cp"); - command.add(config.getJavaClasspath()); + command.add(config.getClasspath()); Iterables.addAll( command, Splitter.on(CharMatcher.WHITESPACE) .omitEmptyStrings() - .split(config.getJavaOptions()) + .split(config.getJavaOpts()) ); for (String propName : props.stringPropertyNames()) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunnerFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunnerFactory.java new file mode 100644 index 00000000000..9dfc4cdb093 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunnerFactory.java @@ -0,0 +1,61 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.inject.Inject; +import com.metamx.druid.guice.annotations.Self; +import com.metamx.druid.indexing.common.tasklogs.TaskLogs; +import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig; +import com.metamx.druid.initialization.DruidNode; + +import java.util.Properties; + +/** +*/ +public class ForkingTaskRunnerFactory implements TaskRunnerFactory +{ + private final ForkingTaskRunnerConfig config; + private final Properties props; + private final ObjectMapper jsonMapper; + private final TaskLogs persistentTaskLogs; + private final DruidNode node; + + @Inject + public ForkingTaskRunnerFactory( + final ForkingTaskRunnerConfig config, + final Properties props, + final ObjectMapper jsonMapper, + final TaskLogs persistentTaskLogs, + @Self DruidNode node + ) { + this.config = config; + this.props = props; + this.jsonMapper = jsonMapper; + this.persistentTaskLogs = persistentTaskLogs; + this.node = node; + } + + @Override + public TaskRunner build() + { + return new ForkingTaskRunner(config, props, persistentTaskLogs, jsonMapper, node); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/IndexerDBCoordinator.java similarity index 98% rename from indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java rename to indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/IndexerDBCoordinator.java index 3f893d79cdd..cfaa56bdb69 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/MergerDBCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/IndexerDBCoordinator.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.logger.Logger; import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; @@ -52,16 +53,17 @@ import java.util.Set; /** */ -public class MergerDBCoordinator +public class IndexerDBCoordinator { - private static final Logger log = new Logger(MergerDBCoordinator.class); + private static final Logger log = new Logger(IndexerDBCoordinator.class); private final ObjectMapper jsonMapper; private final DbConnectorConfig dbConnectorConfig; private final DbTablesConfig dbTables; private final IDBI dbi; - public MergerDBCoordinator( + @Inject + public IndexerDBCoordinator( ObjectMapper jsonMapper, DbConnectorConfig dbConnectorConfig, DbTablesConfig dbTables, diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 88b0a6d43ca..c12077eb8a2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -25,6 +25,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -38,10 +39,11 @@ import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.cache.PathChildrenCacheFactory; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; @@ -73,7 +75,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; /** * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes. @@ -90,7 +91,7 @@ import java.util.concurrent.atomic.AtomicReference; *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ -public class RemoteTaskRunner implements TaskRunner, TaskLogProvider +public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer { private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); @@ -98,10 +99,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; + private final ZkPathsConfig zkPaths; private final CuratorFramework cf; private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; - private final AtomicReference workerSetupData; + private final Supplier workerSetupData; private final HttpClient httpClient; // all workers that exist in ZK @@ -120,17 +122,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider public RemoteTaskRunner( ObjectMapper jsonMapper, RemoteTaskRunnerConfig config, + ZkPathsConfig zkPaths, CuratorFramework cf, PathChildrenCacheFactory pathChildrenCacheFactory, - AtomicReference workerSetupData, + Supplier workerSetupData, HttpClient httpClient ) { this.jsonMapper = jsonMapper; this.config = config; + this.zkPaths = zkPaths; this.cf = cf; this.pathChildrenCacheFactory = pathChildrenCacheFactory; - this.workerPathCache = pathChildrenCacheFactory.make(cf, config.getIndexerAnnouncementPath()); + this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath()); this.workerSetupData = workerSetupData; this.httpClient = httpClient; } @@ -440,7 +444,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private void cleanup(final String workerId, final String taskId) { runningTasks.remove(taskId); - final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId); + final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId); try { cf.delete().guaranteed().forPath(statusPath); } @@ -490,11 +494,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId()); byte[] rawBytes = jsonMapper.writeValueAsBytes(task); - if (rawBytes.length > config.getMaxNumBytes()) { - throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); + if (rawBytes.length > config.getMaxZnodeBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes()); } - String taskPath = JOINER.join(config.getIndexerTaskPath(), theWorker.getHost(), task.getId()); + String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), theWorker.getHost(), task.getId()); if (cf.checkExists().forPath(taskPath) == null) { cf.create() @@ -541,7 +545,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider log.info("Worker[%s] reportin' for duty!", worker.getHost()); try { - final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost()); + final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost()); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final ZkWorker zkWorker = new ZkWorker( worker, @@ -649,10 +653,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (zkWorker != null) { try { for (String assignedTask : cf.getChildren() - .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) { + .forPath(JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost()))) { RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask); if (taskRunnerWorkItem != null) { - String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask); + String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost(), assignedTask); if (cf.checkExists().forPath(taskPath) != null) { cf.delete().guaranteed().forPath(taskPath); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerFactory.java new file mode 100644 index 00000000000..fda47dbfe42 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerFactory.java @@ -0,0 +1,77 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Supplier; +import com.google.inject.Inject; +import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; +import com.metamx.druid.guice.annotations.Global; +import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; +import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; +import com.metamx.druid.initialization.ZkPathsConfig; +import com.metamx.http.client.HttpClient; +import org.apache.curator.framework.CuratorFramework; + +/** +*/ +public class RemoteTaskRunnerFactory implements TaskRunnerFactory +{ + private final CuratorFramework curator; + private final RemoteTaskRunnerConfig remoteTaskRunnerConfig; + private final ZkPathsConfig zkPaths; + private final ObjectMapper jsonMapper; + private final Supplier setupDataWatch; + private final HttpClient httpClient; + + @Inject + public RemoteTaskRunnerFactory( + final CuratorFramework curator, + final RemoteTaskRunnerConfig remoteTaskRunnerConfig, + final ZkPathsConfig zkPaths, + final ObjectMapper jsonMapper, + final Supplier setupDataWatch, + @Global final HttpClient httpClient + ) { + this.curator = curator; + this.remoteTaskRunnerConfig = remoteTaskRunnerConfig; + this.zkPaths = zkPaths; + this.jsonMapper = jsonMapper; + this.setupDataWatch = setupDataWatch; + this.httpClient = httpClient; + } + + @Override + public TaskRunner build() + { + return new RemoteTaskRunner( + jsonMapper, + remoteTaskRunnerConfig, + zkPaths, + curator, + new SimplePathChildrenCacheFactory + .Builder() + .withCompressed(remoteTaskRunnerConfig.isCompressZnodes()) + .build(), + setupDataWatch, + httpClient + ); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskLockbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskLockbox.java index b47e66aa573..5787238ab79 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskLockbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskLockbox.java @@ -28,6 +28,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -63,7 +64,10 @@ public class TaskLockbox private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class); - public TaskLockbox(TaskStorage taskStorage) + @Inject + public TaskLockbox( + TaskStorage taskStorage + ) { this.taskStorage = taskStorage; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMaster.java similarity index 88% rename from indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java rename to indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMaster.java index d1823d36cda..f8ef078f771 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMaster.java @@ -21,20 +21,21 @@ package com.metamx.druid.indexing.coordinator; import com.google.common.base.Optional; import com.google.common.base.Throwables; -import com.metamx.common.ISE; +import com.google.inject.Inject; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.guice.annotations.Self; import com.metamx.druid.indexing.common.actions.TaskActionClient; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.indexing.coordinator.exec.TaskConsumer; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.initialization.DruidNode; import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import org.apache.curator.framework.CuratorFramework; @@ -42,13 +43,14 @@ import org.apache.curator.framework.recipes.leader.LeaderSelector; import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Encapsulates the indexer leadership lifecycle. */ -public class TaskMasterLifecycle +public class TaskMaster { private final LeaderSelector leaderSelector; private final ReentrantLock giant = new ReentrantLock(); @@ -56,17 +58,20 @@ public class TaskMasterLifecycle private final TaskQueue taskQueue; private final TaskActionClientFactory taskActionClientFactory; + private final AtomicReference leaderLifecycleRef = new AtomicReference(null); + private volatile boolean leading = false; private volatile TaskRunner taskRunner; private volatile ResourceManagementScheduler resourceManagementScheduler; - private static final EmittingLogger log = new EmittingLogger(TaskMasterLifecycle.class); + private static final EmittingLogger log = new EmittingLogger(TaskMaster.class); - public TaskMasterLifecycle( + @Inject + public TaskMaster( final TaskQueue taskQueue, final TaskActionClientFactory taskActionClientFactory, - final IndexerCoordinatorConfig indexerCoordinatorConfig, - final DruidNode node, + @Self final DruidNode node, + final ZkPathsConfig zkPaths, final TaskRunnerFactory runnerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory, final CuratorFramework curator, @@ -78,7 +83,7 @@ public class TaskMasterLifecycle this.taskActionClientFactory = taskActionClientFactory; this.leaderSelector = new LeaderSelector( - curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener() + curator, zkPaths.getIndexerLeaderLatchPath(), new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception @@ -101,6 +106,11 @@ public class TaskMasterLifecycle // Sensible order to start stuff: final Lifecycle leaderLifecycle = new Lifecycle(); + if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) { + log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition") + .emit(); + } + leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addHandler( new Lifecycle.Handler() @@ -122,10 +132,7 @@ public class TaskMasterLifecycle Initialization.announceDefaultService(node, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); - if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) { - if (!(taskRunner instanceof RemoteTaskRunner)) { - throw new ISE("WTF?! We configured a remote runner and got %s", taskRunner.getClass()); - } + if (taskRunner instanceof RemoteTaskRunner) { resourceManagementScheduler = managementSchedulerFactory.build((RemoteTaskRunner) taskRunner); leaderLifecycle.addManagedInstance(resourceManagementScheduler); } @@ -144,7 +151,6 @@ public class TaskMasterLifecycle finally { log.info("Bowing out!"); stopLeading(); - leaderLifecycle.stop(); } } catch (Exception e) { @@ -167,7 +173,7 @@ public class TaskMasterLifecycle } ); - leaderSelector.setId(indexerCoordinatorConfig.getServerName()); + leaderSelector.setId(node.getHost()); leaderSelector.autoRequeue(); } @@ -216,6 +222,10 @@ public class TaskMasterLifecycle if (leading) { leading = false; mayBeStopped.signalAll(); + final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null); + if (leaderLifecycle != null) { + leaderLifecycle.stop(); + } } } finally { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java index 252728b32dc..6c3f169c580 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java @@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; +import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.indexing.common.TaskLock; @@ -68,6 +69,7 @@ public class TaskQueue private static final EmittingLogger log = new EmittingLogger(TaskQueue.class); + @Inject public TaskQueue(TaskStorage taskStorage, TaskLockbox taskLockbox) { this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage"); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskStorageQueryAdapter.java index 09000104521..463bf540b21 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskStorageQueryAdapter.java @@ -25,6 +25,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexing.common.TaskStatus; @@ -44,6 +45,7 @@ public class TaskStorageQueryAdapter { private final TaskStorage storage; + @Inject public TaskStorageQueryAdapter(TaskStorage storage) { this.storage = storage; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java index 9ee3a5ab1b7..b0a69e9a14a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java @@ -1,47 +1,85 @@ package com.metamx.druid.indexing.coordinator.config; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.metamx.druid.indexing.worker.executor.ExecutorMain; -import org.skife.config.Config; -import org.skife.config.Default; -import java.io.File; +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; import java.util.List; -public abstract class ForkingTaskRunnerConfig +public class ForkingTaskRunnerConfig { - @Config("druid.indexer.taskDir") - @Default("/tmp/persistent") - public abstract File getBaseTaskDir(); + @JsonProperty + @Min(1) + private int maxForks = 1; - @Config("druid.indexer.fork.java") - @Default("java") - public abstract String getJavaCommand(); + @JsonProperty + @NotNull + private String taskDir = "/tmp/persistent"; - @Config("druid.indexer.fork.opts") - @Default("") - public abstract String getJavaOptions(); + @JsonProperty + @NotNull + private String javaCommand = "java"; - @Config("druid.indexer.fork.classpath") - public String getJavaClasspath() { - return System.getProperty("java.class.path"); + @JsonProperty + @NotNull + private String javaOpts = ""; + + @JsonProperty + @NotNull + private String classpath = System.getProperty("java.class.path"); + + @JsonProperty + @NotNull + private String mainClass = ExecutorMain.class.getName(); + + @JsonProperty + @Min(1024) @Max(65535) + private int startPort = 8080; + + @JsonProperty + @NotNull + List allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid"); + + public int maxForks() + { + return maxForks; + } + + public String getTaskDir() + { + return taskDir; + } + + public String getJavaCommand() + { + return javaCommand; + } + + public String getJavaOpts() + { + return javaOpts; + } + + public String getClasspath() + { + return classpath; } - @Config("druid.indexer.fork.main") public String getMainClass() { - return ExecutorMain.class.getName(); + return mainClass; } - @Config("druid.indexer.fork.hostpattern") - public abstract String getHostPattern(); + public int getStartPort() + { + return startPort; + } - @Config("druid.indexer.fork.startport") - public abstract int getStartPort(); - - @Config("druid.indexer.properties.prefixes") public List getAllowedPrefixes() { - return Lists.newArrayList("com.metamx", "druid"); + return allowedPrefixes; } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java index f8d2cf8c3dc..70c72c74972 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java @@ -36,10 +36,6 @@ public abstract class IndexerCoordinatorConfig extends ZkPathsConfig @Config("druid.host") public abstract String getServerName(); - @Config("druid.indexer.threads") - @Default("1") - public abstract int getNumLocalThreads(); - @Config("druid.indexer.runner") @Default("local") public abstract String getRunnerImpl(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java index 2acadd3ae83..fbf3bb1cc8e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java @@ -19,25 +19,47 @@ package com.metamx.druid.indexing.coordinator.config; -import com.metamx.druid.indexing.common.config.IndexerZkConfig; +import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; -import org.skife.config.DefaultNull; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotNull; /** */ -public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig +public class RemoteTaskRunnerConfig { - @Config("druid.indexer.taskAssignmentTimeoutDuration") - @Default("PT5M") - public abstract Duration getTaskAssignmentTimeoutDuration(); + @JsonProperty + @NotNull + private Duration taskAssignmentTimeoutDuration = new Duration("PT5M"); - @Config("druid.curator.compress") - @Default("false") - public abstract boolean enableCompression(); + @JsonProperty + private boolean compressZnodes = false; - @Config("druid.indexer.worker.version") - @DefaultNull - public abstract String getWorkerVersion(); + @JsonProperty + private String workerVersion = null; + + @JsonProperty + @Min(10 * 1024) + private long maxZnodeBytes = 512 * 1024; + + public Duration getTaskAssignmentTimeoutDuration() + { + return taskAssignmentTimeoutDuration; + } + + public boolean isCompressZnodes() + { + return compressZnodes; + } + + public String getWorkerVersion() + { + return workerVersion; + } + + public long getMaxZnodeBytes() + { + return maxZnodeBytes; + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index 61191da91e5..1a6c8bf36da 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -24,12 +24,9 @@ import com.amazonaws.services.ec2.AmazonEC2Client; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; -import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.io.InputSupplier; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -47,15 +44,14 @@ import com.metamx.druid.QueryableNode; import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.JacksonConfigManager; -import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbTablesConfig; +import com.metamx.druid.guava.DSuppliers; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.RedirectFilter; -import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; @@ -66,22 +62,22 @@ import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; import com.metamx.druid.indexing.common.tasklogs.NoopTaskLogs; import com.metamx.druid.indexing.common.tasklogs.S3TaskLogs; -import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogProvider; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; +import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; import com.metamx.druid.indexing.common.tasklogs.TaskLogs; +import com.metamx.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer; import com.metamx.druid.indexing.coordinator.DbTaskStorage; -import com.metamx.druid.indexing.coordinator.ForkingTaskRunner; +import com.metamx.druid.indexing.coordinator.ForkingTaskRunnerFactory; import com.metamx.druid.indexing.coordinator.HeapMemoryTaskStorage; -import com.metamx.druid.indexing.coordinator.MergerDBCoordinator; +import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator; import com.metamx.druid.indexing.coordinator.RemoteTaskRunner; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerFactory; import com.metamx.druid.indexing.coordinator.TaskLockbox; -import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle; +import com.metamx.druid.indexing.coordinator.TaskMaster; import com.metamx.druid.indexing.coordinator.TaskQueue; -import com.metamx.druid.indexing.coordinator.TaskRunner; import com.metamx.druid.indexing.coordinator.TaskRunnerFactory; import com.metamx.druid.indexing.coordinator.TaskStorage; import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter; -import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig; @@ -93,8 +89,8 @@ import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementSched import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; +import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig; import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy; -import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.initialization.CuratorDiscoveryConfig; import com.metamx.druid.initialization.DruidNode; @@ -113,7 +109,6 @@ import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.SysMonitor; -import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.servlet.DefaultServlet; @@ -127,12 +122,8 @@ import org.jets3t.service.security.AWSCredentials; import org.joda.time.Duration; import org.skife.config.ConfigurationObjectFactory; -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; import java.util.List; import java.util.Properties; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; @@ -155,7 +146,7 @@ public class IndexerCoordinatorNode extends QueryableNode dbTables = null; private IndexerCoordinatorConfig config = null; - private MergerDBCoordinator mergerDBCoordinator = null; + private IndexerDBCoordinator indexerDBCoordinator = null; private ServiceDiscovery serviceDiscovery = null; private ServiceAnnouncer serviceAnnouncer = null; private TaskStorage taskStorage = null; @@ -166,9 +157,9 @@ public class IndexerCoordinatorNode extends QueryableNode providers = Lists.newArrayList(); + if (taskLogStreamer == null) { + final List providers = Lists.newArrayList(); - // Use our TaskRunner if it is also a TaskLogProvider - providers.add( - new TaskLogProvider() - { - @Override - public Optional> streamTaskLog(String taskid, long offset) throws IOException - { - final TaskRunner runner = taskMasterLifecycle.getTaskRunner().orNull(); - if (runner instanceof TaskLogProvider) { - return ((TaskLogProvider) runner).streamTaskLog(taskid, offset); - } else { - return Optional.absent(); - } - } - } - ); + // Use our TaskRunner if it is also a TaskLogStreamer + providers.add(new TaskRunnerTaskLogStreamer(IndexerCoordinatorNode.this.taskMaster)); // Use our persistent log storage providers.add(persistentTaskLogs); - taskLogProvider = new SwitchingTaskLogProvider(providers); + taskLogStreamer = new SwitchingTaskLogStreamer(providers); } } @@ -569,8 +515,8 @@ public class IndexerCoordinatorNode extends QueryableNode workerSetupData = configManager.watch( - WorkerSetupData.CONFIG_KEY, WorkerSetupData.class - ); - - AutoScalingStrategy strategy; - if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { - strategy = new EC2AutoScalingStrategy( - getJsonMapper(), - new AmazonEC2Client( - new BasicAWSCredentials( - PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), - PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") - ) - ), - getConfigFactory().build(EC2AutoScalingStrategyConfig.class), - workerSetupData - ); - } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { - strategy = new NoopAutoScalingStrategy(); - } else { - throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); - } - - return new ResourceManagementScheduler( - runner, - new SimpleResourceManagementStrategy( - strategy, - getConfigFactory().build(SimpleResourceManagmentConfig.class), - workerSetupData - ), - getConfigFactory().build(ResourceManagementSchedulerConfig.class), - scalingScheduledExec - ); - } - }; + resourceManagementSchedulerFactory = new WithOpResourceManagementSchedulerFactory(configManager); } } } @@ -800,4 +675,68 @@ public class IndexerCoordinatorNode extends QueryableNode workerSetupData = configManager.watch( + WorkerSetupData.CONFIG_KEY, WorkerSetupData.class + ); + + AutoScalingStrategy strategy; + if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) { + strategy = new EC2AutoScalingStrategy( + getJsonMapper(), + new AmazonEC2Client( + new BasicAWSCredentials( + PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"), + PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey") + ) + ), + null, // TODO: eliminate + DSuppliers.of(workerSetupData) + ); + } else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) { + strategy = new NoopAutoScalingStrategy(); + } else { + throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl()); + } + + return new ResourceManagementScheduler( + runner, + new SimpleResourceManagementStrategy( + strategy, + getConfigFactory().build(SimpleResourceManagementConfig.class), + DSuppliers.of(workerSetupData) + ), + getConfigFactory().build(ResourceManagementSchedulerConfig.class), + scalingScheduledExec + ); + } + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorResource.java index 15a7e88b82d..63be1366c8c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorResource.java @@ -35,8 +35,8 @@ import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.actions.TaskActionClient; import com.metamx.druid.indexing.common.actions.TaskActionHolder; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; -import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; +import com.metamx.druid.indexing.coordinator.TaskMaster; import com.metamx.druid.indexing.coordinator.TaskQueue; import com.metamx.druid.indexing.coordinator.TaskRunner; import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; @@ -87,9 +87,9 @@ public class IndexerCoordinatorResource } }; - private final TaskMasterLifecycle taskMasterLifecycle; + private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; - private final TaskLogProvider taskLogProvider; + private final TaskLogStreamer taskLogStreamer; private final JacksonConfigManager configManager; private final ObjectMapper jsonMapper; @@ -97,16 +97,16 @@ public class IndexerCoordinatorResource @Inject public IndexerCoordinatorResource( - TaskMasterLifecycle taskMasterLifecycle, + TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, - TaskLogProvider taskLogProvider, + TaskLogStreamer taskLogStreamer, JacksonConfigManager configManager, ObjectMapper jsonMapper ) throws Exception { - this.taskMasterLifecycle = taskMasterLifecycle; + this.taskMaster = taskMaster; this.taskStorageQueryAdapter = taskStorageQueryAdapter; - this.taskLogProvider = taskLogProvider; + this.taskLogStreamer = taskLogStreamer; this.configManager = configManager; this.jsonMapper = jsonMapper; } @@ -137,7 +137,7 @@ public class IndexerCoordinatorResource public Response taskPost(final Task task) { return asLeaderWith( - taskMasterLifecycle.getTaskQueue(), + taskMaster.getTaskQueue(), new Function() { @Override @@ -173,7 +173,7 @@ public class IndexerCoordinatorResource public Response doShutdown(@PathParam("taskid") final String taskid) { return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -241,7 +241,7 @@ public class IndexerCoordinatorResource public Response doAction(final TaskActionHolder holder) { return asLeaderWith( - taskMasterLifecycle.getTaskActionClient(holder.getTask()), + taskMaster.getTaskActionClient(holder.getTask()), new Function() { @Override @@ -278,7 +278,7 @@ public class IndexerCoordinatorResource { if (full != null) { return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -291,7 +291,7 @@ public class IndexerCoordinatorResource } return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -317,7 +317,7 @@ public class IndexerCoordinatorResource { if (full != null) { return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -330,7 +330,7 @@ public class IndexerCoordinatorResource } return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -353,7 +353,7 @@ public class IndexerCoordinatorResource public Response getWorkers() { return asLeaderWith( - taskMasterLifecycle.getTaskRunner(), + taskMaster.getTaskRunner(), new Function() { @Override @@ -371,7 +371,7 @@ public class IndexerCoordinatorResource public Response getScalingState() { return asLeaderWith( - taskMasterLifecycle.getResourceManagementScheduler(), + taskMaster.getResourceManagementScheduler(), new Function() { @Override @@ -392,7 +392,7 @@ public class IndexerCoordinatorResource ) { try { - final Optional> stream = taskLogProvider.streamTaskLog(taskid, offset); + final Optional> stream = taskLogStreamer.streamTaskLog(taskid, offset); if (stream.isPresent()) { return Response.ok(stream.get().getInput()).build(); } else { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorServletModule.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorServletModule.java index 64383a667a1..7eb8ce7e436 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorServletModule.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorServletModule.java @@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; import com.google.inject.Provides; import com.metamx.druid.config.JacksonConfigManager; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; -import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; +import com.metamx.druid.indexing.coordinator.TaskMaster; import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter; import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig; import com.metamx.emitter.service.ServiceEmitter; @@ -40,27 +40,27 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule private final ObjectMapper jsonMapper; private final IndexerCoordinatorConfig indexerCoordinatorConfig; private final ServiceEmitter emitter; - private final TaskMasterLifecycle taskMasterLifecycle; + private final TaskMaster taskMaster; private final TaskStorageQueryAdapter taskStorageQueryAdapter; - private final TaskLogProvider taskLogProvider; + private final TaskLogStreamer taskLogStreamer; private final JacksonConfigManager configManager; public IndexerCoordinatorServletModule( ObjectMapper jsonMapper, IndexerCoordinatorConfig indexerCoordinatorConfig, ServiceEmitter emitter, - TaskMasterLifecycle taskMasterLifecycle, + TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, - TaskLogProvider taskLogProvider, + TaskLogStreamer taskLogStreamer, JacksonConfigManager configManager ) { this.jsonMapper = jsonMapper; this.indexerCoordinatorConfig = indexerCoordinatorConfig; this.emitter = emitter; - this.taskMasterLifecycle = taskMasterLifecycle; + this.taskMaster = taskMaster; this.taskStorageQueryAdapter = taskStorageQueryAdapter; - this.taskLogProvider = taskLogProvider; + this.taskLogStreamer = taskLogStreamer; this.configManager = configManager; } @@ -72,9 +72,9 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule bind(ObjectMapper.class).toInstance(jsonMapper); bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig); bind(ServiceEmitter.class).toInstance(emitter); - bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle); + bind(TaskMaster.class).toInstance(taskMaster); bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter); - bind(TaskLogProvider.class).toInstance(taskLogProvider); + bind(TaskLogStreamer.class).toInstance(taskLogStreamer); bind(JacksonConfigManager.class).toInstance(configManager); serve("/*").with(GuiceContainer.class); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java index 84897f424bd..fe91431c163 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OldIndexerCoordinatorResource.java @@ -3,8 +3,8 @@ package com.metamx.druid.indexing.coordinator.http; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import com.metamx.druid.config.JacksonConfigManager; -import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; -import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle; +import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer; +import com.metamx.druid.indexing.coordinator.TaskMaster; import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter; import javax.ws.rs.Path; @@ -17,13 +17,13 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource { @Inject public OldIndexerCoordinatorResource( - TaskMasterLifecycle taskMasterLifecycle, + TaskMaster taskMaster, TaskStorageQueryAdapter taskStorageQueryAdapter, - TaskLogProvider taskLogProvider, + TaskLogStreamer taskLogStreamer, JacksonConfigManager configManager, ObjectMapper jsonMapper ) throws Exception { - super(taskMasterLifecycle, taskStorageQueryAdapter, taskLogProvider, configManager, jsonMapper); + super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OverlordRedirectInfo.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OverlordRedirectInfo.java new file mode 100644 index 00000000000..1d767c713fd --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/OverlordRedirectInfo.java @@ -0,0 +1,57 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator.http; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.druid.http.RedirectInfo; +import com.metamx.druid.indexing.coordinator.TaskMaster; + +import java.net.URL; + +/** +*/ +public class OverlordRedirectInfo implements RedirectInfo +{ + private final TaskMaster taskMaster; + + @Inject + public OverlordRedirectInfo(TaskMaster taskMaster) + { + this.taskMaster = taskMaster; + } + + @Override + public boolean doLocal() + { + return taskMaster.isLeading(); + } + + @Override + public URL getRedirectURL(String queryString, String requestURI) + { + try { + return new URL(String.format("http://%s%s", taskMaster.getLeader(), requestURI)); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingData.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingData.java index 8a7bc1840d5..46b13fa9a74 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingData.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingData.java @@ -25,12 +25,12 @@ import java.util.List; /** */ -public class AutoScalingData +public class AutoScalingData { private final List nodeIds; - private final List nodes; + private final List nodes; - public AutoScalingData(List nodeIds, List nodes) + public AutoScalingData(List nodeIds, List nodes) { this.nodeIds = nodeIds; this.nodes = nodes; @@ -42,7 +42,7 @@ public class AutoScalingData return nodeIds; } - public List getNodes() + public List getNodes() { return nodes; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingStrategy.java index 68a3170c54a..aab2175a50f 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/AutoScalingStrategy.java @@ -24,11 +24,11 @@ import java.util.List; /** * The AutoScalingStrategy has the actual methods to provision and terminate worker nodes. */ -public interface AutoScalingStrategy +public interface AutoScalingStrategy { - public AutoScalingData provision(); + public AutoScalingData provision(); - public AutoScalingData terminate(List ips); + public AutoScalingData terminate(List ips); /** * Provides a lookup of ip addresses to node ids diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java index 45b5573674a..9141905733c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategy.java @@ -19,7 +19,7 @@ package com.metamx.druid.indexing.coordinator.scaling; -import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Filter; @@ -30,8 +30,9 @@ import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; -import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig; +import com.google.inject.Inject; import com.metamx.druid.indexing.coordinator.setup.EC2NodeData; import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; @@ -40,24 +41,24 @@ import org.apache.commons.codec.binary.Base64; import javax.annotation.Nullable; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; /** */ -public class EC2AutoScalingStrategy implements AutoScalingStrategy +public class EC2AutoScalingStrategy implements AutoScalingStrategy { private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class); private final ObjectMapper jsonMapper; - private final AmazonEC2Client amazonEC2Client; - private final EC2AutoScalingStrategyConfig config; - private final AtomicReference workerSetupDataRef; + private final AmazonEC2 amazonEC2Client; + private final SimpleResourceManagementConfig config; + private final Supplier workerSetupDataRef; + @Inject public EC2AutoScalingStrategy( ObjectMapper jsonMapper, - AmazonEC2Client amazonEC2Client, - EC2AutoScalingStrategyConfig config, - AtomicReference workerSetupDataRef + AmazonEC2 amazonEC2Client, + SimpleResourceManagementConfig config, + Supplier workerSetupDataRef ) { this.jsonMapper = jsonMapper; @@ -67,7 +68,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy } @Override - public AutoScalingData provision() + public AutoScalingData provision() { try { WorkerSetupData setupData = workerSetupDataRef.get(); @@ -110,7 +111,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy log.info("Created instances: %s", instanceIds); - return new AutoScalingData( + return new AutoScalingData( Lists.transform( result.getReservation().getInstances(), new Function() @@ -133,10 +134,10 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy } @Override - public AutoScalingData terminate(List ips) + public AutoScalingData terminate(List ips) { if (ips.isEmpty()) { - return new AutoScalingData(Lists.newArrayList(), Lists.newArrayList()); + return new AutoScalingData(Lists.newArrayList(), Lists.newArrayList()); } DescribeInstancesResult result = amazonEC2Client.describeInstances( @@ -169,7 +170,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy ) ); - return new AutoScalingData( + return new AutoScalingData( Lists.transform( ips, new Function() diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopAutoScalingStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopAutoScalingStrategy.java index 84881deb49a..3d036c8f09a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopAutoScalingStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/NoopAutoScalingStrategy.java @@ -26,19 +26,19 @@ import java.util.List; /** * This class just logs when scaling should occur. */ -public class NoopAutoScalingStrategy implements AutoScalingStrategy +public class NoopAutoScalingStrategy implements AutoScalingStrategy { private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class); @Override - public AutoScalingData provision() + public AutoScalingData provision() { log.info("If I were a real strategy I'd create something now"); return null; } @Override - public AutoScalingData terminate(List ips) + public AutoScalingData terminate(List ips) { log.info("If I were a real strategy I'd terminate %s now", ips); return null; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java index 0cfdc94c76a..0e359635202 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java @@ -76,41 +76,32 @@ public class ResourceManagementScheduler ScheduledExecutors.scheduleAtFixedRate( exec, - config.getProvisionResourcesDuration(), + config.getProvisionPeriod().toStandardDuration(), new Runnable() { @Override public void run() { - resourceManagementStrategy.doProvision( - taskRunner.getPendingTasks(), - taskRunner.getWorkers() - ); + resourceManagementStrategy.doProvision(taskRunner.getPendingTasks(), taskRunner.getWorkers()); } } ); // Schedule termination of worker nodes periodically - Period period = new Period(config.getTerminateResourcesDuration()); - PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null); + Period period = config.getTerminatePeriod(); + PeriodGranularity granularity = new PeriodGranularity(period, config.getOriginTime(), null); final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis())); ScheduledExecutors.scheduleAtFixedRate( exec, - new Duration( - System.currentTimeMillis(), - startTime - ), - config.getTerminateResourcesDuration(), + new Duration(System.currentTimeMillis(), startTime), + config.getTerminatePeriod().toStandardDuration(), new Runnable() { @Override public void run() { - resourceManagementStrategy.doTerminate( - taskRunner.getPendingTasks(), - taskRunner.getWorkers() - ); + resourceManagementStrategy.doTerminate(taskRunner.getPendingTasks(), taskRunner.getWorkers()); } } ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerConfig.java index 4850ecaaf02..eb689b40f81 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerConfig.java @@ -19,24 +19,43 @@ package com.metamx.druid.indexing.coordinator.scaling; +import com.fasterxml.jackson.annotation.JsonProperty; import org.joda.time.DateTime; -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; +import org.joda.time.Period; /** */ -public abstract class ResourceManagementSchedulerConfig +public class ResourceManagementSchedulerConfig { - @Config("druid.indexer.provisionResources.duration") - @Default("PT1M") - public abstract Duration getProvisionResourcesDuration(); + @JsonProperty + private boolean doAutoscale = false; - @Config("druid.indexer.terminateResources.duration") - @Default("PT1H") - public abstract Duration getTerminateResourcesDuration(); + @JsonProperty + private Period provisionPeriod = new Period("PT1M"); - @Config("druid.indexer.terminateResources.originDateTime") - @Default("2012-01-01T00:55:00.000Z") - public abstract DateTime getTerminateResourcesOriginDateTime(); + @JsonProperty + private Period terminatePeriod = new Period("PT1H"); + + @JsonProperty + private DateTime originTime = new DateTime("2012-01-01T00:55:00.000Z"); + + public boolean isDoAutoscale() + { + return doAutoscale; + } + + public Period getProvisionPeriod() + { + return provisionPeriod; + } + + public Period getTerminatePeriod() + { + return terminatePeriod; + } + + public DateTime getOriginTime() + { + return originTime; + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactoryImpl.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactoryImpl.java new file mode 100644 index 00000000000..c828051d150 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactoryImpl.java @@ -0,0 +1,56 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator.scaling; + +import com.google.inject.Inject; +import com.metamx.common.concurrent.ScheduledExecutorFactory; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunner; + +/** + */ +public class ResourceManagementSchedulerFactoryImpl implements ResourceManagementSchedulerFactory +{ + private final ResourceManagementSchedulerConfig config; + private final ResourceManagementStrategy strategy; + private final ScheduledExecutorFactory executorFactory; + + @Inject + public ResourceManagementSchedulerFactoryImpl( + ResourceManagementStrategy strategy, + ResourceManagementSchedulerConfig config, + ScheduledExecutorFactory executorFactory + ) + { + this.config = config; + this.strategy = strategy; + this.executorFactory = executorFactory; + } + + @Override + public ResourceManagementScheduler build(RemoteTaskRunner runner) + { + if (config.isDoAutoscale()) { + return new ResourceManagementScheduler(runner, strategy, config, executorFactory.create(1, "ScalingExec--%d")); + } + else { + return new NoopResourceManagementScheduler(); + } + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementConfig.java new file mode 100644 index 00000000000..deb529e7a09 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementConfig.java @@ -0,0 +1,112 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator.scaling; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.joda.time.Period; + +/** + */ +public class SimpleResourceManagementConfig +{ + @JsonProperty + private Period workerIdleTimeout = new Period("PT10m"); + + @JsonProperty + private Period maxScalingDuration = new Period("PT15M"); + + @JsonProperty + private int numEventsToTrack = 50; + + @JsonProperty + private Period pendingTaskTimeout = new Period("PT30s"); + + @JsonProperty + private String workerVersion = null; + + @JsonProperty + private int workerPort = 8080; + + public Period getWorkerIdleTimeout() + { + return workerIdleTimeout; + } + + public SimpleResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout) + { + this.workerIdleTimeout = workerIdleTimeout; + return this; + } + + public Period getMaxScalingDuration() + { + return maxScalingDuration; + } + + public SimpleResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration) + { + this.maxScalingDuration = maxScalingDuration; + return this; + } + + public int getNumEventsToTrack() + { + return numEventsToTrack; + } + + public SimpleResourceManagementConfig setNumEventsToTrack(int numEventsToTrack) + { + this.numEventsToTrack = numEventsToTrack; + return this; + } + + public Period getPendingTaskTimeout() + { + return pendingTaskTimeout; + } + + public SimpleResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout) + { + this.pendingTaskTimeout = pendingTaskTimeout; + return this; + } + + public String getWorkerVersion() + { + return workerVersion; + } + + public SimpleResourceManagementConfig setWorkerVersion(String workerVersion) + { + this.workerVersion = workerVersion; + return this; + } + + public int getWorkerPort() + { + return workerPort; + } + + public SimpleResourceManagementConfig setWorkerPort(int workerPort) + { + this.workerPort = workerPort; + return this; + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java index 0f9fd927245..a3800345fb1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -21,9 +21,11 @@ package com.metamx.druid.indexing.coordinator.scaling; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Supplier; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; @@ -37,7 +39,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicReference; /** */ @@ -46,8 +47,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class); private final AutoScalingStrategy autoScalingStrategy; - private final SimpleResourceManagmentConfig config; - private final AtomicReference workerSetupdDataRef; + private final SimpleResourceManagementConfig config; + private final Supplier workerSetupdDataRef; private final ScalingStats scalingStats; private final ConcurrentSkipListSet currentlyProvisioning = new ConcurrentSkipListSet(); @@ -56,10 +57,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat private volatile DateTime lastProvisionTime = new DateTime(); private volatile DateTime lastTerminateTime = new DateTime(); + @Inject public SimpleResourceManagementStrategy( AutoScalingStrategy autoScalingStrategy, - SimpleResourceManagmentConfig config, - AtomicReference workerSetupdDataRef + SimpleResourceManagementConfig config, + Supplier workerSetupdDataRef ) { this.autoScalingStrategy = autoScalingStrategy; @@ -96,7 +98,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat List workerNodeIds = autoScalingStrategy.ipToIdLookup( Lists.newArrayList( - Iterables.transform( + Iterables.transform( zkWorkers, new Function() { @@ -134,7 +136,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat durSinceLastProvision ); - if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { + if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { log.makeAlert("Worker node provisioning taking too long!") .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) .addData("provisioningCount", currentlyProvisioning.size()) @@ -198,7 +200,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat { return input.getRunningTasks().isEmpty() && System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis() - >= config.getMaxWorkerIdleTimeMillisBeforeDeletion(); + >= config.getWorkerIdleTimeout().getMillis(); } } ) @@ -240,7 +242,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat currentlyTerminating ); - if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { + if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) { log.makeAlert("Worker node termination taking too long!") .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) .addData("terminatingCount", currentlyTerminating.size()) @@ -263,11 +265,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat { long now = System.currentTimeMillis(); for (TaskRunnerWorkItem pendingTask : pendingTasks) { - if (new Duration(pendingTask.getQueueInsertionTime().getMillis(), now).isEqual(config.getMaxPendingTaskDuration()) - || - new Duration( - pendingTask.getQueueInsertionTime().getMillis(), now - ).isLongerThan(config.getMaxPendingTaskDuration())) { + final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now); + final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration(); + if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) { return true; } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagmentConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagmentConfig.java deleted file mode 100644 index 184e1aba7ed..00000000000 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagmentConfig.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.indexing.coordinator.scaling; - -import org.joda.time.Duration; -import org.skife.config.Config; -import org.skife.config.Default; -import org.skife.config.DefaultNull; - -/** - */ -public abstract class SimpleResourceManagmentConfig -{ - @Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion") - @Default("600000") - public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion(); - - @Config("druid.indexer.maxScalingDuration") - @Default("PT15M") - public abstract Duration getMaxScalingDuration(); - - @Config("druid.indexer.numEventsToTrack") - @Default("50") - public abstract int getNumEventsToTrack(); - - @Config("druid.indexer.maxPendingTaskDuration") - @Default("PT30S") - public abstract Duration getMaxPendingTaskDuration(); - - @Config("druid.indexer.worker.version") - @DefaultNull - public abstract String getWorkerVersion(); -} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 90890ca7a37..7391fdf4e93 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -366,8 +366,7 @@ public class WorkerNode extends QueryableNode if (taskLogConfig.getLogStorageBucket() != null) { initializeS3Service(); persistentTaskLogs = new S3TaskLogs( - taskLogConfig.getLogStorageBucket(), - taskLogConfig.getLogStoragePrefix(), + null, // TODO: eliminate s3Service ); } else { @@ -383,8 +382,8 @@ public class WorkerNode extends QueryableNode getConfigFactory().build(ForkingTaskRunnerConfig.class), getProps(), persistentTaskLogs, - Executors.newFixedThreadPool(workerConfig.getCapacity()), - getJsonMapper() + getJsonMapper(), + null // todo: eliminate ); } } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 0c39406b821..cc897a00b9c 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -15,6 +15,7 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; +import com.metamx.druid.guava.DSuppliers; import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolboxFactory; @@ -27,6 +28,7 @@ import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.indexing.worker.WorkerCuratorCoordinator; import com.metamx.druid.indexing.worker.WorkerTaskMonitor; +import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; @@ -354,9 +356,17 @@ public class RemoteTaskRunnerTest remoteTaskRunner = new RemoteTaskRunner( jsonMapper, new TestRemoteTaskRunnerConfig(), + new ZkPathsConfig() + { + @Override + public String getZkBasePath() + { + return basePath; + } + }, cf, new SimplePathChildrenCacheFactory.Builder().build(), - new AtomicReference(new WorkerSetupData("0", 0, 1, null, null)), + DSuppliers.of(new AtomicReference(new WorkerSetupData("0", 0, 1, null, null))), null ); @@ -381,17 +391,11 @@ public class RemoteTaskRunnerTest private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig { @Override - public boolean enableCompression() + public boolean isCompressZnodes() { return false; } - @Override - public String getZkBasePath() - { - return basePath; - } - @Override public Duration getTaskAssignmentTimeoutDuration() { @@ -399,7 +403,7 @@ public class RemoteTaskRunnerTest } @Override - public long getMaxNumBytes() + public long getMaxZnodeBytes() { return 1000; } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index 77e7c2fcb49..a67ec0326f6 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -87,7 +87,7 @@ public class TaskLifecycleTest private TaskLockbox tl = null; private TaskQueue tq = null; private TaskRunner tr = null; - private MockMergerDBCoordinator mdc = null; + private MockIndexerDBCoordinator mdc = null; private TaskActionClientFactory tac = null; private TaskToolboxFactory tb = null; private TaskConsumer tc = null; @@ -410,12 +410,12 @@ public class TaskLifecycleTest return status; } - private static class MockMergerDBCoordinator extends MergerDBCoordinator + private static class MockIndexerDBCoordinator extends IndexerDBCoordinator { final private Set published = Sets.newHashSet(); final private Set nuked = Sets.newHashSet(); - private MockMergerDBCoordinator() + private MockIndexerDBCoordinator() { super(null, null, null, null); } @@ -462,9 +462,9 @@ public class TaskLifecycleTest } } - private static MockMergerDBCoordinator newMockMDC() + private static MockIndexerDBCoordinator newMockMDC() { - return new MockMergerDBCoordinator(); + return new MockIndexerDBCoordinator(); } private static ServiceEmitter newMockEmitter() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java index 27442ed1cdc..39df4467800 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -28,7 +28,7 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.google.common.collect.Lists; -import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig; +import com.metamx.druid.guava.DSuppliers; import com.metamx.druid.indexing.coordinator.setup.EC2NodeData; import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; @@ -77,21 +77,8 @@ public class EC2AutoScalingStrategyTest strategy = new EC2AutoScalingStrategy( new DefaultObjectMapper(), amazonEC2Client, - new EC2AutoScalingStrategyConfig() - { - @Override - public String getWorkerPort() - { - return "8080"; - } - - @Override - public String getWorkerVersion() - { - return ""; - } - }, - workerSetupData + new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""), + DSuppliers.of(workerSetupData) ); } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index fcbc1d4113b..5f183e83ce3 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.guava.DSuppliers; import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; @@ -38,8 +39,8 @@ import com.metamx.emitter.service.ServiceEventBuilder; import junit.framework.Assert; import org.easymock.EasyMock; import org.joda.time.DateTime; -import org.joda.time.Duration; import org.joda.time.Interval; +import org.joda.time.Period; import org.junit.Before; import org.junit.Test; @@ -88,39 +89,13 @@ public class SimpleResourceManagementStrategyTest ); simpleResourceManagementStrategy = new SimpleResourceManagementStrategy( autoScalingStrategy, - new SimpleResourceManagmentConfig() - { - @Override - public int getMaxWorkerIdleTimeMillisBeforeDeletion() - { - return 0; - } - - @Override - public Duration getMaxScalingDuration() - { - return new Duration(1000); - } - - @Override - public int getNumEventsToTrack() - { - return 1; - } - - @Override - public Duration getMaxPendingTaskDuration() - { - return new Duration(0); - } - - @Override - public String getWorkerVersion() - { - return ""; - } - }, - workerSetupData + new SimpleResourceManagementConfig() + .setWorkerIdleTimeout(new Period(0)) + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(1) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(""), + DSuppliers.of(workerSetupData) ); } diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/TestAutoScalingStrategy.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/TestAutoScalingStrategy.java index e86180c79ee..b11a859e5fe 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/TestAutoScalingStrategy.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/TestAutoScalingStrategy.java @@ -23,16 +23,16 @@ import java.util.List; /** */ -public class TestAutoScalingStrategy implements AutoScalingStrategy +public class TestAutoScalingStrategy implements AutoScalingStrategy { @Override - public AutoScalingData provision() + public AutoScalingData provision() { return null; } @Override - public AutoScalingData terminate(List ips) + public AutoScalingData terminate(List ips) { return null; } diff --git a/pom.xml b/pom.xml index e7a23cdc662..61dd45f9417 100644 --- a/pom.xml +++ b/pom.xml @@ -113,6 +113,25 @@ commons-lang 2.6 + + com.amazonaws + aws-java-sdk + 1.3.27 + + + javax.mail + mail + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + com.ning compress-lzf diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index aa26c735da7..e425a70bec8 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -68,7 +68,6 @@ import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import com.sun.istack.internal.NotNull; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -99,32 +98,16 @@ public class RealtimePlumberSchool implements PlumberSchool private final IndexGranularity segmentGranularity; private final Object handoffCondition = new Object(); - @JacksonInject - @NotNull - private ServiceEmitter emitter; + private volatile ServiceEmitter emitter; + private volatile QueryRunnerFactoryConglomerate conglomerate = null; + private volatile DataSegmentPusher dataSegmentPusher = null; + private volatile DataSegmentAnnouncer segmentAnnouncer = null; + private volatile SegmentPublisher segmentPublisher = null; + private volatile ServerView serverView = null; private volatile VersioningPolicy versioningPolicy = null; private volatile RejectionPolicyFactory rejectionPolicyFactory = null; - @JacksonInject - @NotNull - private volatile QueryRunnerFactoryConglomerate conglomerate = null; - - @JacksonInject - @NotNull - private volatile DataSegmentPusher dataSegmentPusher = null; - - @JacksonInject - @NotNull - private volatile DataSegmentAnnouncer segmentAnnouncer = null; - - @JacksonInject - @NotNull - private volatile SegmentPublisher segmentPublisher = null; - - @JacksonInject - @NotNull - private volatile ServerView serverView = null; @JsonCreator public RealtimePlumberSchool( @@ -156,6 +139,42 @@ public class RealtimePlumberSchool implements PlumberSchool this.rejectionPolicyFactory = factory; } + @JacksonInject + public void setEmitter(ServiceEmitter emitter) + { + this.emitter = emitter; + } + + @JacksonInject + public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate) + { + this.conglomerate = conglomerate; + } + + @JacksonInject + public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher) + { + this.dataSegmentPusher = dataSegmentPusher; + } + + @JacksonInject + public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer) + { + this.segmentAnnouncer = segmentAnnouncer; + } + + @JacksonInject + public void setSegmentPublisher(SegmentPublisher segmentPublisher) + { + this.segmentPublisher = segmentPublisher; + } + + @JacksonInject + public void setServerView(ServerView serverView) + { + this.serverView = serverView; + } + @Override public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) { diff --git a/server/pom.xml b/server/pom.xml index 56dcde1f43c..2799613f39f 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -72,6 +72,10 @@ commons-io commons-io + + com.amazonaws + aws-java-sdk + com.ning compress-lzf diff --git a/server/src/main/java/com/metamx/druid/guice/S3Module.java b/server/src/main/java/com/metamx/druid/guice/S3Module.java index e60827285a3..7cf04b762e9 100644 --- a/server/src/main/java/com/metamx/druid/guice/S3Module.java +++ b/server/src/main/java/com/metamx/druid/guice/S3Module.java @@ -19,14 +19,17 @@ package com.metamx.druid.guice; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.ec2.AmazonEC2; +import com.amazonaws.services.ec2.AmazonEC2Client; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.ProvisionException; -import com.metamx.druid.loading.S3CredentialsConfig; +import com.metamx.druid.loading.AWSCredentialsConfig; import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; /** */ @@ -35,18 +38,39 @@ public class S3Module implements Module @Override public void configure(Binder binder) { - JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class); + JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); } @Provides @LazySingleton - public RestS3Service getRestS3Service(S3CredentialsConfig config) + public AWSCredentials getAWSCredentials(AWSCredentialsConfig config) + { + return new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey()); + } + + @Provides + @LazySingleton + public org.jets3t.service.security.AWSCredentials getJets3tAWSCredentials(AWSCredentialsConfig config) + { + return new org.jets3t.service.security.AWSCredentials(config.getAccessKey(), config.getSecretKey()); + } + + @Provides + @LazySingleton + public RestS3Service getRestS3Service(org.jets3t.service.security.AWSCredentials credentials) { try { - return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey())); + return new RestS3Service(credentials); } catch (S3ServiceException e) { throw new ProvisionException("Unable to create a RestS3Service", e); } } + + @Provides + @LazySingleton + public AmazonEC2 getEc2Client(AWSCredentials credentials) + { + return new AmazonEC2Client(credentials); + } } diff --git a/server/src/main/java/com/metamx/druid/loading/S3CredentialsConfig.java b/server/src/main/java/com/metamx/druid/loading/AWSCredentialsConfig.java similarity index 90% rename from server/src/main/java/com/metamx/druid/loading/S3CredentialsConfig.java rename to server/src/main/java/com/metamx/druid/loading/AWSCredentialsConfig.java index 252cf3c92df..dd8decfab54 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3CredentialsConfig.java +++ b/server/src/main/java/com/metamx/druid/loading/AWSCredentialsConfig.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; /** */ -public class S3CredentialsConfig +public class AWSCredentialsConfig { @JsonProperty private String accessKey = ""; diff --git a/services/pom.xml b/services/pom.xml index aee7193de5d..fb69d5c83cc 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -47,6 +47,11 @@ druid-examples ${project.parent.version} + + com.metamx.druid + druid-indexing-service + ${project.parent.version} + io.airlift airline diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java new file mode 100644 index 00000000000..c7e43955bae --- /dev/null +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -0,0 +1,118 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012, 2013 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package io.druid.cli; + +import com.google.inject.Injector; +import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.CuratorModule; +import com.metamx.druid.curator.discovery.DiscoveryModule; +import com.metamx.druid.guice.DbConnectorModule; +import com.metamx.druid.guice.HttpClientModule; +import com.metamx.druid.guice.JacksonConfigManagerModule; +import com.metamx.druid.guice.LifecycleModule; +import com.metamx.druid.guice.OverlordModule; +import com.metamx.druid.guice.S3Module; +import com.metamx.druid.guice.ServerModule; +import com.metamx.druid.http.RedirectFilter; +import com.metamx.druid.indexing.coordinator.TaskMaster; +import com.metamx.druid.indexing.coordinator.http.IndexerCoordinatorResource; +import com.metamx.druid.initialization.EmitterModule; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.JettyServerInitializer; +import com.metamx.druid.initialization.JettyServerModule; +import com.metamx.druid.metrics.MetricsModule; +import io.airlift.command.Command; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.eclipse.jetty.servlet.DefaultServlet; +import org.eclipse.jetty.servlet.FilterHolder; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.servlets.GzipFilter; +import org.eclipse.jetty.util.resource.ResourceCollection; + +/** + */ +@Command( + name = "overlord", + description = "Runs an Overlord node, see https://github.com/metamx/druid/wiki/Indexing-Service for a description" +) +public class CliOverlord extends ServerRunnable +{ + private static Logger log = new Logger(CliOverlord.class); + + public CliOverlord() + { + super(log); + } + + @Override + protected Injector getInjector() + { + return Initialization.makeInjector( + new LifecycleModule(), + EmitterModule.class, + HttpClientModule.global(), + CuratorModule.class, + new MetricsModule(), + ServerModule.class, + new S3Module(), + new DbConnectorModule(), + new JacksonConfigManagerModule(), + new JettyServerModule(new OverlordJettyServerInitializer()) + .addResource(IndexerCoordinatorResource.class), + new DiscoveryModule(), + new OverlordModule() + ); + } + + private static class OverlordJettyServerInitializer implements JettyServerInitializer + { + @Override + public void initialize(Server server, Injector injector) + { + ResourceHandler resourceHandler = new ResourceHandler(); + resourceHandler.setBaseResource( + new ResourceCollection( + new String[]{ + TaskMaster.class.getClassLoader().getResource("static").toExternalForm(), + TaskMaster.class.getClassLoader().getResource("indexer_static").toExternalForm() + } + ) + ); + + final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); + root.setContextPath("/"); + + HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()}); + server.setHandler(handlerList); + + root.addServlet(new ServletHolder(new DefaultServlet()), "/*"); + root.addFilter(GzipFilter.class, "/*", null); + root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null); + root.addFilter(GuiceFilter.class, "/*", null); + } + } +} diff --git a/services/src/main/java/io/druid/cli/Main.java b/services/src/main/java/io/druid/cli/Main.java index d2f290e26c8..0ad07dd5a8b 100644 --- a/services/src/main/java/io/druid/cli/Main.java +++ b/services/src/main/java/io/druid/cli/Main.java @@ -21,6 +21,7 @@ package io.druid.cli; import io.airlift.command.Cli; import io.airlift.command.Help; +import io.airlift.command.ParseException; /** */ @@ -38,8 +39,22 @@ public class Main builder.withGroup("server") .withDescription("Run one of the Druid server types.") .withDefaultCommand(Help.class) - .withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeExample.class); + .withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class); - builder.build().parse(args).run(); + builder.withGroup("example") + .withDescription("Run an example") + .withDefaultCommand(Help.class) + .withCommands(CliRealtimeExample.class); + + final Cli cli = builder.build(); + try { + cli.parse(args).run(); + } + catch (ParseException e) { + System.out.println("ERROR!!!!"); + System.out.println(e.getMessage()); + System.out.println("==="); + cli.parse(new String[]{"help"}).run(); + } } }