1) IndexCoordinator appears to work as the CliOverlord now, yay!

This commit is contained in:
cheddar 2013-08-23 14:11:34 -05:00
parent 1aa99f5878
commit b897c2cb22
68 changed files with 1867 additions and 589 deletions

View File

@ -112,10 +112,6 @@
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>com.ibm.icu</groupId>
<artifactId>icu4j</artifactId>

View File

@ -31,6 +31,8 @@ import javax.validation.constraints.NotNull;
*/
public class DruidNode
{
private String hostNoPort;
@JsonProperty("service")
@NotNull
private String serviceName = null;
@ -54,35 +56,37 @@ public class DruidNode
if (port == null) {
if (host == null) {
setHostAndPort(null, -1);
setHostAndPort(null, -1, null);
}
else if (host.contains(":")) {
final String[] hostParts = host.split(":");
try {
setHostAndPort(host, Integer.parseInt(host.split(":")[1]));
setHostAndPort(host, Integer.parseInt(hostParts[1]), hostParts[0]);
}
catch (Exception e) {
setHostAndPort(host, -1);
catch (NumberFormatException e) {
setHostAndPort(host, -1, hostParts[0]);
}
}
else {
final int openPort = SocketUtil.findOpenPort(8080);
setHostAndPort(String.format("%s:%d", host, openPort), openPort);
setHostAndPort(String.format("%s:%d", host, openPort), openPort, host);
}
}
else {
if (host == null || host.contains(":")) {
setHostAndPort(host, port);
setHostAndPort(host, port, host == null ? null : host.split(":")[0]);
}
else {
setHostAndPort(String.format("%s:%d", host, port), port);
setHostAndPort(String.format("%s:%d", host, port), port, host);
}
}
}
private void setHostAndPort(String host, int port)
private void setHostAndPort(String host, int port, String hostNoPort)
{
this.host = host;
this.port = port;
this.hostNoPort = hostNoPort;
}
public String getServiceName()
@ -100,6 +104,11 @@ public class DruidNode
return port;
}
public String getHostNoPort()
{
return hostNoPort;
}
@Override
public String toString()
{

View File

@ -220,12 +220,14 @@ public class Initialization
@Override
public void start() throws Exception
{
log.info("Starting Curator");
framework.start();
}
@Override
public void stop()
{
log.info("Stopping Curator");
framework.close();
}
}

View File

@ -39,7 +39,6 @@ public class SelectorDimFilter implements DimFilter
)
{
Preconditions.checkArgument(dimension != null, "dimension must not be null");
Preconditions.checkArgument(value != null, "value must not be null");
this.dimension = dimension;
this.value = value;
}
@ -48,7 +47,7 @@ public class SelectorDimFilter implements DimFilter
public byte[] getCacheKey()
{
byte[] dimensionBytes = dimension.getBytes();
byte[] valueBytes = value.getBytes();
byte[] valueBytes = value == null ? new byte[]{} : value.getBytes();
return ByteBuffer.allocate(1 + dimensionBytes.length + valueBytes.length)
.put(DimFilterCacheHelper.SELECTOR_CACHE_ID)

View File

@ -103,6 +103,10 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>

View File

@ -0,0 +1,41 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guava;
import com.google.common.base.Supplier;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class DSuppliers
{
public static <T> Supplier<T> of(final AtomicReference<T> ref)
{
return new Supplier<T>()
{
@Override
public T get()
{
return ref.get();
}
};
}
}

View File

@ -19,7 +19,7 @@ public class JacksonConfigManagerModule implements Module
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.manager.config", ConfigManagerConfig.class);
binder.bind(JacksonConfigManager.class);
binder.bind(JacksonConfigManager.class).in(LazySingleton.class);
}
@Provides @ManageLifecycle

View File

@ -0,0 +1,94 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.util.Types;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.guava.DSuppliers;
/**
*/
public class JacksonConfigProvider<T> implements Provider<Supplier<T>>
{
public static <T> void bind(Binder binder, String key, Class<T> clazz, T defaultVal)
{
binder.bind(Key.get(Types.newParameterizedType(Supplier.class, clazz)))
.toProvider((Provider) of(key, clazz, defaultVal))
.in(LazySingleton.class);
}
public static <T> JacksonConfigProvider<T> of(String key, Class<T> clazz)
{
return of(key, clazz, null);
}
public static <T> JacksonConfigProvider<T> of(String key, Class<T> clazz, T defaultVal)
{
return new JacksonConfigProvider<T>(key, clazz, null, defaultVal);
}
public static <T> JacksonConfigProvider<T> of(String key, TypeReference<T> clazz)
{
return of(key, clazz, null);
}
public static <T> JacksonConfigProvider<T> of(String key, TypeReference<T> typeRef, T defaultVal)
{
return new JacksonConfigProvider<T>(key, null, typeRef, defaultVal);
}
private final String key;
private final Class<T> clazz;
private final TypeReference<T> typeRef;
private final T defaultVal;
private JacksonConfigManager configManager;
JacksonConfigProvider(String key, Class<T> clazz, TypeReference<T> typeRef, T defaultVal)
{
this.key = key;
this.clazz = clazz;
this.typeRef = typeRef;
this.defaultVal = defaultVal;
}
@Inject
public void configure(JacksonConfigManager configManager)
{
this.configManager = configManager;
}
@Override
public Supplier<T> get()
{
if (clazz == null) {
return DSuppliers.of(configManager.watch(key, typeRef, defaultVal));
}
else {
return DSuppliers.of(configManager.watch(key, clazz, defaultVal));
}
}
}

View File

@ -0,0 +1,74 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import java.lang.annotation.Annotation;
import java.util.List;
/**
*/
public class ListProvider<T> implements Provider<List<T>>
{
private final List<Key<? extends T>> itemsToLoad = Lists.newArrayList();
private Injector injector;
public ListProvider<T> add(Class<? extends T> clazz)
{
return add(Key.get(clazz));
}
public ListProvider<T> add(Class<? extends T> clazz, Class<? extends Annotation> annotation)
{
return add(Key.get(clazz, annotation));
}
public ListProvider<T> add(Class<? extends T> clazz, Annotation annotation)
{
return add(Key.get(clazz, annotation));
}
public ListProvider<T> add(Key<? extends T> key)
{
itemsToLoad.add(key);
return this;
}
@Inject
private void configure(Injector injector)
{
this.injector = injector;
}
@Override
public List<T> get()
{
List<T> retVal = Lists.newArrayListWithExpectedSize(itemsToLoad.size());
for (Key<? extends T> key : itemsToLoad) {
retVal.add(injector.getInstance(key));
}
return retVal;
}
}

View File

@ -0,0 +1,157 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import com.google.inject.binder.ScopedBindingBuilder;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.util.Types;
import javax.annotation.Nullable;
import java.lang.reflect.ParameterizedType;
import java.util.Map;
import java.util.Properties;
/**
* Provides the ability to create "polymorphic" bindings. Where the polymorphism is actually just making a decision
* based on a value in a Properties.
*
* The workflow is that you first create a choice by calling createChoice(). Then you create options using the binder
* returned by the optionBinder() method. Multiple different modules can call optionBinder and all options will be
* reflected at injection time as long as equivalent interface Key objects are passed into the various methods.
*/
public class PolyBind
{
/**
* Sets up a "choice" for the injector to resolve at injection time.
*
* @param binder the binder for the injector that is being configured
* @param property the property that will be checked to determine the implementation choice
* @param interfaceKey the interface that will be injected using this choice
* @param defaultKey the default instance to be injected if the property doesn't match a choice. Can be null
* @param <T> interface type
* @return A ScopedBindingBuilder so that scopes can be added to the binding, if required.
*/
public static <T> ScopedBindingBuilder createChoice(
Binder binder,
String property,
Key<T> interfaceKey,
@Nullable Key<? extends T> defaultKey
)
{
return binder.bind(interfaceKey).toProvider(new ConfiggedProvider<T>(interfaceKey, property, defaultKey));
}
/**
* Binds an option for a specific choice. The choice must already be registered on the injector for this to work.
*
* @param binder the binder for the injector that is being configured
* @param interfaceKey the interface that will have an option added to it. This must equal the
* Key provided to createChoice
* @param <T> interface type
* @return A MapBinder that can be used to create the actual option bindings.
*/
public static <T> MapBinder<String, T> optionBinder(Binder binder, Key<T> interfaceKey)
{
final TypeLiteral<T> interfaceType = interfaceKey.getTypeLiteral();
if (interfaceKey.getAnnotation() != null) {
return MapBinder.newMapBinder(
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotation()
);
}
else if (interfaceKey.getAnnotationType() != null) {
return MapBinder.newMapBinder(
binder, TypeLiteral.get(String.class), interfaceType, interfaceKey.getAnnotationType()
);
}
else {
return MapBinder.newMapBinder(binder, TypeLiteral.get(String.class), interfaceType);
}
}
static class ConfiggedProvider<T> implements Provider<T>
{
private final Key<T> key;
private final String property;
private final Key<? extends T> defaultKey;
private Injector injector;
private Properties props;
ConfiggedProvider(
Key<T> key,
String property,
Key<? extends T> defaultKey
)
{
this.key = key;
this.property = property;
this.defaultKey = defaultKey;
}
@Inject
void configure(Injector injector, Properties props)
{
this.injector = injector;
this.props = props;
}
@Override
@SuppressWarnings("unchecked")
public T get()
{
final ParameterizedType mapType = Types.mapOf(
String.class, Types.newParameterizedType(Provider.class, key.getTypeLiteral().getType())
);
final Map<String, Provider<T>> implsMap;
if (key.getAnnotation() != null) {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
}
else if (key.getAnnotationType() != null) {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType, key.getAnnotation()));
}
else {
implsMap = (Map<String, Provider<T>>) injector.getInstance(Key.get(mapType));
}
final String implName = props.getProperty(property);
final Provider<T> provider = implsMap.get(implName);
if (provider == null) {
if (defaultKey == null) {
throw new ProvisionException(
String.format("Unknown provider[%s] of %s, known options[%s]", implName, key, implsMap.keySet())
);
}
return injector.getInstance(defaultKey);
}
return provider.get();
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.common.collect.Iterables;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Names;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Properties;
/**
*/
public class PolyBindTest
{
private Properties props;
private Injector injector;
public void setUp(Module... modules) throws Exception
{
props = new Properties();
injector = Guice.createInjector(
Iterables.concat(
Arrays.asList(
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Properties.class).toInstance(props);
PolyBind.createChoice(binder, "billy", Key.get(Gogo.class), Key.get(GoA.class));
}
}
),
Arrays.asList(modules)
)
);
}
@Test
public void testSanity() throws Exception
{
setUp(
new Module()
{
@Override
public void configure(Binder binder)
{
final MapBinder<String,Gogo> gogoBinder = PolyBind.optionBinder(binder, Key.get(Gogo.class));
gogoBinder.addBinding("a").to(GoA.class);
gogoBinder.addBinding("b").to(GoB.class);
PolyBind.createChoice(
binder, "billy", Key.get(Gogo.class, Names.named("reverse")), Key.get(GoB.class)
);
final MapBinder<String,Gogo> annotatedGogoBinder = PolyBind.optionBinder(
binder, Key.get(Gogo.class, Names.named("reverse"))
);
annotatedGogoBinder.addBinding("a").to(GoB.class);
annotatedGogoBinder.addBinding("b").to(GoA.class);
}
}
);
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy.type", "b");
Assert.assertEquals("B", injector.getInstance(Gogo.class).go());
Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy.type", "a");
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy.type", "b");
Assert.assertEquals("B", injector.getInstance(Gogo.class).go());
Assert.assertEquals("A", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
props.setProperty("billy.type", "c");
Assert.assertEquals("A", injector.getInstance(Gogo.class).go());
Assert.assertEquals("B", injector.getInstance(Key.get(Gogo.class, Names.named("reverse"))).go());
}
public static interface Gogo
{
public String go();
}
public static class GoA implements Gogo
{
@Override
public String go()
{
return "A";
}
}
public static class GoB implements Gogo
{
@Override
public String go()
{
return "B";
}
}
}

View File

@ -48,25 +48,6 @@
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.3.27</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>

View File

@ -0,0 +1,168 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionToolbox;
import com.metamx.druid.indexing.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.indexing.common.tasklogs.S3TaskLogs;
import com.metamx.druid.indexing.common.tasklogs.S3TaskLogsConfig;
import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.common.tasklogs.TaskLogs;
import com.metamx.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import com.metamx.druid.indexing.coordinator.DbTaskStorage;
import com.metamx.druid.indexing.coordinator.ForkingTaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.TaskLockbox;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskQueue;
import com.metamx.druid.indexing.coordinator.TaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.TaskStorage;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.http.OverlordRedirectInfo;
import com.metamx.druid.indexing.coordinator.scaling.AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.EC2AutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.NoopAutoScalingStrategy;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactoryImpl;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import java.util.List;
/**
*/
public class OverlordModule implements Module
{
@Override
public void configure(Binder binder)
{
binder.bind(TaskMaster.class).in(ManageLifecycle.class);
binder.bind(TaskLogStreamer.class).to(SwitchingTaskLogStreamer.class).in(LazySingleton.class);
binder.bind(new TypeLiteral<List<TaskLogStreamer>>(){})
.toProvider(
new ListProvider<TaskLogStreamer>()
.add(TaskRunnerTaskLogStreamer.class)
.add(TaskLogs.class)
)
.in(LazySingleton.class);
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(TaskQueue.class).in(LazySingleton.class); // Lifecycle managed by TaskMaster instead
binder.bind(IndexerDBCoordinator.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)
.to(ResourceManagementSchedulerFactoryImpl.class)
.in(LazySingleton.class);
configureTaskStorage(binder);
configureRunners(binder);
configureAutoscale(binder);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);
}
private void configureTaskStorage(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.storage.type", Key.get(TaskStorage.class), Key.get(HeapMemoryTaskStorage.class)
);
final MapBinder<String, TaskStorage> storageBinder = PolyBind.optionBinder(binder, Key.get(TaskStorage.class));
storageBinder.addBinding("local").to(HeapMemoryTaskStorage.class);
binder.bind(HeapMemoryTaskStorage.class).in(LazySingleton.class);
storageBinder.addBinding("db").to(DbTaskStorage.class);
binder.bind(DbTaskStorage.class).in(LazySingleton.class);
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(NoopTaskLogs.class));
final MapBinder<String, TaskLogs> taskLogBinder = PolyBind.optionBinder(binder, Key.get(TaskLogs.class));
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
taskLogBinder.addBinding("s3").to(S3TaskLogs.class);
binder.bind(S3TaskLogs.class).in(LazySingleton.class);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
}
private void configureRunners(Binder binder)
{
PolyBind.createChoice(
binder, "druid.indexer.runner.type", Key.get(TaskRunnerFactory.class), Key.get(ForkingTaskRunnerFactory.class)
);
final MapBinder<String, TaskRunnerFactory> biddy = PolyBind.optionBinder(binder, Key.get(TaskRunnerFactory.class));
JsonConfigProvider.bind(binder, "druid.indexer.runner", ForkingTaskRunnerConfig.class);
biddy.addBinding("local").to(ForkingTaskRunnerFactory.class);
binder.bind(ForkingTaskRunnerFactory.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.runner", RemoteTaskRunnerConfig.class);
biddy.addBinding("remote").to(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
binder.bind(RemoteTaskRunnerFactory.class).in(LazySingleton.class);
}
private void configureAutoscale(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", ResourceManagementSchedulerConfig.class);
binder.bind(ResourceManagementStrategy.class).to(SimpleResourceManagementStrategy.class).in(LazySingleton.class);
JacksonConfigProvider.bind(binder, WorkerSetupData.CONFIG_KEY, WorkerSetupData.class, null);
PolyBind.createChoice(
binder,
"druid.indexer.autoscale.strategy",
Key.get(AutoScalingStrategy.class),
Key.get(NoopAutoScalingStrategy.class)
);
final MapBinder<String, AutoScalingStrategy> autoScalingBinder = PolyBind.optionBinder(
binder, Key.get(AutoScalingStrategy.class)
);
autoScalingBinder.addBinding("ec2").to(EC2AutoScalingStrategy.class);
binder.bind(EC2AutoScalingStrategy.class).in(LazySingleton.class);
autoScalingBinder.addBinding("noop").to(NoopAutoScalingStrategy.class);
binder.bind(NoopAutoScalingStrategy.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.indexing.common.actions;
import com.google.inject.Inject;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.TaskStorage;
@ -29,6 +30,7 @@ public class LocalTaskActionClientFactory implements TaskActionClientFactory
private final TaskStorage storage;
private final TaskActionToolbox toolbox;
@Inject
public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox)
{
this.storage = storage;

View File

@ -65,7 +65,7 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
throw new ISE("Segments not covered by locks for task[%s]: %s", task.getId(), segments);
}
final Set<DataSegment> retVal = toolbox.getMergerDBCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -49,7 +49,7 @@ public class SegmentListUnusedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getMergerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -49,7 +49,7 @@ public class SegmentListUsedAction implements TaskAction<List<DataSegment>>
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getMergerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}
@Override

View File

@ -44,7 +44,7 @@ public class SegmentNukeAction implements TaskAction<Void>
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}
toolbox.getMergerDBCoordinator().deleteSegments(segments);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);
// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()

View File

@ -2,10 +2,11 @@ package com.metamx.druid.indexing.common.actions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.MergerDBCoordinator;
import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator;
import com.metamx.druid.indexing.coordinator.TaskLockbox;
import com.metamx.druid.indexing.coordinator.TaskQueue;
import com.metamx.emitter.service.ServiceEmitter;
@ -17,19 +18,20 @@ public class TaskActionToolbox
{
private final TaskQueue taskQueue;
private final TaskLockbox taskLockbox;
private final MergerDBCoordinator mergerDBCoordinator;
private final IndexerDBCoordinator indexerDBCoordinator;
private final ServiceEmitter emitter;
@Inject
public TaskActionToolbox(
TaskQueue taskQueue,
TaskLockbox taskLockbox,
MergerDBCoordinator mergerDBCoordinator,
IndexerDBCoordinator indexerDBCoordinator,
ServiceEmitter emitter
)
{
this.taskQueue = taskQueue;
this.taskLockbox = taskLockbox;
this.mergerDBCoordinator = mergerDBCoordinator;
this.indexerDBCoordinator = indexerDBCoordinator;
this.emitter = emitter;
}
@ -43,9 +45,9 @@ public class TaskActionToolbox
return taskLockbox;
}
public MergerDBCoordinator getMergerDBCoordinator()
public IndexerDBCoordinator getIndexerDBCoordinator()
{
return mergerDBCoordinator;
return indexerDBCoordinator;
}
public ServiceEmitter getEmitter()

View File

@ -278,7 +278,7 @@ public class RealtimeIndexTask extends AbstractTask
realtimePlumberSchool.setSegmentAnnouncer(lockingSegmentAnnouncer);
realtimePlumberSchool.setSegmentPublisher(segmentPublisher);
realtimePlumberSchool.setServerView(toolbox.getNewSegmentServerView());
realtimePlumberSchool.setServiceEmitter(toolbox.getEmitter());
realtimePlumberSchool.setEmitter(toolbox.getEmitter());
if (this.rejectionPolicyFactory != null) {
realtimePlumberSchool.setRejectionPolicyFactory(rejectionPolicyFactory);

View File

@ -1,9 +1,9 @@
package com.metamx.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
@ -21,15 +21,14 @@ public class S3TaskLogs implements TaskLogs
{
private static final Logger log = new Logger(S3TaskLogs.class);
private final String bucket;
private final String prefix;
private final StorageService service;
private final S3TaskLogsConfig config;
public S3TaskLogs(String bucket, String prefix, RestS3Service service)
@Inject
public S3TaskLogs(S3TaskLogsConfig config, RestS3Service service)
{
this.bucket = Preconditions.checkNotNull(bucket, "bucket");
this.prefix = Preconditions.checkNotNull(prefix, "prefix");
this.service = Preconditions.checkNotNull(service, "service");
this.config = config;
this.service = service;
}
@Override
@ -38,7 +37,7 @@ public class S3TaskLogs implements TaskLogs
final String taskKey = getTaskLogKey(taskid);
try {
final StorageObject objectDetails = service.getObjectDetails(bucket, taskKey, null, null, null, null);
final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null);
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
@ -59,7 +58,7 @@ public class S3TaskLogs implements TaskLogs
}
return service.getObject(
bucket,
config.getS3Bucket(),
taskKey,
null,
null,
@ -95,7 +94,7 @@ public class S3TaskLogs implements TaskLogs
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(bucket, object);
service.putObject(config.getS3Bucket(), object);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, IOException.class);
@ -105,6 +104,6 @@ public class S3TaskLogs implements TaskLogs
private String getTaskLogKey(String taskid)
{
return String.format("%s/%s/log", prefix, taskid);
return String.format("%s/%s/log", config.getS3Prefix(), taskid);
}
}

View File

@ -0,0 +1,47 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.tasklogs;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
*/
public class S3TaskLogsConfig
{
@JsonProperty
@NotNull
private String s3Bucket = null;
@JsonProperty
@NotNull
private String s3Prefix = null;
public String getS3Bucket()
{
return s3Bucket;
}
public String getS3Prefix()
{
return s3Prefix;
}
}

View File

@ -3,6 +3,7 @@ package com.metamx.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStream;
@ -11,11 +12,12 @@ import java.util.List;
/**
* Provides task logs based on a series of underlying task log providers.
*/
public class SwitchingTaskLogProvider implements TaskLogProvider
public class SwitchingTaskLogStreamer implements TaskLogStreamer
{
private final List<TaskLogProvider> providers;
private final List<TaskLogStreamer> providers;
public SwitchingTaskLogProvider(List<TaskLogProvider> providers)
@Inject
public SwitchingTaskLogStreamer(List<TaskLogStreamer> providers)
{
this.providers = ImmutableList.copyOf(providers);
}
@ -23,7 +25,7 @@ public class SwitchingTaskLogProvider implements TaskLogProvider
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
for (TaskLogProvider provider : providers) {
for (TaskLogStreamer provider : providers) {
final Optional<InputSupplier<InputStream>> stream = provider.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return stream;

View File

@ -9,7 +9,7 @@ import java.io.InputStream;
/**
* Something that knows how to stream logs for tasks.
*/
public interface TaskLogProvider
public interface TaskLogStreamer
{
/**
* Stream log for a task.

View File

@ -1,5 +1,5 @@
package com.metamx.druid.indexing.common.tasklogs;
public interface TaskLogs extends TaskLogProvider, TaskLogPusher
public interface TaskLogs extends TaskLogStreamer, TaskLogPusher
{
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import java.io.IOException;
import java.io.InputStream;
/**
*/
public class TaskRunnerTaskLogStreamer implements TaskLogStreamer
{
private final TaskMaster taskMaster;
@Inject
public TaskRunnerTaskLogStreamer(
final TaskMaster taskMaster
) {
this.taskMaster = taskMaster;
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
final TaskRunner runner = taskMaster.getTaskRunner().orNull();
if (runner instanceof TaskLogStreamer) {
return ((TaskLogStreamer) runner).streamTaskLog(taskid, offset);
} else {
return Optional.absent();
}
}
}

View File

@ -28,11 +28,12 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.actions.TaskAction;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
@ -46,15 +47,16 @@ import java.util.Map;
public class DbTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private final IndexerDbConnectorConfig dbConnectorConfig;
private final DbTablesConfig dbTables;
private final IDBI dbi;
private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class);
public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, IDBI dbi)
@Inject
public DbTaskStorage(ObjectMapper jsonMapper, DbTablesConfig dbTables, IDBI dbi)
{
this.jsonMapper = jsonMapper;
this.dbConnectorConfig = dbConnectorConfig;
this.dbTables = dbTables;
this.dbi = dbi;
}
@ -82,7 +84,7 @@ public class DbTaskStorage implements TaskStorage
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
dbConnectorConfig.getTaskTable()
dbTables.getTasksTable()
)
)
.bind("id", task.getId())
@ -123,7 +125,7 @@ public class DbTaskStorage implements TaskStorage
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
dbConnectorConfig.getTaskTable()
dbTables.getTasksTable()
)
)
.bind("id", status.getId())
@ -152,7 +154,7 @@ public class DbTaskStorage implements TaskStorage
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
dbConnectorConfig.getTaskTable()
dbTables.getTasksTable()
)
)
.bind("id", taskid)
@ -182,7 +184,7 @@ public class DbTaskStorage implements TaskStorage
handle.createQuery(
String.format(
"SELECT status_payload FROM %s WHERE id = :id",
dbConnectorConfig.getTaskTable()
dbTables.getTasksTable()
)
)
.bind("id", taskid)
@ -212,7 +214,7 @@ public class DbTaskStorage implements TaskStorage
handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1",
dbConnectorConfig.getTaskTable()
dbTables.getTasksTable()
)
)
.list();
@ -261,7 +263,7 @@ public class DbTaskStorage implements TaskStorage
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
dbConnectorConfig.getTaskLockTable()
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)
@ -296,7 +298,7 @@ public class DbTaskStorage implements TaskStorage
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
dbConnectorConfig.getTaskLockTable()
dbTables.getTaskLockTable()
)
)
.bind("id", id)
@ -341,7 +343,7 @@ public class DbTaskStorage implements TaskStorage
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
dbConnectorConfig.getTaskLogTable()
dbTables.getTaskLogTable()
)
)
.bind("task_id", task.getId())
@ -365,7 +367,7 @@ public class DbTaskStorage implements TaskStorage
handle.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
dbConnectorConfig.getTaskLogTable()
dbTables.getTaskLogTable()
)
)
.bind("task_id", taskid)
@ -402,7 +404,7 @@ public class DbTaskStorage implements TaskStorage
handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
dbConnectorConfig.getTaskLockTable()
dbTables.getTaskLockTable()
)
)
.bind("task_id", taskid)

View File

@ -38,12 +38,14 @@ import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.common.tasklogs.TaskLogPusher;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
@ -59,12 +61,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.
*/
public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
public class ForkingTaskRunner implements TaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(ForkingTaskRunner.class);
private static final String CHILD_PROPERTY_PREFIX = "druid.indexer.fork.property.";
@ -72,6 +74,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
private final ForkingTaskRunnerConfig config;
private final Properties props;
private final TaskLogPusher taskLogPusher;
private final DruidNode node;
private final ListeningExecutorService exec;
private final ObjectMapper jsonMapper;
@ -81,15 +84,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
ForkingTaskRunnerConfig config,
Properties props,
TaskLogPusher taskLogPusher,
ExecutorService exec,
ObjectMapper jsonMapper
ObjectMapper jsonMapper,
@Self DruidNode node
)
{
this.config = config;
this.props = props;
this.taskLogPusher = taskLogPusher;
this.exec = MoreExecutors.listeningDecorator(exec);
this.jsonMapper = jsonMapper;
this.node = node;
this.exec = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(config.maxForks()));
}
@Override
@ -113,7 +118,7 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
public TaskStatus call()
{
final String attemptUUID = UUID.randomUUID().toString();
final File taskDir = new File(config.getBaseTaskDir(), task.getId());
final File taskDir = new File(config.getTaskDir(), task.getId());
final File attemptDir = new File(taskDir, attemptUUID);
final ProcessHolder processHolder;
@ -147,17 +152,17 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
final List<String> command = Lists.newArrayList();
final int childPort = findUnusedPort();
final String childHost = String.format(config.getHostPattern(), childPort);
final String childHost = String.format("%s:%d", node.getHostNoPort(), childPort);
command.add(config.getJavaCommand());
command.add("-cp");
command.add(config.getJavaClasspath());
command.add(config.getClasspath());
Iterables.addAll(
command,
Splitter.on(CharMatcher.WHITESPACE)
.omitEmptyStrings()
.split(config.getJavaOptions())
.split(config.getJavaOpts())
);
for (String propName : props.stringPropertyNames()) {

View File

@ -0,0 +1,61 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.indexing.common.tasklogs.TaskLogs;
import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.initialization.DruidNode;
import java.util.Properties;
/**
*/
public class ForkingTaskRunnerFactory implements TaskRunnerFactory
{
private final ForkingTaskRunnerConfig config;
private final Properties props;
private final ObjectMapper jsonMapper;
private final TaskLogs persistentTaskLogs;
private final DruidNode node;
@Inject
public ForkingTaskRunnerFactory(
final ForkingTaskRunnerConfig config,
final Properties props,
final ObjectMapper jsonMapper,
final TaskLogs persistentTaskLogs,
@Self DruidNode node
) {
this.config = config;
this.props = props;
this.jsonMapper = jsonMapper;
this.persistentTaskLogs = persistentTaskLogs;
this.node = node;
}
@Override
public TaskRunner build()
{
return new ForkingTaskRunner(config, props, persistentTaskLogs, jsonMapper, node);
}
}

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
@ -52,16 +53,17 @@ import java.util.Set;
/**
*/
public class MergerDBCoordinator
public class IndexerDBCoordinator
{
private static final Logger log = new Logger(MergerDBCoordinator.class);
private static final Logger log = new Logger(IndexerDBCoordinator.class);
private final ObjectMapper jsonMapper;
private final DbConnectorConfig dbConnectorConfig;
private final DbTablesConfig dbTables;
private final IDBI dbi;
public MergerDBCoordinator(
@Inject
public IndexerDBCoordinator(
ObjectMapper jsonMapper,
DbConnectorConfig dbConnectorConfig,
DbTablesConfig dbTables,

View File

@ -25,6 +25,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@ -38,10 +39,11 @@ import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
@ -73,7 +75,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/**
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
@ -90,7 +91,7 @@ import java.util.concurrent.atomic.AtomicReference;
* <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/
public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public class RemoteTaskRunner implements TaskRunner, TaskLogStreamer
{
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
@ -98,10 +99,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final ZkPathsConfig zkPaths;
private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache;
private final AtomicReference<WorkerSetupData> workerSetupData;
private final Supplier<WorkerSetupData> workerSetupData;
private final HttpClient httpClient;
// all workers that exist in ZK
@ -120,17 +122,19 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public RemoteTaskRunner(
ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config,
ZkPathsConfig zkPaths,
CuratorFramework cf,
PathChildrenCacheFactory pathChildrenCacheFactory,
AtomicReference<WorkerSetupData> workerSetupData,
Supplier<WorkerSetupData> workerSetupData,
HttpClient httpClient
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.zkPaths = zkPaths;
this.cf = cf;
this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.workerPathCache = pathChildrenCacheFactory.make(cf, config.getIndexerAnnouncementPath());
this.workerPathCache = pathChildrenCacheFactory.make(cf, zkPaths.getIndexerAnnouncementPath());
this.workerSetupData = workerSetupData;
this.httpClient = httpClient;
}
@ -440,7 +444,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private void cleanup(final String workerId, final String taskId)
{
runningTasks.remove(taskId);
final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId);
final String statusPath = JOINER.join(zkPaths.getIndexerStatusPath(), workerId, taskId);
try {
cf.delete().guaranteed().forPath(statusPath);
}
@ -490,11 +494,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.info("Coordinator asking Worker[%s] to add task[%s]", theWorker.getHost(), task.getId());
byte[] rawBytes = jsonMapper.writeValueAsBytes(task);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
if (rawBytes.length > config.getMaxZnodeBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxZnodeBytes());
}
String taskPath = JOINER.join(config.getIndexerTaskPath(), theWorker.getHost(), task.getId());
String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), theWorker.getHost(), task.getId());
if (cf.checkExists().forPath(taskPath) == null) {
cf.create()
@ -541,7 +545,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
log.info("Worker[%s] reportin' for duty!", worker.getHost());
try {
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final String workerStatusPath = JOINER.join(zkPaths.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final ZkWorker zkWorker = new ZkWorker(
worker,
@ -649,10 +653,10 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (zkWorker != null) {
try {
for (String assignedTask : cf.getChildren()
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
.forPath(JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost()))) {
RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
String taskPath = JOINER.join(zkPaths.getIndexerTaskPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath);
}

View File

@ -0,0 +1,77 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.guice.annotations.Global;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.http.client.HttpClient;
import org.apache.curator.framework.CuratorFramework;
/**
*/
public class RemoteTaskRunnerFactory implements TaskRunnerFactory
{
private final CuratorFramework curator;
private final RemoteTaskRunnerConfig remoteTaskRunnerConfig;
private final ZkPathsConfig zkPaths;
private final ObjectMapper jsonMapper;
private final Supplier<WorkerSetupData> setupDataWatch;
private final HttpClient httpClient;
@Inject
public RemoteTaskRunnerFactory(
final CuratorFramework curator,
final RemoteTaskRunnerConfig remoteTaskRunnerConfig,
final ZkPathsConfig zkPaths,
final ObjectMapper jsonMapper,
final Supplier<WorkerSetupData> setupDataWatch,
@Global final HttpClient httpClient
) {
this.curator = curator;
this.remoteTaskRunnerConfig = remoteTaskRunnerConfig;
this.zkPaths = zkPaths;
this.jsonMapper = jsonMapper;
this.setupDataWatch = setupDataWatch;
this.httpClient = httpClient;
}
@Override
public TaskRunner build()
{
return new RemoteTaskRunner(
jsonMapper,
remoteTaskRunnerConfig,
zkPaths,
curator,
new SimplePathChildrenCacheFactory
.Builder()
.withCompressed(remoteTaskRunnerConfig.isCompressZnodes())
.build(),
setupDataWatch,
httpClient
);
}
}

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.IAE;
import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable;
@ -63,7 +64,10 @@ public class TaskLockbox
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
public TaskLockbox(TaskStorage taskStorage)
@Inject
public TaskLockbox(
TaskStorage taskStorage
)
{
this.taskStorage = taskStorage;
}

View File

@ -21,20 +21,21 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
@ -42,13 +43,14 @@ import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* Encapsulates the indexer leadership lifecycle.
*/
public class TaskMasterLifecycle
public class TaskMaster
{
private final LeaderSelector leaderSelector;
private final ReentrantLock giant = new ReentrantLock();
@ -56,17 +58,20 @@ public class TaskMasterLifecycle
private final TaskQueue taskQueue;
private final TaskActionClientFactory taskActionClientFactory;
private final AtomicReference<Lifecycle> leaderLifecycleRef = new AtomicReference<Lifecycle>(null);
private volatile boolean leading = false;
private volatile TaskRunner taskRunner;
private volatile ResourceManagementScheduler resourceManagementScheduler;
private static final EmittingLogger log = new EmittingLogger(TaskMasterLifecycle.class);
private static final EmittingLogger log = new EmittingLogger(TaskMaster.class);
public TaskMasterLifecycle(
@Inject
public TaskMaster(
final TaskQueue taskQueue,
final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final DruidNode node,
@Self final DruidNode node,
final ZkPathsConfig zkPaths,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
@ -78,7 +83,7 @@ public class TaskMasterLifecycle
this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
curator, zkPaths.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception
@ -101,6 +106,11 @@ public class TaskMasterLifecycle
// Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle();
if (leaderLifecycleRef.getAndSet(leaderLifecycle) != null) {
log.makeAlert("TaskMaster set a new Lifecycle without the old one being cleared! Race condition")
.emit();
}
leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
@ -122,10 +132,7 @@ public class TaskMasterLifecycle
Initialization.announceDefaultService(node, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) {
if (!(taskRunner instanceof RemoteTaskRunner)) {
throw new ISE("WTF?! We configured a remote runner and got %s", taskRunner.getClass());
}
if (taskRunner instanceof RemoteTaskRunner) {
resourceManagementScheduler = managementSchedulerFactory.build((RemoteTaskRunner) taskRunner);
leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
@ -144,7 +151,6 @@ public class TaskMasterLifecycle
finally {
log.info("Bowing out!");
stopLeading();
leaderLifecycle.stop();
}
}
catch (Exception e) {
@ -167,7 +173,7 @@ public class TaskMasterLifecycle
}
);
leaderSelector.setId(indexerCoordinatorConfig.getServerName());
leaderSelector.setId(node.getHost());
leaderSelector.autoRequeue();
}
@ -216,6 +222,10 @@ public class TaskMasterLifecycle
if (leading) {
leading = false;
mayBeStopped.signalAll();
final Lifecycle leaderLifecycle = leaderLifecycleRef.getAndSet(null);
if (leaderLifecycle != null) {
leaderLifecycle.stop();
}
}
}
finally {

View File

@ -27,6 +27,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.indexing.common.TaskLock;
@ -68,6 +69,7 @@ public class TaskQueue
private static final EmittingLogger log = new EmittingLogger(TaskQueue.class);
@Inject
public TaskQueue(TaskStorage taskStorage, TaskLockbox taskLockbox)
{
this.taskStorage = Preconditions.checkNotNull(taskStorage, "taskStorage");

View File

@ -25,6 +25,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.common.TaskStatus;
@ -44,6 +45,7 @@ public class TaskStorageQueryAdapter
{
private final TaskStorage storage;
@Inject
public TaskStorageQueryAdapter(TaskStorage storage)
{
this.storage = storage;

View File

@ -1,47 +1,85 @@
package com.metamx.druid.indexing.coordinator.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import org.skife.config.Config;
import org.skife.config.Default;
import java.io.File;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.List;
public abstract class ForkingTaskRunnerConfig
public class ForkingTaskRunnerConfig
{
@Config("druid.indexer.taskDir")
@Default("/tmp/persistent")
public abstract File getBaseTaskDir();
@JsonProperty
@Min(1)
private int maxForks = 1;
@Config("druid.indexer.fork.java")
@Default("java")
public abstract String getJavaCommand();
@JsonProperty
@NotNull
private String taskDir = "/tmp/persistent";
@Config("druid.indexer.fork.opts")
@Default("")
public abstract String getJavaOptions();
@JsonProperty
@NotNull
private String javaCommand = "java";
@Config("druid.indexer.fork.classpath")
public String getJavaClasspath() {
return System.getProperty("java.class.path");
@JsonProperty
@NotNull
private String javaOpts = "";
@JsonProperty
@NotNull
private String classpath = System.getProperty("java.class.path");
@JsonProperty
@NotNull
private String mainClass = ExecutorMain.class.getName();
@JsonProperty
@Min(1024) @Max(65535)
private int startPort = 8080;
@JsonProperty
@NotNull
List<String> allowedPrefixes = Lists.newArrayList("com.metamx", "druid", "io.druid");
public int maxForks()
{
return maxForks;
}
public String getTaskDir()
{
return taskDir;
}
public String getJavaCommand()
{
return javaCommand;
}
public String getJavaOpts()
{
return javaOpts;
}
public String getClasspath()
{
return classpath;
}
@Config("druid.indexer.fork.main")
public String getMainClass()
{
return ExecutorMain.class.getName();
return mainClass;
}
@Config("druid.indexer.fork.hostpattern")
public abstract String getHostPattern();
public int getStartPort()
{
return startPort;
}
@Config("druid.indexer.fork.startport")
public abstract int getStartPort();
@Config("druid.indexer.properties.prefixes")
public List<String> getAllowedPrefixes()
{
return Lists.newArrayList("com.metamx", "druid");
return allowedPrefixes;
}
}

View File

@ -36,10 +36,6 @@ public abstract class IndexerCoordinatorConfig extends ZkPathsConfig
@Config("druid.host")
public abstract String getServerName();
@Config("druid.indexer.threads")
@Default("1")
public abstract int getNumLocalThreads();
@Config("druid.indexer.runner")
@Default("local")
public abstract String getRunnerImpl();

View File

@ -19,25 +19,47 @@
package com.metamx.druid.indexing.coordinator.config;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
/**
*/
public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
public class RemoteTaskRunnerConfig
{
@Config("druid.indexer.taskAssignmentTimeoutDuration")
@Default("PT5M")
public abstract Duration getTaskAssignmentTimeoutDuration();
@JsonProperty
@NotNull
private Duration taskAssignmentTimeoutDuration = new Duration("PT5M");
@Config("druid.curator.compress")
@Default("false")
public abstract boolean enableCompression();
@JsonProperty
private boolean compressZnodes = false;
@Config("druid.indexer.worker.version")
@DefaultNull
public abstract String getWorkerVersion();
@JsonProperty
private String workerVersion = null;
@JsonProperty
@Min(10 * 1024)
private long maxZnodeBytes = 512 * 1024;
public Duration getTaskAssignmentTimeoutDuration()
{
return taskAssignmentTimeoutDuration;
}
public boolean isCompressZnodes()
{
return compressZnodes;
}
public String getWorkerVersion()
{
return workerVersion;
}
public long getMaxZnodeBytes()
{
return maxZnodeBytes;
}
}

View File

@ -24,12 +24,9 @@ import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@ -47,15 +44,14 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.db.DbTablesConfig;
import com.metamx.druid.guava.DSuppliers;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
@ -66,22 +62,22 @@ import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.indexing.common.tasklogs.S3TaskLogs;
import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogProvider;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.common.tasklogs.TaskLogs;
import com.metamx.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import com.metamx.druid.indexing.coordinator.DbTaskStorage;
import com.metamx.druid.indexing.coordinator.ForkingTaskRunner;
import com.metamx.druid.indexing.coordinator.ForkingTaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.HeapMemoryTaskStorage;
import com.metamx.druid.indexing.coordinator.MergerDBCoordinator;
import com.metamx.druid.indexing.coordinator.IndexerDBCoordinator;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.TaskLockbox;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskQueue;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.druid.indexing.coordinator.TaskRunnerFactory;
import com.metamx.druid.indexing.coordinator.TaskStorage;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.indexing.coordinator.config.ForkingTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.config.IndexerDbConnectorConfig;
@ -93,8 +89,8 @@ import com.metamx.druid.indexing.coordinator.scaling.NoopResourceManagementSched
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerConfig;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementConfig;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagementStrategy;
import com.metamx.druid.indexing.coordinator.scaling.SimpleResourceManagmentConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.initialization.CuratorDiscoveryConfig;
import com.metamx.druid.initialization.DruidNode;
@ -113,7 +109,6 @@ import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.DefaultServlet;
@ -127,12 +122,8 @@ import org.jets3t.service.security.AWSCredentials;
import org.joda.time.Duration;
import org.skife.config.ConfigurationObjectFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
@ -155,7 +146,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private DbConnector dbi = null;
private Supplier<DbTablesConfig> dbTables = null;
private IndexerCoordinatorConfig config = null;
private MergerDBCoordinator mergerDBCoordinator = null;
private IndexerDBCoordinator indexerDBCoordinator = null;
private ServiceDiscovery<Void> serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private TaskStorage taskStorage = null;
@ -166,9 +157,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private ResourceManagementSchedulerFactory resourceManagementSchedulerFactory = null;
private HttpClient httpClient = null;
private TaskActionClientFactory taskActionClientFactory = null;
private TaskMasterLifecycle taskMasterLifecycle = null;
private TaskMaster taskMaster = null;
private TaskLogs persistentTaskLogs = null;
private TaskLogProvider taskLogProvider = null;
private TaskLogStreamer taskLogStreamer = null;
private Server server = null;
private boolean initialized = false;
@ -190,9 +181,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
return this;
}
public IndexerCoordinatorNode setMergerDBCoordinator(MergerDBCoordinator mergerDBCoordinator)
public IndexerCoordinatorNode setIndexerDBCoordinator(IndexerDBCoordinator indexerDBCoordinator)
{
this.mergerDBCoordinator = mergerDBCoordinator;
this.indexerDBCoordinator = indexerDBCoordinator;
return this;
}
@ -214,9 +205,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
return this;
}
public IndexerCoordinatorNode setMergeDbCoordinator(MergerDBCoordinator mergeDbCoordinator)
public IndexerCoordinatorNode setMergeDbCoordinator(IndexerDBCoordinator mergeDbCoordinator)
{
this.mergerDBCoordinator = mergeDbCoordinator;
this.indexerDBCoordinator = mergeDbCoordinator;
return this;
}
@ -300,9 +291,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
getJsonMapper(),
config,
emitter,
taskMasterLifecycle,
taskMaster,
new TaskStorageQueryAdapter(taskStorage),
taskLogProvider,
taskLogStreamer,
configManager
)
);
@ -325,37 +316,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
root.addServlet(new ServletHolder(new DefaultServlet()), "/druid/*");
root.addServlet(new ServletHolder(new DefaultServlet()), "/mmx/*"); // backwards compatibility
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(
new FilterHolder(
new RedirectFilter(
new RedirectInfo()
{
@Override
public boolean doLocal()
{
return taskMasterLifecycle.isLeading();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
return new URL(
String.format(
"http://%s%s",
taskMasterLifecycle.getLeader(),
requestURI
)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
)
), "/*", null
);
root.addFilter(new FilterHolder(new RedirectFilter(new OverlordRedirectInfo(taskMaster))), "/*", null);
root.addFilter(GuiceFilter.class, "/druid/indexer/v1/*", null);
root.addFilter(GuiceFilter.class, "/mmx/merger/v1/*", null); //backwards compatibility, soon to be removed
@ -367,27 +328,27 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
if (taskActionClientFactory == null) {
taskActionClientFactory = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskQueue, taskLockbox, mergerDBCoordinator, emitter)
new TaskActionToolbox(taskQueue, taskLockbox, indexerDBCoordinator, emitter)
);
}
}
private void initializeTaskMasterLifecycle()
{
if (taskMasterLifecycle == null) {
if (taskMaster == null) {
final DruidNode nodeConfig = getConfigFactory().build(DruidNode.class);
taskMasterLifecycle = new TaskMasterLifecycle(
taskMaster = new TaskMaster(
taskQueue,
taskActionClientFactory,
config,
nodeConfig,
getZkPaths(),
taskRunnerFactory,
resourceManagementSchedulerFactory,
getCuratorFramework(),
serviceAnnouncer,
emitter
);
getLifecycle().addManagedInstance(taskMasterLifecycle);
getLifecycle().addManagedInstance(taskMaster);
}
}
@ -398,8 +359,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
if (taskLogConfig.getLogType().equalsIgnoreCase("s3")) {
initializeS3Service();
persistentTaskLogs = new S3TaskLogs(
taskLogConfig.getLogStorageBucket(),
taskLogConfig.getLogStoragePrefix(),
null, // TODO: eliminate
s3Service
);
} else if (taskLogConfig.getLogType().equalsIgnoreCase("noop")) {
@ -412,30 +372,16 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private void initializeTaskLogProvider()
{
if (taskLogProvider == null) {
final List<TaskLogProvider> providers = Lists.newArrayList();
if (taskLogStreamer == null) {
final List<TaskLogStreamer> providers = Lists.newArrayList();
// Use our TaskRunner if it is also a TaskLogProvider
providers.add(
new TaskLogProvider()
{
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(String taskid, long offset) throws IOException
{
final TaskRunner runner = taskMasterLifecycle.getTaskRunner().orNull();
if (runner instanceof TaskLogProvider) {
return ((TaskLogProvider) runner).streamTaskLog(taskid, offset);
} else {
return Optional.absent();
}
}
}
);
// Use our TaskRunner if it is also a TaskLogStreamer
providers.add(new TaskRunnerTaskLogStreamer(IndexerCoordinatorNode.this.taskMaster));
// Use our persistent log storage
providers.add(persistentTaskLogs);
taskLogProvider = new SwitchingTaskLogProvider(providers);
taskLogStreamer = new SwitchingTaskLogStreamer(providers);
}
}
@ -569,8 +515,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
public void initializeMergeDBCoordinator()
{
if (mergerDBCoordinator == null) {
mergerDBCoordinator = new MergerDBCoordinator(
if (indexerDBCoordinator == null) {
indexerDBCoordinator = new IndexerDBCoordinator(
getJsonMapper(),
dbConnectorConfig,
getDbTables().get(),
@ -595,7 +541,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
public void initializeTaskQueue()
{
if (taskQueue == null) {
// Don't start it here. The TaskMasterLifecycle will handle that when it feels like it.
// Don't start it here. The TaskMaster will handle that when it feels like it.
taskQueue = new TaskQueue(taskStorage, taskLockbox);
}
}
@ -623,7 +569,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
final IndexerDbConnectorConfig dbConnectorConfig = getConfigFactory().build(IndexerDbConnectorConfig.class);
dbi.createTaskTables();
taskStorage = new DbTaskStorage(getJsonMapper(), dbConnectorConfig, dbi.getDBI());
taskStorage = new DbTaskStorage(getJsonMapper(), null, dbi.getDBI()); // TODO: eliminate
} else {
throw new ISE("Invalid storage implementation: %s", config.getStorageImpl());
}
@ -634,41 +580,23 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
{
if (taskRunnerFactory == null) {
if (config.getRunnerImpl().equals("remote")) {
taskRunnerFactory = new TaskRunnerFactory()
{
@Override
public TaskRunner build()
{
final CuratorFramework curator = getCuratorFramework();
final RemoteTaskRunnerConfig remoteTaskRunnerConfig = getConfigFactory().build(RemoteTaskRunnerConfig.class);
return new RemoteTaskRunner(
taskRunnerFactory = new RemoteTaskRunnerFactory(
getCuratorFramework(),
getConfigFactory().build(RemoteTaskRunnerConfig.class),
getZkPaths(),
getJsonMapper(),
remoteTaskRunnerConfig,
curator,
new SimplePathChildrenCacheFactory.Builder().withCompressed(remoteTaskRunnerConfig.enableCompression())
.build(),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
DSuppliers.of(configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class)),
httpClient
);
}
};
} else if (config.getRunnerImpl().equals("local")) {
taskRunnerFactory = new TaskRunnerFactory()
{
@Override
public TaskRunner build()
{
final ExecutorService runnerExec = Executors.newFixedThreadPool(config.getNumLocalThreads());
return new ForkingTaskRunner(
taskRunnerFactory = new ForkingTaskRunnerFactory(
getConfigFactory().build(ForkingTaskRunnerConfig.class),
getProps(),
getJsonMapper(),
persistentTaskLogs,
runnerExec,
getJsonMapper()
null // TODO: eliminate
);
}
};
} else {
throw new ISE("Invalid runner implementation: %s", config.getRunnerImpl());
}
@ -679,62 +607,9 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
{
if (resourceManagementSchedulerFactory == null) {
if (!config.isAutoScalingEnabled()) {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
return new NoopResourceManagementScheduler();
}
};
resourceManagementSchedulerFactory = new NoopResourceManagementSchedulerFactory();
} else {
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ScalingExec--%d")
.build()
);
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
AutoScalingStrategy strategy;
if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
getJsonMapper(),
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
)
),
getConfigFactory().build(EC2AutoScalingStrategyConfig.class),
workerSetupData
);
} else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
}
return new ResourceManagementScheduler(
runner,
new SimpleResourceManagementStrategy(
strategy,
getConfigFactory().build(SimpleResourceManagmentConfig.class),
workerSetupData
),
getConfigFactory().build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
}
};
resourceManagementSchedulerFactory = new WithOpResourceManagementSchedulerFactory(configManager);
}
}
}
@ -800,4 +675,68 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
return new IndexerCoordinatorNode(props, lifecycle, jsonMapper, smileMapper, configFactory);
}
}
private static class NoopResourceManagementSchedulerFactory implements ResourceManagementSchedulerFactory
{
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
return new NoopResourceManagementScheduler();
}
}
private class WithOpResourceManagementSchedulerFactory implements ResourceManagementSchedulerFactory
{
private final JacksonConfigManager configManager;
public WithOpResourceManagementSchedulerFactory(JacksonConfigManager configManager)
{
this.configManager = configManager;
}
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("ScalingExec--%d")
.build()
);
final AtomicReference<WorkerSetupData> workerSetupData = configManager.watch(
WorkerSetupData.CONFIG_KEY, WorkerSetupData.class
);
AutoScalingStrategy strategy;
if (config.getAutoScalingImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
getJsonMapper(),
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(getProps(), "com.metamx.aws.accessKey"),
PropUtils.getProperty(getProps(), "com.metamx.aws.secretKey")
)
),
null, // TODO: eliminate
DSuppliers.of(workerSetupData)
);
} else if (config.getAutoScalingImpl().equalsIgnoreCase("noop")) {
strategy = new NoopAutoScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s", config.getAutoScalingImpl());
}
return new ResourceManagementScheduler(
runner,
new SimpleResourceManagementStrategy(
strategy,
getConfigFactory().build(SimpleResourceManagementConfig.class),
DSuppliers.of(workerSetupData)
),
getConfigFactory().build(ResourceManagementSchedulerConfig.class),
scalingScheduledExec
);
}
}
}

View File

@ -35,8 +35,8 @@ import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionHolder;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskQueue;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
@ -87,9 +87,9 @@ public class IndexerCoordinatorResource
}
};
private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogProvider taskLogProvider;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
private final ObjectMapper jsonMapper;
@ -97,16 +97,16 @@ public class IndexerCoordinatorResource
@Inject
public IndexerCoordinatorResource(
TaskMasterLifecycle taskMasterLifecycle,
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
) throws Exception
{
this.taskMasterLifecycle = taskMasterLifecycle;
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogProvider = taskLogProvider;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
this.jsonMapper = jsonMapper;
}
@ -137,7 +137,7 @@ public class IndexerCoordinatorResource
public Response taskPost(final Task task)
{
return asLeaderWith(
taskMasterLifecycle.getTaskQueue(),
taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>()
{
@Override
@ -173,7 +173,7 @@ public class IndexerCoordinatorResource
public Response doShutdown(@PathParam("taskid") final String taskid)
{
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -241,7 +241,7 @@ public class IndexerCoordinatorResource
public <T> Response doAction(final TaskActionHolder<T> holder)
{
return asLeaderWith(
taskMasterLifecycle.getTaskActionClient(holder.getTask()),
taskMaster.getTaskActionClient(holder.getTask()),
new Function<TaskActionClient, Response>()
{
@Override
@ -278,7 +278,7 @@ public class IndexerCoordinatorResource
{
if (full != null) {
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -291,7 +291,7 @@ public class IndexerCoordinatorResource
}
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -317,7 +317,7 @@ public class IndexerCoordinatorResource
{
if (full != null) {
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -330,7 +330,7 @@ public class IndexerCoordinatorResource
}
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -353,7 +353,7 @@ public class IndexerCoordinatorResource
public Response getWorkers()
{
return asLeaderWith(
taskMasterLifecycle.getTaskRunner(),
taskMaster.getTaskRunner(),
new Function<TaskRunner, Response>()
{
@Override
@ -371,7 +371,7 @@ public class IndexerCoordinatorResource
public Response getScalingState()
{
return asLeaderWith(
taskMasterLifecycle.getResourceManagementScheduler(),
taskMaster.getResourceManagementScheduler(),
new Function<ResourceManagementScheduler, Response>()
{
@Override
@ -392,7 +392,7 @@ public class IndexerCoordinatorResource
)
{
try {
final Optional<InputSupplier<InputStream>> stream = taskLogProvider.streamTaskLog(taskid, offset);
final Optional<InputSupplier<InputStream>> stream = taskLogStreamer.streamTaskLog(taskid, offset);
if (stream.isPresent()) {
return Response.ok(stream.get().getInput()).build();
} else {

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.emitter.service.ServiceEmitter;
@ -40,27 +40,27 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
private final ObjectMapper jsonMapper;
private final IndexerCoordinatorConfig indexerCoordinatorConfig;
private final ServiceEmitter emitter;
private final TaskMasterLifecycle taskMasterLifecycle;
private final TaskMaster taskMaster;
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogProvider taskLogProvider;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
public IndexerCoordinatorServletModule(
ObjectMapper jsonMapper,
IndexerCoordinatorConfig indexerCoordinatorConfig,
ServiceEmitter emitter,
TaskMasterLifecycle taskMasterLifecycle,
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager
)
{
this.jsonMapper = jsonMapper;
this.indexerCoordinatorConfig = indexerCoordinatorConfig;
this.emitter = emitter;
this.taskMasterLifecycle = taskMasterLifecycle;
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogProvider = taskLogProvider;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
}
@ -72,9 +72,9 @@ public class IndexerCoordinatorServletModule extends JerseyServletModule
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(IndexerCoordinatorConfig.class).toInstance(indexerCoordinatorConfig);
bind(ServiceEmitter.class).toInstance(emitter);
bind(TaskMasterLifecycle.class).toInstance(taskMasterLifecycle);
bind(TaskMaster.class).toInstance(taskMaster);
bind(TaskStorageQueryAdapter.class).toInstance(taskStorageQueryAdapter);
bind(TaskLogProvider.class).toInstance(taskLogProvider);
bind(TaskLogStreamer.class).toInstance(taskLogStreamer);
bind(JacksonConfigManager.class).toInstance(configManager);
serve("/*").with(GuiceContainer.class);

View File

@ -3,8 +3,8 @@ package com.metamx.druid.indexing.coordinator.http;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
import com.metamx.druid.indexing.coordinator.TaskMasterLifecycle;
import com.metamx.druid.indexing.common.tasklogs.TaskLogStreamer;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.TaskStorageQueryAdapter;
import javax.ws.rs.Path;
@ -17,13 +17,13 @@ public class OldIndexerCoordinatorResource extends IndexerCoordinatorResource
{
@Inject
public OldIndexerCoordinatorResource(
TaskMasterLifecycle taskMasterLifecycle,
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogProvider taskLogProvider,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager,
ObjectMapper jsonMapper
) throws Exception
{
super(taskMasterLifecycle, taskStorageQueryAdapter, taskLogProvider, configManager, jsonMapper);
super(taskMaster, taskStorageQueryAdapter, taskLogStreamer, configManager, jsonMapper);
}
}

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.http;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import java.net.URL;
/**
*/
public class OverlordRedirectInfo implements RedirectInfo
{
private final TaskMaster taskMaster;
@Inject
public OverlordRedirectInfo(TaskMaster taskMaster)
{
this.taskMaster = taskMaster;
}
@Override
public boolean doLocal()
{
return taskMaster.isLeading();
}
@Override
public URL getRedirectURL(String queryString, String requestURI)
{
try {
return new URL(String.format("http://%s%s", taskMaster.getLeader(), requestURI));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -25,12 +25,12 @@ import java.util.List;
/**
*/
public class AutoScalingData<T>
public class AutoScalingData
{
private final List<String> nodeIds;
private final List<T> nodes;
private final List nodes;
public AutoScalingData(List<String> nodeIds, List<T> nodes)
public AutoScalingData(List<String> nodeIds, List nodes)
{
this.nodeIds = nodeIds;
this.nodes = nodes;
@ -42,7 +42,7 @@ public class AutoScalingData<T>
return nodeIds;
}
public List<T> getNodes()
public List getNodes()
{
return nodes;
}

View File

@ -24,11 +24,11 @@ import java.util.List;
/**
* The AutoScalingStrategy has the actual methods to provision and terminate worker nodes.
*/
public interface AutoScalingStrategy<T>
public interface AutoScalingStrategy
{
public AutoScalingData<T> provision();
public AutoScalingData provision();
public AutoScalingData<T> terminate(List<String> ips);
public AutoScalingData terminate(List<String> ips);
/**
* Provides a lookup of ip addresses to node ids

View File

@ -19,7 +19,7 @@
package com.metamx.druid.indexing.coordinator.scaling;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.model.DescribeInstancesRequest;
import com.amazonaws.services.ec2.model.DescribeInstancesResult;
import com.amazonaws.services.ec2.model.Filter;
@ -30,8 +30,9 @@ import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.google.inject.Inject;
import com.metamx.druid.indexing.coordinator.setup.EC2NodeData;
import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
@ -40,24 +41,24 @@ import org.apache.commons.codec.binary.Base64;
import javax.annotation.Nullable;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
public class EC2AutoScalingStrategy implements AutoScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
private final ObjectMapper jsonMapper;
private final AmazonEC2Client amazonEC2Client;
private final EC2AutoScalingStrategyConfig config;
private final AtomicReference<WorkerSetupData> workerSetupDataRef;
private final AmazonEC2 amazonEC2Client;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupDataRef;
@Inject
public EC2AutoScalingStrategy(
ObjectMapper jsonMapper,
AmazonEC2Client amazonEC2Client,
EC2AutoScalingStrategyConfig config,
AtomicReference<WorkerSetupData> workerSetupDataRef
AmazonEC2 amazonEC2Client,
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupDataRef
)
{
this.jsonMapper = jsonMapper;
@ -67,7 +68,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> provision()
public AutoScalingData provision()
{
try {
WorkerSetupData setupData = workerSetupDataRef.get();
@ -110,7 +111,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
log.info("Created instances: %s", instanceIds);
return new AutoScalingData<Instance>(
return new AutoScalingData(
Lists.transform(
result.getReservation().getInstances(),
new Function<Instance, String>()
@ -133,10 +134,10 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
}
@Override
public AutoScalingData<Instance> terminate(List<String> ips)
public AutoScalingData terminate(List<String> ips)
{
if (ips.isEmpty()) {
return new AutoScalingData<Instance>(Lists.<String>newArrayList(), Lists.<Instance>newArrayList());
return new AutoScalingData(Lists.<String>newArrayList(), Lists.<Instance>newArrayList());
}
DescribeInstancesResult result = amazonEC2Client.describeInstances(
@ -169,7 +170,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
)
);
return new AutoScalingData<Instance>(
return new AutoScalingData(
Lists.transform(
ips,
new Function<String, String>()

View File

@ -26,19 +26,19 @@ import java.util.List;
/**
* This class just logs when scaling should occur.
*/
public class NoopAutoScalingStrategy implements AutoScalingStrategy<String>
public class NoopAutoScalingStrategy implements AutoScalingStrategy
{
private static final EmittingLogger log = new EmittingLogger(NoopAutoScalingStrategy.class);
@Override
public AutoScalingData<String> provision()
public AutoScalingData provision()
{
log.info("If I were a real strategy I'd create something now");
return null;
}
@Override
public AutoScalingData<String> terminate(List<String> ips)
public AutoScalingData terminate(List<String> ips)
{
log.info("If I were a real strategy I'd terminate %s now", ips);
return null;

View File

@ -76,41 +76,32 @@ public class ResourceManagementScheduler
ScheduledExecutors.scheduleAtFixedRate(
exec,
config.getProvisionResourcesDuration(),
config.getProvisionPeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doProvision(
taskRunner.getPendingTasks(),
taskRunner.getWorkers()
);
resourceManagementStrategy.doProvision(taskRunner.getPendingTasks(), taskRunner.getWorkers());
}
}
);
// Schedule termination of worker nodes periodically
Period period = new Period(config.getTerminateResourcesDuration());
PeriodGranularity granularity = new PeriodGranularity(period, config.getTerminateResourcesOriginDateTime(), null);
Period period = config.getTerminatePeriod();
PeriodGranularity granularity = new PeriodGranularity(period, config.getOriginTime(), null);
final long startTime = granularity.next(granularity.truncate(new DateTime().getMillis()));
ScheduledExecutors.scheduleAtFixedRate(
exec,
new Duration(
System.currentTimeMillis(),
startTime
),
config.getTerminateResourcesDuration(),
new Duration(System.currentTimeMillis(), startTime),
config.getTerminatePeriod().toStandardDuration(),
new Runnable()
{
@Override
public void run()
{
resourceManagementStrategy.doTerminate(
taskRunner.getPendingTasks(),
taskRunner.getWorkers()
);
resourceManagementStrategy.doTerminate(taskRunner.getPendingTasks(), taskRunner.getWorkers());
}
}
);

View File

@ -19,24 +19,43 @@
package com.metamx.druid.indexing.coordinator.scaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import org.joda.time.Period;
/**
*/
public abstract class ResourceManagementSchedulerConfig
public class ResourceManagementSchedulerConfig
{
@Config("druid.indexer.provisionResources.duration")
@Default("PT1M")
public abstract Duration getProvisionResourcesDuration();
@JsonProperty
private boolean doAutoscale = false;
@Config("druid.indexer.terminateResources.duration")
@Default("PT1H")
public abstract Duration getTerminateResourcesDuration();
@JsonProperty
private Period provisionPeriod = new Period("PT1M");
@Config("druid.indexer.terminateResources.originDateTime")
@Default("2012-01-01T00:55:00.000Z")
public abstract DateTime getTerminateResourcesOriginDateTime();
@JsonProperty
private Period terminatePeriod = new Period("PT1H");
@JsonProperty
private DateTime originTime = new DateTime("2012-01-01T00:55:00.000Z");
public boolean isDoAutoscale()
{
return doAutoscale;
}
public Period getProvisionPeriod()
{
return provisionPeriod;
}
public Period getTerminatePeriod()
{
return terminatePeriod;
}
public DateTime getOriginTime()
{
return originTime;
}
}

View File

@ -0,0 +1,56 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.scaling;
import com.google.inject.Inject;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
/**
*/
public class ResourceManagementSchedulerFactoryImpl implements ResourceManagementSchedulerFactory
{
private final ResourceManagementSchedulerConfig config;
private final ResourceManagementStrategy strategy;
private final ScheduledExecutorFactory executorFactory;
@Inject
public ResourceManagementSchedulerFactoryImpl(
ResourceManagementStrategy strategy,
ResourceManagementSchedulerConfig config,
ScheduledExecutorFactory executorFactory
)
{
this.config = config;
this.strategy = strategy;
this.executorFactory = executorFactory;
}
@Override
public ResourceManagementScheduler build(RemoteTaskRunner runner)
{
if (config.isDoAutoscale()) {
return new ResourceManagementScheduler(runner, strategy, config, executorFactory.create(1, "ScalingExec--%d"));
}
else {
return new NoopResourceManagementScheduler();
}
}
}

View File

@ -0,0 +1,112 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.scaling;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
/**
*/
public class SimpleResourceManagementConfig
{
@JsonProperty
private Period workerIdleTimeout = new Period("PT10m");
@JsonProperty
private Period maxScalingDuration = new Period("PT15M");
@JsonProperty
private int numEventsToTrack = 50;
@JsonProperty
private Period pendingTaskTimeout = new Period("PT30s");
@JsonProperty
private String workerVersion = null;
@JsonProperty
private int workerPort = 8080;
public Period getWorkerIdleTimeout()
{
return workerIdleTimeout;
}
public SimpleResourceManagementConfig setWorkerIdleTimeout(Period workerIdleTimeout)
{
this.workerIdleTimeout = workerIdleTimeout;
return this;
}
public Period getMaxScalingDuration()
{
return maxScalingDuration;
}
public SimpleResourceManagementConfig setMaxScalingDuration(Period maxScalingDuration)
{
this.maxScalingDuration = maxScalingDuration;
return this;
}
public int getNumEventsToTrack()
{
return numEventsToTrack;
}
public SimpleResourceManagementConfig setNumEventsToTrack(int numEventsToTrack)
{
this.numEventsToTrack = numEventsToTrack;
return this;
}
public Period getPendingTaskTimeout()
{
return pendingTaskTimeout;
}
public SimpleResourceManagementConfig setPendingTaskTimeout(Period pendingTaskTimeout)
{
this.pendingTaskTimeout = pendingTaskTimeout;
return this;
}
public String getWorkerVersion()
{
return workerVersion;
}
public SimpleResourceManagementConfig setWorkerVersion(String workerVersion)
{
this.workerVersion = workerVersion;
return this;
}
public int getWorkerPort()
{
return workerPort;
}
public SimpleResourceManagementConfig setWorkerPort(int workerPort)
{
this.workerPort = workerPort;
return this;
}
}

View File

@ -21,9 +21,11 @@ package com.metamx.druid.indexing.coordinator.scaling;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
@ -37,7 +39,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicReference;
/**
*/
@ -46,8 +47,8 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private static final EmittingLogger log = new EmittingLogger(SimpleResourceManagementStrategy.class);
private final AutoScalingStrategy autoScalingStrategy;
private final SimpleResourceManagmentConfig config;
private final AtomicReference<WorkerSetupData> workerSetupdDataRef;
private final SimpleResourceManagementConfig config;
private final Supplier<WorkerSetupData> workerSetupdDataRef;
private final ScalingStats scalingStats;
private final ConcurrentSkipListSet<String> currentlyProvisioning = new ConcurrentSkipListSet<String>();
@ -56,10 +57,11 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
private volatile DateTime lastProvisionTime = new DateTime();
private volatile DateTime lastTerminateTime = new DateTime();
@Inject
public SimpleResourceManagementStrategy(
AutoScalingStrategy autoScalingStrategy,
SimpleResourceManagmentConfig config,
AtomicReference<WorkerSetupData> workerSetupdDataRef
SimpleResourceManagementConfig config,
Supplier<WorkerSetupData> workerSetupdDataRef
)
{
this.autoScalingStrategy = autoScalingStrategy;
@ -96,7 +98,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
List<String> workerNodeIds = autoScalingStrategy.ipToIdLookup(
Lists.newArrayList(
Iterables.transform(
Iterables.<ZkWorker, String>transform(
zkWorkers,
new Function<ZkWorker, String>()
{
@ -134,7 +136,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
durSinceLastProvision
);
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) {
if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node provisioning taking too long!")
.addData("millisSinceLastProvision", durSinceLastProvision.getMillis())
.addData("provisioningCount", currentlyProvisioning.size())
@ -198,7 +200,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
return input.getRunningTasks().isEmpty()
&& System.currentTimeMillis() - input.getLastCompletedTaskTime().getMillis()
>= config.getMaxWorkerIdleTimeMillisBeforeDeletion();
>= config.getWorkerIdleTimeout().getMillis();
}
}
)
@ -240,7 +242,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
currentlyTerminating
);
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) {
if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration().toStandardDuration())) {
log.makeAlert("Worker node termination taking too long!")
.addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis())
.addData("terminatingCount", currentlyTerminating.size())
@ -263,11 +265,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
{
long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) {
if (new Duration(pendingTask.getQueueInsertionTime().getMillis(), now).isEqual(config.getMaxPendingTaskDuration())
||
new Duration(
pendingTask.getQueueInsertionTime().getMillis(), now
).isLongerThan(config.getMaxPendingTaskDuration())) {
final Duration durationSinceInsertion = new Duration(pendingTask.getQueueInsertionTime().getMillis(), now);
final Duration timeoutDuration = config.getPendingTaskTimeout().toStandardDuration();
if (durationSinceInsertion.isEqual(timeoutDuration) || durationSinceInsertion.isLongerThan(timeoutDuration)) {
return true;
}
}

View File

@ -1,50 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator.scaling;
import org.joda.time.Duration;
import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull;
/**
*/
public abstract class SimpleResourceManagmentConfig
{
@Config("druid.indexer.maxWorkerIdleTimeMillisBeforeDeletion")
@Default("600000")
public abstract int getMaxWorkerIdleTimeMillisBeforeDeletion();
@Config("druid.indexer.maxScalingDuration")
@Default("PT15M")
public abstract Duration getMaxScalingDuration();
@Config("druid.indexer.numEventsToTrack")
@Default("50")
public abstract int getNumEventsToTrack();
@Config("druid.indexer.maxPendingTaskDuration")
@Default("PT30S")
public abstract Duration getMaxPendingTaskDuration();
@Config("druid.indexer.worker.version")
@DefaultNull
public abstract String getWorkerVersion();
}

View File

@ -366,8 +366,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
if (taskLogConfig.getLogStorageBucket() != null) {
initializeS3Service();
persistentTaskLogs = new S3TaskLogs(
taskLogConfig.getLogStorageBucket(),
taskLogConfig.getLogStoragePrefix(),
null, // TODO: eliminate
s3Service
);
} else {
@ -383,8 +382,8 @@ public class WorkerNode extends QueryableNode<WorkerNode>
getConfigFactory().build(ForkingTaskRunnerConfig.class),
getProps(),
persistentTaskLogs,
Executors.newFixedThreadPool(workerConfig.getCapacity()),
getJsonMapper()
getJsonMapper(),
null // todo: eliminate
);
}
}

View File

@ -15,6 +15,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.guava.DSuppliers;
import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolboxFactory;
@ -27,6 +28,7 @@ import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.indexing.worker.WorkerCuratorCoordinator;
import com.metamx.druid.indexing.worker.WorkerTaskMonitor;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
@ -354,9 +356,17 @@ public class RemoteTaskRunnerTest
remoteTaskRunner = new RemoteTaskRunner(
jsonMapper,
new TestRemoteTaskRunnerConfig(),
new ZkPathsConfig()
{
@Override
public String getZkBasePath()
{
return basePath;
}
},
cf,
new SimplePathChildrenCacheFactory.Builder().build(),
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)),
DSuppliers.of(new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null))),
null
);
@ -381,17 +391,11 @@ public class RemoteTaskRunnerTest
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{
@Override
public boolean enableCompression()
public boolean isCompressZnodes()
{
return false;
}
@Override
public String getZkBasePath()
{
return basePath;
}
@Override
public Duration getTaskAssignmentTimeoutDuration()
{
@ -399,7 +403,7 @@ public class RemoteTaskRunnerTest
}
@Override
public long getMaxNumBytes()
public long getMaxZnodeBytes()
{
return 1000;
}

View File

@ -87,7 +87,7 @@ public class TaskLifecycleTest
private TaskLockbox tl = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockMergerDBCoordinator mdc = null;
private MockIndexerDBCoordinator mdc = null;
private TaskActionClientFactory tac = null;
private TaskToolboxFactory tb = null;
private TaskConsumer tc = null;
@ -410,12 +410,12 @@ public class TaskLifecycleTest
return status;
}
private static class MockMergerDBCoordinator extends MergerDBCoordinator
private static class MockIndexerDBCoordinator extends IndexerDBCoordinator
{
final private Set<DataSegment> published = Sets.newHashSet();
final private Set<DataSegment> nuked = Sets.newHashSet();
private MockMergerDBCoordinator()
private MockIndexerDBCoordinator()
{
super(null, null, null, null);
}
@ -462,9 +462,9 @@ public class TaskLifecycleTest
}
}
private static MockMergerDBCoordinator newMockMDC()
private static MockIndexerDBCoordinator newMockMDC()
{
return new MockMergerDBCoordinator();
return new MockIndexerDBCoordinator();
}
private static ServiceEmitter newMockEmitter()

View File

@ -28,7 +28,7 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.guava.DSuppliers;
import com.metamx.druid.indexing.coordinator.setup.EC2NodeData;
import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
@ -77,21 +77,8 @@ public class EC2AutoScalingStrategyTest
strategy = new EC2AutoScalingStrategy(
new DefaultObjectMapper(),
amazonEC2Client,
new EC2AutoScalingStrategyConfig()
{
@Override
public String getWorkerPort()
{
return "8080";
}
@Override
public String getWorkerVersion()
{
return "";
}
},
workerSetupData
new SimpleResourceManagementConfig().setWorkerPort(8080).setWorkerVersion(""),
DSuppliers.of(workerSetupData)
);
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.guava.DSuppliers;
import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
@ -38,8 +39,8 @@ import com.metamx.emitter.service.ServiceEventBuilder;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Before;
import org.junit.Test;
@ -88,39 +89,13 @@ public class SimpleResourceManagementStrategyTest
);
simpleResourceManagementStrategy = new SimpleResourceManagementStrategy(
autoScalingStrategy,
new SimpleResourceManagmentConfig()
{
@Override
public int getMaxWorkerIdleTimeMillisBeforeDeletion()
{
return 0;
}
@Override
public Duration getMaxScalingDuration()
{
return new Duration(1000);
}
@Override
public int getNumEventsToTrack()
{
return 1;
}
@Override
public Duration getMaxPendingTaskDuration()
{
return new Duration(0);
}
@Override
public String getWorkerVersion()
{
return "";
}
},
workerSetupData
new SimpleResourceManagementConfig()
.setWorkerIdleTimeout(new Period(0))
.setMaxScalingDuration(new Period(1000))
.setNumEventsToTrack(1)
.setPendingTaskTimeout(new Period(0))
.setWorkerVersion(""),
DSuppliers.of(workerSetupData)
);
}

View File

@ -23,16 +23,16 @@ import java.util.List;
/**
*/
public class TestAutoScalingStrategy<T> implements AutoScalingStrategy<T>
public class TestAutoScalingStrategy<T> implements AutoScalingStrategy
{
@Override
public AutoScalingData<T> provision()
public AutoScalingData provision()
{
return null;
}
@Override
public AutoScalingData<T> terminate(List<String> ips)
public AutoScalingData terminate(List<String> ips)
{
return null;
}

19
pom.xml
View File

@ -113,6 +113,25 @@
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.3.27</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>

View File

@ -68,7 +68,6 @@ import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.sun.istack.internal.NotNull;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -99,32 +98,16 @@ public class RealtimePlumberSchool implements PlumberSchool
private final IndexGranularity segmentGranularity;
private final Object handoffCondition = new Object();
@JacksonInject
@NotNull
private ServiceEmitter emitter;
private volatile ServiceEmitter emitter;
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
private volatile DataSegmentPusher dataSegmentPusher = null;
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
private volatile SegmentPublisher segmentPublisher = null;
private volatile ServerView serverView = null;
private volatile VersioningPolicy versioningPolicy = null;
private volatile RejectionPolicyFactory rejectionPolicyFactory = null;
@JacksonInject
@NotNull
private volatile QueryRunnerFactoryConglomerate conglomerate = null;
@JacksonInject
@NotNull
private volatile DataSegmentPusher dataSegmentPusher = null;
@JacksonInject
@NotNull
private volatile DataSegmentAnnouncer segmentAnnouncer = null;
@JacksonInject
@NotNull
private volatile SegmentPublisher segmentPublisher = null;
@JacksonInject
@NotNull
private volatile ServerView serverView = null;
@JsonCreator
public RealtimePlumberSchool(
@ -156,6 +139,42 @@ public class RealtimePlumberSchool implements PlumberSchool
this.rejectionPolicyFactory = factory;
}
@JacksonInject
public void setEmitter(ServiceEmitter emitter)
{
this.emitter = emitter;
}
@JacksonInject
public void setConglomerate(QueryRunnerFactoryConglomerate conglomerate)
{
this.conglomerate = conglomerate;
}
@JacksonInject
public void setDataSegmentPusher(DataSegmentPusher dataSegmentPusher)
{
this.dataSegmentPusher = dataSegmentPusher;
}
@JacksonInject
public void setSegmentAnnouncer(DataSegmentAnnouncer segmentAnnouncer)
{
this.segmentAnnouncer = segmentAnnouncer;
}
@JacksonInject
public void setSegmentPublisher(SegmentPublisher segmentPublisher)
{
this.segmentPublisher = segmentPublisher;
}
@JacksonInject
public void setServerView(ServerView serverView)
{
this.serverView = serverView;
}
@Override
public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics)
{

View File

@ -72,6 +72,10 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>

View File

@ -19,14 +19,17 @@
package com.metamx.druid.guice;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2Client;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
import com.metamx.druid.loading.S3CredentialsConfig;
import com.metamx.druid.loading.AWSCredentialsConfig;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
/**
*/
@ -35,18 +38,39 @@ public class S3Module implements Module
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.s3", S3CredentialsConfig.class);
JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class);
}
@Provides
@LazySingleton
public RestS3Service getRestS3Service(S3CredentialsConfig config)
public AWSCredentials getAWSCredentials(AWSCredentialsConfig config)
{
return new BasicAWSCredentials(config.getAccessKey(), config.getSecretKey());
}
@Provides
@LazySingleton
public org.jets3t.service.security.AWSCredentials getJets3tAWSCredentials(AWSCredentialsConfig config)
{
return new org.jets3t.service.security.AWSCredentials(config.getAccessKey(), config.getSecretKey());
}
@Provides
@LazySingleton
public RestS3Service getRestS3Service(org.jets3t.service.security.AWSCredentials credentials)
{
try {
return new RestS3Service(new AWSCredentials(config.getAccessKey(), config.getSecretKey()));
return new RestS3Service(credentials);
}
catch (S3ServiceException e) {
throw new ProvisionException("Unable to create a RestS3Service", e);
}
}
@Provides
@LazySingleton
public AmazonEC2 getEc2Client(AWSCredentials credentials)
{
return new AmazonEC2Client(credentials);
}
}

View File

@ -4,7 +4,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class S3CredentialsConfig
public class AWSCredentialsConfig
{
@JsonProperty
private String accessKey = "";

View File

@ -47,6 +47,11 @@
<artifactId>druid-examples</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-indexing-service</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>

View File

@ -0,0 +1,118 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.cli;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
import com.metamx.druid.curator.discovery.DiscoveryModule;
import com.metamx.druid.guice.DbConnectorModule;
import com.metamx.druid.guice.HttpClientModule;
import com.metamx.druid.guice.JacksonConfigManagerModule;
import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.OverlordModule;
import com.metamx.druid.guice.S3Module;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.indexing.coordinator.TaskMaster;
import com.metamx.druid.indexing.coordinator.http.IndexerCoordinatorResource;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerInitializer;
import com.metamx.druid.initialization.JettyServerModule;
import com.metamx.druid.metrics.MetricsModule;
import io.airlift.command.Command;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlets.GzipFilter;
import org.eclipse.jetty.util.resource.ResourceCollection;
/**
*/
@Command(
name = "overlord",
description = "Runs an Overlord node, see https://github.com/metamx/druid/wiki/Indexing-Service for a description"
)
public class CliOverlord extends ServerRunnable
{
private static Logger log = new Logger(CliOverlord.class);
public CliOverlord()
{
super(log);
}
@Override
protected Injector getInjector()
{
return Initialization.makeInjector(
new LifecycleModule(),
EmitterModule.class,
HttpClientModule.global(),
CuratorModule.class,
new MetricsModule(),
ServerModule.class,
new S3Module(),
new DbConnectorModule(),
new JacksonConfigManagerModule(),
new JettyServerModule(new OverlordJettyServerInitializer())
.addResource(IndexerCoordinatorResource.class),
new DiscoveryModule(),
new OverlordModule()
);
}
private static class OverlordJettyServerInitializer implements JettyServerInitializer
{
@Override
public void initialize(Server server, Injector injector)
{
ResourceHandler resourceHandler = new ResourceHandler();
resourceHandler.setBaseResource(
new ResourceCollection(
new String[]{
TaskMaster.class.getClassLoader().getResource("static").toExternalForm(),
TaskMaster.class.getClassLoader().getResource("indexer_static").toExternalForm()
}
)
);
final ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS);
root.setContextPath("/");
HandlerList handlerList = new HandlerList();
handlerList.setHandlers(new Handler[]{resourceHandler, root, new DefaultHandler()});
server.setHandler(handlerList);
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addFilter(GzipFilter.class, "/*", null);
root.addFilter(new FilterHolder(injector.getInstance(RedirectFilter.class)), "/*", null);
root.addFilter(GuiceFilter.class, "/*", null);
}
}
}

View File

@ -21,6 +21,7 @@ package io.druid.cli;
import io.airlift.command.Cli;
import io.airlift.command.Help;
import io.airlift.command.ParseException;
/**
*/
@ -38,8 +39,22 @@ public class Main
builder.withGroup("server")
.withDescription("Run one of the Druid server types.")
.withDefaultCommand(Help.class)
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliRealtimeExample.class);
.withCommands(CliCoordinator.class, CliHistorical.class, CliBroker.class, CliRealtime.class, CliOverlord.class);
builder.build().parse(args).run();
builder.withGroup("example")
.withDescription("Run an example")
.withDefaultCommand(Help.class)
.withCommands(CliRealtimeExample.class);
final Cli<Runnable> cli = builder.build();
try {
cli.parse(args).run();
}
catch (ParseException e) {
System.out.println("ERROR!!!!");
System.out.println(e.getMessage());
System.out.println("===");
cli.parse(new String[]{"help"}).run();
}
}
}