diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java index e86efdad816..a3acc3c00d8 100644 --- a/client/src/main/java/com/metamx/druid/client/DruidServer.java +++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java @@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap; */ public class DruidServer implements Comparable { + public static final String DEFAULT_TIER = "_default_tier"; private static final Logger log = new Logger(DruidServer.class); private final Object lock = new Object(); diff --git a/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java b/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java index 89028021c18..d66ef146db4 100644 --- a/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java +++ b/client/src/main/java/com/metamx/druid/client/DruidServerConfig.java @@ -33,9 +33,10 @@ public abstract class DruidServerConfig public abstract String getHost(); @Config("druid.server.maxSize") + @Default("0") public abstract long getMaxSize(); @Config("druid.server.tier") - @Default("_default_tier") + @Default(DruidServer.DEFAULT_TIER) public abstract String getTier(); } diff --git a/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java b/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java index 2808ce4d7b8..fb7a3044ee4 100644 --- a/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java +++ b/client/src/main/java/com/metamx/druid/curator/cache/SimplePathChildrenCacheFactory.java @@ -21,8 +21,11 @@ package com.metamx.druid.curator.cache; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.utils.ThreadUtils; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; /** */ @@ -48,4 +51,43 @@ public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory { return new PathChildrenCache(curator, path, cacheData, compressed, exec); } + + public static class Builder + { + private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache"); + + private boolean cacheData; + private boolean compressed; + private ExecutorService exec; + + public Builder() + { + cacheData = true; + compressed = false; + exec = Executors.newSingleThreadExecutor(defaultThreadFactory); + } + + public Builder withCacheData(boolean cacheData) + { + this.cacheData = cacheData; + return this; + } + + public Builder withCompressed(boolean compressed) + { + this.compressed = compressed; + return this; + } + + public Builder withExecutorService(ExecutorService exec) + { + this.exec = exec; + return this; + } + + public SimplePathChildrenCacheFactory build() + { + return new SimplePathChildrenCacheFactory(cacheData, compressed, exec); + } + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java index 8a798d2a961..d83b7d0de40 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskStatus.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import com.metamx.druid.indexing.common.task.TaskResource; /** * Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be @@ -42,33 +43,36 @@ public class TaskStatus public static TaskStatus running(String taskId) { - return new TaskStatus(taskId, Status.RUNNING, -1); + return new TaskStatus(taskId, Status.RUNNING, -1, null); } public static TaskStatus success(String taskId) { - return new TaskStatus(taskId, Status.SUCCESS, -1); + return new TaskStatus(taskId, Status.SUCCESS, -1, null); } public static TaskStatus failure(String taskId) { - return new TaskStatus(taskId, Status.FAILED, -1); + return new TaskStatus(taskId, Status.FAILED, -1, null); } private final String id; private final Status status; private final long duration; + private final TaskResource resource; @JsonCreator private TaskStatus( @JsonProperty("id") String id, @JsonProperty("status") Status status, - @JsonProperty("duration") long duration + @JsonProperty("duration") long duration, + @JsonProperty("resource") TaskResource resource ) { this.id = id; this.status = status; this.duration = duration; + this.resource = resource == null ? new TaskResource(id, 1) : resource; // Check class invariants. Preconditions.checkNotNull(id, "id"); @@ -93,6 +97,12 @@ public class TaskStatus return duration; } + @JsonProperty("resource") + public TaskResource getResource() + { + return resource; + } + /** * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * isSuccess, or isFailure will be true at any one time. @@ -134,7 +144,7 @@ public class TaskStatus public TaskStatus withDuration(long _duration) { - return new TaskStatus(id, status, _duration); + return new TaskStatus(id, status, _duration, resource); } @Override @@ -144,6 +154,7 @@ public class TaskStatus .add("id", id) .add("status", status) .add("duration", duration) + .add("resource", resource) .toString(); } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java index 9de8bcc9302..5e9789e9660 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskConfig.java @@ -19,6 +19,7 @@ package com.metamx.druid.indexing.common.config; +import com.google.common.base.Joiner; import org.skife.config.Config; import org.skife.config.Default; @@ -26,13 +27,30 @@ import java.io.File; public abstract class TaskConfig { + private static Joiner joiner = Joiner.on("/"); + + @Config("druid.indexer.baseDir") + @Default("/tmp/") + public abstract String getBaseDir(); + @Config("druid.indexer.taskDir") - public abstract File getBaseTaskDir(); + public File getBaseTaskDir() + { + return new File(defaultPath("persistent/task")); + } + + @Config("druid.indexer.hadoopWorkingPath") + public String getHadoopWorkingPath() + { + return defaultPath("druid-indexing"); + } @Config("druid.indexer.rowFlushBoundary") @Default("500000") public abstract int getDefaultRowFlushBoundary(); - @Config("druid.indexer.hadoopWorkingPath") - public abstract String getHadoopWorkingPath(); -} + private String defaultPath(String subPath) + { + return joiner.join(getBaseDir(), subPath); + } +} \ No newline at end of file diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskLogConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskLogConfig.java index a80fdcd76c8..59030c99807 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskLogConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/config/TaskLogConfig.java @@ -1,10 +1,15 @@ package com.metamx.druid.indexing.common.config; import org.skife.config.Config; +import org.skife.config.Default; import org.skife.config.DefaultNull; public abstract class TaskLogConfig { + @Config("druid.indexer.logs.type") + @Default("noop") + public abstract String getLogType(); + @Config("druid.indexer.logs.s3bucket") @DefaultNull public abstract String getLogStorageBucket(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandler.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandler.java index 3458c931de4..95c66028e52 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandler.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandler.java @@ -1,7 +1,26 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexing.common.index; /** - * Objects that can be registered with a {@link ChatHandlerProvider} and provide http endpoints for indexing-related + * Objects that can be registered with a {@link EventReceivingChatHandlerProvider} and provide http endpoints for indexing-related * objects. This interface is empty because it only exists to signal intent. The actual http endpoints are provided * through JAX-RS annotations on the {@link ChatHandler} objects. */ diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java index 6d67a7a5fb1..1662abc7d9b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/ChatHandlerProvider.java @@ -1,83 +1,33 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexing.common.index; import com.google.common.base.Optional; -import com.google.common.collect.Maps; -import com.metamx.common.ISE; -import com.metamx.common.logger.Logger; -import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; - -import java.util.concurrent.ConcurrentMap; /** - * Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method - * allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded - * {@link ServiceAnnouncer} will be used to advertise handlers on this host. */ -public class ChatHandlerProvider +public interface ChatHandlerProvider { - private static final Logger log = new Logger(ChatHandlerProvider.class); + public void register(final String key, ChatHandler handler); - private final ChatHandlerProviderConfig config; - private final ServiceAnnouncer serviceAnnouncer; - private final ConcurrentMap handlers; + public void unregister(final String key); - public ChatHandlerProvider( - ChatHandlerProviderConfig config, - ServiceAnnouncer serviceAnnouncer - ) - { - this.config = config; - this.serviceAnnouncer = serviceAnnouncer; - this.handlers = Maps.newConcurrentMap(); - } - - public void register(final String key, ChatHandler handler) - { - final String service = serviceName(key); - log.info("Registering Eventhandler: %s", key); - - if (handlers.putIfAbsent(key, handler) != null) { - throw new ISE("handler already registered for key: %s", key); - } - - try { - serviceAnnouncer.announce(service); - } - catch (Exception e) { - log.warn(e, "Failed to register service: %s", service); - handlers.remove(key, handler); - } - } - - public void unregister(final String key) - { - final String service = serviceName(key); - - log.info("Unregistering chat handler: %s", key); - - final ChatHandler handler = handlers.get(key); - if (handler == null) { - log.warn("handler not currently registered, ignoring: %s", key); - } - - try { - serviceAnnouncer.unannounce(service); - } - catch (Exception e) { - log.warn(e, "Failed to unregister service: %s", service); - } - - handlers.remove(key, handler); - } - - public Optional get(final String key) - { - return Optional.fromNullable(handlers.get(key)); - } - - private String serviceName(String key) - { - return String.format(config.getServiceFormat(), key); - } + public Optional get(final String key); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceiverFirehoseFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceiverFirehoseFactory.java index 5c437eaa356..1c235cf43c9 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceiverFirehoseFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceiverFirehoseFactory.java @@ -1,3 +1,22 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + package com.metamx.druid.indexing.common.index; import com.fasterxml.jackson.annotation.JacksonInject; @@ -30,7 +49,7 @@ import java.util.concurrent.TimeUnit; /** * Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these - * firehoses with an {@link ChatHandlerProvider}. + * firehoses with an {@link EventReceivingChatHandlerProvider}. */ @JsonTypeName("receiver") public class EventReceiverFirehoseFactory implements FirehoseFactory @@ -41,14 +60,14 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory private final String firehoseId; private final int bufferSize; private final MapInputRowParser parser; - private final Optional chatHandlerProvider; + private final Optional chatHandlerProvider; @JsonCreator public EventReceiverFirehoseFactory( @JsonProperty("firehoseId") String firehoseId, @JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("parser") MapInputRowParser parser, - @JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider + @JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider ) { this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java new file mode 100644 index 00000000000..f18a79c241c --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/EventReceivingChatHandlerProvider.java @@ -0,0 +1,105 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.common.index; + +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; + +import java.util.concurrent.ConcurrentMap; + +/** + * Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method + * allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded + * {@link ServiceAnnouncer} will be used to advertise handlers on this host. + */ +public class EventReceivingChatHandlerProvider implements ChatHandlerProvider +{ + private static final Logger log = new Logger(EventReceivingChatHandlerProvider.class); + + private final ChatHandlerProviderConfig config; + private final ServiceAnnouncer serviceAnnouncer; + private final ConcurrentMap handlers; + + public EventReceivingChatHandlerProvider( + ChatHandlerProviderConfig config, + ServiceAnnouncer serviceAnnouncer + ) + { + this.config = config; + this.serviceAnnouncer = serviceAnnouncer; + this.handlers = Maps.newConcurrentMap(); + } + + @Override + public void register(final String key, ChatHandler handler) + { + final String service = serviceName(key); + log.info("Registering Eventhandler: %s", key); + + if (handlers.putIfAbsent(key, handler) != null) { + throw new ISE("handler already registered for key: %s", key); + } + + try { + serviceAnnouncer.announce(service); + } + catch (Exception e) { + log.warn(e, "Failed to register service: %s", service); + handlers.remove(key, handler); + } + } + + @Override + public void unregister(final String key) + { + final String service = serviceName(key); + + log.info("Unregistering chat handler: %s", key); + + final ChatHandler handler = handlers.get(key); + if (handler == null) { + log.warn("handler not currently registered, ignoring: %s", key); + } + + try { + serviceAnnouncer.unannounce(service); + } + catch (Exception e) { + log.warn(e, "Failed to unregister service: %s", service); + } + + handlers.remove(key, handler); + } + + @Override + public Optional get(final String key) + { + return Optional.fromNullable(handlers.get(key)); + } + + private String serviceName(String key) + { + return String.format(config.getServiceFormat(), key); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/NoopChatHandlerProvider.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/NoopChatHandlerProvider.java new file mode 100644 index 00000000000..bf9a36afdb5 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/index/NoopChatHandlerProvider.java @@ -0,0 +1,45 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.common.index; + +import com.google.common.base.Optional; + +/** + */ +public class NoopChatHandlerProvider implements ChatHandlerProvider +{ + @Override + public void register(String key, ChatHandler handler) + { + // do nothing + } + + @Override + public void unregister(String key) + { + // do nothing + } + + @Override + public Optional get(String key) + { + return Optional.absent(); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java index de40abe90fc..e263c1288b1 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/AbstractTask.java @@ -43,7 +43,7 @@ public abstract class AbstractTask implements Task private final String groupId; @JsonIgnore - private final String availabilityGroup; + private final TaskResource taskResource; @JsonIgnore private final String dataSource; @@ -53,23 +53,23 @@ public abstract class AbstractTask implements Task protected AbstractTask(String id, String dataSource, Interval interval) { - this(id, id, id, dataSource, interval); + this(id, id, new TaskResource(id, 1), dataSource, interval); } protected AbstractTask(String id, String groupId, String dataSource, Interval interval) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = Preconditions.checkNotNull(groupId, "groupId"); - this.availabilityGroup = id; + this.taskResource = new TaskResource(id, 1); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.interval = Optional.fromNullable(interval); } - protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval) + protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = Preconditions.checkNotNull(groupId, "groupId"); - this.availabilityGroup = Preconditions.checkNotNull(availabilityGroup, "availabilityGroup"); + this.taskResource = Preconditions.checkNotNull(taskResource, "taskResource"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.interval = Optional.fromNullable(interval); } @@ -90,9 +90,9 @@ public abstract class AbstractTask implements Task @JsonProperty @Override - public String getAvailabilityGroup() + public TaskResource getTaskResource() { - return availabilityGroup; + return taskResource; } @Override @@ -166,19 +166,16 @@ public abstract class AbstractTask implements Task AbstractTask that = (AbstractTask) o; - if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) { - return false; - } - if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) { - return false; - } - if (id != null ? !id.equals(that.id) : that.id != null) { - return false; - } - if (interval != null ? !interval.equals(that.interval) : that.interval != null) { + if (!id.equals(that.id)) { return false; } return true; } + + @Override + public int hashCode() + { + return id.hashCode(); + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java index 7c7d8707a37..8388c9653e3 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/IndexDeterminePartitionsTask.java @@ -88,7 +88,6 @@ public class IndexDeterminePartitionsTask extends AbstractTask super( id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()), groupId, - makeTaskId(groupId, interval.getStart(), interval.getEnd()), schema.getDataSource(), Preconditions.checkNotNull(interval, "interval") ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index e2cb18ccb05..dc3ad87a9cb 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -46,13 +46,11 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.realtime.FireDepartment; import com.metamx.druid.realtime.FireDepartmentConfig; -import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.RealtimeMetricsMonitor; import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; -import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory; import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RejectionPolicyFactory; @@ -105,7 +103,7 @@ public class RealtimeIndexTask extends AbstractTask @JsonCreator public RealtimeIndexTask( @JsonProperty("id") String id, - @JsonProperty("availabilityGroup") String availabilityGroup, + @JsonProperty("resource") TaskResource taskResource, @JsonProperty("schema") Schema schema, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @@ -115,16 +113,22 @@ public class RealtimeIndexTask extends AbstractTask ) { super( - id != null - ? id - : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()), + id == null + ? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()) + :id, String.format( "index_realtime_%s", schema.getDataSource() ), - availabilityGroup != null - ? availabilityGroup - : makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()), + taskResource == null + ? new TaskResource( + makeTaskId( + schema.getDataSource(), + schema.getShardSpec().getPartitionNum(), + new DateTime().toString() + ), 1 + ) + : taskResource, schema.getDataSource(), null ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java index aace40f7bfd..25f8b6425cd 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/Task.java @@ -72,11 +72,10 @@ public interface Task public String getGroupId(); /** - * Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same - * worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the - * task ID. + * Returns a {@link com.metamx.druid.indexing.common.task.TaskResource} for this task. Task resources define specific + * worker requirements a task may require. */ - public String getAvailabilityGroup(); + public TaskResource getTaskResource(); /** * Returns a descriptive label for this task type. Used for metrics emission and logging. diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/TaskResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/TaskResource.java new file mode 100644 index 00000000000..81d23a942b9 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/TaskResource.java @@ -0,0 +1,52 @@ +package com.metamx.druid.indexing.common.task; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + */ +public class TaskResource +{ + private final String availabilityGroup; + private final int requiredCapacity; + + @JsonCreator + public TaskResource( + @JsonProperty("availabilityGroup") String availabilityGroup, + @JsonProperty("requiredCapacity") int requiredCapacity + ) + { + this.availabilityGroup = availabilityGroup; + this.requiredCapacity = requiredCapacity; + } + + /** + * Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same + * worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the + * task ID. + */ + @JsonProperty + public String getAvailabilityGroup() + { + return availabilityGroup; + } + + + /** + * Returns the number of worker slots this task will take. + */ + @JsonProperty + public int getRequiredCapacity() + { + return requiredCapacity; + } + + @Override + public String toString() + { + return "TaskResource{" + + "availabilityGroup='" + availabilityGroup + '\'' + + ", requiredCapacity=" + requiredCapacity + + '}'; + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java index 27e46f9ce42..4722222bf52 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/VersionConverterTask.java @@ -104,7 +104,7 @@ public class VersionConverterTask extends AbstractTask DataSegment segment ) { - super(id, groupId, id, dataSource, interval); + super(id, groupId, dataSource, interval); this.segment = segment; } @@ -205,13 +205,6 @@ public class VersionConverterTask extends AbstractTask segment.getShardSpec().getPartitionNum() ), groupId, - joinId( - groupId, - "sub", - segment.getInterval().getStart(), - segment.getInterval().getEnd(), - segment.getShardSpec().getPartitionNum() - ), segment.getDataSource(), segment.getInterval() ); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java index 4ac78c96c40..0eb0094c6c0 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java @@ -96,6 +96,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider this.jsonMapper = jsonMapper; } + @Override + public void bootstrap(List tasks) + { + // do nothing + } + @Override public ListenableFuture run(final Task task) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java index 08157d2ab43..969c14b3008 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunner.java @@ -24,22 +24,18 @@ import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.collect.MinMaxPriorityQueue; import com.google.common.collect.Sets; import com.google.common.io.InputSupplier; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.metamx.common.ISE; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.druid.indexing.common.RetryPolicy; -import com.metamx.druid.indexing.common.RetryPolicyFactory; +import com.metamx.druid.curator.cache.PathChildrenCacheFactory; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; @@ -49,15 +45,17 @@ import com.metamx.druid.indexing.worker.Worker; import com.metamx.emitter.EmittingLogger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.InputStreamResponseHandler; -import com.metamx.http.client.response.ToStringResponseHandler; +import com.metamx.http.client.response.StatusResponseHandler; +import com.metamx.http.client.response.StatusResponseHolder; import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.utils.ZKPaths; import org.apache.zookeeper.CreateMode; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.joda.time.DateTime; -import org.joda.time.Duration; import java.io.IOException; import java.io.InputStream; @@ -68,54 +66,50 @@ import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** - * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure - * scenarios. The RemoteTaskRunner keeps track of which workers are running which tasks and manages coordinator and - * worker interactions over Zookeeper. The RemoteTaskRunner is event driven and updates state according to ephemeral - * node changes in ZK. + * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes. + * The RemoteTaskRunner uses Zookeeper to keep track of which workers are running which tasks. Tasks are assigned by + * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running. + * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup. + * The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK. *

* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * fail. The RemoteTaskRunner depends on another component to create additional worker resources. * For example, {@link com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler} can take care of these duties. *

- * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks - * that were associated with the node. + * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker. *

* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. */ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider { private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); - private static final ToStringResponseHandler STRING_RESPONSE_HANDLER = new ToStringResponseHandler(Charsets.UTF_8); + private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8); private static final Joiner JOINER = Joiner.on("/"); private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; private final CuratorFramework cf; + private final PathChildrenCacheFactory pathChildrenCacheFactory; private final PathChildrenCache workerPathCache; - private final ScheduledExecutorService scheduledExec; - private final RetryPolicyFactory retryPolicyFactory; private final AtomicReference workerSetupData; private final HttpClient httpClient; // all workers that exist in ZK private final Map zkWorkers = new ConcurrentHashMap(); // all tasks that have been assigned to a worker - private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue(); + private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue(); // tasks that have not yet run - private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue(); - // idempotent task retry - private final Set tasksToRetry = new ConcurrentSkipListSet(); + private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue(); private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor(); @@ -127,9 +121,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ObjectMapper jsonMapper, RemoteTaskRunnerConfig config, CuratorFramework cf, - PathChildrenCache workerPathCache, - ScheduledExecutorService scheduledExec, - RetryPolicyFactory retryPolicyFactory, + PathChildrenCacheFactory pathChildrenCacheFactory, AtomicReference workerSetupData, HttpClient httpClient ) @@ -137,9 +129,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider this.jsonMapper = jsonMapper; this.config = config; this.cf = cf; - this.workerPathCache = workerPathCache; - this.scheduledExec = scheduledExec; - this.retryPolicyFactory = retryPolicyFactory; + this.pathChildrenCacheFactory = pathChildrenCacheFactory; + this.workerPathCache = pathChildrenCacheFactory.make(cf, config.getIndexerAnnouncementPath()); this.workerSetupData = workerSetupData; this.httpClient = httpClient; } @@ -159,25 +150,37 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider @Override public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception { - if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { - final Worker worker = jsonMapper.readValue( - event.getData().getData(), - Worker.class - ); - log.info("Worker[%s] reportin' for duty!", worker.getHost()); - addWorker(worker); - } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { - final Worker worker = jsonMapper.readValue( - event.getData().getData(), - Worker.class - ); - log.info("Kaboom! Worker[%s] removed!", worker.getHost()); - removeWorker(worker); + Worker worker; + switch (event.getType()) { + case CHILD_ADDED: + worker = jsonMapper.readValue( + event.getData().getData(), + Worker.class + ); + addWorker(worker, PathChildrenCache.StartMode.NORMAL); + break; + case CHILD_REMOVED: + worker = jsonMapper.readValue( + event.getData().getData(), + Worker.class + ); + removeWorker(worker); + break; + default: + break; } } } ); - workerPathCache.start(); + workerPathCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + + for (ChildData childData : workerPathCache.getCurrentData()) { + final Worker worker = jsonMapper.readValue( + childData.getData(), + Worker.class + ); + addWorker(worker, PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + } started = true; } @@ -197,6 +200,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider for (ZkWorker zkWorker : zkWorkers.values()) { zkWorker.close(); } + workerPathCache.close(); } catch (Exception e) { throw Throwables.propagate(e); @@ -213,13 +217,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } @Override - public Collection getRunningTasks() + public Collection getRunningTasks() { return runningTasks.values(); } @Override - public Collection getPendingTasks() + public Collection getPendingTasks() { return pendingTasks.values(); } @@ -227,18 +231,49 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider public ZkWorker findWorkerRunningTask(String taskId) { for (ZkWorker zkWorker : zkWorkers.values()) { - if (zkWorker.getRunningTasks().contains(taskId)) { + if (zkWorker.isRunningTask(taskId)) { return zkWorker; } } return null; } - public boolean isWorkerRunningTask(String workerHost, String taskId) + public boolean isWorkerRunningTask(Worker worker, Task task) { - ZkWorker zkWorker = zkWorkers.get(workerHost); + ZkWorker zkWorker = zkWorkers.get(worker.getHost()); - return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId)); + return (zkWorker != null && zkWorker.isRunningTask(task.getId())); + } + + @Override + public void bootstrap(List tasks) + { + try { + if (!started) { + throw new ISE("Must start RTR first before calling bootstrap!"); + } + + Set existingTasks = Sets.newHashSet(); + for (ZkWorker zkWorker : zkWorkers.values()) { + existingTasks.addAll(zkWorker.getRunningTasks().keySet()); + } + + for (Task task : tasks) { + if (existingTasks.contains(task.getId())) { + log.info("Bootstrap found %s running.", task.getId()); + runningTasks.put( + task.getId(), + new RemoteTaskRunnerWorkItem(task, SettableFuture.create()) + ); + } else { + log.info("Bootstrap didn't find %s running. Running it again", task.getId()); + run(task); + } + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } } /** @@ -252,8 +287,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) { throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId()); } - TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem( - task, SettableFuture.create(), retryPolicyFactory.makeRetryPolicy(), new DateTime() + RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem( + task, + SettableFuture.create() ); addPendingTask(taskRunnerWorkItem); return taskRunnerWorkItem.getResult(); @@ -262,7 +298,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider /** * Finds the worker running the task and forwards the shutdown signal to the worker. * - * @param taskId + * @param taskId - task id to shutdown */ @Override public void shutdown(String taskId) @@ -275,40 +311,30 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider final ZkWorker zkWorker = findWorkerRunningTask(taskId); if (zkWorker == null) { - // Would be nice to have an ability to shut down pending tasks log.info("Can't shutdown! No worker running task %s", taskId); return; } - final RetryPolicy shutdownRetryPolicy = retryPolicyFactory.makeRetryPolicy(); - final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); + try { + final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId)); + final StatusResponseHolder response = httpClient.post(url) + .go(RESPONSE_HANDLER) + .get(); - while (!shutdownRetryPolicy.hasExceededRetryThreshold()) { - try { - final String response = httpClient.post(url) - .go(STRING_RESPONSE_HANDLER) - .get(); - log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response); + log.info( + "Sent shutdown message to worker: %s, status %s, response: %s", + zkWorker.getWorker().getHost(), + response.getStatus(), + response.getContent() + ); - return; - } - catch (Exception e) { - log.error(e, "Exception shutting down taskId: %s", taskId); - - if (shutdownRetryPolicy.hasExceededRetryThreshold()) { - throw Throwables.propagate(e); - } else { - try { - final long sleepTime = shutdownRetryPolicy.getAndIncrementRetryDelay().getMillis(); - log.info("Will try again in %s.", new Duration(sleepTime).toString()); - Thread.sleep(sleepTime); - } - catch (InterruptedException e2) { - throw Throwables.propagate(e2); - } - } + if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) { + log.error("Shutdown failed for %s! Are you sure the task was running?", taskId); } } + catch (Exception e) { + throw Throwables.propagate(e); + } } @Override @@ -321,7 +347,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider return Optional.absent(); } else { // Worker is still running this task - final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset)); + final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset)); return Optional.>of( new InputSupplier() { @@ -347,7 +373,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } } - private URL workerURL(Worker worker, String path) + private URL makeWorkerURL(Worker worker, String path) { Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); @@ -361,10 +387,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider /** * Adds a task to the pending queue - * - * @param taskRunnerWorkItem */ - private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) + private void addPendingTask(final RemoteTaskRunnerWorkItem taskRunnerWorkItem) { log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId()); @@ -388,8 +412,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider try { // make a copy of the pending tasks because assignTask may delete tasks from pending and move them // into running status - List copy = Lists.newArrayList(pendingTasks.values()); - for (TaskRunnerWorkItem taskWrapper : copy) { + List copy = Lists.newArrayList(pendingTasks.values()); + for (RemoteTaskRunnerWorkItem taskWrapper : copy) { assignTask(taskWrapper); } } @@ -403,42 +427,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ); } - /** - * Retries a task by inserting it back into the pending queue after a given delay. - * - * @param taskRunnerWorkItem - the task to retry - */ - private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem) - { - final String taskId = taskRunnerWorkItem.getTask().getId(); - - if (tasksToRetry.contains(taskId)) { - return; - } - - tasksToRetry.add(taskId); - - if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) { - log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId); - scheduledExec.schedule( - new Runnable() - { - @Override - public void run() - { - runningTasks.remove(taskId); - tasksToRetry.remove(taskId); - addPendingTask(taskRunnerWorkItem); - } - }, - taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(), - TimeUnit.MILLISECONDS - ); - } else { - log.makeAlert("Task exceeded retry threshold").addData("task", taskId).emit(); - } - } - /** * Removes a task from the running queue and clears out the ZK status path of the task. * @@ -464,21 +452,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider * * @param taskRunnerWorkItem - the task to assign */ - private void assignTask(TaskRunnerWorkItem taskRunnerWorkItem) + private void assignTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem) { try { final String taskId = taskRunnerWorkItem.getTask().getId(); - ZkWorker zkWorker = findWorkerRunningTask(taskId); - // If a worker is already running this task, we don't need to announce it - if (zkWorker != null) { - final Worker worker = zkWorker.getWorker(); - log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskId); - runningTasks.put(taskId, pendingTasks.remove(taskId)); - log.info("Task %s switched from pending to running", taskId); + if (runningTasks.containsKey(taskId) || findWorkerRunningTask(taskId) != null) { + log.info("Task[%s] already running.", taskId); } else { // Nothing running this task, announce it in ZK for a worker to run it - zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); + ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); if (zkWorker != null) { announceTask(zkWorker.getWorker(), taskRunnerWorkItem); } @@ -496,7 +479,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider * @param theWorker The worker the task is assigned to * @param taskRunnerWorkItem The task to be assigned */ - private void announceTask(Worker theWorker, TaskRunnerWorkItem taskRunnerWorkItem) throws Exception + private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception { final Task task = taskRunnerWorkItem.getTask(); @@ -525,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider Stopwatch timeoutStopwatch = new Stopwatch(); timeoutStopwatch.start(); synchronized (statusLock) { - while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) { + while (!isWorkerRunningTask(theWorker, task)) { statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) { log.error( @@ -534,7 +517,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider task.getId(), config.getTaskAssignmentTimeoutDuration() ); - retryTask(runningTasks.get(task.getId())); + + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); break; } } @@ -548,11 +532,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider * * @param worker - contains metadata for a worker that has appeared in ZK */ - private void addWorker(final Worker worker) + private ZkWorker addWorker(final Worker worker, PathChildrenCache.StartMode startMode) { + log.info("Worker[%s] reportin' for duty!", worker.getHost()); + try { final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost()); - final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); + final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath); final ZkWorker zkWorker = new ZkWorker( worker, statusCache, @@ -560,72 +546,70 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider ); // Add status listener to the watcher for status changes - statusCache.getListenable().addListener( + zkWorker.addListener( new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + String taskId; + RemoteTaskRunnerWorkItem taskRunnerWorkItem; synchronized (statusLock) { try { - if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || - event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { - final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - final TaskStatus taskStatus = jsonMapper.readValue( - event.getData().getData(), TaskStatus.class - ); + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + final TaskStatus taskStatus = jsonMapper.readValue( + event.getData().getData(), TaskStatus.class + ); - // This can fail if a worker writes a bogus status. Retry if so. - if (!taskStatus.getId().equals(taskId)) { - retryTask(runningTasks.get(taskId)); - return; - } - - log.info( - "Worker[%s] wrote %s status for task: %s", - worker.getHost(), - taskStatus.getStatusCode(), - taskId - ); - - - // Synchronizing state with ZK - statusLock.notify(); - - final TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); - if (taskRunnerWorkItem == null) { - log.warn( - "WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", - worker.getHost(), + log.info( + "Worker[%s] wrote %s status for task: %s", + zkWorker.getWorker().getHost(), + taskStatus.getStatusCode(), taskId ); - } - if (taskStatus.isComplete()) { - if (taskRunnerWorkItem != null) { - final ListenableFuture result = taskRunnerWorkItem.getResult(); - if (result != null) { - ((SettableFuture) result).set(taskStatus); - } + // Synchronizing state with ZK + statusLock.notify(); + + taskRunnerWorkItem = runningTasks.get(taskId); + if (taskRunnerWorkItem == null) { + log.warn( + "WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", + zkWorker.getWorker().getHost(), + taskId + ); } - // Worker is done with this task - zkWorker.setLastCompletedTaskTime(new DateTime()); - cleanup(worker.getHost(), taskId); - runPendingTasks(); - } - } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { - final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); - TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); - if (taskRunnerWorkItem != null) { - log.info("Task %s just disappeared!", taskId); - retryTask(taskRunnerWorkItem); - } + if (taskStatus.isComplete()) { + if (taskRunnerWorkItem != null) { + final ListenableFuture result = taskRunnerWorkItem.getResult(); + if (result != null) { + ((SettableFuture) result).set(taskStatus); + } + } + + // Worker is done with this task + zkWorker.setLastCompletedTaskTime(new DateTime()); + cleanup(zkWorker.getWorker().getHost(), taskId); + runPendingTasks(); + } + break; + case CHILD_REMOVED: + taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); + taskRunnerWorkItem = runningTasks.get(taskId); + if (taskRunnerWorkItem != null) { + log.info("Task %s just disappeared!", taskId); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); + } + break; } } catch (Exception e) { log.makeAlert(e, "Failed to handle new worker status") - .addData("worker", worker.getHost()) + .addData("worker", zkWorker.getWorker().getHost()) .addData("znode", event.getData().getPath()) .emit(); } @@ -633,10 +617,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider } } ); - zkWorkers.put(worker.getHost(), zkWorker); - statusCache.start(); - runPendingTasks(); + zkWorker.start(startMode); + zkWorkers.put(worker.getHost(), zkWorker); + + return zkWorker; } catch (Exception e) { throw Throwables.propagate(e); @@ -652,38 +637,35 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider */ private void removeWorker(final Worker worker) { + log.info("Kaboom! Worker[%s] removed!", worker.getHost()); + ZkWorker zkWorker = zkWorkers.get(worker.getHost()); if (zkWorker != null) { try { - Set tasksToRetry = Sets.newHashSet( - cf.getChildren() - .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) - ); - tasksToRetry.addAll( - cf.getChildren() - .forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost())) - ); - log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); - - for (String taskId : tasksToRetry) { - TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); + for (String assignedTask : cf.getChildren() + .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) { + RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask); if (taskRunnerWorkItem != null) { - String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId); + String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask); if (cf.checkExists().forPath(taskPath) != null) { cf.delete().guaranteed().forPath(taskPath); } - retryTask(taskRunnerWorkItem); + taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId())); } else { - log.warn("RemoteTaskRunner has no knowledge of task %s", taskId); + log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask); } } - - zkWorker.close(); } catch (Exception e) { throw Throwables.propagate(e); } finally { + try { + zkWorker.close(); + } + catch (Exception e) { + log.error(e, "Exception closing worker %s!", worker.getHost()); + } zkWorkers.remove(worker.getHost()); } } @@ -691,48 +673,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider private ZkWorker findWorkerForTask(final Task task) { - try { - final MinMaxPriorityQueue workerQueue = MinMaxPriorityQueue.orderedBy( - new Comparator() - { - @Override - public int compare(ZkWorker w1, ZkWorker w2) - { - return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size()); - } - } - ).create( - FunctionalIterable.create(zkWorkers.values()).filter( - new Predicate() - { - @Override - public boolean apply(ZkWorker input) - { - for (String taskId : input.getRunningTasks()) { - TaskRunnerWorkItem workerTask = runningTasks.get(taskId); - if (workerTask != null && task.getAvailabilityGroup() - .equalsIgnoreCase(workerTask.getTask().getAvailabilityGroup())) { - return false; - } - } - return (!input.isAtCapacity() && - input.getWorker() - .getVersion() - .compareTo(workerSetupData.get().getMinVersion()) >= 0); - } - } + TreeSet sortedWorkers = Sets.newTreeSet( + new Comparator() + { + @Override + public int compare( + ZkWorker zkWorker, ZkWorker zkWorker2 ) - ); - - if (workerQueue.isEmpty()) { - log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); - return null; + { + return -Ints.compare(zkWorker.getCurrCapacity(), zkWorker2.getCurrCapacity()); + } + } + ); + sortedWorkers.addAll(zkWorkers.values()); + for (ZkWorker zkWorker : sortedWorkers) { + if (zkWorker.canRunTask(task) && + zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) { + return zkWorker; } - - return workerQueue.peek(); - } - catch (Exception e) { - throw Throwables.propagate(e); } + log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); + return null; } -} +} \ No newline at end of file diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java new file mode 100644 index 00000000000..72cb7155af8 --- /dev/null +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkItem.java @@ -0,0 +1,63 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.indexing.coordinator; + +import com.google.common.util.concurrent.SettableFuture; +import com.metamx.druid.indexing.common.TaskStatus; +import com.metamx.druid.indexing.common.task.Task; +import org.joda.time.DateTime; + +/** + */ +public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem +{ + private final SettableFuture result; + + public RemoteTaskRunnerWorkItem( + Task task, + SettableFuture result + ) + { + super(task, result); + this.result = result; + } + + public RemoteTaskRunnerWorkItem( + Task task, + SettableFuture result, + DateTime createdTime, + DateTime queueInsertionTime + ) + { + super(task, result, createdTime, queueInsertionTime); + this.result = result; + } + + public void setResult(TaskStatus status) + { + result.set(status); + } + + @Override + public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time) + { + return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time); + } +} diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkQueue.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkQueue.java similarity index 84% rename from indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkQueue.java rename to indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkQueue.java index 0562040c35b..338233c5dca 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkQueue.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerWorkQueue.java @@ -25,10 +25,10 @@ import java.util.concurrent.ConcurrentSkipListMap; /** */ -public class TaskRunnerWorkQueue extends ConcurrentSkipListMap +public class RemoteTaskRunnerWorkQueue extends ConcurrentSkipListMap { @Override - public TaskRunnerWorkItem put(String s, TaskRunnerWorkItem taskRunnerWorkItem) + public RemoteTaskRunnerWorkItem put(String s, RemoteTaskRunnerWorkItem taskRunnerWorkItem) { return super.put(s, taskRunnerWorkItem.withQueueInsertionTime(new DateTime())); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java index 8b4e3fba6e1..f0332aa3cdf 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskMasterLifecycle.java @@ -21,12 +21,11 @@ package com.metamx.druid.indexing.coordinator; import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.metamx.common.ISE; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.indexing.common.actions.TaskActionClient; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.task.Task; @@ -34,6 +33,8 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.indexing.coordinator.exec.TaskConsumer; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; +import com.metamx.druid.initialization.Initialization; +import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import org.apache.curator.framework.CuratorFramework; @@ -88,7 +89,6 @@ public class TaskMasterLifecycle log.info("By the power of Grayskull, I have the power!"); taskRunner = runnerFactory.build(); - resourceManagementScheduler = managementSchedulerFactory.build(taskRunner); final TaskConsumer taskConsumer = new TaskConsumer( taskQueue, taskRunner, @@ -101,12 +101,34 @@ public class TaskMasterLifecycle // Sensible order to start stuff: final Lifecycle leaderLifecycle = new Lifecycle(); - leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskRunner); + leaderLifecycle.addHandler( + new Lifecycle.Handler() + { + @Override + public void start() throws Exception + { + taskRunner.bootstrap(taskQueue.snapshot()); + } + + @Override + public void stop() + { + + } + } + ); + leaderLifecycle.addManagedInstance(taskQueue); Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); leaderLifecycle.addManagedInstance(taskConsumer); - leaderLifecycle.addManagedInstance(resourceManagementScheduler); + if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) { + if (!(taskRunner instanceof RemoteTaskRunner)) { + throw new ISE("WTF?! We configured a remote runner and got %s", taskRunner.getClass()); + } + resourceManagementScheduler = managementSchedulerFactory.build((RemoteTaskRunner) taskRunner); + leaderLifecycle.addManagedInstance(resourceManagementScheduler); + } try { leaderLifecycle.start(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java index 630934c6d17..252728b32dc 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskQueue.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; @@ -165,6 +166,20 @@ public class TaskQueue } } + /** + * Returns an immutable snapshot of the current status of this queue. + */ + public List snapshot() + { + giant.lock(); + + try { + return ImmutableList.copyOf(queue); + } finally { + giant.unlock(); + } + } + /** * Starts this task queue. Allows {@link #add(Task)} to accept new tasks. This should not be called on * an already-started queue. diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunner.java index 00abd883d90..074a22d74bf 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunner.java @@ -24,6 +24,7 @@ import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; import java.util.Collection; +import java.util.List; /** * Interface for handing off tasks. Used by a {@link com.metamx.druid.indexing.coordinator.exec.TaskConsumer} to @@ -31,6 +32,15 @@ import java.util.Collection; */ public interface TaskRunner { + /** + * Provide a new task runner with a list of tasks that should already be running. Will be called once shortly + * after instantiation and before any calls to {@link #run}. Bootstrapping should not be construed as a command + * to run the tasks; they will be passed to {@link #run} one-by-one when this is desired. + * + * @param tasks the tasks + */ + public void bootstrap(List tasks); + /** * Run a task. The returned status should be some kind of completed status. * @@ -44,9 +54,9 @@ public interface TaskRunner */ public void shutdown(String taskid); - public Collection getRunningTasks(); + public Collection getRunningTasks(); - public Collection getPendingTasks(); + public Collection getPendingTasks(); public Collection getWorkers(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java index 74321f5b1b6..10a4ff5d1a3 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/TaskRunnerWorkItem.java @@ -22,7 +22,6 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ComparisonChain; import com.google.common.util.concurrent.ListenableFuture; -import com.metamx.druid.indexing.common.RetryPolicy; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; import org.joda.time.DateTime; @@ -35,22 +34,29 @@ public class TaskRunnerWorkItem implements Comparable { private final Task task; private final ListenableFuture result; - private final RetryPolicy retryPolicy; private final DateTime createdTime; private volatile DateTime queueInsertionTime; + public TaskRunnerWorkItem( + Task task, + ListenableFuture result + ) + { + this(task, result, new DateTime(), new DateTime()); + } + public TaskRunnerWorkItem( Task task, ListenableFuture result, - RetryPolicy retryPolicy, - DateTime createdTime + DateTime createdTime, + DateTime queueInsertionTime ) { this.task = task; this.result = result; - this.retryPolicy = retryPolicy; this.createdTime = createdTime; + this.queueInsertionTime = queueInsertionTime; } @JsonProperty @@ -64,11 +70,6 @@ public class TaskRunnerWorkItem implements Comparable return result; } - public RetryPolicy getRetryPolicy() - { - return retryPolicy; - } - @JsonProperty public DateTime getCreatedTime() { @@ -83,8 +84,7 @@ public class TaskRunnerWorkItem implements Comparable public TaskRunnerWorkItem withQueueInsertionTime(DateTime time) { - this.queueInsertionTime = time; - return this; + return new TaskRunnerWorkItem(task, result, createdTime, time); } @Override @@ -102,7 +102,6 @@ public class TaskRunnerWorkItem implements Comparable return "TaskRunnerWorkItem{" + "task=" + task + ", result=" + result + - ", retryPolicy=" + retryPolicy + ", createdTime=" + createdTime + '}'; } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java index a4853f83a84..ac2d680ebc0 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ThreadPoolTaskRunner.java @@ -77,13 +77,19 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker exec.shutdownNow(); } + @Override + public void bootstrap(List tasks) + { + // do nothing + } + @Override public ListenableFuture run(final Task task) { final TaskToolbox toolbox = toolboxFactory.build(task); final ListenableFuture statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox)); - final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime()); + final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture); runningItems.add(taskRunnerWorkItem); Futures.addCallback( statusFuture, new FutureCallback() @@ -184,14 +190,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker private final Task task; private final TaskToolbox toolbox; - private final DateTime createdTime; - public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox) { this.task = task; this.toolbox = toolbox; - - this.createdTime = new DateTime(); } @Override @@ -243,12 +245,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker public TaskRunnerWorkItem getTaskRunnerWorkItem() { - return new TaskRunnerWorkItem( - task, - null, - null, - createdTime - ); + return new TaskRunnerWorkItem(task, null); } } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java index 16b0f57f5c5..a20e324ed1e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ZkWorker.java @@ -24,17 +24,19 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.metamx.druid.indexing.common.TaskStatus; +import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.worker.Worker; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; - +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.Map; import java.util.Set; /** @@ -44,7 +46,7 @@ public class ZkWorker implements Closeable { private final Worker worker; private final PathChildrenCache statusCache; - private final Function cacheConverter; + private final Function cacheConverter; private volatile DateTime lastCompletedTaskTime = new DateTime(); @@ -52,13 +54,13 @@ public class ZkWorker implements Closeable { this.worker = worker; this.statusCache = statusCache; - this.cacheConverter = new Function() + this.cacheConverter = new Function() { @Override - public String apply(@Nullable ChildData input) + public TaskStatus apply(ChildData input) { try { - return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); + return jsonMapper.readValue(input.getData(), TaskStatus.class); } catch (Exception e) { throw Throwables.propagate(e); @@ -67,6 +69,16 @@ public class ZkWorker implements Closeable }; } + public void start(PathChildrenCache.StartMode startMode) throws Exception + { + statusCache.start(startMode); + } + + public void addListener(PathChildrenCacheListener listener) + { + statusCache.getListenable().addListener(listener); + } + @JsonProperty public Worker getWorker() { @@ -74,14 +86,37 @@ public class ZkWorker implements Closeable } @JsonProperty - public Set getRunningTasks() + public Map getRunningTasks() { - return Sets.newHashSet( - Lists.transform( - statusCache.getCurrentData(), - cacheConverter - ) - ); + Map retVal = Maps.newHashMap(); + for (TaskStatus taskStatus : Lists.transform( + statusCache.getCurrentData(), + cacheConverter + )) { + retVal.put(taskStatus.getId(), taskStatus); + } + + return retVal; + } + + @JsonProperty("currCapacity") + public int getCurrCapacity() + { + int currCapacity = 0; + for (TaskStatus taskStatus : getRunningTasks().values()) { + currCapacity += taskStatus.getResource().getRequiredCapacity(); + } + return currCapacity; + } + + @JsonProperty("availabilityGroups") + public Set getAvailabilityGroups() + { + Set retVal = Sets.newHashSet(); + for (TaskStatus taskStatus : getRunningTasks().values()) { + retVal.add(taskStatus.getResource().getAvailabilityGroup()); + } + return retVal; } @JsonProperty @@ -90,10 +125,20 @@ public class ZkWorker implements Closeable return lastCompletedTaskTime; } - @JsonProperty + public boolean isRunningTask(String taskId) + { + return getRunningTasks().containsKey(taskId); + } + public boolean isAtCapacity() { - return statusCache.getCurrentData().size() >= worker.getCapacity(); + return getCurrCapacity() >= worker.getCapacity(); + } + + public boolean canRunTask(Task task) + { + return (worker.getCapacity() - getCurrCapacity() >= task.getTaskResource().getRequiredCapacity() + && !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup())); } public void setLastCompletedTaskTime(DateTime completedTaskTime) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/EC2AutoScalingStrategyConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/EC2AutoScalingStrategyConfig.java index 77575b37cfd..da67e88c64e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/EC2AutoScalingStrategyConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/EC2AutoScalingStrategyConfig.java @@ -21,6 +21,7 @@ package com.metamx.druid.indexing.coordinator.config; import org.skife.config.Config; import org.skife.config.Default; +import org.skife.config.DefaultNull; /** */ @@ -29,4 +30,8 @@ public abstract class EC2AutoScalingStrategyConfig @Config("druid.indexer.worker.port") @Default("8080") public abstract String getWorkerPort(); + + @Config("druid.indexer.worker.version") + @DefaultNull + public abstract String getWorkerVersion(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java index 4ff499dbd92..25233ed3300 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java @@ -13,6 +13,7 @@ import java.util.Set; public abstract class ForkingTaskRunnerConfig { @Config("druid.indexer.taskDir") + @Default("/tmp/persistent") public abstract File getBaseTaskDir(); @Config("druid.indexer.fork.java") diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java index 9c3baf06c24..f8d2cf8c3dc 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/IndexerCoordinatorConfig.java @@ -41,7 +41,7 @@ public abstract class IndexerCoordinatorConfig extends ZkPathsConfig public abstract int getNumLocalThreads(); @Config("druid.indexer.runner") - @Default("remote") + @Default("local") public abstract String getRunnerImpl(); @Config("druid.indexer.storage") diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java index 15970fc37b3..6023605ea7d 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/RemoteTaskRunnerConfig.java @@ -31,4 +31,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig @Config("druid.indexer.taskAssignmentTimeoutDuration") @Default("PT5M") public abstract Duration getTaskAssignmentTimeoutDuration(); + + @Config("druid.curator.compression.enable") + @Default("false") + public abstract boolean enableCompression(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index b07ae5f1792..42a82fc8ada 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -27,13 +27,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.base.Charsets; import com.google.common.base.Optional; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.InputSupplier; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; +import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; @@ -46,6 +46,7 @@ import com.metamx.druid.QueryableNode; import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.JacksonConfigManager; +import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceInstanceFactory; @@ -55,12 +56,10 @@ import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.StatusServlet; -import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionToolbox; import com.metamx.druid.indexing.common.config.IndexerZkConfig; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.druid.indexing.common.config.TaskLogConfig; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; @@ -114,7 +113,6 @@ import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.SysMonitor; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.x.discovery.ServiceDiscovery; import org.jets3t.service.S3ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; @@ -386,15 +384,17 @@ public class IndexerCoordinatorNode extends QueryableNode WorkerSetupData setupData = workerSetupDataRef.get(); EC2NodeData workerConfig = setupData.getNodeData(); + GalaxyUserData userData = setupData.getUserData(); + if (config.getWorkerVersion() != null) { + userData = userData.withVersion(config.getWorkerVersion()); + } + RunInstancesResult result = amazonEC2Client.runInstances( new RunInstancesRequest( workerConfig.getAmiId(), @@ -84,7 +90,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy .withUserData( Base64.encodeBase64String( jsonMapper.writeValueAsBytes( - setupData.getUserData() + userData ) ) ) diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java index 5ac9d4384bc..0cfdc94c76a 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementScheduler.java @@ -24,6 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.PeriodGranularity; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunner; import com.metamx.druid.indexing.coordinator.TaskRunner; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -42,7 +43,7 @@ public class ResourceManagementScheduler { private static final Logger log = new Logger(ResourceManagementScheduler.class); - private final TaskRunner taskRunner; + private final RemoteTaskRunner taskRunner; private final ResourceManagementStrategy resourceManagementStrategy; private final ResourceManagementSchedulerConfig config; private final ScheduledExecutorService exec; @@ -51,7 +52,7 @@ public class ResourceManagementScheduler private volatile boolean started = false; public ResourceManagementScheduler( - TaskRunner taskRunner, + RemoteTaskRunner taskRunner, ResourceManagementStrategy resourceManagementStrategy, ResourceManagementSchedulerConfig config, ScheduledExecutorService exec diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactory.java index 51c7833db9a..16ae50ccc2b 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementSchedulerFactory.java @@ -19,11 +19,11 @@ package com.metamx.druid.indexing.coordinator.scaling; -import com.metamx.druid.indexing.coordinator.TaskRunner; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunner; /** */ public interface ResourceManagementSchedulerFactory { - public ResourceManagementScheduler build(TaskRunner runner); + public ResourceManagementScheduler build(RemoteTaskRunner runner); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementStrategy.java index b658b812ac8..2c61dc79285 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/ResourceManagementStrategy.java @@ -19,7 +19,7 @@ package com.metamx.druid.indexing.coordinator.scaling; -import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.ZkWorker; import java.util.Collection; @@ -30,9 +30,9 @@ import java.util.Collection; */ public interface ResourceManagementStrategy { - public boolean doProvision(Collection runningTasks, Collection zkWorkers); + public boolean doProvision(Collection runningTasks, Collection zkWorkers); - public boolean doTerminate(Collection runningTasks, Collection zkWorkers); + public boolean doTerminate(Collection runningTasks, Collection zkWorkers); public ScalingStats getStats(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java index 023091935f8..4451d68ff76 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.guava.FunctionalIterable; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; @@ -68,7 +69,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } @Override - public boolean doProvision(Collection pendingTasks, Collection zkWorkers) + public boolean doProvision(Collection pendingTasks, Collection zkWorkers) { if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) { log.info( @@ -135,7 +136,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } @Override - public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) + public boolean doTerminate(Collection pendingTasks, Collection zkWorkers) { Set workerNodeIds = Sets.newHashSet( autoScalingStrategy.ipToIdLookup( @@ -244,7 +245,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return scalingStats; } - private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) + private boolean hasTaskPendingBeyondThreshold(Collection pendingTasks) { long now = System.currentTimeMillis(); for (TaskRunnerWorkItem pendingTask : pendingTasks) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/setup/GalaxyUserData.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/setup/GalaxyUserData.java index 60241f472ff..4a7490660cc 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/setup/GalaxyUserData.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/setup/GalaxyUserData.java @@ -60,6 +60,11 @@ public class GalaxyUserData return type; } + public GalaxyUserData withVersion(String ver) + { + return new GalaxyUserData(env, ver, type); + } + @Override public String toString() { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java index 77e70f52a5f..89d6c313487 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -130,6 +130,7 @@ public class WorkerCuratorCoordinator } curatorFramework.create() + .creatingParentsIfNeeded() .withMode(mode) .forPath(path, rawBytes); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java index c6b3bdcdc74..deca87c56f5 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/config/WorkerConfig.java @@ -29,10 +29,6 @@ public abstract class WorkerConfig @Config("druid.host") public abstract String getHost(); - @Config("druid.worker.threads") - @Default("1") - public abstract int getNumThreads(); - @Config("druid.worker.ip") public abstract String getIp(); diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java index 05d459d1d3a..7521c1adb22 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ChatHandlerResource.java @@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.inject.Inject; import com.metamx.druid.indexing.common.index.ChatHandler; -import com.metamx.druid.indexing.common.index.ChatHandlerProvider; +import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider; import javax.ws.rs.Path; import javax.ws.rs.PathParam; @@ -14,10 +14,10 @@ import javax.ws.rs.core.Response; public class ChatHandlerResource { private final ObjectMapper jsonMapper; - private final ChatHandlerProvider handlers; + private final EventReceivingChatHandlerProvider handlers; @Inject - public ChatHandlerResource(ObjectMapper jsonMapper, ChatHandlerProvider handlers) + public ChatHandlerResource(ObjectMapper jsonMapper, EventReceivingChatHandlerProvider handlers) { this.jsonMapper = jsonMapper; this.handlers = handlers; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 20428fe269c..341a578d6de 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -37,7 +37,6 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.BaseServerNode; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; -import com.metamx.druid.curator.discovery.NoopServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; @@ -50,6 +49,8 @@ import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.index.ChatHandlerProvider; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; +import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider; +import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; @@ -285,7 +286,7 @@ public class ExecutorNode extends BaseServerNode private void initializeS3Service() throws S3ServiceException { - if(s3Service == null) { + if (s3Service == null) { s3Service = new RestS3Service( new AWSCredentials( PropUtils.getProperty(props, "com.metamx.aws.accessKey"), @@ -385,17 +386,15 @@ public class ExecutorNode extends BaseServerNode { if (chatHandlerProvider == null) { final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class); - final ServiceAnnouncer myServiceAnnouncer; if (config.getServiceFormat() == null) { - log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); - myServiceAnnouncer = new NoopServiceAnnouncer(); + log.info("ChatHandlerProvider: Using NoopChatHandlerProvider. Good luck finding your firehoses!"); + this.chatHandlerProvider = new NoopChatHandlerProvider(); } else { - myServiceAnnouncer = serviceAnnouncer; + this.chatHandlerProvider = new EventReceivingChatHandlerProvider( + config, + serviceAnnouncer + ); } - this.chatHandlerProvider = new ChatHandlerProvider( - config, - myServiceAnnouncer - ); } } @@ -437,9 +436,12 @@ public class ExecutorNode extends BaseServerNode jsonMapper = new DefaultObjectMapper(); smileMapper = new DefaultObjectMapper(new SmileFactory()); smileMapper.getJsonFactory().setCodec(smileMapper); - } - else if (jsonMapper == null || smileMapper == null) { - throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); + } else if (jsonMapper == null || smileMapper == null) { + throw new ISE( + "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", + jsonMapper, + smileMapper + ); } if (lifecycle == null) { @@ -454,7 +456,15 @@ public class ExecutorNode extends BaseServerNode configFactory = Config.createFactory(props); } - return new ExecutorNode(nodeType, props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory); + return new ExecutorNode( + nodeType, + props, + lifecycle, + jsonMapper, + smileMapper, + configFactory, + executorLifecycleFactory + ); } } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index dda7afe7d78..5c38440a9a2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -394,7 +394,7 @@ public class WorkerNode extends QueryableNode public void initializeWorkerTaskMonitor() { if (workerTaskMonitor == null) { - final ExecutorService workerExec = Executors.newFixedThreadPool(workerConfig.getNumThreads()); + final ExecutorService workerExec = Executors.newFixedThreadPool(workerConfig.getCapacity()); final CuratorFramework curatorFramework = getCuratorFramework(); final PathChildrenCache pathChildrenCache = new PathChildrenCache( diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RealtimeishTask.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RealtimeishTask.java index a43ec4fb2fc..4e1dc18ae5f 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RealtimeishTask.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RealtimeishTask.java @@ -30,6 +30,7 @@ import com.metamx.druid.indexing.common.actions.LockListAction; import com.metamx.druid.indexing.common.actions.LockReleaseAction; import com.metamx.druid.indexing.common.actions.SegmentInsertAction; import com.metamx.druid.indexing.common.task.AbstractTask; +import com.metamx.druid.indexing.common.task.TaskResource; import org.joda.time.Interval; import org.junit.Assert; @@ -41,12 +42,12 @@ public class RealtimeishTask extends AbstractTask { public RealtimeishTask() { - super("rt1", "rt", "rt1", "foo", null); + super("rt1", "rt", new TaskResource("rt1", 1), "foo", null); } - public RealtimeishTask(String id, String groupId, String availGroup, String dataSource, Interval interval) + public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval) { - super(id, groupId, availGroup, dataSource, interval); + super(id, groupId, taskResource, dataSource, interval); } @Override diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java index 5e9505e061f..c53f3e1e58e 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/RemoteTaskRunnerTest.java @@ -2,22 +2,26 @@ package com.metamx.druid.indexing.coordinator; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.google.common.base.Function; +import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; -import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; +import com.google.common.collect.Sets; +import com.google.common.io.Files; +import com.google.common.util.concurrent.ListenableFuture; import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; +import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory; import com.metamx.druid.indexing.TestTask; -import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolboxFactory; import com.metamx.druid.indexing.common.config.IndexerZkConfig; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.indexing.common.task.TaskResource; import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; @@ -26,7 +30,6 @@ import com.metamx.druid.indexing.worker.WorkerTaskMonitor; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; -import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; @@ -42,35 +45,34 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import javax.annotation.Nullable; import java.io.File; +import java.util.Arrays; +import java.util.Set; import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static junit.framework.Assert.fail; - /** */ public class RemoteTaskRunnerTest { private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); - private static final String basePath = "/test/druid/indexer"; - private static final String announcementsPath = String.format("%s/announcements", basePath); - private static final String tasksPath = String.format("%s/tasks", basePath); - private static final String statusPath = String.format("%s/status", basePath); + private static final Joiner joiner = Joiner.on("/"); + private static final String basePath = "/test/druid"; + private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath); + private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath); + private static final String statusPath = String.format("%s/indexer/status/worker", basePath); private TestingCluster testingCluster; private CuratorFramework cf; - private PathChildrenCache pathChildrenCache; private RemoteTaskRunner remoteTaskRunner; + private WorkerCuratorCoordinator workerCuratorCoordinator; private WorkerTaskMonitor workerTaskMonitor; - private ScheduledExecutorService scheduledExec; + private TestTask task; - private TestTask task1; - - private Worker worker1; + private Worker worker; @Before public void setUp() throws Exception @@ -84,24 +86,200 @@ public class RemoteTaskRunnerTest .compressionProvider(new PotentiallyGzippedCompressionProvider(false)) .build(); cf.start(); + cf.create().creatingParentsIfNeeded().forPath(basePath); - cf.create().creatingParentsIfNeeded().forPath(announcementsPath); - cf.create().forPath(tasksPath); - cf.create().forPath(String.format("%s/worker1", tasksPath)); - cf.create().forPath(statusPath); - cf.create().forPath(String.format("%s/worker1", statusPath)); + task = makeTask(TaskStatus.success("task")); + } - pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true); + @After + public void tearDown() throws Exception + { + remoteTaskRunner.stop(); + workerCuratorCoordinator.stop(); + workerTaskMonitor.stop(); + cf.close(); + testingCluster.stop(); + } - worker1 = new Worker( - "worker1", - "localhost", - 3, - "0" + @Test + public void testRunNoExistingTask() throws Exception + { + doSetup(); + + remoteTaskRunner.run(task); + } + + @Test(expected = ISE.class) + public void testExceptionThrownWithExistingTask() throws Exception + { + doSetup(); + + remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); + remoteTaskRunner.run( + makeTask(TaskStatus.running("task")) + ); + } + + @Test + public void testRunTooMuchZKData() throws Exception + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + EasyMock.replay(emitter); + + doSetup(); + + remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000])))); + + EasyMock.verify(emitter); + } + + @Test + public void testRunSameAvailabilityGroup() throws Exception + { + doSetup(); + + TestRealtimeTask theTask = new TestRealtimeTask( + "rt1", + new TaskResource("rt1", 1), + "foo", + TaskStatus.running("rt1") + ); + remoteTaskRunner.run(theTask); + remoteTaskRunner.run( + new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2")) + ); + remoteTaskRunner.run( + new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3")) ); - task1 = new TestTask( - "task1", + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (remoteTaskRunner.getRunningTasks().size() < 2) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + + Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); + } + + @Test + public void testRunWithCapacity() throws Exception + { + doSetup(); + + TestRealtimeTask theTask = new TestRealtimeTask( + "rt1", + new TaskResource("rt1", 1), + "foo", + TaskStatus.running("rt1") + ); + remoteTaskRunner.run(theTask); + remoteTaskRunner.run( + new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2")) + ); + remoteTaskRunner.run( + new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3")) + ); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (remoteTaskRunner.getRunningTasks().size() < 2) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + + Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); + Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); + } + + @Test + public void testFailure() throws Exception + { + doSetup(); + + ListenableFuture future = remoteTaskRunner.run(makeTask(TaskStatus.running("task"))); + final String taskStatus = joiner.join(statusPath, "task"); + + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + while (cf.checkExists().forPath(taskStatus) == null) { + Thread.sleep(100); + if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { + throw new ISE("Cannot find running task"); + } + } + Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task")); + + cf.delete().forPath(taskStatus); + + TaskStatus status = future.get(); + + Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.FAILED); + } + + @Test + public void testBootstrap() throws Exception + { + cf.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(joiner.join(statusPath, "first"), jsonMapper.writeValueAsBytes(TaskStatus.running("first"))); + cf.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.EPHEMERAL) + .forPath(joiner.join(statusPath, "second"), jsonMapper.writeValueAsBytes(TaskStatus.running("second"))); + + doSetup(); + + Set existingTasks = Sets.newHashSet(); + for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) { + existingTasks.addAll(zkWorker.getRunningTasks().keySet()); + } + + Assert.assertTrue(existingTasks.size() == 2); + Assert.assertTrue(existingTasks.contains("first")); + Assert.assertTrue(existingTasks.contains("second")); + + remoteTaskRunner.bootstrap(Arrays.asList(makeTask(TaskStatus.running("second")))); + + Set runningTasks = Sets.newHashSet( + Iterables.transform( + remoteTaskRunner.getRunningTasks(), + new Function() + { + @Override + public String apply(RemoteTaskRunnerWorkItem input) + { + return input.getTask().getId(); + } + } + ) + ); + + Assert.assertTrue(runningTasks.size() == 1); + Assert.assertTrue(runningTasks.contains("second")); + Assert.assertFalse(runningTasks.contains("first")); + } + + private void doSetup() throws Exception + { + makeWorker(); + makeRemoteTaskRunner(); + makeTaskMonitor(); + } + + private TestTask makeTask(TaskStatus status) + { + return new TestTask( + status.getId(), "dummyDs", Lists.newArrayList( new DataSegment( @@ -117,194 +295,20 @@ public class RemoteTaskRunnerTest ) ), Lists.newArrayList(), - TaskStatus.success("task1") + status ); - - makeRemoteTaskRunner(); - makeTaskMonitor(); } - @After - public void tearDown() throws Exception - { - testingCluster.stop(); - remoteTaskRunner.stop(); - workerTaskMonitor.stop(); - } - - @Test - public void testRunNoExistingTask() throws Exception - { - remoteTaskRunner.run(task1); - } - - @Test - public void testExceptionThrownWithExistingTask() throws Exception - { - remoteTaskRunner.run( - new TestTask( - task1.getId(), - task1.getDataSource(), - task1.getSegments(), - Lists.newArrayList(), - TaskStatus.running(task1.getId()) - ) - ); - try { - remoteTaskRunner.run(task1); - fail("ISE expected"); - } - catch (ISE expected) { - } - } - - @Test - public void testRunTooMuchZKData() throws Exception - { - ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); - EmittingLogger.registerEmitter(emitter); - EasyMock.replay(emitter); - remoteTaskRunner.run( - new TestTask( - new String(new char[5000]), - "dummyDs", - Lists.newArrayList( - new DataSegment( - "dummyDs", - new Interval(new DateTime(), new DateTime()), - new DateTime().toString(), - null, - null, - null, - null, - 0, - 0 - ) - ), - Lists.newArrayList(), - TaskStatus.success("foo") - ) - ); - EasyMock.verify(emitter); - } - - @Test - public void testRunWithCallback() throws Exception - { - final MutableBoolean callbackCalled = new MutableBoolean(false); - - Futures.addCallback( - remoteTaskRunner.run( - new TestTask( - task1.getId(), - task1.getDataSource(), - task1.getSegments(), - Lists.newArrayList(), - TaskStatus.running(task1.getId()) - ) - ), new FutureCallback() - { - @Override - public void onSuccess(TaskStatus taskStatus) - { - callbackCalled.setValue(true); - } - - @Override - public void onFailure(Throwable throwable) - { - // neg - } - } - ); - - // Really don't like this way of waiting for the task to appear - int count = 0; - while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) == null) { - Thread.sleep(500); - if (count > 10) { - throw new ISE("WTF?! Task still not announced in ZK?"); - } - count++; - } - - Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 1); - - // Complete the task - cf.setData().forPath( - String.format("%s/worker1/task1", statusPath), - jsonMapper.writeValueAsBytes(TaskStatus.success(task1.getId())) - ); - - // Really don't like this way of waiting for the task to disappear - count = 0; - while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) != null) { - Thread.sleep(500); - if (count > 10) { - throw new ISE("WTF?! Task still exists in ZK?"); - } - count++; - } - - Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue()); - } - - - @Test - public void testRunSameAvailabilityGroup() throws Exception - { - TestRealtimeTask theTask = new TestRealtimeTask("rt1", "rt1", "foo", TaskStatus.running("rt1")); - remoteTaskRunner.run(theTask); - remoteTaskRunner.run( - new TestRealtimeTask("rt2", "rt1", "foo", TaskStatus.running("rt2")) - ); - remoteTaskRunner.run( - new TestRealtimeTask("rt3", "rt2", "foo", TaskStatus.running("rt3")) - ); - - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - while (remoteTaskRunner.getRunningTasks().isEmpty()) { - Thread.sleep(100); - if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { - throw new ISE("Cannot find running task"); - } - } - - Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1); - Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); - } - - private void makeTaskMonitor() throws Exception { - WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator( + workerCuratorCoordinator = new WorkerCuratorCoordinator( jsonMapper, new IndexerZkConfig() { - @Override - public String getIndexerAnnouncementPath() - { - return announcementsPath; - } - - @Override - public String getIndexerTaskPath() - { - return tasksPath; - } - - @Override - public String getIndexerStatusPath() - { - return statusPath; - } - @Override public String getZkBasePath() { - throw new UnsupportedOperationException(); + return basePath; } @Override @@ -314,13 +318,14 @@ public class RemoteTaskRunnerTest } }, cf, - worker1 + worker ); workerCuratorCoordinator.start(); + // Start a task monitor workerTaskMonitor = new WorkerTaskMonitor( jsonMapper, - new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), + new PathChildrenCache(cf, tasksPath, true), cf, workerCuratorCoordinator, new ThreadPoolTaskRunner( @@ -328,14 +333,11 @@ public class RemoteTaskRunnerTest new TaskConfig() { @Override - public File getBaseTaskDir() + public String getBaseDir() { - try { - return File.createTempFile("billy", "yay"); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + File tmp = Files.createTempDir(); + tmp.deleteOnExit(); + return tmp.toString(); } @Override @@ -361,80 +363,45 @@ public class RemoteTaskRunnerTest private void makeRemoteTaskRunner() throws Exception { - scheduledExec = EasyMock.createMock(ScheduledExecutorService.class); - remoteTaskRunner = new RemoteTaskRunner( jsonMapper, new TestRemoteTaskRunnerConfig(), cf, - pathChildrenCache, - scheduledExec, - new RetryPolicyFactory(new TestRetryPolicyConfig()), + new SimplePathChildrenCacheFactory.Builder().build(), new AtomicReference(new WorkerSetupData("0", 0, 1, null, null)), null ); - // Create a single worker and wait for things for be ready remoteTaskRunner.start(); - cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( - String.format("%s/worker1", announcementsPath), - jsonMapper.writeValueAsBytes(worker1) - ); - int count = 0; - while (remoteTaskRunner.getWorkers().size() == 0) { - Thread.sleep(500); - if (count > 10) { - throw new ISE("WTF?! Still can't find worker!"); - } - count++; - } } - private static class TestRetryPolicyConfig extends RetryPolicyConfig + private void makeWorker() throws Exception { - @Override - public Duration getRetryMinDuration() - { - return null; - } + worker = new Worker( + "worker", + "localhost", + 3, + "0" + ); - @Override - public Duration getRetryMaxDuration() - { - return null; - } - - @Override - public long getMaxRetryCount() - { - return 0; - } + cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( + announcementsPath, + jsonMapper.writeValueAsBytes(worker) + ); } private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig { @Override - public String getIndexerAnnouncementPath() + public boolean enableCompression() { - return announcementsPath; - } - - @Override - public String getIndexerTaskPath() - { - return tasksPath; - } - - @Override - public String getIndexerStatusPath() - { - return statusPath; + return false; } @Override public String getZkBasePath() { - throw new UnsupportedOperationException(); + return basePath; } @Override diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java index ecd31c985d7..17e718a8983 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskLifecycleTest.java @@ -36,6 +36,7 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexer.granularity.UniformGranularitySpec; +import com.metamx.druid.indexing.common.task.TaskResource; import com.metamx.druid.input.InputRow; import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.jackson.DefaultObjectMapper; @@ -120,9 +121,9 @@ public class TaskLifecycleTest new TaskConfig() { @Override - public File getBaseTaskDir() + public String getBaseDir() { - return tmp; + return tmp.toString(); } @Override @@ -285,7 +286,7 @@ public class TaskLifecycleTest @Test public void testSimple() throws Exception { - final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", new TaskResource("id1", 1), "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() @@ -322,7 +323,7 @@ public class TaskLifecycleTest @Test public void testBadInterval() throws Exception { - final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() @@ -356,7 +357,7 @@ public class TaskLifecycleTest @Test public void testBadVersion() throws Exception { - final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) + final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) { @Override public String getType() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java index 533ae3d2760..f421bee7f3c 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TaskQueueTest.java @@ -348,7 +348,7 @@ public class TaskQueueTest private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval) { - return new AbstractTask(id, groupId, id, dataSource, interval) + return new AbstractTask(id, groupId, dataSource, interval) { @Override public TaskStatus run(TaskToolbox toolbox) throws Exception @@ -372,7 +372,7 @@ public class TaskQueueTest final List nextTasks ) { - return new AbstractTask(id, groupId, id, dataSource, interval) + return new AbstractTask(id, groupId, dataSource, interval) { @Override public String getType() diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java index ee7d077cdd7..9c30cf690d0 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/TestRealtimeTask.java @@ -27,6 +27,7 @@ import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.task.RealtimeIndexTask; +import com.metamx.druid.indexing.common.task.TaskResource; import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; @@ -40,14 +41,14 @@ public class TestRealtimeTask extends RealtimeIndexTask @JsonCreator public TestRealtimeTask( @JsonProperty("id") String id, - @JsonProperty("availabilityGroup") String availGroup, + @JsonProperty("resource") TaskResource taskResource, @JsonProperty("dataSource") String dataSource, @JsonProperty("taskStatus") TaskStatus status ) { super( id, - availGroup, + taskResource, new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()), null, null, diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java index eba026ea620..8f512390f14 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -84,6 +84,12 @@ public class EC2AutoScalingStrategyTest { return "8080"; } + + @Override + public String getWorkerVersion() + { + return ""; + } }, workerSetupData ); diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java index c44d555f798..7a4e4c736f7 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -19,17 +19,20 @@ package com.metamx.druid.indexing.coordinator.scaling; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.task.Task; -import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; +import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.worker.Worker; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; @@ -42,7 +45,9 @@ import org.junit.Before; import org.junit.Test; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; @@ -127,8 +132,8 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScalingStrategy); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -155,8 +160,8 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScalingStrategy); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -171,8 +176,8 @@ public class SimpleResourceManagementStrategyTest ); provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -212,8 +217,8 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScalingStrategy); boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -230,8 +235,8 @@ public class SimpleResourceManagementStrategyTest Thread.sleep(2000); provisionedSomething = simpleResourceManagementStrategy.doProvision( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(testTask) @@ -264,8 +269,8 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScalingStrategy); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -294,8 +299,8 @@ public class SimpleResourceManagementStrategyTest EasyMock.replay(autoScalingStrategy); boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -309,8 +314,8 @@ public class SimpleResourceManagementStrategyTest ); terminatedSomething = simpleResourceManagementStrategy.doTerminate( - Arrays.asList( - new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + Arrays.asList( + new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime()) ), Arrays.asList( new TestZkWorker(null) @@ -334,18 +339,18 @@ public class SimpleResourceManagementStrategyTest Task testTask ) { - super(new Worker("host", "ip", 3, "version"), null, null); + super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper()); this.testTask = testTask; } @Override - public Set getRunningTasks() + public Map getRunningTasks() { if (testTask == null) { - return Sets.newHashSet(); + return Maps.newHashMap(); } - return Sets.newHashSet(testTask.getId()); + return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId())); } } } diff --git a/server/src/test/java/com/metamx/druid/metrics/NoopServiceEmitter.java b/server/src/test/java/com/metamx/druid/metrics/NoopServiceEmitter.java index 54c158495f9..f980bc4885c 100644 --- a/server/src/test/java/com/metamx/druid/metrics/NoopServiceEmitter.java +++ b/server/src/test/java/com/metamx/druid/metrics/NoopServiceEmitter.java @@ -22,7 +22,8 @@ package com.metamx.druid.metrics; import com.metamx.emitter.core.Event; import com.metamx.emitter.service.ServiceEmitter; -public class NoopServiceEmitter extends ServiceEmitter +public class + NoopServiceEmitter extends ServiceEmitter { public NoopServiceEmitter() {