Merge pull request #203 from metamx/worker-resource

Remove retry logic out of the Remote Task Runner and introduce Task resources
This commit is contained in:
cheddar 2013-08-05 11:11:21 -07:00
commit e1d65e736c
51 changed files with 1155 additions and 781 deletions

View File

@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentMap;
*/ */
public class DruidServer implements Comparable public class DruidServer implements Comparable
{ {
public static final String DEFAULT_TIER = "_default_tier";
private static final Logger log = new Logger(DruidServer.class); private static final Logger log = new Logger(DruidServer.class);
private final Object lock = new Object(); private final Object lock = new Object();

View File

@ -33,9 +33,10 @@ public abstract class DruidServerConfig
public abstract String getHost(); public abstract String getHost();
@Config("druid.server.maxSize") @Config("druid.server.maxSize")
@Default("0")
public abstract long getMaxSize(); public abstract long getMaxSize();
@Config("druid.server.tier") @Config("druid.server.tier")
@Default("_default_tier") @Default(DruidServer.DEFAULT_TIER)
public abstract String getTier(); public abstract String getTier();
} }

View File

@ -21,8 +21,11 @@ package com.metamx.druid.curator.cache;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.utils.ThreadUtils;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/** /**
*/ */
@ -48,4 +51,43 @@ public class SimplePathChildrenCacheFactory implements PathChildrenCacheFactory
{ {
return new PathChildrenCache(curator, path, cacheData, compressed, exec); return new PathChildrenCache(curator, path, cacheData, compressed, exec);
} }
public static class Builder
{
private static final ThreadFactory defaultThreadFactory = ThreadUtils.newThreadFactory("PathChildrenCache");
private boolean cacheData;
private boolean compressed;
private ExecutorService exec;
public Builder()
{
cacheData = true;
compressed = false;
exec = Executors.newSingleThreadExecutor(defaultThreadFactory);
}
public Builder withCacheData(boolean cacheData)
{
this.cacheData = cacheData;
return this;
}
public Builder withCompressed(boolean compressed)
{
this.compressed = compressed;
return this;
}
public Builder withExecutorService(ExecutorService exec)
{
this.exec = exec;
return this;
}
public SimplePathChildrenCacheFactory build()
{
return new SimplePathChildrenCacheFactory(cacheData, compressed, exec);
}
}
} }

View File

@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects; import com.google.common.base.Objects;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.metamx.druid.indexing.common.task.TaskResource;
/** /**
* Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be * Represents the status of a task. The task may be ongoing ({@link #isComplete()} false) or it may be
@ -42,33 +43,36 @@ public class TaskStatus
public static TaskStatus running(String taskId) public static TaskStatus running(String taskId)
{ {
return new TaskStatus(taskId, Status.RUNNING, -1); return new TaskStatus(taskId, Status.RUNNING, -1, null);
} }
public static TaskStatus success(String taskId) public static TaskStatus success(String taskId)
{ {
return new TaskStatus(taskId, Status.SUCCESS, -1); return new TaskStatus(taskId, Status.SUCCESS, -1, null);
} }
public static TaskStatus failure(String taskId) public static TaskStatus failure(String taskId)
{ {
return new TaskStatus(taskId, Status.FAILED, -1); return new TaskStatus(taskId, Status.FAILED, -1, null);
} }
private final String id; private final String id;
private final Status status; private final Status status;
private final long duration; private final long duration;
private final TaskResource resource;
@JsonCreator @JsonCreator
private TaskStatus( private TaskStatus(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("status") Status status, @JsonProperty("status") Status status,
@JsonProperty("duration") long duration @JsonProperty("duration") long duration,
@JsonProperty("resource") TaskResource resource
) )
{ {
this.id = id; this.id = id;
this.status = status; this.status = status;
this.duration = duration; this.duration = duration;
this.resource = resource == null ? new TaskResource(id, 1) : resource;
// Check class invariants. // Check class invariants.
Preconditions.checkNotNull(id, "id"); Preconditions.checkNotNull(id, "id");
@ -93,6 +97,12 @@ public class TaskStatus
return duration; return duration;
} }
@JsonProperty("resource")
public TaskResource getResource()
{
return resource;
}
/** /**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time. * isSuccess, or isFailure will be true at any one time.
@ -134,7 +144,7 @@ public class TaskStatus
public TaskStatus withDuration(long _duration) public TaskStatus withDuration(long _duration)
{ {
return new TaskStatus(id, status, _duration); return new TaskStatus(id, status, _duration, resource);
} }
@Override @Override
@ -144,6 +154,7 @@ public class TaskStatus
.add("id", id) .add("id", id)
.add("status", status) .add("status", status)
.add("duration", duration) .add("duration", duration)
.add("resource", resource)
.toString(); .toString();
} }
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.indexing.common.config; package com.metamx.druid.indexing.common.config;
import com.google.common.base.Joiner;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
@ -26,13 +27,30 @@ import java.io.File;
public abstract class TaskConfig public abstract class TaskConfig
{ {
private static Joiner joiner = Joiner.on("/");
@Config("druid.indexer.baseDir")
@Default("/tmp/")
public abstract String getBaseDir();
@Config("druid.indexer.taskDir") @Config("druid.indexer.taskDir")
public abstract File getBaseTaskDir(); public File getBaseTaskDir()
{
return new File(defaultPath("persistent/task"));
}
@Config("druid.indexer.hadoopWorkingPath")
public String getHadoopWorkingPath()
{
return defaultPath("druid-indexing");
}
@Config("druid.indexer.rowFlushBoundary") @Config("druid.indexer.rowFlushBoundary")
@Default("500000") @Default("500000")
public abstract int getDefaultRowFlushBoundary(); public abstract int getDefaultRowFlushBoundary();
@Config("druid.indexer.hadoopWorkingPath") private String defaultPath(String subPath)
public abstract String getHadoopWorkingPath(); {
return joiner.join(getBaseDir(), subPath);
}
} }

View File

@ -1,10 +1,15 @@
package com.metamx.druid.indexing.common.config; package com.metamx.druid.indexing.common.config;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default;
import org.skife.config.DefaultNull; import org.skife.config.DefaultNull;
public abstract class TaskLogConfig public abstract class TaskLogConfig
{ {
@Config("druid.indexer.logs.type")
@Default("noop")
public abstract String getLogType();
@Config("druid.indexer.logs.s3bucket") @Config("druid.indexer.logs.s3bucket")
@DefaultNull @DefaultNull
public abstract String getLogStorageBucket(); public abstract String getLogStorageBucket();

View File

@ -1,7 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.index; package com.metamx.druid.indexing.common.index;
/** /**
* Objects that can be registered with a {@link ChatHandlerProvider} and provide http endpoints for indexing-related * Objects that can be registered with a {@link EventReceivingChatHandlerProvider} and provide http endpoints for indexing-related
* objects. This interface is empty because it only exists to signal intent. The actual http endpoints are provided * objects. This interface is empty because it only exists to signal intent. The actual http endpoints are provided
* through JAX-RS annotations on the {@link ChatHandler} objects. * through JAX-RS annotations on the {@link ChatHandler} objects.
*/ */

View File

@ -1,83 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.index; package com.metamx.druid.indexing.common.index;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import java.util.concurrent.ConcurrentMap;
/** /**
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
* allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded
* {@link ServiceAnnouncer} will be used to advertise handlers on this host.
*/ */
public class ChatHandlerProvider public interface ChatHandlerProvider
{ {
private static final Logger log = new Logger(ChatHandlerProvider.class); public void register(final String key, ChatHandler handler);
private final ChatHandlerProviderConfig config; public void unregister(final String key);
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers;
public ChatHandlerProvider( public Optional<ChatHandler> get(final String key);
ChatHandlerProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
public void register(final String key, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
}
public Optional<ChatHandler> get(final String key)
{
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
} }

View File

@ -1,3 +1,22 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.index; package com.metamx.druid.indexing.common.index;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
@ -30,7 +49,7 @@ import java.util.concurrent.TimeUnit;
/** /**
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these * Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link ChatHandlerProvider}. * firehoses with an {@link EventReceivingChatHandlerProvider}.
*/ */
@JsonTypeName("receiver") @JsonTypeName("receiver")
public class EventReceiverFirehoseFactory implements FirehoseFactory public class EventReceiverFirehoseFactory implements FirehoseFactory
@ -41,14 +60,14 @@ public class EventReceiverFirehoseFactory implements FirehoseFactory
private final String firehoseId; private final String firehoseId;
private final int bufferSize; private final int bufferSize;
private final MapInputRowParser parser; private final MapInputRowParser parser;
private final Optional<ChatHandlerProvider> chatHandlerProvider; private final Optional<EventReceivingChatHandlerProvider> chatHandlerProvider;
@JsonCreator @JsonCreator
public EventReceiverFirehoseFactory( public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId, @JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize, @JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser, @JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider @JacksonInject("chatHandlerProvider") EventReceivingChatHandlerProvider chatHandlerProvider
) )
{ {
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId"); this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");

View File

@ -0,0 +1,105 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import java.util.concurrent.ConcurrentMap;
/**
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
* allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded
* {@link ServiceAnnouncer} will be used to advertise handlers on this host.
*/
public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
{
private static final Logger log = new Logger(EventReceivingChatHandlerProvider.class);
private final ChatHandlerProviderConfig config;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers;
public EventReceivingChatHandlerProvider(
ChatHandlerProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
@Override
public void register(final String key, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
}
}
@Override
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
}
@Override
public Optional<ChatHandler> get(final String key)
{
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
}

View File

@ -0,0 +1,45 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.common.index;
import com.google.common.base.Optional;
/**
*/
public class NoopChatHandlerProvider implements ChatHandlerProvider
{
@Override
public void register(String key, ChatHandler handler)
{
// do nothing
}
@Override
public void unregister(String key)
{
// do nothing
}
@Override
public Optional<ChatHandler> get(String key)
{
return Optional.absent();
}
}

View File

@ -43,7 +43,7 @@ public abstract class AbstractTask implements Task
private final String groupId; private final String groupId;
@JsonIgnore @JsonIgnore
private final String availabilityGroup; private final TaskResource taskResource;
@JsonIgnore @JsonIgnore
private final String dataSource; private final String dataSource;
@ -53,23 +53,23 @@ public abstract class AbstractTask implements Task
protected AbstractTask(String id, String dataSource, Interval interval) protected AbstractTask(String id, String dataSource, Interval interval)
{ {
this(id, id, id, dataSource, interval); this(id, id, new TaskResource(id, 1), dataSource, interval);
} }
protected AbstractTask(String id, String groupId, String dataSource, Interval interval) protected AbstractTask(String id, String groupId, String dataSource, Interval interval)
{ {
this.id = Preconditions.checkNotNull(id, "id"); this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId"); this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = id; this.taskResource = new TaskResource(id, 1);
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval); this.interval = Optional.fromNullable(interval);
} }
protected AbstractTask(String id, String groupId, String availabilityGroup, String dataSource, Interval interval) protected AbstractTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
{ {
this.id = Preconditions.checkNotNull(id, "id"); this.id = Preconditions.checkNotNull(id, "id");
this.groupId = Preconditions.checkNotNull(groupId, "groupId"); this.groupId = Preconditions.checkNotNull(groupId, "groupId");
this.availabilityGroup = Preconditions.checkNotNull(availabilityGroup, "availabilityGroup"); this.taskResource = Preconditions.checkNotNull(taskResource, "taskResource");
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.interval = Optional.fromNullable(interval); this.interval = Optional.fromNullable(interval);
} }
@ -90,9 +90,9 @@ public abstract class AbstractTask implements Task
@JsonProperty @JsonProperty
@Override @Override
public String getAvailabilityGroup() public TaskResource getTaskResource()
{ {
return availabilityGroup; return taskResource;
} }
@Override @Override
@ -166,19 +166,16 @@ public abstract class AbstractTask implements Task
AbstractTask that = (AbstractTask) o; AbstractTask that = (AbstractTask) o;
if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) { if (!id.equals(that.id)) {
return false;
}
if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) {
return false;
}
if (id != null ? !id.equals(that.id) : that.id != null) {
return false;
}
if (interval != null ? !interval.equals(that.interval) : that.interval != null) {
return false; return false;
} }
return true; return true;
} }
@Override
public int hashCode()
{
return id.hashCode();
}
} }

View File

@ -88,7 +88,6 @@ public class IndexDeterminePartitionsTask extends AbstractTask
super( super(
id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()), id != null ? id : makeTaskId(groupId, interval.getStart(), interval.getEnd()),
groupId, groupId,
makeTaskId(groupId, interval.getStart(), interval.getEnd()),
schema.getDataSource(), schema.getDataSource(),
Preconditions.checkNotNull(interval, "interval") Preconditions.checkNotNull(interval, "interval")
); );

View File

@ -46,13 +46,11 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartment; import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.RealtimeMetricsMonitor; import com.metamx.druid.realtime.RealtimeMetricsMonitor;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.RejectionPolicyFactory; import com.metamx.druid.realtime.plumber.RejectionPolicyFactory;
@ -105,7 +103,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonCreator @JsonCreator
public RealtimeIndexTask( public RealtimeIndexTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availabilityGroup, @JsonProperty("resource") TaskResource taskResource,
@JsonProperty("schema") Schema schema, @JsonProperty("schema") Schema schema,
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@ -115,16 +113,22 @@ public class RealtimeIndexTask extends AbstractTask
) )
{ {
super( super(
id != null id == null
? id ? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString())
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()), :id,
String.format( String.format(
"index_realtime_%s", "index_realtime_%s",
schema.getDataSource() schema.getDataSource()
), ),
availabilityGroup != null taskResource == null
? availabilityGroup ? new TaskResource(
: makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()), makeTaskId(
schema.getDataSource(),
schema.getShardSpec().getPartitionNum(),
new DateTime().toString()
), 1
)
: taskResource,
schema.getDataSource(), schema.getDataSource(),
null null
); );

View File

@ -72,11 +72,10 @@ public interface Task
public String getGroupId(); public String getGroupId();
/** /**
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same * Returns a {@link com.metamx.druid.indexing.common.task.TaskResource} for this task. Task resources define specific
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the * worker requirements a task may require.
* task ID.
*/ */
public String getAvailabilityGroup(); public TaskResource getTaskResource();
/** /**
* Returns a descriptive label for this task type. Used for metrics emission and logging. * Returns a descriptive label for this task type. Used for metrics emission and logging.

View File

@ -0,0 +1,52 @@
package com.metamx.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class TaskResource
{
private final String availabilityGroup;
private final int requiredCapacity;
@JsonCreator
public TaskResource(
@JsonProperty("availabilityGroup") String availabilityGroup,
@JsonProperty("requiredCapacity") int requiredCapacity
)
{
this.availabilityGroup = availabilityGroup;
this.requiredCapacity = requiredCapacity;
}
/**
* Returns availability group ID of this task. Tasks the same availability group cannot be assigned to the same
* worker. If tasks do not have this restriction, a common convention is to set the availability group ID to the
* task ID.
*/
@JsonProperty
public String getAvailabilityGroup()
{
return availabilityGroup;
}
/**
* Returns the number of worker slots this task will take.
*/
@JsonProperty
public int getRequiredCapacity()
{
return requiredCapacity;
}
@Override
public String toString()
{
return "TaskResource{" +
"availabilityGroup='" + availabilityGroup + '\'' +
", requiredCapacity=" + requiredCapacity +
'}';
}
}

View File

@ -104,7 +104,7 @@ public class VersionConverterTask extends AbstractTask
DataSegment segment DataSegment segment
) )
{ {
super(id, groupId, id, dataSource, interval); super(id, groupId, dataSource, interval);
this.segment = segment; this.segment = segment;
} }
@ -205,13 +205,6 @@ public class VersionConverterTask extends AbstractTask
segment.getShardSpec().getPartitionNum() segment.getShardSpec().getPartitionNum()
), ),
groupId, groupId,
joinId(
groupId,
"sub",
segment.getInterval().getStart(),
segment.getInterval().getEnd(),
segment.getShardSpec().getPartitionNum()
),
segment.getDataSource(), segment.getDataSource(),
segment.getInterval() segment.getInterval()
); );

View File

@ -96,6 +96,12 @@ public class ForkingTaskRunner implements TaskRunner, TaskLogProvider
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
} }
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override @Override
public ListenableFuture<TaskStatus> run(final Task task) public ListenableFuture<TaskStatus> run(final Task task)
{ {

View File

@ -24,22 +24,18 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.indexing.common.RetryPolicy; import com.metamx.druid.curator.cache.PathChildrenCacheFactory;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider; import com.metamx.druid.indexing.common.tasklogs.TaskLogProvider;
@ -49,15 +45,17 @@ import com.metamx.druid.indexing.worker.Worker;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler; import com.metamx.http.client.response.InputStreamResponseHandler;
import com.metamx.http.client.response.ToStringResponseHandler; import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths; import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -68,54 +66,50 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes and manage retries in failure * The RemoteTaskRunner's primary responsibility is to assign tasks to worker nodes.
* scenarios. The RemoteTaskRunner keeps track of which workers are running which tasks and manages coordinator and * The RemoteTaskRunner uses Zookeeper to keep track of which workers are running which tasks. Tasks are assigned by
* worker interactions over Zookeeper. The RemoteTaskRunner is event driven and updates state according to ephemeral * creating ephemeral nodes in ZK that workers must remove. Workers announce the statuses of the tasks they are running.
* node changes in ZK. * Once a task completes, it is up to the RTR to remove the task status and run any necessary cleanup.
* The RemoteTaskRunner is event driven and updates state according to ephemeral node changes in ZK.
* <p/> * <p/>
* The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will * The RemoteTaskRunner will assign tasks to a node until the node hits capacity. At that point, task assignment will
* fail. The RemoteTaskRunner depends on another component to create additional worker resources. * fail. The RemoteTaskRunner depends on another component to create additional worker resources.
* For example, {@link com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler} can take care of these duties. * For example, {@link com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler} can take care of these duties.
* <p/> * <p/>
* If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will automatically retry any tasks * If a worker node becomes inexplicably disconnected from Zk, the RemoteTaskRunner will fail any tasks associated with the worker.
* that were associated with the node.
* <p/> * <p/>
* The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages. * The RemoteTaskRunner uses ZK for job management and assignment and http for IPC messages.
*/ */
public class RemoteTaskRunner implements TaskRunner, TaskLogProvider public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
{ {
private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class); private static final EmittingLogger log = new EmittingLogger(RemoteTaskRunner.class);
private static final ToStringResponseHandler STRING_RESPONSE_HANDLER = new ToStringResponseHandler(Charsets.UTF_8); private static final StatusResponseHandler RESPONSE_HANDLER = new StatusResponseHandler(Charsets.UTF_8);
private static final Joiner JOINER = Joiner.on("/"); private static final Joiner JOINER = Joiner.on("/");
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config; private final RemoteTaskRunnerConfig config;
private final CuratorFramework cf; private final CuratorFramework cf;
private final PathChildrenCacheFactory pathChildrenCacheFactory;
private final PathChildrenCache workerPathCache; private final PathChildrenCache workerPathCache;
private final ScheduledExecutorService scheduledExec;
private final RetryPolicyFactory retryPolicyFactory;
private final AtomicReference<WorkerSetupData> workerSetupData; private final AtomicReference<WorkerSetupData> workerSetupData;
private final HttpClient httpClient; private final HttpClient httpClient;
// all workers that exist in ZK // all workers that exist in ZK
private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>(); private final Map<String, ZkWorker> zkWorkers = new ConcurrentHashMap<String, ZkWorker>();
// all tasks that have been assigned to a worker // all tasks that have been assigned to a worker
private final TaskRunnerWorkQueue runningTasks = new TaskRunnerWorkQueue(); private final RemoteTaskRunnerWorkQueue runningTasks = new RemoteTaskRunnerWorkQueue();
// tasks that have not yet run // tasks that have not yet run
private final TaskRunnerWorkQueue pendingTasks = new TaskRunnerWorkQueue(); private final RemoteTaskRunnerWorkQueue pendingTasks = new RemoteTaskRunnerWorkQueue();
// idempotent task retry
private final Set<String> tasksToRetry = new ConcurrentSkipListSet<String>();
private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor(); private final ExecutorService runPendingTasksExec = Executors.newSingleThreadExecutor();
@ -127,9 +121,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
RemoteTaskRunnerConfig config, RemoteTaskRunnerConfig config,
CuratorFramework cf, CuratorFramework cf,
PathChildrenCache workerPathCache, PathChildrenCacheFactory pathChildrenCacheFactory,
ScheduledExecutorService scheduledExec,
RetryPolicyFactory retryPolicyFactory,
AtomicReference<WorkerSetupData> workerSetupData, AtomicReference<WorkerSetupData> workerSetupData,
HttpClient httpClient HttpClient httpClient
) )
@ -137,9 +129,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.config = config; this.config = config;
this.cf = cf; this.cf = cf;
this.workerPathCache = workerPathCache; this.pathChildrenCacheFactory = pathChildrenCacheFactory;
this.scheduledExec = scheduledExec; this.workerPathCache = pathChildrenCacheFactory.make(cf, config.getIndexerAnnouncementPath());
this.retryPolicyFactory = retryPolicyFactory;
this.workerSetupData = workerSetupData; this.workerSetupData = workerSetupData;
this.httpClient = httpClient; this.httpClient = httpClient;
} }
@ -159,25 +150,37 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
@Override @Override
public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, final PathChildrenCacheEvent event) throws Exception
{ {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { Worker worker;
final Worker worker = jsonMapper.readValue( switch (event.getType()) {
case CHILD_ADDED:
worker = jsonMapper.readValue(
event.getData().getData(), event.getData().getData(),
Worker.class Worker.class
); );
log.info("Worker[%s] reportin' for duty!", worker.getHost()); addWorker(worker, PathChildrenCache.StartMode.NORMAL);
addWorker(worker); break;
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { case CHILD_REMOVED:
final Worker worker = jsonMapper.readValue( worker = jsonMapper.readValue(
event.getData().getData(), event.getData().getData(),
Worker.class Worker.class
); );
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
removeWorker(worker); removeWorker(worker);
break;
default:
break;
} }
} }
} }
); );
workerPathCache.start(); workerPathCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
for (ChildData childData : workerPathCache.getCurrentData()) {
final Worker worker = jsonMapper.readValue(
childData.getData(),
Worker.class
);
addWorker(worker, PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
}
started = true; started = true;
} }
@ -197,6 +200,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
for (ZkWorker zkWorker : zkWorkers.values()) { for (ZkWorker zkWorker : zkWorkers.values()) {
zkWorker.close(); zkWorker.close();
} }
workerPathCache.close();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -213,13 +217,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
} }
@Override @Override
public Collection<TaskRunnerWorkItem> getRunningTasks() public Collection<RemoteTaskRunnerWorkItem> getRunningTasks()
{ {
return runningTasks.values(); return runningTasks.values();
} }
@Override @Override
public Collection<TaskRunnerWorkItem> getPendingTasks() public Collection<RemoteTaskRunnerWorkItem> getPendingTasks()
{ {
return pendingTasks.values(); return pendingTasks.values();
} }
@ -227,18 +231,49 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
public ZkWorker findWorkerRunningTask(String taskId) public ZkWorker findWorkerRunningTask(String taskId)
{ {
for (ZkWorker zkWorker : zkWorkers.values()) { for (ZkWorker zkWorker : zkWorkers.values()) {
if (zkWorker.getRunningTasks().contains(taskId)) { if (zkWorker.isRunningTask(taskId)) {
return zkWorker; return zkWorker;
} }
} }
return null; return null;
} }
public boolean isWorkerRunningTask(String workerHost, String taskId) public boolean isWorkerRunningTask(Worker worker, Task task)
{ {
ZkWorker zkWorker = zkWorkers.get(workerHost); ZkWorker zkWorker = zkWorkers.get(worker.getHost());
return (zkWorker != null && zkWorker.getRunningTasks().contains(taskId)); return (zkWorker != null && zkWorker.isRunningTask(task.getId()));
}
@Override
public void bootstrap(List<Task> tasks)
{
try {
if (!started) {
throw new ISE("Must start RTR first before calling bootstrap!");
}
Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : zkWorkers.values()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
}
for (Task task : tasks) {
if (existingTasks.contains(task.getId())) {
log.info("Bootstrap found %s running.", task.getId());
runningTasks.put(
task.getId(),
new RemoteTaskRunnerWorkItem(task, SettableFuture.<TaskStatus>create())
);
} else {
log.info("Bootstrap didn't find %s running. Running it again", task.getId());
run(task);
}
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
/** /**
@ -252,8 +287,9 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) { if (runningTasks.containsKey(task.getId()) || pendingTasks.containsKey(task.getId())) {
throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId()); throw new ISE("Assigned a task[%s] that is already running or pending, WTF is happening?!", task.getId());
} }
TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem( RemoteTaskRunnerWorkItem taskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
task, SettableFuture.<TaskStatus>create(), retryPolicyFactory.makeRetryPolicy(), new DateTime() task,
SettableFuture.<TaskStatus>create()
); );
addPendingTask(taskRunnerWorkItem); addPendingTask(taskRunnerWorkItem);
return taskRunnerWorkItem.getResult(); return taskRunnerWorkItem.getResult();
@ -262,7 +298,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/** /**
* Finds the worker running the task and forwards the shutdown signal to the worker. * Finds the worker running the task and forwards the shutdown signal to the worker.
* *
* @param taskId * @param taskId - task id to shutdown
*/ */
@Override @Override
public void shutdown(String taskId) public void shutdown(String taskId)
@ -275,39 +311,29 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
final ZkWorker zkWorker = findWorkerRunningTask(taskId); final ZkWorker zkWorker = findWorkerRunningTask(taskId);
if (zkWorker == null) { if (zkWorker == null) {
// Would be nice to have an ability to shut down pending tasks
log.info("Can't shutdown! No worker running task %s", taskId); log.info("Can't shutdown! No worker running task %s", taskId);
return; return;
} }
final RetryPolicy shutdownRetryPolicy = retryPolicyFactory.makeRetryPolicy();
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
while (!shutdownRetryPolicy.hasExceededRetryThreshold()) {
try { try {
final String response = httpClient.post(url) final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/shutdown", taskId));
.go(STRING_RESPONSE_HANDLER) final StatusResponseHolder response = httpClient.post(url)
.go(RESPONSE_HANDLER)
.get(); .get();
log.info("Sent shutdown message to worker: %s, response: %s", zkWorker.getWorker().getHost(), response);
return; log.info(
"Sent shutdown message to worker: %s, status %s, response: %s",
zkWorker.getWorker().getHost(),
response.getStatus(),
response.getContent()
);
if (!response.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
log.error("Shutdown failed for %s! Are you sure the task was running?", taskId);
}
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Exception shutting down taskId: %s", taskId);
if (shutdownRetryPolicy.hasExceededRetryThreshold()) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} else {
try {
final long sleepTime = shutdownRetryPolicy.getAndIncrementRetryDelay().getMillis();
log.info("Will try again in %s.", new Duration(sleepTime).toString());
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
throw Throwables.propagate(e2);
}
}
}
} }
} }
@ -321,7 +347,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
return Optional.absent(); return Optional.absent();
} else { } else {
// Worker is still running this task // Worker is still running this task
final URL url = workerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset)); final URL url = makeWorkerURL(zkWorker.getWorker(), String.format("/task/%s/log?offset=%d", taskId, offset));
return Optional.<InputSupplier<InputStream>>of( return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>() new InputSupplier<InputStream>()
{ {
@ -347,7 +373,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
} }
} }
private URL workerURL(Worker worker, String path) private URL makeWorkerURL(Worker worker, String path)
{ {
Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path); Preconditions.checkArgument(path.startsWith("/"), "path must start with '/': %s", path);
@ -361,10 +387,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
/** /**
* Adds a task to the pending queue * Adds a task to the pending queue
*
* @param taskRunnerWorkItem
*/ */
private void addPendingTask(final TaskRunnerWorkItem taskRunnerWorkItem) private void addPendingTask(final RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{ {
log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId()); log.info("Added pending task %s", taskRunnerWorkItem.getTask().getId());
@ -388,8 +412,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
try { try {
// make a copy of the pending tasks because assignTask may delete tasks from pending and move them // make a copy of the pending tasks because assignTask may delete tasks from pending and move them
// into running status // into running status
List<TaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values()); List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (TaskRunnerWorkItem taskWrapper : copy) { for (RemoteTaskRunnerWorkItem taskWrapper : copy) {
assignTask(taskWrapper); assignTask(taskWrapper);
} }
} }
@ -403,42 +427,6 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
); );
} }
/**
* Retries a task by inserting it back into the pending queue after a given delay.
*
* @param taskRunnerWorkItem - the task to retry
*/
private void retryTask(final TaskRunnerWorkItem taskRunnerWorkItem)
{
final String taskId = taskRunnerWorkItem.getTask().getId();
if (tasksToRetry.contains(taskId)) {
return;
}
tasksToRetry.add(taskId);
if (!taskRunnerWorkItem.getRetryPolicy().hasExceededRetryThreshold()) {
log.info("Retry scheduled in %s for %s", taskRunnerWorkItem.getRetryPolicy().getRetryDelay(), taskId);
scheduledExec.schedule(
new Runnable()
{
@Override
public void run()
{
runningTasks.remove(taskId);
tasksToRetry.remove(taskId);
addPendingTask(taskRunnerWorkItem);
}
},
taskRunnerWorkItem.getRetryPolicy().getAndIncrementRetryDelay().getMillis(),
TimeUnit.MILLISECONDS
);
} else {
log.makeAlert("Task exceeded retry threshold").addData("task", taskId).emit();
}
}
/** /**
* Removes a task from the running queue and clears out the ZK status path of the task. * Removes a task from the running queue and clears out the ZK status path of the task.
* *
@ -464,21 +452,16 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
* *
* @param taskRunnerWorkItem - the task to assign * @param taskRunnerWorkItem - the task to assign
*/ */
private void assignTask(TaskRunnerWorkItem taskRunnerWorkItem) private void assignTask(RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{ {
try { try {
final String taskId = taskRunnerWorkItem.getTask().getId(); final String taskId = taskRunnerWorkItem.getTask().getId();
ZkWorker zkWorker = findWorkerRunningTask(taskId);
// If a worker is already running this task, we don't need to announce it if (runningTasks.containsKey(taskId) || findWorkerRunningTask(taskId) != null) {
if (zkWorker != null) { log.info("Task[%s] already running.", taskId);
final Worker worker = zkWorker.getWorker();
log.info("Worker[%s] is already running task[%s].", worker.getHost(), taskId);
runningTasks.put(taskId, pendingTasks.remove(taskId));
log.info("Task %s switched from pending to running", taskId);
} else { } else {
// Nothing running this task, announce it in ZK for a worker to run it // Nothing running this task, announce it in ZK for a worker to run it
zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask()); ZkWorker zkWorker = findWorkerForTask(taskRunnerWorkItem.getTask());
if (zkWorker != null) { if (zkWorker != null) {
announceTask(zkWorker.getWorker(), taskRunnerWorkItem); announceTask(zkWorker.getWorker(), taskRunnerWorkItem);
} }
@ -496,7 +479,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
* @param theWorker The worker the task is assigned to * @param theWorker The worker the task is assigned to
* @param taskRunnerWorkItem The task to be assigned * @param taskRunnerWorkItem The task to be assigned
*/ */
private void announceTask(Worker theWorker, TaskRunnerWorkItem taskRunnerWorkItem) throws Exception private void announceTask(Worker theWorker, RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{ {
final Task task = taskRunnerWorkItem.getTask(); final Task task = taskRunnerWorkItem.getTask();
@ -525,7 +508,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
Stopwatch timeoutStopwatch = new Stopwatch(); Stopwatch timeoutStopwatch = new Stopwatch();
timeoutStopwatch.start(); timeoutStopwatch.start();
synchronized (statusLock) { synchronized (statusLock) {
while (!isWorkerRunningTask(theWorker.getHost(), task.getId())) { while (!isWorkerRunningTask(theWorker, task)) {
statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis()); statusLock.wait(config.getTaskAssignmentTimeoutDuration().getMillis());
if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) { if (timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS) >= config.getTaskAssignmentTimeoutDuration().getMillis()) {
log.error( log.error(
@ -534,7 +517,8 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
task.getId(), task.getId(),
config.getTaskAssignmentTimeoutDuration() config.getTaskAssignmentTimeoutDuration()
); );
retryTask(runningTasks.get(task.getId()));
taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
break; break;
} }
} }
@ -548,11 +532,13 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
* *
* @param worker - contains metadata for a worker that has appeared in ZK * @param worker - contains metadata for a worker that has appeared in ZK
*/ */
private void addWorker(final Worker worker) private ZkWorker addWorker(final Worker worker, PathChildrenCache.StartMode startMode)
{ {
log.info("Worker[%s] reportin' for duty!", worker.getHost());
try { try {
final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final PathChildrenCache statusCache = pathChildrenCacheFactory.make(cf, workerStatusPath);
final ZkWorker zkWorker = new ZkWorker( final ZkWorker zkWorker = new ZkWorker(
worker, worker,
statusCache, statusCache,
@ -560,43 +546,39 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
); );
// Add status listener to the watcher for status changes // Add status listener to the watcher for status changes
statusCache.getListenable().addListener( zkWorker.addListener(
new PathChildrenCacheListener() new PathChildrenCacheListener()
{ {
@Override @Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception
{ {
String taskId;
RemoteTaskRunnerWorkItem taskRunnerWorkItem;
synchronized (statusLock) { synchronized (statusLock) {
try { try {
if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED) || switch (event.getType()) {
event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { case CHILD_ADDED:
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); case CHILD_UPDATED:
taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
final TaskStatus taskStatus = jsonMapper.readValue( final TaskStatus taskStatus = jsonMapper.readValue(
event.getData().getData(), TaskStatus.class event.getData().getData(), TaskStatus.class
); );
// This can fail if a worker writes a bogus status. Retry if so.
if (!taskStatus.getId().equals(taskId)) {
retryTask(runningTasks.get(taskId));
return;
}
log.info( log.info(
"Worker[%s] wrote %s status for task: %s", "Worker[%s] wrote %s status for task: %s",
worker.getHost(), zkWorker.getWorker().getHost(),
taskStatus.getStatusCode(), taskStatus.getStatusCode(),
taskId taskId
); );
// Synchronizing state with ZK // Synchronizing state with ZK
statusLock.notify(); statusLock.notify();
final TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem == null) { if (taskRunnerWorkItem == null) {
log.warn( log.warn(
"WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", "WTF?! Worker[%s] announcing a status for a task I didn't know about: %s",
worker.getHost(), zkWorker.getWorker().getHost(),
taskId taskId
); );
} }
@ -611,21 +593,23 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
// Worker is done with this task // Worker is done with this task
zkWorker.setLastCompletedTaskTime(new DateTime()); zkWorker.setLastCompletedTaskTime(new DateTime());
cleanup(worker.getHost(), taskId); cleanup(zkWorker.getWorker().getHost(), taskId);
runPendingTasks(); runPendingTasks();
} }
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { break;
final String taskId = ZKPaths.getNodeFromPath(event.getData().getPath()); case CHILD_REMOVED:
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); taskId = ZKPaths.getNodeFromPath(event.getData().getPath());
taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
log.info("Task %s just disappeared!", taskId); log.info("Task %s just disappeared!", taskId);
retryTask(taskRunnerWorkItem); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} }
break;
} }
} }
catch (Exception e) { catch (Exception e) {
log.makeAlert(e, "Failed to handle new worker status") log.makeAlert(e, "Failed to handle new worker status")
.addData("worker", worker.getHost()) .addData("worker", zkWorker.getWorker().getHost())
.addData("znode", event.getData().getPath()) .addData("znode", event.getData().getPath())
.emit(); .emit();
} }
@ -633,10 +617,11 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
} }
} }
); );
zkWorkers.put(worker.getHost(), zkWorker);
statusCache.start();
runPendingTasks(); zkWorker.start(startMode);
zkWorkers.put(worker.getHost(), zkWorker);
return zkWorker;
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -652,38 +637,35 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
*/ */
private void removeWorker(final Worker worker) private void removeWorker(final Worker worker)
{ {
log.info("Kaboom! Worker[%s] removed!", worker.getHost());
ZkWorker zkWorker = zkWorkers.get(worker.getHost()); ZkWorker zkWorker = zkWorkers.get(worker.getHost());
if (zkWorker != null) { if (zkWorker != null) {
try { try {
Set<String> tasksToRetry = Sets.newHashSet( for (String assignedTask : cf.getChildren()
cf.getChildren() .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))) {
.forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost())) RemoteTaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(assignedTask);
);
tasksToRetry.addAll(
cf.getChildren()
.forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
);
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId); String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), assignedTask);
if (cf.checkExists().forPath(taskPath) != null) { if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath); cf.delete().guaranteed().forPath(taskPath);
} }
retryTask(taskRunnerWorkItem); taskRunnerWorkItem.setResult(TaskStatus.failure(taskRunnerWorkItem.getTask().getId()));
} else { } else {
log.warn("RemoteTaskRunner has no knowledge of task %s", taskId); log.warn("RemoteTaskRunner has no knowledge of task %s", assignedTask);
} }
} }
zkWorker.close();
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
} }
finally { finally {
try {
zkWorker.close();
}
catch (Exception e) {
log.error(e, "Exception closing worker %s!", worker.getHost());
}
zkWorkers.remove(worker.getHost()); zkWorkers.remove(worker.getHost());
} }
} }
@ -691,48 +673,26 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private ZkWorker findWorkerForTask(final Task task) private ZkWorker findWorkerForTask(final Task task)
{ {
try { TreeSet<ZkWorker> sortedWorkers = Sets.newTreeSet(
final MinMaxPriorityQueue<ZkWorker> workerQueue = MinMaxPriorityQueue.<ZkWorker>orderedBy(
new Comparator<ZkWorker>() new Comparator<ZkWorker>()
{ {
@Override @Override
public int compare(ZkWorker w1, ZkWorker w2) public int compare(
{ ZkWorker zkWorker, ZkWorker zkWorker2
return -Ints.compare(w1.getRunningTasks().size(), w2.getRunningTasks().size());
}
}
).create(
FunctionalIterable.create(zkWorkers.values()).filter(
new Predicate<ZkWorker>()
{
@Override
public boolean apply(ZkWorker input)
{
for (String taskId : input.getRunningTasks()) {
TaskRunnerWorkItem workerTask = runningTasks.get(taskId);
if (workerTask != null && task.getAvailabilityGroup()
.equalsIgnoreCase(workerTask.getTask().getAvailabilityGroup())) {
return false;
}
}
return (!input.isAtCapacity() &&
input.getWorker()
.getVersion()
.compareTo(workerSetupData.get().getMinVersion()) >= 0);
}
}
) )
{
return -Ints.compare(zkWorker.getCurrCapacity(), zkWorker2.getCurrCapacity());
}
}
); );
sortedWorkers.addAll(zkWorkers.values());
if (workerQueue.isEmpty()) { for (ZkWorker zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) &&
zkWorker.getWorker().getVersion().compareTo(workerSetupData.get().getMinVersion()) >= 0) {
return zkWorker;
}
}
log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values()); log.debug("Worker nodes %s do not have capacity to run any more tasks!", zkWorkers.values());
return null; return null;
} }
return workerQueue.peek();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
} }

View File

@ -0,0 +1,63 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.indexing.coordinator;
import com.google.common.util.concurrent.SettableFuture;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
/**
*/
public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result
)
{
super(task, result);
this.result = result;
}
public RemoteTaskRunnerWorkItem(
Task task,
SettableFuture<TaskStatus> result,
DateTime createdTime,
DateTime queueInsertionTime
)
{
super(task, result, createdTime, queueInsertionTime);
this.result = result;
}
public void setResult(TaskStatus status)
{
result.set(status);
}
@Override
public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTask(), result, getCreatedTime(), time);
}
}

View File

@ -25,10 +25,10 @@ import java.util.concurrent.ConcurrentSkipListMap;
/** /**
*/ */
public class TaskRunnerWorkQueue extends ConcurrentSkipListMap<String, TaskRunnerWorkItem> public class RemoteTaskRunnerWorkQueue extends ConcurrentSkipListMap<String, RemoteTaskRunnerWorkItem>
{ {
@Override @Override
public TaskRunnerWorkItem put(String s, TaskRunnerWorkItem taskRunnerWorkItem) public RemoteTaskRunnerWorkItem put(String s, RemoteTaskRunnerWorkItem taskRunnerWorkItem)
{ {
return super.put(s, taskRunnerWorkItem.withQueueInsertionTime(new DateTime())); return super.put(s, taskRunnerWorkItem.withQueueInsertionTime(new DateTime()));
} }

View File

@ -21,12 +21,11 @@ package com.metamx.druid.indexing.coordinator;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.indexing.common.actions.TaskActionClient; import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
@ -34,6 +33,8 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer; import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory; import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
@ -88,7 +89,6 @@ public class TaskMasterLifecycle
log.info("By the power of Grayskull, I have the power!"); log.info("By the power of Grayskull, I have the power!");
taskRunner = runnerFactory.build(); taskRunner = runnerFactory.build();
resourceManagementScheduler = managementSchedulerFactory.build(taskRunner);
final TaskConsumer taskConsumer = new TaskConsumer( final TaskConsumer taskConsumer = new TaskConsumer(
taskQueue, taskQueue,
taskRunner, taskRunner,
@ -101,12 +101,34 @@ public class TaskMasterLifecycle
// Sensible order to start stuff: // Sensible order to start stuff:
final Lifecycle leaderLifecycle = new Lifecycle(); final Lifecycle leaderLifecycle = new Lifecycle();
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskRunner);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
taskRunner.bootstrap(taskQueue.snapshot());
}
@Override
public void stop()
{
}
}
);
leaderLifecycle.addManagedInstance(taskQueue);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle); Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer); leaderLifecycle.addManagedInstance(taskConsumer);
if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) {
if (!(taskRunner instanceof RemoteTaskRunner)) {
throw new ISE("WTF?! We configured a remote runner and got %s", taskRunner.getClass());
}
resourceManagementScheduler = managementSchedulerFactory.build((RemoteTaskRunner) taskRunner);
leaderLifecycle.addManagedInstance(resourceManagementScheduler); leaderLifecycle.addManagedInstance(resourceManagementScheduler);
}
try { try {
leaderLifecycle.start(); leaderLifecycle.start();

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
@ -165,6 +166,20 @@ public class TaskQueue
} }
} }
/**
* Returns an immutable snapshot of the current status of this queue.
*/
public List<Task> snapshot()
{
giant.lock();
try {
return ImmutableList.copyOf(queue);
} finally {
giant.unlock();
}
}
/** /**
* Starts this task queue. Allows {@link #add(Task)} to accept new tasks. This should not be called on * Starts this task queue. Allows {@link #add(Task)} to accept new tasks. This should not be called on
* an already-started queue. * an already-started queue.

View File

@ -24,6 +24,7 @@ import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
import java.util.Collection; import java.util.Collection;
import java.util.List;
/** /**
* Interface for handing off tasks. Used by a {@link com.metamx.druid.indexing.coordinator.exec.TaskConsumer} to * Interface for handing off tasks. Used by a {@link com.metamx.druid.indexing.coordinator.exec.TaskConsumer} to
@ -31,6 +32,15 @@ import java.util.Collection;
*/ */
public interface TaskRunner public interface TaskRunner
{ {
/**
* Provide a new task runner with a list of tasks that should already be running. Will be called once shortly
* after instantiation and before any calls to {@link #run}. Bootstrapping should not be construed as a command
* to run the tasks; they will be passed to {@link #run} one-by-one when this is desired.
*
* @param tasks the tasks
*/
public void bootstrap(List<Task> tasks);
/** /**
* Run a task. The returned status should be some kind of completed status. * Run a task. The returned status should be some kind of completed status.
* *
@ -44,9 +54,9 @@ public interface TaskRunner
*/ */
public void shutdown(String taskid); public void shutdown(String taskid);
public Collection<TaskRunnerWorkItem> getRunningTasks(); public Collection<? extends TaskRunnerWorkItem> getRunningTasks();
public Collection<TaskRunnerWorkItem> getPendingTasks(); public Collection<? extends TaskRunnerWorkItem> getPendingTasks();
public Collection<ZkWorker> getWorkers(); public Collection<ZkWorker> getWorkers();
} }

View File

@ -22,7 +22,6 @@ package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain; import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.druid.indexing.common.RetryPolicy;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -35,22 +34,29 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
{ {
private final Task task; private final Task task;
private final ListenableFuture<TaskStatus> result; private final ListenableFuture<TaskStatus> result;
private final RetryPolicy retryPolicy;
private final DateTime createdTime; private final DateTime createdTime;
private volatile DateTime queueInsertionTime; private volatile DateTime queueInsertionTime;
public TaskRunnerWorkItem(
Task task,
ListenableFuture<TaskStatus> result
)
{
this(task, result, new DateTime(), new DateTime());
}
public TaskRunnerWorkItem( public TaskRunnerWorkItem(
Task task, Task task,
ListenableFuture<TaskStatus> result, ListenableFuture<TaskStatus> result,
RetryPolicy retryPolicy, DateTime createdTime,
DateTime createdTime DateTime queueInsertionTime
) )
{ {
this.task = task; this.task = task;
this.result = result; this.result = result;
this.retryPolicy = retryPolicy;
this.createdTime = createdTime; this.createdTime = createdTime;
this.queueInsertionTime = queueInsertionTime;
} }
@JsonProperty @JsonProperty
@ -64,11 +70,6 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return result; return result;
} }
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
}
@JsonProperty @JsonProperty
public DateTime getCreatedTime() public DateTime getCreatedTime()
{ {
@ -83,8 +84,7 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
public TaskRunnerWorkItem withQueueInsertionTime(DateTime time) public TaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{ {
this.queueInsertionTime = time; return new TaskRunnerWorkItem(task, result, createdTime, time);
return this;
} }
@Override @Override
@ -102,7 +102,6 @@ public class TaskRunnerWorkItem implements Comparable<TaskRunnerWorkItem>
return "TaskRunnerWorkItem{" + return "TaskRunnerWorkItem{" +
"task=" + task + "task=" + task +
", result=" + result + ", result=" + result +
", retryPolicy=" + retryPolicy +
", createdTime=" + createdTime + ", createdTime=" + createdTime +
'}'; '}';
} }

View File

@ -77,13 +77,19 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
exec.shutdownNow(); exec.shutdownNow();
} }
@Override
public void bootstrap(List<Task> tasks)
{
// do nothing
}
@Override @Override
public ListenableFuture<TaskStatus> run(final Task task) public ListenableFuture<TaskStatus> run(final Task task)
{ {
final TaskToolbox toolbox = toolboxFactory.build(task); final TaskToolbox toolbox = toolboxFactory.build(task);
final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox)); final ListenableFuture<TaskStatus> statusFuture = exec.submit(new ExecutorServiceTaskRunnerCallable(task, toolbox));
final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture, null, new DateTime()); final TaskRunnerWorkItem taskRunnerWorkItem = new TaskRunnerWorkItem(task, statusFuture);
runningItems.add(taskRunnerWorkItem); runningItems.add(taskRunnerWorkItem);
Futures.addCallback( Futures.addCallback(
statusFuture, new FutureCallback<TaskStatus>() statusFuture, new FutureCallback<TaskStatus>()
@ -184,14 +190,10 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final Task task; private final Task task;
private final TaskToolbox toolbox; private final TaskToolbox toolbox;
private final DateTime createdTime;
public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox) public ExecutorServiceTaskRunnerCallable(Task task, TaskToolbox toolbox)
{ {
this.task = task; this.task = task;
this.toolbox = toolbox; this.toolbox = toolbox;
this.createdTime = new DateTime();
} }
@Override @Override
@ -243,12 +245,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
public TaskRunnerWorkItem getTaskRunnerWorkItem() public TaskRunnerWorkItem getTaskRunnerWorkItem()
{ {
return new TaskRunnerWorkItem( return new TaskRunnerWorkItem(task, null);
task,
null,
null,
createdTime
);
} }
} }
} }

View File

@ -24,17 +24,19 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.indexing.worker.Worker;
import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import java.util.Set; import java.util.Set;
/** /**
@ -44,7 +46,7 @@ public class ZkWorker implements Closeable
{ {
private final Worker worker; private final Worker worker;
private final PathChildrenCache statusCache; private final PathChildrenCache statusCache;
private final Function<ChildData, String> cacheConverter; private final Function<ChildData, TaskStatus> cacheConverter;
private volatile DateTime lastCompletedTaskTime = new DateTime(); private volatile DateTime lastCompletedTaskTime = new DateTime();
@ -52,13 +54,13 @@ public class ZkWorker implements Closeable
{ {
this.worker = worker; this.worker = worker;
this.statusCache = statusCache; this.statusCache = statusCache;
this.cacheConverter = new Function<ChildData, String>() this.cacheConverter = new Function<ChildData, TaskStatus>()
{ {
@Override @Override
public String apply(@Nullable ChildData input) public TaskStatus apply(ChildData input)
{ {
try { try {
return jsonMapper.readValue(input.getData(), TaskStatus.class).getId(); return jsonMapper.readValue(input.getData(), TaskStatus.class);
} }
catch (Exception e) { catch (Exception e) {
throw Throwables.propagate(e); throw Throwables.propagate(e);
@ -67,6 +69,16 @@ public class ZkWorker implements Closeable
}; };
} }
public void start(PathChildrenCache.StartMode startMode) throws Exception
{
statusCache.start(startMode);
}
public void addListener(PathChildrenCacheListener listener)
{
statusCache.getListenable().addListener(listener);
}
@JsonProperty @JsonProperty
public Worker getWorker() public Worker getWorker()
{ {
@ -74,14 +86,37 @@ public class ZkWorker implements Closeable
} }
@JsonProperty @JsonProperty
public Set<String> getRunningTasks() public Map<String, TaskStatus> getRunningTasks()
{ {
return Sets.newHashSet( Map<String, TaskStatus> retVal = Maps.newHashMap();
Lists.transform( for (TaskStatus taskStatus : Lists.transform(
statusCache.getCurrentData(), statusCache.getCurrentData(),
cacheConverter cacheConverter
) )) {
); retVal.put(taskStatus.getId(), taskStatus);
}
return retVal;
}
@JsonProperty("currCapacity")
public int getCurrCapacity()
{
int currCapacity = 0;
for (TaskStatus taskStatus : getRunningTasks().values()) {
currCapacity += taskStatus.getResource().getRequiredCapacity();
}
return currCapacity;
}
@JsonProperty("availabilityGroups")
public Set<String> getAvailabilityGroups()
{
Set<String> retVal = Sets.newHashSet();
for (TaskStatus taskStatus : getRunningTasks().values()) {
retVal.add(taskStatus.getResource().getAvailabilityGroup());
}
return retVal;
} }
@JsonProperty @JsonProperty
@ -90,10 +125,20 @@ public class ZkWorker implements Closeable
return lastCompletedTaskTime; return lastCompletedTaskTime;
} }
@JsonProperty public boolean isRunningTask(String taskId)
{
return getRunningTasks().containsKey(taskId);
}
public boolean isAtCapacity() public boolean isAtCapacity()
{ {
return statusCache.getCurrentData().size() >= worker.getCapacity(); return getCurrCapacity() >= worker.getCapacity();
}
public boolean canRunTask(Task task)
{
return (worker.getCapacity() - getCurrCapacity() >= task.getTaskResource().getRequiredCapacity()
&& !getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
} }
public void setLastCompletedTaskTime(DateTime completedTaskTime) public void setLastCompletedTaskTime(DateTime completedTaskTime)

View File

@ -21,6 +21,7 @@ package com.metamx.druid.indexing.coordinator.config;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
import org.skife.config.DefaultNull;
/** /**
*/ */
@ -29,4 +30,8 @@ public abstract class EC2AutoScalingStrategyConfig
@Config("druid.indexer.worker.port") @Config("druid.indexer.worker.port")
@Default("8080") @Default("8080")
public abstract String getWorkerPort(); public abstract String getWorkerPort();
@Config("druid.indexer.worker.version")
@DefaultNull
public abstract String getWorkerVersion();
} }

View File

@ -13,6 +13,7 @@ import java.util.Set;
public abstract class ForkingTaskRunnerConfig public abstract class ForkingTaskRunnerConfig
{ {
@Config("druid.indexer.taskDir") @Config("druid.indexer.taskDir")
@Default("/tmp/persistent")
public abstract File getBaseTaskDir(); public abstract File getBaseTaskDir();
@Config("druid.indexer.fork.java") @Config("druid.indexer.fork.java")

View File

@ -41,7 +41,7 @@ public abstract class IndexerCoordinatorConfig extends ZkPathsConfig
public abstract int getNumLocalThreads(); public abstract int getNumLocalThreads();
@Config("druid.indexer.runner") @Config("druid.indexer.runner")
@Default("remote") @Default("local")
public abstract String getRunnerImpl(); public abstract String getRunnerImpl();
@Config("druid.indexer.storage") @Config("druid.indexer.storage")

View File

@ -31,4 +31,8 @@ public abstract class RemoteTaskRunnerConfig extends IndexerZkConfig
@Config("druid.indexer.taskAssignmentTimeoutDuration") @Config("druid.indexer.taskAssignmentTimeoutDuration")
@Default("PT5M") @Default("PT5M")
public abstract Duration getTaskAssignmentTimeoutDuration(); public abstract Duration getTaskAssignmentTimeoutDuration();
@Config("druid.curator.compression.enable")
@Default("false")
public abstract boolean enableCompression();
} }

View File

@ -27,13 +27,13 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.io.InputSupplier; import com.google.common.io.InputSupplier;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter; import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -46,6 +46,7 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
@ -55,12 +56,10 @@ import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.RedirectFilter; import com.metamx.druid.http.RedirectFilter;
import com.metamx.druid.http.RedirectInfo; import com.metamx.druid.http.RedirectInfo;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory; import com.metamx.druid.indexing.common.actions.LocalTaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.actions.TaskActionToolbox; import com.metamx.druid.indexing.common.actions.TaskActionToolbox;
import com.metamx.druid.indexing.common.config.IndexerZkConfig; import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskLogConfig; import com.metamx.druid.indexing.common.config.TaskLogConfig;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
@ -114,7 +113,6 @@ import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor; import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscovery;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -386,15 +384,17 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
{ {
if (persistentTaskLogs == null) { if (persistentTaskLogs == null) {
final TaskLogConfig taskLogConfig = getConfigFactory().build(TaskLogConfig.class); final TaskLogConfig taskLogConfig = getConfigFactory().build(TaskLogConfig.class);
if (taskLogConfig.getLogStorageBucket() != null) { if (taskLogConfig.getLogType().equalsIgnoreCase("s3")) {
initializeS3Service(); initializeS3Service();
persistentTaskLogs = new S3TaskLogs( persistentTaskLogs = new S3TaskLogs(
taskLogConfig.getLogStorageBucket(), taskLogConfig.getLogStorageBucket(),
taskLogConfig.getLogStoragePrefix(), taskLogConfig.getLogStoragePrefix(),
s3Service s3Service
); );
} else { } else if (taskLogConfig.getLogType().equalsIgnoreCase("noop")) {
persistentTaskLogs = new NoopTaskLogs(); persistentTaskLogs = new NoopTaskLogs();
} else {
throw new IAE("Unknown log type %s", taskLogConfig.getLogType());
} }
} }
} }
@ -634,29 +634,14 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
@Override @Override
public TaskRunner build() public TaskRunner build()
{ {
// Don't use scheduledExecutorFactory, since it's linked to the wrong lifecycle (global lifecycle instead
// of leadership lifecycle)
final ScheduledExecutorService retryScheduledExec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("RemoteRunnerRetryExec--%d")
.build()
);
final CuratorFramework curator = getCuratorFramework(); final CuratorFramework curator = getCuratorFramework();
final RemoteTaskRunnerConfig remoteTaskRunnerConfig = getConfigFactory().build(RemoteTaskRunnerConfig.class);
return new RemoteTaskRunner( return new RemoteTaskRunner(
getJsonMapper(), getJsonMapper(),
getConfigFactory().build(RemoteTaskRunnerConfig.class), remoteTaskRunnerConfig,
curator, curator,
new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true), new SimplePathChildrenCacheFactory.Builder().withCompressed(remoteTaskRunnerConfig.enableCompression())
retryScheduledExec, .build(),
new RetryPolicyFactory(
getConfigFactory().buildWithReplacements(
RetryPolicyConfig.class,
ImmutableMap.of("base_path", "druid.indexing")
)
),
configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class), configManager.watch(WorkerSetupData.CONFIG_KEY, WorkerSetupData.class),
httpClient httpClient
); );
@ -692,7 +677,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{ {
@Override @Override
public ResourceManagementScheduler build(TaskRunner runner) public ResourceManagementScheduler build(RemoteTaskRunner runner)
{ {
return new NoopResourceManagementScheduler(); return new NoopResourceManagementScheduler();
} }
@ -701,7 +686,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory() resourceManagementSchedulerFactory = new ResourceManagementSchedulerFactory()
{ {
@Override @Override
public ResourceManagementScheduler build(TaskRunner runner) public ResourceManagementScheduler build(RemoteTaskRunner runner)
{ {
final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool( final ScheduledExecutorService scalingScheduledExec = Executors.newScheduledThreadPool(
1, 1,

View File

@ -33,6 +33,7 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.indexing.coordinator.setup.EC2NodeData; import com.metamx.druid.indexing.coordinator.setup.EC2NodeData;
import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
@ -72,6 +73,11 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
WorkerSetupData setupData = workerSetupDataRef.get(); WorkerSetupData setupData = workerSetupDataRef.get();
EC2NodeData workerConfig = setupData.getNodeData(); EC2NodeData workerConfig = setupData.getNodeData();
GalaxyUserData userData = setupData.getUserData();
if (config.getWorkerVersion() != null) {
userData = userData.withVersion(config.getWorkerVersion());
}
RunInstancesResult result = amazonEC2Client.runInstances( RunInstancesResult result = amazonEC2Client.runInstances(
new RunInstancesRequest( new RunInstancesRequest(
workerConfig.getAmiId(), workerConfig.getAmiId(),
@ -84,7 +90,7 @@ public class EC2AutoScalingStrategy implements AutoScalingStrategy<Instance>
.withUserData( .withUserData(
Base64.encodeBase64String( Base64.encodeBase64String(
jsonMapper.writeValueAsBytes( jsonMapper.writeValueAsBytes(
setupData.getUserData() userData
) )
) )
) )

View File

@ -24,6 +24,7 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.PeriodGranularity; import com.metamx.druid.PeriodGranularity;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
import com.metamx.druid.indexing.coordinator.TaskRunner; import com.metamx.druid.indexing.coordinator.TaskRunner;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Duration; import org.joda.time.Duration;
@ -42,7 +43,7 @@ public class ResourceManagementScheduler
{ {
private static final Logger log = new Logger(ResourceManagementScheduler.class); private static final Logger log = new Logger(ResourceManagementScheduler.class);
private final TaskRunner taskRunner; private final RemoteTaskRunner taskRunner;
private final ResourceManagementStrategy resourceManagementStrategy; private final ResourceManagementStrategy resourceManagementStrategy;
private final ResourceManagementSchedulerConfig config; private final ResourceManagementSchedulerConfig config;
private final ScheduledExecutorService exec; private final ScheduledExecutorService exec;
@ -51,7 +52,7 @@ public class ResourceManagementScheduler
private volatile boolean started = false; private volatile boolean started = false;
public ResourceManagementScheduler( public ResourceManagementScheduler(
TaskRunner taskRunner, RemoteTaskRunner taskRunner,
ResourceManagementStrategy resourceManagementStrategy, ResourceManagementStrategy resourceManagementStrategy,
ResourceManagementSchedulerConfig config, ResourceManagementSchedulerConfig config,
ScheduledExecutorService exec ScheduledExecutorService exec

View File

@ -19,11 +19,11 @@
package com.metamx.druid.indexing.coordinator.scaling; package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.druid.indexing.coordinator.TaskRunner; import com.metamx.druid.indexing.coordinator.RemoteTaskRunner;
/** /**
*/ */
public interface ResourceManagementSchedulerFactory public interface ResourceManagementSchedulerFactory
{ {
public ResourceManagementScheduler build(TaskRunner runner); public ResourceManagementScheduler build(RemoteTaskRunner runner);
} }

View File

@ -19,7 +19,7 @@
package com.metamx.druid.indexing.coordinator.scaling; package com.metamx.druid.indexing.coordinator.scaling;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.ZkWorker;
import java.util.Collection; import java.util.Collection;
@ -30,9 +30,9 @@ import java.util.Collection;
*/ */
public interface ResourceManagementStrategy public interface ResourceManagementStrategy
{ {
public boolean doProvision(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers); public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public boolean doTerminate(Collection<TaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers); public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> runningTasks, Collection<ZkWorker> zkWorkers);
public ScalingStats getStats(); public ScalingStats getStats();
} }

View File

@ -25,6 +25,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
@ -68,7 +69,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
@Override @Override
public boolean doProvision(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doProvision(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) { if (zkWorkers.size() >= workerSetupdDataRef.get().getMaxNumWorkers()) {
log.info( log.info(
@ -135,7 +136,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
} }
@Override @Override
public boolean doTerminate(Collection<TaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers) public boolean doTerminate(Collection<RemoteTaskRunnerWorkItem> pendingTasks, Collection<ZkWorker> zkWorkers)
{ {
Set<String> workerNodeIds = Sets.newHashSet( Set<String> workerNodeIds = Sets.newHashSet(
autoScalingStrategy.ipToIdLookup( autoScalingStrategy.ipToIdLookup(
@ -244,7 +245,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat
return scalingStats; return scalingStats;
} }
private boolean hasTaskPendingBeyondThreshold(Collection<TaskRunnerWorkItem> pendingTasks) private boolean hasTaskPendingBeyondThreshold(Collection<RemoteTaskRunnerWorkItem> pendingTasks)
{ {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
for (TaskRunnerWorkItem pendingTask : pendingTasks) { for (TaskRunnerWorkItem pendingTask : pendingTasks) {

View File

@ -60,6 +60,11 @@ public class GalaxyUserData
return type; return type;
} }
public GalaxyUserData withVersion(String ver)
{
return new GalaxyUserData(env, ver, type);
}
@Override @Override
public String toString() public String toString()
{ {

View File

@ -130,6 +130,7 @@ public class WorkerCuratorCoordinator
} }
curatorFramework.create() curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(mode) .withMode(mode)
.forPath(path, rawBytes); .forPath(path, rawBytes);
} }

View File

@ -29,10 +29,6 @@ public abstract class WorkerConfig
@Config("druid.host") @Config("druid.host")
public abstract String getHost(); public abstract String getHost();
@Config("druid.worker.threads")
@Default("1")
public abstract int getNumThreads();
@Config("druid.worker.ip") @Config("druid.worker.ip")
public abstract String getIp(); public abstract String getIp();

View File

@ -4,7 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.druid.indexing.common.index.ChatHandler; import com.metamx.druid.indexing.common.index.ChatHandler;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider; import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam; import javax.ws.rs.PathParam;
@ -14,10 +14,10 @@ import javax.ws.rs.core.Response;
public class ChatHandlerResource public class ChatHandlerResource
{ {
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ChatHandlerProvider handlers; private final EventReceivingChatHandlerProvider handlers;
@Inject @Inject
public ChatHandlerResource(ObjectMapper jsonMapper, ChatHandlerProvider handlers) public ChatHandlerResource(ObjectMapper jsonMapper, EventReceivingChatHandlerProvider handlers)
{ {
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.handlers = handlers; this.handlers = handlers;

View File

@ -37,7 +37,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.GuiceServletConfig;
@ -50,6 +49,8 @@ import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider; import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.EventReceivingChatHandlerProvider;
import com.metamx.druid.indexing.common.index.NoopChatHandlerProvider;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
@ -385,19 +386,17 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{ {
if (chatHandlerProvider == null) { if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class); final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.getServiceFormat() == null) { if (config.getServiceFormat() == null) {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!"); log.info("ChatHandlerProvider: Using NoopChatHandlerProvider. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer(); this.chatHandlerProvider = new NoopChatHandlerProvider();
} else { } else {
myServiceAnnouncer = serviceAnnouncer; this.chatHandlerProvider = new EventReceivingChatHandlerProvider(
}
this.chatHandlerProvider = new ChatHandlerProvider(
config, config,
myServiceAnnouncer serviceAnnouncer
); );
} }
} }
}
public static class Builder public static class Builder
{ {
@ -437,9 +436,12 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
jsonMapper = new DefaultObjectMapper(); jsonMapper = new DefaultObjectMapper();
smileMapper = new DefaultObjectMapper(new SmileFactory()); smileMapper = new DefaultObjectMapper(new SmileFactory());
smileMapper.getJsonFactory().setCodec(smileMapper); smileMapper.getJsonFactory().setCodec(smileMapper);
} } else if (jsonMapper == null || smileMapper == null) {
else if (jsonMapper == null || smileMapper == null) { throw new ISE(
throw new ISE("Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.", jsonMapper, smileMapper); "Only jsonMapper[%s] or smileMapper[%s] was set, must set neither or both.",
jsonMapper,
smileMapper
);
} }
if (lifecycle == null) { if (lifecycle == null) {
@ -454,7 +456,15 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
configFactory = Config.createFactory(props); configFactory = Config.createFactory(props);
} }
return new ExecutorNode(nodeType, props, lifecycle, jsonMapper, smileMapper, configFactory, executorLifecycleFactory); return new ExecutorNode(
nodeType,
props,
lifecycle,
jsonMapper,
smileMapper,
configFactory,
executorLifecycleFactory
);
} }
} }
} }

View File

@ -394,7 +394,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
public void initializeWorkerTaskMonitor() public void initializeWorkerTaskMonitor()
{ {
if (workerTaskMonitor == null) { if (workerTaskMonitor == null) {
final ExecutorService workerExec = Executors.newFixedThreadPool(workerConfig.getNumThreads()); final ExecutorService workerExec = Executors.newFixedThreadPool(workerConfig.getCapacity());
final CuratorFramework curatorFramework = getCuratorFramework(); final CuratorFramework curatorFramework = getCuratorFramework();
final PathChildrenCache pathChildrenCache = new PathChildrenCache( final PathChildrenCache pathChildrenCache = new PathChildrenCache(

View File

@ -30,6 +30,7 @@ import com.metamx.druid.indexing.common.actions.LockListAction;
import com.metamx.druid.indexing.common.actions.LockReleaseAction; import com.metamx.druid.indexing.common.actions.LockReleaseAction;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction; import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.indexing.common.task.AbstractTask; import com.metamx.druid.indexing.common.task.AbstractTask;
import com.metamx.druid.indexing.common.task.TaskResource;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert; import org.junit.Assert;
@ -41,12 +42,12 @@ public class RealtimeishTask extends AbstractTask
{ {
public RealtimeishTask() public RealtimeishTask()
{ {
super("rt1", "rt", "rt1", "foo", null); super("rt1", "rt", new TaskResource("rt1", 1), "foo", null);
} }
public RealtimeishTask(String id, String groupId, String availGroup, String dataSource, Interval interval) public RealtimeishTask(String id, String groupId, TaskResource taskResource, String dataSource, Interval interval)
{ {
super(id, groupId, availGroup, dataSource, interval); super(id, groupId, taskResource, dataSource, interval);
} }
@Override @Override

View File

@ -2,22 +2,26 @@ package com.metamx.druid.indexing.coordinator;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables; import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures; import com.google.common.io.Files;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.cache.SimplePathChildrenCacheFactory;
import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolboxFactory; import com.metamx.druid.indexing.common.TaskToolboxFactory;
import com.metamx.druid.indexing.common.config.IndexerZkConfig; import com.metamx.druid.indexing.common.config.IndexerZkConfig;
import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig; import com.metamx.druid.indexing.coordinator.config.RemoteTaskRunnerConfig;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.indexing.worker.Worker;
@ -26,7 +30,6 @@ import com.metamx.druid.indexing.worker.WorkerTaskMonitor;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
@ -42,35 +45,34 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static junit.framework.Assert.fail;
/** /**
*/ */
public class RemoteTaskRunnerTest public class RemoteTaskRunnerTest
{ {
private static final ObjectMapper jsonMapper = new DefaultObjectMapper(); private static final ObjectMapper jsonMapper = new DefaultObjectMapper();
private static final String basePath = "/test/druid/indexer"; private static final Joiner joiner = Joiner.on("/");
private static final String announcementsPath = String.format("%s/announcements", basePath); private static final String basePath = "/test/druid";
private static final String tasksPath = String.format("%s/tasks", basePath); private static final String announcementsPath = String.format("%s/indexer/announcements/worker", basePath);
private static final String statusPath = String.format("%s/status", basePath); private static final String tasksPath = String.format("%s/indexer/tasks/worker", basePath);
private static final String statusPath = String.format("%s/indexer/status/worker", basePath);
private TestingCluster testingCluster; private TestingCluster testingCluster;
private CuratorFramework cf; private CuratorFramework cf;
private PathChildrenCache pathChildrenCache;
private RemoteTaskRunner remoteTaskRunner; private RemoteTaskRunner remoteTaskRunner;
private WorkerCuratorCoordinator workerCuratorCoordinator;
private WorkerTaskMonitor workerTaskMonitor; private WorkerTaskMonitor workerTaskMonitor;
private ScheduledExecutorService scheduledExec; private TestTask task;
private TestTask task1; private Worker worker;
private Worker worker1;
@Before @Before
public void setUp() throws Exception public void setUp() throws Exception
@ -84,78 +86,38 @@ public class RemoteTaskRunnerTest
.compressionProvider(new PotentiallyGzippedCompressionProvider(false)) .compressionProvider(new PotentiallyGzippedCompressionProvider(false))
.build(); .build();
cf.start(); cf.start();
cf.create().creatingParentsIfNeeded().forPath(basePath);
cf.create().creatingParentsIfNeeded().forPath(announcementsPath); task = makeTask(TaskStatus.success("task"));
cf.create().forPath(tasksPath);
cf.create().forPath(String.format("%s/worker1", tasksPath));
cf.create().forPath(statusPath);
cf.create().forPath(String.format("%s/worker1", statusPath));
pathChildrenCache = new PathChildrenCache(cf, announcementsPath, true);
worker1 = new Worker(
"worker1",
"localhost",
3,
"0"
);
task1 = new TestTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.success("task1")
);
makeRemoteTaskRunner();
makeTaskMonitor();
} }
@After @After
public void tearDown() throws Exception public void tearDown() throws Exception
{ {
testingCluster.stop();
remoteTaskRunner.stop(); remoteTaskRunner.stop();
workerCuratorCoordinator.stop();
workerTaskMonitor.stop(); workerTaskMonitor.stop();
cf.close();
testingCluster.stop();
} }
@Test @Test
public void testRunNoExistingTask() throws Exception public void testRunNoExistingTask() throws Exception
{ {
remoteTaskRunner.run(task1); doSetup();
remoteTaskRunner.run(task);
} }
@Test @Test(expected = ISE.class)
public void testExceptionThrownWithExistingTask() throws Exception public void testExceptionThrownWithExistingTask() throws Exception
{ {
doSetup();
remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
remoteTaskRunner.run( remoteTaskRunner.run(
new TestTask( makeTask(TaskStatus.running("task"))
task1.getId(),
task1.getDataSource(),
task1.getSegments(),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.running(task1.getId())
)
); );
try {
remoteTaskRunner.run(task1);
fail("ISE expected");
}
catch (ISE expected) {
}
} }
@Test @Test
@ -164,107 +126,36 @@ public class RemoteTaskRunnerTest
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter); EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter); EasyMock.replay(emitter);
remoteTaskRunner.run(
new TestTask( doSetup();
new String(new char[5000]),
"dummyDs", remoteTaskRunner.run(makeTask(TaskStatus.success(new String(new char[5000]))));
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.success("foo")
)
);
EasyMock.verify(emitter); EasyMock.verify(emitter);
} }
@Test
public void testRunWithCallback() throws Exception
{
final MutableBoolean callbackCalled = new MutableBoolean(false);
Futures.addCallback(
remoteTaskRunner.run(
new TestTask(
task1.getId(),
task1.getDataSource(),
task1.getSegments(),
Lists.<AggregatorFactory>newArrayList(),
TaskStatus.running(task1.getId())
)
), new FutureCallback<TaskStatus>()
{
@Override
public void onSuccess(TaskStatus taskStatus)
{
callbackCalled.setValue(true);
}
@Override
public void onFailure(Throwable throwable)
{
// neg
}
}
);
// Really don't like this way of waiting for the task to appear
int count = 0;
while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) == null) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Task still not announced in ZK?");
}
count++;
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 1);
// Complete the task
cf.setData().forPath(
String.format("%s/worker1/task1", statusPath),
jsonMapper.writeValueAsBytes(TaskStatus.success(task1.getId()))
);
// Really don't like this way of waiting for the task to disappear
count = 0;
while (remoteTaskRunner.findWorkerRunningTask(task1.getId()) != null) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Task still exists in ZK?");
}
count++;
}
Assert.assertTrue("TaskCallback was not called!", callbackCalled.booleanValue());
}
@Test @Test
public void testRunSameAvailabilityGroup() throws Exception public void testRunSameAvailabilityGroup() throws Exception
{ {
TestRealtimeTask theTask = new TestRealtimeTask("rt1", "rt1", "foo", TaskStatus.running("rt1")); doSetup();
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask); remoteTaskRunner.run(theTask);
remoteTaskRunner.run( remoteTaskRunner.run(
new TestRealtimeTask("rt2", "rt1", "foo", TaskStatus.running("rt2")) new TestRealtimeTask("rt2", new TaskResource("rt1", 1), "foo", TaskStatus.running("rt2"))
); );
remoteTaskRunner.run( remoteTaskRunner.run(
new TestRealtimeTask("rt3", "rt2", "foo", TaskStatus.running("rt3")) new TestRealtimeTask("rt3", new TaskResource("rt2", 1), "foo", TaskStatus.running("rt3"))
); );
Stopwatch stopwatch = new Stopwatch(); Stopwatch stopwatch = new Stopwatch();
stopwatch.start(); stopwatch.start();
while (remoteTaskRunner.getRunningTasks().isEmpty()) { while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100); Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) { if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task"); throw new ISE("Cannot find running task");
@ -276,35 +167,148 @@ public class RemoteTaskRunnerTest
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2")); Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
} }
@Test
public void testRunWithCapacity() throws Exception
{
doSetup();
TestRealtimeTask theTask = new TestRealtimeTask(
"rt1",
new TaskResource("rt1", 1),
"foo",
TaskStatus.running("rt1")
);
remoteTaskRunner.run(theTask);
remoteTaskRunner.run(
new TestRealtimeTask("rt2", new TaskResource("rt2", 3), "foo", TaskStatus.running("rt2"))
);
remoteTaskRunner.run(
new TestRealtimeTask("rt3", new TaskResource("rt3", 2), "foo", TaskStatus.running("rt3"))
);
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (remoteTaskRunner.getRunningTasks().size() < 2) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().size() == 2);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().size() == 1);
Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTask().getId().equals("rt2"));
}
@Test
public void testFailure() throws Exception
{
doSetup();
ListenableFuture<TaskStatus> future = remoteTaskRunner.run(makeTask(TaskStatus.running("task")));
final String taskStatus = joiner.join(statusPath, "task");
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (cf.checkExists().forPath(taskStatus) == null) {
Thread.sleep(100);
if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > 1000) {
throw new ISE("Cannot find running task");
}
}
Assert.assertTrue(remoteTaskRunner.getRunningTasks().iterator().next().getTask().getId().equals("task"));
cf.delete().forPath(taskStatus);
TaskStatus status = future.get();
Assert.assertEquals(status.getStatusCode(), TaskStatus.Status.FAILED);
}
@Test
public void testBootstrap() throws Exception
{
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, "first"), jsonMapper.writeValueAsBytes(TaskStatus.running("first")));
cf.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath(joiner.join(statusPath, "second"), jsonMapper.writeValueAsBytes(TaskStatus.running("second")));
doSetup();
Set<String> existingTasks = Sets.newHashSet();
for (ZkWorker zkWorker : remoteTaskRunner.getWorkers()) {
existingTasks.addAll(zkWorker.getRunningTasks().keySet());
}
Assert.assertTrue(existingTasks.size() == 2);
Assert.assertTrue(existingTasks.contains("first"));
Assert.assertTrue(existingTasks.contains("second"));
remoteTaskRunner.bootstrap(Arrays.<Task>asList(makeTask(TaskStatus.running("second"))));
Set<String> runningTasks = Sets.newHashSet(
Iterables.transform(
remoteTaskRunner.getRunningTasks(),
new Function<RemoteTaskRunnerWorkItem, String>()
{
@Override
public String apply(RemoteTaskRunnerWorkItem input)
{
return input.getTask().getId();
}
}
)
);
Assert.assertTrue(runningTasks.size() == 1);
Assert.assertTrue(runningTasks.contains("second"));
Assert.assertFalse(runningTasks.contains("first"));
}
private void doSetup() throws Exception
{
makeWorker();
makeRemoteTaskRunner();
makeTaskMonitor();
}
private TestTask makeTask(TaskStatus status)
{
return new TestTask(
status.getId(),
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0,
0
)
),
Lists.<AggregatorFactory>newArrayList(),
status
);
}
private void makeTaskMonitor() throws Exception private void makeTaskMonitor() throws Exception
{ {
WorkerCuratorCoordinator workerCuratorCoordinator = new WorkerCuratorCoordinator( workerCuratorCoordinator = new WorkerCuratorCoordinator(
jsonMapper, jsonMapper,
new IndexerZkConfig() new IndexerZkConfig()
{ {
@Override
public String getIndexerAnnouncementPath()
{
return announcementsPath;
}
@Override
public String getIndexerTaskPath()
{
return tasksPath;
}
@Override
public String getIndexerStatusPath()
{
return statusPath;
}
@Override @Override
public String getZkBasePath() public String getZkBasePath()
{ {
throw new UnsupportedOperationException(); return basePath;
} }
@Override @Override
@ -314,13 +318,14 @@ public class RemoteTaskRunnerTest
} }
}, },
cf, cf,
worker1 worker
); );
workerCuratorCoordinator.start(); workerCuratorCoordinator.start();
// Start a task monitor
workerTaskMonitor = new WorkerTaskMonitor( workerTaskMonitor = new WorkerTaskMonitor(
jsonMapper, jsonMapper,
new PathChildrenCache(cf, String.format("%s/worker1", tasksPath), true), new PathChildrenCache(cf, tasksPath, true),
cf, cf,
workerCuratorCoordinator, workerCuratorCoordinator,
new ThreadPoolTaskRunner( new ThreadPoolTaskRunner(
@ -328,14 +333,11 @@ public class RemoteTaskRunnerTest
new TaskConfig() new TaskConfig()
{ {
@Override @Override
public File getBaseTaskDir() public String getBaseDir()
{ {
try { File tmp = Files.createTempDir();
return File.createTempFile("billy", "yay"); tmp.deleteOnExit();
} return tmp.toString();
catch (Exception e) {
throw Throwables.propagate(e);
}
} }
@Override @Override
@ -361,80 +363,45 @@ public class RemoteTaskRunnerTest
private void makeRemoteTaskRunner() throws Exception private void makeRemoteTaskRunner() throws Exception
{ {
scheduledExec = EasyMock.createMock(ScheduledExecutorService.class);
remoteTaskRunner = new RemoteTaskRunner( remoteTaskRunner = new RemoteTaskRunner(
jsonMapper, jsonMapper,
new TestRemoteTaskRunnerConfig(), new TestRemoteTaskRunnerConfig(),
cf, cf,
pathChildrenCache, new SimplePathChildrenCacheFactory.Builder().build(),
scheduledExec,
new RetryPolicyFactory(new TestRetryPolicyConfig()),
new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)), new AtomicReference<WorkerSetupData>(new WorkerSetupData("0", 0, 1, null, null)),
null null
); );
// Create a single worker and wait for things for be ready
remoteTaskRunner.start(); remoteTaskRunner.start();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( }
String.format("%s/worker1", announcementsPath),
jsonMapper.writeValueAsBytes(worker1) private void makeWorker() throws Exception
{
worker = new Worker(
"worker",
"localhost",
3,
"0"
); );
int count = 0;
while (remoteTaskRunner.getWorkers().size() == 0) {
Thread.sleep(500);
if (count > 10) {
throw new ISE("WTF?! Still can't find worker!");
}
count++;
}
}
private static class TestRetryPolicyConfig extends RetryPolicyConfig cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
{ announcementsPath,
@Override jsonMapper.writeValueAsBytes(worker)
public Duration getRetryMinDuration() );
{
return null;
}
@Override
public Duration getRetryMaxDuration()
{
return null;
}
@Override
public long getMaxRetryCount()
{
return 0;
}
} }
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{ {
@Override @Override
public String getIndexerAnnouncementPath() public boolean enableCompression()
{ {
return announcementsPath; return false;
}
@Override
public String getIndexerTaskPath()
{
return tasksPath;
}
@Override
public String getIndexerStatusPath()
{
return statusPath;
} }
@Override @Override
public String getZkBasePath() public String getZkBasePath()
{ {
throw new UnsupportedOperationException(); return basePath;
} }
@Override @Override

View File

@ -36,6 +36,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
@ -120,9 +121,9 @@ public class TaskLifecycleTest
new TaskConfig() new TaskConfig()
{ {
@Override @Override
public File getBaseTaskDir() public String getBaseDir()
{ {
return tmp; return tmp.toString();
} }
@Override @Override
@ -285,7 +286,7 @@ public class TaskLifecycleTest
@Test @Test
public void testSimple() throws Exception public void testSimple() throws Exception
{ {
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) final Task task = new AbstractTask("id1", "id1", new TaskResource("id1", 1), "ds", new Interval("2012-01-01/P1D"))
{ {
@Override @Override
public String getType() public String getType()
@ -322,7 +323,7 @@ public class TaskLifecycleTest
@Test @Test
public void testBadInterval() throws Exception public void testBadInterval() throws Exception
{ {
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{ {
@Override @Override
public String getType() public String getType()
@ -356,7 +357,7 @@ public class TaskLifecycleTest
@Test @Test
public void testBadVersion() throws Exception public void testBadVersion() throws Exception
{ {
final Task task = new AbstractTask("id1", "id1", "id1", "ds", new Interval("2012-01-01/P1D")) final Task task = new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{ {
@Override @Override
public String getType() public String getType()

View File

@ -348,7 +348,7 @@ public class TaskQueueTest
private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval) private static Task newTask(final String id, final String groupId, final String dataSource, final Interval interval)
{ {
return new AbstractTask(id, groupId, id, dataSource, interval) return new AbstractTask(id, groupId, dataSource, interval)
{ {
@Override @Override
public TaskStatus run(TaskToolbox toolbox) throws Exception public TaskStatus run(TaskToolbox toolbox) throws Exception
@ -372,7 +372,7 @@ public class TaskQueueTest
final List<Task> nextTasks final List<Task> nextTasks
) )
{ {
return new AbstractTask(id, groupId, id, dataSource, interval) return new AbstractTask(id, groupId, dataSource, interval)
{ {
@Override @Override
public String getType() public String getType()

View File

@ -27,6 +27,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox; import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.task.RealtimeIndexTask; import com.metamx.druid.indexing.common.task.RealtimeIndexTask;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
@ -40,14 +41,14 @@ public class TestRealtimeTask extends RealtimeIndexTask
@JsonCreator @JsonCreator
public TestRealtimeTask( public TestRealtimeTask(
@JsonProperty("id") String id, @JsonProperty("id") String id,
@JsonProperty("availabilityGroup") String availGroup, @JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSource") String dataSource, @JsonProperty("dataSource") String dataSource,
@JsonProperty("taskStatus") TaskStatus status @JsonProperty("taskStatus") TaskStatus status
) )
{ {
super( super(
id, id,
availGroup, taskResource,
new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()), new Schema(dataSource, null, new AggregatorFactory[]{}, QueryGranularity.NONE, new NoneShardSpec()),
null, null,
null, null,

View File

@ -84,6 +84,12 @@ public class EC2AutoScalingStrategyTest
{ {
return "8080"; return "8080";
} }
@Override
public String getWorkerVersion()
{
return "";
}
}, },
workerSetupData workerSetupData
); );

View File

@ -19,17 +19,20 @@
package com.metamx.druid.indexing.coordinator.scaling; package com.metamx.druid.indexing.coordinator.scaling;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.TestTask; import com.metamx.druid.indexing.TestTask;
import com.metamx.druid.indexing.common.TaskStatus; import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.TaskRunnerWorkItem; import com.metamx.druid.indexing.coordinator.RemoteTaskRunnerWorkItem;
import com.metamx.druid.indexing.coordinator.ZkWorker; import com.metamx.druid.indexing.coordinator.ZkWorker;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData; import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.indexing.worker.Worker; import com.metamx.druid.indexing.worker.Worker;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder; import com.metamx.emitter.service.ServiceEventBuilder;
@ -42,7 +45,9 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -127,8 +132,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -155,8 +160,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -171,8 +176,8 @@ public class SimpleResourceManagementStrategyTest
); );
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -212,8 +217,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( boolean provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -230,8 +235,8 @@ public class SimpleResourceManagementStrategyTest
Thread.sleep(2000); Thread.sleep(2000);
provisionedSomething = simpleResourceManagementStrategy.doProvision( provisionedSomething = simpleResourceManagementStrategy.doProvision(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(testTask) new TestZkWorker(testTask)
@ -264,8 +269,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -294,8 +299,8 @@ public class SimpleResourceManagementStrategyTest
EasyMock.replay(autoScalingStrategy); EasyMock.replay(autoScalingStrategy);
boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate( boolean terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -309,8 +314,8 @@ public class SimpleResourceManagementStrategyTest
); );
terminatedSomething = simpleResourceManagementStrategy.doTerminate( terminatedSomething = simpleResourceManagementStrategy.doTerminate(
Arrays.<TaskRunnerWorkItem>asList( Arrays.<RemoteTaskRunnerWorkItem>asList(
new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) new RemoteTaskRunnerWorkItem(testTask, null).withQueueInsertionTime(new DateTime())
), ),
Arrays.<ZkWorker>asList( Arrays.<ZkWorker>asList(
new TestZkWorker(null) new TestZkWorker(null)
@ -334,18 +339,18 @@ public class SimpleResourceManagementStrategyTest
Task testTask Task testTask
) )
{ {
super(new Worker("host", "ip", 3, "version"), null, null); super(new Worker("host", "ip", 3, "version"), null, new DefaultObjectMapper());
this.testTask = testTask; this.testTask = testTask;
} }
@Override @Override
public Set<String> getRunningTasks() public Map<String, TaskStatus> getRunningTasks()
{ {
if (testTask == null) { if (testTask == null) {
return Sets.newHashSet(); return Maps.newHashMap();
} }
return Sets.newHashSet(testTask.getId()); return ImmutableMap.of(testTask.getId(), TaskStatus.running(testTask.getId()));
} }
} }
} }

View File

@ -22,7 +22,8 @@ package com.metamx.druid.metrics;
import com.metamx.emitter.core.Event; import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
public class NoopServiceEmitter extends ServiceEmitter public class
NoopServiceEmitter extends ServiceEmitter
{ {
public NoopServiceEmitter() public NoopServiceEmitter()
{ {