Merge pull request #134 from metamx/event-push

Event push
This commit is contained in:
cheddar 2013-05-06 15:10:44 -07:00
commit 01c1257cee
50 changed files with 1116 additions and 186 deletions

View File

@ -0,0 +1,27 @@
package com.metamx.druid.curator.discovery;
import com.google.common.base.Throwables;
import org.apache.curator.x.discovery.ServiceInstance;
public class AddressPortServiceInstanceFactory implements ServiceInstanceFactory<Void>
{
private final String address;
private final int port;
public AddressPortServiceInstanceFactory(String address, int port)
{
this.address = address;
this.port = port;
}
@Override
public ServiceInstance<Void> create(String service)
{
try {
return ServiceInstance.<Void>builder().name(service).address(address).port(port).build();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -0,0 +1,81 @@
package com.metamx.druid.curator.discovery;
import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceInstance;
import java.util.Map;
/**
* Uses the Curator Service Discovery recipe to announce services.
*/
public class CuratorServiceAnnouncer<T> implements ServiceAnnouncer
{
private static final Logger log = new Logger(CuratorServiceAnnouncer.class);
private final ServiceDiscovery<T> discovery;
private final ServiceInstanceFactory<T> instanceFactory;
private final Map<String, ServiceInstance<T>> instanceMap = Maps.newHashMap();
private final Object monitor = new Object();
public CuratorServiceAnnouncer(
ServiceDiscovery<T> discovery,
ServiceInstanceFactory<T> instanceFactory
)
{
this.discovery = discovery;
this.instanceFactory = instanceFactory;
}
@Override
public void announce(String service) throws Exception
{
final ServiceInstance<T> instance;
synchronized (monitor) {
if (instanceMap.containsKey(service)) {
log.warn("Ignoring request to announce service[%s]", service);
return;
} else {
instance = instanceFactory.create(service);
instanceMap.put(service, instance);
}
}
try {
log.info("Announcing service[%s]", service);
discovery.registerService(instance);
} catch (Exception e) {
log.warn("Failed to announce service[%s]", service);
synchronized (monitor) {
instanceMap.remove(service);
}
}
}
@Override
public void unannounce(String service) throws Exception
{
final ServiceInstance<T> instance;
synchronized (monitor) {
instance = instanceMap.get(service);
if (instance == null) {
log.warn("Ignoring request to unannounce service[%s]", service);
return;
}
}
log.info("Unannouncing service[%s]", service);
try {
discovery.unregisterService(instance);
} catch (Exception e) {
log.warn(e, "Failed to unannounce service[%s]", service);
} finally {
synchronized (monitor) {
instanceMap.remove(service);
}
}
}
}

View File

@ -0,0 +1,19 @@
package com.metamx.druid.curator.discovery;
/**
* Does nothing.
*/
public class NoopServiceAnnouncer implements ServiceAnnouncer
{
@Override
public void unannounce(String service)
{
}
@Override
public void announce(String service)
{
}
}

View File

@ -0,0 +1,11 @@
package com.metamx.druid.curator.discovery;
/**
* Announces our ability to serve a particular function. Multiple users may announce the same service, in which
* case they are treated as interchangeable instances of that service.
*/
public interface ServiceAnnouncer
{
public void announce(String service) throws Exception;
public void unannounce(String service) throws Exception;
}

View File

@ -0,0 +1,8 @@
package com.metamx.druid.curator.discovery;
import org.apache.curator.x.discovery.ServiceInstance;
public interface ServiceInstanceFactory<T>
{
public ServiceInstance<T> create(String service);
}

View File

@ -42,6 +42,8 @@ import com.metamx.druid.client.cache.MapCache;
import com.metamx.druid.client.cache.MapCacheConfig; import com.metamx.druid.client.cache.MapCacheConfig;
import com.metamx.druid.client.cache.MemcachedCache; import com.metamx.druid.client.cache.MemcachedCache;
import com.metamx.druid.client.cache.MemcachedCacheConfig; import com.metamx.druid.client.cache.MemcachedCacheConfig;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
@ -225,15 +227,17 @@ public class BrokerNode extends QueryableNode<BrokerNode>
{ {
if (useDiscovery) { if (useDiscovery) {
final Lifecycle lifecycle = getLifecycle(); final Lifecycle lifecycle = getLifecycle();
final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig serviceDiscoveryConfig = getConfigFactory().build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework( final CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig, lifecycle serviceDiscoveryConfig, lifecycle
); );
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryConfig, lifecycle curatorFramework, serviceDiscoveryConfig, lifecycle
); );
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
} }
} }

View File

@ -28,6 +28,10 @@ import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider; import com.metamx.druid.curator.PotentiallyGzippedCompressionProvider;
import com.metamx.druid.curator.discovery.AddressPortServiceInstanceFactory;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.EmittingRequestLogger; import com.metamx.druid.http.EmittingRequestLogger;
import com.metamx.druid.http.FileRequestLogger; import com.metamx.druid.http.FileRequestLogger;
import com.metamx.druid.http.RequestLogger; import com.metamx.druid.http.RequestLogger;
@ -224,17 +228,10 @@ public class Initialization
) )
throws Exception throws Exception
{ {
final ServiceInstance serviceInstance =
ServiceInstance.builder()
.name(config.getServiceName().replace('/', ':'))
.address(addressFromHost(config.getHost()))
.port(config.getPort())
.build();
final ServiceDiscovery serviceDiscovery = final ServiceDiscovery serviceDiscovery =
ServiceDiscoveryBuilder.builder(Void.class) ServiceDiscoveryBuilder.builder(Void.class)
.basePath(config.getDiscoveryPath()) .basePath(config.getDiscoveryPath())
.client(discoveryClient) .client(discoveryClient)
.thisInstance(serviceInstance)
.build(); .build();
lifecycle.addHandler( lifecycle.addHandler(
@ -262,6 +259,46 @@ public class Initialization
return serviceDiscovery; return serviceDiscovery;
} }
public static ServiceAnnouncer makeServiceAnnouncer(
ServiceDiscoveryConfig config,
ServiceDiscovery serviceDiscovery
)
{
final ServiceInstanceFactory serviceInstanceFactory = makeServiceInstanceFactory(config);
return new CuratorServiceAnnouncer(serviceDiscovery, serviceInstanceFactory);
}
public static void announceDefaultService(
final ServiceDiscoveryConfig config,
final ServiceAnnouncer serviceAnnouncer,
final Lifecycle lifecycle
) throws Exception
{
final String service = config.getServiceName().replace('/', ':');
lifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
serviceAnnouncer.announce(service);
}
@Override
public void stop()
{
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unannouce default service[%s]", service);
}
}
}
);
}
public static ServiceProvider makeServiceProvider( public static ServiceProvider makeServiceProvider(
String serviceName, String serviceName,
ServiceDiscovery serviceDiscovery, ServiceDiscovery serviceDiscovery,
@ -320,13 +357,17 @@ public class Initialization
); );
} }
public static String addressFromHost(final String host) public static ServiceInstanceFactory<Void> makeServiceInstanceFactory(ServiceDiscoveryConfig config)
{ {
final String host = config.getHost();
final String address;
final int colon = host.indexOf(':'); final int colon = host.indexOf(':');
if (colon < 0) { if (colon < 0) {
return host; address = host;
} else { } else {
return host.substring(0, colon); address = host.substring(0, colon);
} }
return new AddressPortServiceInstanceFactory(address, config.getPort());
} }
} }

View File

@ -19,6 +19,7 @@
package com.metamx.druid.initialization; package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config; import org.skife.config.Config;
public abstract class ZkPathsConfig public abstract class ZkPathsConfig
@ -56,7 +57,32 @@ public abstract class ZkPathsConfig
return defaultPath("master"); return defaultPath("master");
} }
private String defaultPath(final String subPath) { @Config("druid.zk.paths.indexer.announcementsPath")
return String.format("%s/%s", getZkBasePath(), subPath); public String getIndexerAnnouncementPath()
{
return defaultPath("indexer/announcements");
}
@Config("druid.zk.paths.indexer.tasksPath")
public String getIndexerTaskPath()
{
return defaultPath("indexer/tasks");
}
@Config("druid.zk.paths.indexer.statusPath")
public String getIndexerStatusPath()
{
return defaultPath("indexer/status");
}
@Config("druid.zk.paths.indexer.leaderLatchPath")
public String getIndexerLeaderLatchPath()
{
return defaultPath("indexer/leaderLatchPath");
}
private String defaultPath(final String subPath)
{
return ZKPaths.makePath(getZkBasePath(), subPath);
} }
} }

View File

@ -27,8 +27,8 @@ import com.google.common.collect.Iterators;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;

View File

@ -7,8 +7,8 @@ import com.google.common.collect.Maps;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import java.io.IOException; import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;

View File

@ -7,8 +7,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener; import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity; import twitter4j.HashtagEntity;
import twitter4j.Status; import twitter4j.Status;

View File

@ -0,0 +1,9 @@
package com.metamx.druid.indexer.data;
import com.metamx.druid.input.InputRow;
public interface InputRowParser<T>
{
public InputRow parse(T input);
public void addDimensionExclusion(String dimension);
}

View File

@ -0,0 +1,83 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{
private final TimestampSpec timestampSpec;
private final DataSpec dataSpec;
private final Set<String> dimensionExclusions;
@JsonCreator
public MapInputRowParser(
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("data") DataSpec dataSpec,
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions
)
{
this.timestampSpec = timestampSpec;
this.dataSpec = dataSpec;
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
this.dimensionExclusions.add(dimensionExclusion.toLowerCase());
}
}
this.dimensionExclusions.add(timestampSpec.getTimestampColumn().toLowerCase());
}
@Override
public InputRow parse(Map<String, Object> theMap)
{
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
final String input = theMap.toString();
throw new NullPointerException(
String.format(
"Null timestamp in input: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
}
@Override
public void addDimensionExclusion(String dimension)
{
dimensionExclusions.add(dimension);
}
@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public Set<String> getDimensionExclusions()
{
return dimensionExclusions;
}
}

View File

@ -21,27 +21,20 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists; import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.collect.Sets;
import com.metamx.common.exception.FormattedException; import com.metamx.common.exception.FormattedException;
import com.metamx.common.parsers.Parser; import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser; import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import org.joda.time.DateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
*/ */
public class StringInputRowParser public class StringInputRowParser implements InputRowParser<String>
{ {
private final TimestampSpec timestampSpec; private final InputRowParser<Map<String, Object>> inputRowCreator;
private final DataSpec dataSpec;
private final Set<String> dimensionExclusions;
private final Parser<String, Object> parser; private final Parser<String, Object> parser;
@JsonCreator @JsonCreator
@ -51,62 +44,24 @@ public class StringInputRowParser
@JsonProperty("dimensionExclusions") List<String> dimensionExclusions @JsonProperty("dimensionExclusions") List<String> dimensionExclusions
) )
{ {
this.timestampSpec = timestampSpec; this.inputRowCreator = new MapInputRowParser(timestampSpec, dataSpec, dimensionExclusions);
this.dataSpec = dataSpec;
this.dimensionExclusions = Sets.newHashSet();
if (dimensionExclusions != null) {
for (String dimensionExclusion : dimensionExclusions) {
this.dimensionExclusions.add(dimensionExclusion.toLowerCase());
}
}
this.dimensionExclusions.add(timestampSpec.getTimestampColumn());
this.parser = new ToLowerCaseParser(dataSpec.getParser()); this.parser = new ToLowerCaseParser(dataSpec.getParser());
} }
public StringInputRowParser addDimensionExclusion(String dimension) public void addDimensionExclusion(String dimension)
{ {
dimensionExclusions.add(dimension); inputRowCreator.addDimensionExclusion(dimension);
return this;
} }
@Override
public InputRow parse(String input) throws FormattedException public InputRow parse(String input) throws FormattedException
{ {
Map<String, Object> theMap = parser.parse(input); return inputRowCreator.parse(parser.parse(input));
final List<String> dimensions = dataSpec.hasCustomDimensions()
? dataSpec.getDimensions()
: Lists.newArrayList(Sets.difference(theMap.keySet(), dimensionExclusions));
final DateTime timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
throw new NullPointerException(
String.format(
"Null timestamp in input string: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
return new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap);
} }
@JsonProperty @JsonValue
public TimestampSpec getTimestampSpec() public InputRowParser<Map<String, Object>> getInputRowCreator()
{ {
return timestampSpec; return inputRowCreator;
}
@JsonProperty("data")
public DataSpec getDataSpec()
{
return dataSpec;
}
@JsonProperty
public Set<String> getDimensionExclusions()
{
return dimensionExclusions;
} }
} }

View File

@ -19,22 +19,14 @@
package com.metamx.druid.merger.common.config; package com.metamx.druid.merger.common.config;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
/** /**
*/ */
public abstract class IndexerZkConfig public abstract class IndexerZkConfig extends ZkPathsConfig
{ {
@Config("druid.zk.paths.indexer.announcementsPath")
public abstract String getAnnouncementPath();
@Config("druid.zk.paths.indexer.tasksPath")
public abstract String getTaskPath();
@Config("druid.zk.paths.indexer.statusPath")
public abstract String getStatusPath();
@Config("druid.zk.maxNumBytes") @Config("druid.zk.maxNumBytes")
@Default("512000") @Default("512000")
public abstract long getMaxNumBytes(); public abstract long getMaxNumBytes();

View File

@ -0,0 +1,10 @@
package com.metamx.druid.merger.common.index;
/**
* Objects that can be registered with a {@link ChatHandlerProvider} and provide http endpoints for indexing-related
* objects. This interface is empty because it only exists to signal intent. The actual http endpoints are provided
* through JAX-RS annotations on the {@link ChatHandler} objects.
*/
public interface ChatHandler
{
}

View File

@ -0,0 +1,83 @@
package com.metamx.druid.merger.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import java.util.concurrent.ConcurrentMap;
/**
* Provides a way for the outside world to talk to objects in the indexing service. The {@link #get(String)} method
* allows anyone with a reference to this object to obtain a particular {@link ChatHandler}. An embedded
* {@link ServiceAnnouncer} will be used to advertise handlers on this host.
*/
public class ChatHandlerProvider
{
private static final Logger log = new Logger(ChatHandlerProvider.class);
private final ChatHandlerProviderConfig config;
private final ServiceAnnouncer serviceAnnouncer;
private final ConcurrentMap<String, ChatHandler> handlers;
public ChatHandlerProvider(
ChatHandlerProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
public void register(final String key, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
}
public Optional<ChatHandler> get(final String key)
{
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
}

View File

@ -0,0 +1,9 @@
package com.metamx.druid.merger.common.index;
import java.util.Collection;
import java.util.Map;
public interface EventReceiver
{
public void addAll(Collection<Map<String, Object>> events);
}

View File

@ -0,0 +1,193 @@
package com.metamx.druid.merger.common.index;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.druid.indexer.data.MapInputRowParser;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* Builds firehoses that accept events through the {@link EventReceiver} interface. Can also register these
* firehoses with an {@link ChatHandlerProvider}.
*/
@JsonTypeName("receiver")
public class EventReceiverFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
private static final int DEFAULT_BUFFER_SIZE = 100000;
private final String firehoseId;
private final int bufferSize;
private final MapInputRowParser parser;
private final Optional<ChatHandlerProvider> chatHandlerProvider;
@JsonCreator
public EventReceiverFirehoseFactory(
@JsonProperty("firehoseId") String firehoseId,
@JsonProperty("bufferSize") Integer bufferSize,
@JsonProperty("parser") MapInputRowParser parser,
@JacksonInject("chatHandlerProvider") ChatHandlerProvider chatHandlerProvider
)
{
this.firehoseId = Preconditions.checkNotNull(firehoseId, "firehoseId");
this.bufferSize = bufferSize == null || bufferSize <= 0 ? DEFAULT_BUFFER_SIZE : bufferSize;
this.parser = Preconditions.checkNotNull(parser, "parser");
this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
}
@Override
public Firehose connect() throws IOException
{
log.info("Connecting firehose: %s", firehoseId);
final EventReceiverFirehose firehose = new EventReceiverFirehose();
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().register(firehoseId, firehose);
}
return firehose;
}
@JsonProperty
public String getFirehoseId()
{
return firehoseId;
}
@JsonProperty
public int getBufferSize()
{
return bufferSize;
}
@JsonProperty
public MapInputRowParser getParser()
{
return parser;
}
public class EventReceiverFirehose implements ChatHandler, Firehose
{
private final BlockingQueue<InputRow> buffer;
private final Object readLock = new Object();
private volatile InputRow nextRow = null;
private volatile boolean closed = false;
public EventReceiverFirehose()
{
this.buffer = new ArrayBlockingQueue<InputRow>(bufferSize);
}
@POST
@Path("/push-events")
@Produces("application/json")
public Response addAll(Collection<Map<String, Object>> events)
{
log.debug("Adding %,d events to firehose: %s", events.size(), firehoseId);
final List<InputRow> rows = Lists.newArrayList();
for (final Map<String, Object> event : events) {
// Might throw an exception. We'd like that to happen now, instead of while adding to the row buffer.
rows.add(parser.parse(event));
}
try {
for (final InputRow row : rows) {
boolean added = false;
while (!closed && !added) {
added = buffer.offer(row, 500, TimeUnit.MILLISECONDS);
}
if (!added) {
throw new IllegalStateException("Cannot add events to closed firehose!");
}
}
return Response.ok().entity(ImmutableMap.of("eventCount", events.size())).build();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
}
@Override
public boolean hasMore()
{
synchronized (readLock) {
try {
while (!closed && nextRow == null) {
nextRow = buffer.poll(500, TimeUnit.MILLISECONDS);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
return nextRow != null;
}
}
@Override
public InputRow nextRow()
{
synchronized (readLock) {
final InputRow row = nextRow;
if (row == null) {
throw new NoSuchElementException();
} else {
nextRow = null;
return row;
}
}
}
@Override
public Runnable commit()
{
return new Runnable()
{
@Override
public void run()
{
// Nothing
}
};
}
@Override
public void close() throws IOException
{
log.info("Firehose closing.");
closed = true;
if (chatHandlerProvider.isPresent()) {
chatHandlerProvider.get().unregister(firehoseId);
}
}
}
}

View File

@ -31,8 +31,8 @@ import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.indexer.data.StringInputRowParser; import com.metamx.druid.indexer.data.StringInputRowParser;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator; import org.apache.commons.io.LineIterator;

View File

@ -35,8 +35,8 @@ import com.metamx.druid.input.InputRow;
import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.ShardSpec;

View File

@ -37,8 +37,8 @@ import com.metamx.druid.merger.common.actions.LockListAction;
import com.metamx.druid.merger.common.actions.SegmentInsertAction; import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.index.YeOldePlumberSchool; import com.metamx.druid.merger.common.index.YeOldePlumberSchool;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.Sink;

View File

@ -32,7 +32,7 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SpawnTasksAction; import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionClient;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec; import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.DateTime; import org.joda.time.DateTime;

View File

@ -44,12 +44,10 @@ import com.metamx.druid.query.QueryRunnerFactory;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartmentConfig; import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.Firehose;
import com.metamx.druid.realtime.FirehoseFactory;
import com.metamx.druid.realtime.GracefulShutdownFirehose;
import com.metamx.druid.realtime.MinTimeFirehose;
import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.firehose.GracefulShutdownFirehose;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.Sink;
@ -81,9 +79,6 @@ public class RealtimeIndexTask extends AbstractTask
@JsonIgnore @JsonIgnore
private final IndexGranularity segmentGranularity; private final IndexGranularity segmentGranularity;
@JsonIgnore
private final DateTime minTime;
@JsonIgnore @JsonIgnore
private volatile Plumber plumber = null; private volatile Plumber plumber = null;
@ -106,8 +101,7 @@ public class RealtimeIndexTask extends AbstractTask
@JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig,
@JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity
@JsonProperty("minTime") DateTime minTime
) )
{ {
super( super(
@ -128,7 +122,6 @@ public class RealtimeIndexTask extends AbstractTask
this.fireDepartmentConfig = fireDepartmentConfig; this.fireDepartmentConfig = fireDepartmentConfig;
this.windowPeriod = windowPeriod; this.windowPeriod = windowPeriod;
this.segmentGranularity = segmentGranularity; this.segmentGranularity = segmentGranularity;
this.minTime = minTime;
} }
@Override @Override
@ -179,18 +172,12 @@ public class RealtimeIndexTask extends AbstractTask
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }
Firehose wrappedFirehose = firehoseFactory.connect();
if (minTime != null) {
log.info("Wrapping firehose in MinTimeFirehose with minTime[%s]", minTime);
wrappedFirehose = new MinTimeFirehose(wrappedFirehose, minTime);
}
log.info( log.info(
"Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]", "Wrapping firehose in GracefulShutdownFirehose with segmentGranularity[%s] and windowPeriod[%s]",
segmentGranularity, segmentGranularity,
windowPeriod windowPeriod
); );
firehose = new GracefulShutdownFirehose(wrappedFirehose, segmentGranularity, windowPeriod); firehose = new GracefulShutdownFirehose(firehoseFactory.connect(), segmentGranularity, windowPeriod);
} }
// It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for // It would be nice to get the PlumberSchool in the constructor. Although that will need jackson injectables for
@ -381,12 +368,6 @@ public class RealtimeIndexTask extends AbstractTask
return segmentGranularity; return segmentGranularity;
} }
@JsonProperty
public DateTime getMinTime()
{
return minTime;
}
public static class TaskActionSegmentPublisher implements SegmentPublisher public static class TaskActionSegmentPublisher implements SegmentPublisher
{ {
final Task task; final Task task;

View File

@ -430,7 +430,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private void cleanup(final String workerId, final String taskId) private void cleanup(final String workerId, final String taskId)
{ {
runningTasks.remove(taskId); runningTasks.remove(taskId);
final String statusPath = JOINER.join(config.getStatusPath(), workerId, taskId); final String statusPath = JOINER.join(config.getIndexerStatusPath(), workerId, taskId);
try { try {
cf.delete().guaranteed().forPath(statusPath); cf.delete().guaranteed().forPath(statusPath);
} }
@ -493,7 +493,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
.withMode(CreateMode.EPHEMERAL) .withMode(CreateMode.EPHEMERAL)
.forPath( .forPath(
JOINER.join( JOINER.join(
config.getTaskPath(), config.getIndexerTaskPath(),
theWorker.getHost(), theWorker.getHost(),
task.getId() task.getId()
), ),
@ -522,7 +522,7 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
private void addWorker(final Worker worker) private void addWorker(final Worker worker)
{ {
try { try {
final String workerStatusPath = JOINER.join(config.getStatusPath(), worker.getHost()); final String workerStatusPath = JOINER.join(config.getIndexerStatusPath(), worker.getHost());
final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true); final PathChildrenCache statusCache = new PathChildrenCache(cf, workerStatusPath, true);
final ZkWorker zkWorker = new ZkWorker( final ZkWorker zkWorker = new ZkWorker(
worker, worker,
@ -626,18 +626,18 @@ public class RemoteTaskRunner implements TaskRunner, TaskLogProvider
try { try {
Set<String> tasksToRetry = Sets.newHashSet( Set<String> tasksToRetry = Sets.newHashSet(
cf.getChildren() cf.getChildren()
.forPath(JOINER.join(config.getTaskPath(), worker.getHost())) .forPath(JOINER.join(config.getIndexerTaskPath(), worker.getHost()))
); );
tasksToRetry.addAll( tasksToRetry.addAll(
cf.getChildren() cf.getChildren()
.forPath(JOINER.join(config.getStatusPath(), worker.getHost())) .forPath(JOINER.join(config.getIndexerStatusPath(), worker.getHost()))
); );
log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size()); log.info("%s has %d tasks to retry", worker.getHost(), tasksToRetry.size());
for (String taskId : tasksToRetry) { for (String taskId : tasksToRetry) {
TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId); TaskRunnerWorkItem taskRunnerWorkItem = runningTasks.get(taskId);
if (taskRunnerWorkItem != null) { if (taskRunnerWorkItem != null) {
String taskPath = JOINER.join(config.getTaskPath(), worker.getHost(), taskId); String taskPath = JOINER.join(config.getIndexerTaskPath(), worker.getHost(), taskId);
if (cf.checkExists().forPath(taskPath) != null) { if (cf.checkExists().forPath(taskPath) != null) {
cf.delete().guaranteed().forPath(taskPath); cf.delete().guaranteed().forPath(taskPath);
} }

View File

@ -24,6 +24,7 @@ import com.google.common.base.Throwables;
import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.merger.common.actions.TaskActionClient; import com.metamx.druid.merger.common.actions.TaskActionClient;
@ -68,6 +69,7 @@ public class TaskMasterLifecycle
final TaskRunnerFactory runnerFactory, final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory, final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator, final CuratorFramework curator,
final ServiceAnnouncer serviceAnnouncer,
final ServiceEmitter emitter final ServiceEmitter emitter
) )
{ {
@ -75,7 +77,7 @@ public class TaskMasterLifecycle
this.taskActionClientFactory = taskActionClientFactory; this.taskActionClientFactory = taskActionClientFactory;
this.leaderSelector = new LeaderSelector( this.leaderSelector = new LeaderSelector(
curator, indexerCoordinatorConfig.getLeaderLatchPath(), new LeaderSelectorListener() curator, indexerCoordinatorConfig.getIndexerLeaderLatchPath(), new LeaderSelectorListener()
{ {
@Override @Override
public void takeLeadership(CuratorFramework client) throws Exception public void takeLeadership(CuratorFramework client) throws Exception
@ -101,7 +103,7 @@ public class TaskMasterLifecycle
final Lifecycle leaderLifecycle = new Lifecycle(); final Lifecycle leaderLifecycle = new Lifecycle();
leaderLifecycle.addManagedInstance(taskQueue); leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(taskRunner); leaderLifecycle.addManagedInstance(taskRunner);
Initialization.makeServiceDiscoveryClient(curator, serviceDiscoveryConfig, leaderLifecycle); Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer); leaderLifecycle.addManagedInstance(taskConsumer);
leaderLifecycle.addManagedInstance(resourceManagementScheduler); leaderLifecycle.addManagedInstance(resourceManagementScheduler);

View File

@ -109,11 +109,14 @@ public class TaskStorageQueryAdapter
int nSuccesses = 0; int nSuccesses = 0;
int nFailures = 0; int nFailures = 0;
int nTotal = 0; int nTotal = 0;
int nPresent = 0;
for(final Optional<TaskStatus> statusOption : statuses.values()) { for(final Optional<TaskStatus> statusOption : statuses.values()) {
nTotal ++; nTotal ++;
if(statusOption.isPresent()) { if(statusOption.isPresent()) {
nPresent ++;
final TaskStatus status = statusOption.get(); final TaskStatus status = statusOption.get();
if(status.isSuccess()) { if(status.isSuccess()) {
@ -126,7 +129,7 @@ public class TaskStorageQueryAdapter
final Optional<TaskStatus> status; final Optional<TaskStatus> status;
if(nTotal == 0) { if(nPresent == 0) {
status = Optional.absent(); status = Optional.absent();
} else if(nSuccesses == nTotal) { } else if(nSuccesses == nTotal) {
status = Optional.of(TaskStatus.success(taskid)); status = Optional.of(TaskStatus.success(taskid));

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.config;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.druid.initialization.ZkPathsConfig;
import org.skife.config.Config; import org.skife.config.Config;
import org.skife.config.Default; import org.skife.config.Default;
import org.skife.config.DefaultNull; import org.skife.config.DefaultNull;
@ -29,16 +30,13 @@ import java.util.Set;
/** /**
*/ */
public abstract class IndexerCoordinatorConfig public abstract class IndexerCoordinatorConfig extends ZkPathsConfig
{ {
private volatile Set<String> whitelistDatasources = null; private volatile Set<String> whitelistDatasources = null;
@Config("druid.host") @Config("druid.host")
public abstract String getServerName(); public abstract String getServerName();
@Config("druid.zk.paths.indexer.leaderLatchPath")
public abstract String getLeaderLatchPath();
@Config("druid.merger.threads") @Config("druid.merger.threads")
@Default("1") @Default("1")
public abstract int getNumLocalThreads(); public abstract int getNumLocalThreads();

View File

@ -21,6 +21,7 @@ package com.metamx.druid.merger.coordinator.http;
import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.AmazonEC2Client;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
@ -45,6 +46,9 @@ import com.metamx.druid.QueryableNode;
import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.GuiceServletConfig;
@ -62,6 +66,7 @@ import com.metamx.druid.merger.common.actions.TaskActionToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig; import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs; import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs; import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
@ -109,6 +114,7 @@ import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor; import com.metamx.metrics.SysMonitor;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
@ -150,6 +156,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
private DBI dbi = null; private DBI dbi = null;
private IndexerCoordinatorConfig config = null; private IndexerCoordinatorConfig config = null;
private MergerDBCoordinator mergerDBCoordinator = null; private MergerDBCoordinator mergerDBCoordinator = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private TaskStorage taskStorage = null; private TaskStorage taskStorage = null;
private TaskQueue taskQueue = null; private TaskQueue taskQueue = null;
private TaskLockbox taskLockbox = null; private TaskLockbox taskLockbox = null;
@ -253,6 +261,8 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
initializeIndexerCoordinatorConfig(); initializeIndexerCoordinatorConfig();
initializeMergeDBCoordinator(); initializeMergeDBCoordinator();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeJacksonInjections();
initializeServiceDiscovery();
initializeTaskStorage(); initializeTaskStorage();
initializeTaskLockbox(); initializeTaskLockbox();
initializeTaskQueue(); initializeTaskQueue();
@ -362,6 +372,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
taskRunnerFactory, taskRunnerFactory,
resourceManagementSchedulerFactory, resourceManagementSchedulerFactory,
getCuratorFramework(), getCuratorFramework(),
serviceAnnouncer,
emitter emitter
); );
getLifecycle().addManagedInstance(taskMasterLifecycle); getLifecycle().addManagedInstance(taskMasterLifecycle);
@ -461,9 +472,21 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("chatHandlerProvider", null);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()
@ -541,6 +564,20 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
} }
} }
public void initializeServiceDiscovery() throws Exception
{
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) {
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, getLifecycle()
);
}
if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
}
}
public void initializeTaskQueue() public void initializeTaskQueue()
{ {
if (taskQueue == null) { if (taskQueue == null) {
@ -609,7 +646,7 @@ public class IndexerCoordinatorNode extends QueryableNode<IndexerCoordinatorNode
getJsonMapper(), getJsonMapper(),
getConfigFactory().build(RemoteTaskRunnerConfig.class), getConfigFactory().build(RemoteTaskRunnerConfig.class),
curator, curator,
new PathChildrenCache(curator, indexerZkConfig.getAnnouncementPath(), true), new PathChildrenCache(curator, indexerZkConfig.getIndexerAnnouncementPath(), true),
retryScheduledExec, retryScheduledExec,
new RetryPolicyFactory( new RetryPolicyFactory(
getConfigFactory().buildWithReplacements( getConfigFactory().buildWithReplacements(

View File

@ -140,12 +140,7 @@ public class IndexerCoordinatorResource
@Produces("application/json") @Produces("application/json")
public Response getTaskStatus(@PathParam("taskid") String taskid) public Response getTaskStatus(@PathParam("taskid") String taskid)
{ {
final Optional<TaskStatus> status = taskStorageQueryAdapter.getSameGroupMergedStatus(taskid); return optionalTaskResponse(taskid, "status", taskStorageQueryAdapter.getSameGroupMergedStatus(taskid));
if (!status.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).build();
} else {
return Response.ok().entity(status.get()).build();
}
} }
@GET @GET
@ -351,6 +346,17 @@ public class IndexerCoordinatorResource
} }
} }
public <T> Response optionalTaskResponse(String taskid, String objectType, Optional<T> x) {
final Map<String, Object> results = Maps.newHashMap();
results.put("task", taskid);
if (x.isPresent()) {
results.put(objectType, x.get());
return Response.status(Response.Status.OK).entity(results).build();
} else {
return Response.status(Response.Status.NOT_FOUND).entity(results).build();
}
}
public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f) public <T> Response asLeaderWith(Optional<T> x, Function<T, Response> f)
{ {
if (x.isPresent()) { if (x.isPresent()) {

View File

@ -71,9 +71,9 @@ public class WorkerCuratorCoordinator
this.worker = worker; this.worker = worker;
this.config = config; this.config = config;
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getAnnouncementPath(), worker.getHost())); this.baseAnnouncementsPath = getPath(Arrays.asList(config.getIndexerAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(config.getTaskPath(), worker.getHost())); this.baseTaskPath = getPath(Arrays.asList(config.getIndexerTaskPath(), worker.getHost()));
this.baseStatusPath = getPath(Arrays.asList(config.getStatusPath(), worker.getHost())); this.baseStatusPath = getPath(Arrays.asList(config.getIndexerStatusPath(), worker.getHost()));
} }
@LifecycleStart @LifecycleStart

View File

@ -0,0 +1,17 @@
package com.metamx.druid.merger.worker.config;
import org.skife.config.Config;
import org.skife.config.DefaultNull;
public abstract class ChatHandlerProviderConfig
{
@Config("druid.indexer.chathandler.service")
@DefaultNull
public abstract String getServiceFormat();
@Config("druid.host")
public abstract String getHost();
@Config("druid.port")
public abstract int getPort();
}

View File

@ -0,0 +1,39 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.metamx.druid.merger.common.index.ChatHandler;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.core.Response;
@Path("/mmx/worker/v1")
public class ChatHandlerResource
{
private final ObjectMapper jsonMapper;
private final ChatHandlerProvider handlers;
@Inject
public ChatHandlerResource(ObjectMapper jsonMapper, ChatHandlerProvider handlers)
{
this.jsonMapper = jsonMapper;
this.handlers = handlers;
}
@Path("/chat/{id}")
public Object doTaskChat(
@PathParam("id") String handlerId
)
{
final Optional<ChatHandler> handler = handlers.get(handlerId);
if (handler.isPresent()) {
return handler.get();
} else {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
}

View File

@ -37,7 +37,7 @@ public class ExecutorMain
{ {
LogLevelAdjuster.register(); LogLevelAdjuster.register();
if (args.length != 3) { if (args.length != 2) {
log.info("Usage: ExecutorMain <task.json> <status.json>"); log.info("Usage: ExecutorMain <task.json> <status.json>");
System.exit(2); System.exit(2);
} }

View File

@ -25,6 +25,9 @@ import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -33,6 +36,11 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.QueryServlet;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
@ -48,8 +56,11 @@ import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory; import com.metamx.druid.merger.common.actions.RemoteTaskActionClientFactory;
import com.metamx.druid.merger.common.config.RetryPolicyConfig; import com.metamx.druid.merger.common.config.RetryPolicyConfig;
import com.metamx.druid.merger.common.config.TaskConfig; import com.metamx.druid.merger.common.config.TaskConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner; import com.metamx.druid.merger.coordinator.ExecutorServiceTaskRunner;
import com.metamx.druid.merger.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.merger.worker.config.WorkerConfig; import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.utils.PropUtils; import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
@ -70,6 +81,7 @@ import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials; import org.jets3t.service.security.AWSCredentials;
import org.mortbay.jetty.Server; import org.mortbay.jetty.Server;
import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
@ -103,10 +115,12 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private DataSegmentPusher segmentPusher = null; private DataSegmentPusher segmentPusher = null;
private TaskToolboxFactory taskToolboxFactory = null; private TaskToolboxFactory taskToolboxFactory = null;
private ServiceDiscovery serviceDiscovery = null; private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null; private ServiceProvider coordinatorServiceProvider = null;
private Server server = null; private Server server = null;
private ExecutorServiceTaskRunner taskRunner = null; private ExecutorServiceTaskRunner taskRunner = null;
private ExecutorLifecycle executorLifecycle = null; private ExecutorLifecycle executorLifecycle = null;
private ChatHandlerProvider chatHandlerProvider = null;
public ExecutorNode( public ExecutorNode(
String nodeType, String nodeType,
@ -177,10 +191,10 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
initializeMonitors(); initializeMonitors();
initializeMergerConfig(); initializeMergerConfig();
initializeServiceDiscovery(); initializeServiceDiscovery();
initializeCoordinatorServiceProvider();
initializeDataSegmentPusher(); initializeDataSegmentPusher();
initializeTaskToolbox(); initializeTaskToolbox();
initializeTaskRunner(); initializeTaskRunner();
initializeChatHandlerProvider();
initializeJacksonInjections(); initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeServer(); initializeServer();
@ -195,9 +209,18 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper()); executorLifecycle = executorLifecycleFactory.build(taskRunner, getJsonMapper());
lifecycle.addManagedInstance(executorLifecycle); lifecycle.addManagedInstance(executorLifecycle);
final Injector injector = Guice.createInjector(
new ExecutorServletModule(
getJsonMapper(),
chatHandlerProvider
)
);
final Context root = new Context(server, "/", Context.SESSIONS); final Context root = new Context(server, "/", Context.SESSIONS);
root.addServlet(new ServletHolder(new StatusServlet()), "/status"); root.addServlet(new ServletHolder(new StatusServlet()), "/status");
root.addServlet(new ServletHolder(new DefaultServlet()), "/*");
root.addEventListener(new GuiceServletConfig(injector));
root.addFilter(GuiceFilter.class, "/mmx/worker/v1/*", 0);
root.addServlet( root.addServlet(
new ServletHolder( new ServletHolder(
new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger()) new QueryServlet(getJsonMapper(), getSmileMapper(), taskRunner, emitter, getRequestLogger())
@ -265,7 +288,8 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
InjectableValues.Std injectables = new InjectableValues.Std(); InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", s3Service) injectables.addValue("s3Client", s3Service)
.addValue("segmentPusher", segmentPusher); .addValue("segmentPusher", segmentPusher)
.addValue("chatHandlerProvider", chatHandlerProvider);
getJsonMapper().setInjectableValues(injectables); getJsonMapper().setInjectableValues(injectables);
} }
@ -273,6 +297,7 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()
@ -366,16 +391,16 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeServiceDiscovery() throws Exception public void initializeServiceDiscovery() throws Exception
{ {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, lifecycle getCuratorFramework(), config, lifecycle
); );
} }
} if (serviceAnnouncer == null) {
final ServiceInstanceFactory instanceFactory = Initialization.makeServiceInstanceFactory(config);
public void initializeCoordinatorServiceProvider() this.serviceAnnouncer = new CuratorServiceAnnouncer(serviceDiscovery, instanceFactory);
{ }
if (coordinatorServiceProvider == null) { if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider( this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(), workerConfig.getMasterService(),
@ -401,6 +426,24 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
} }
} }
public void initializeChatHandlerProvider()
{
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.getServiceFormat() == null) {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer();
} else {
myServiceAnnouncer = serviceAnnouncer;
}
this.chatHandlerProvider = new ChatHandlerProvider(
config,
myServiceAnnouncer
);
}
}
public static class Builder public static class Builder
{ {
private ObjectMapper jsonMapper = null; private ObjectMapper jsonMapper = null;

View File

@ -0,0 +1,44 @@
package com.metamx.druid.merger.worker.executor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import com.google.inject.Provides;
import com.metamx.druid.merger.common.index.ChatHandlerProvider;
import com.sun.jersey.guice.JerseyServletModule;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import javax.inject.Singleton;
public class ExecutorServletModule extends JerseyServletModule
{
private final ObjectMapper jsonMapper;
private final ChatHandlerProvider receivers;
public ExecutorServletModule(
ObjectMapper jsonMapper,
ChatHandlerProvider receivers
)
{
this.jsonMapper = jsonMapper;
this.receivers = receivers;
}
@Override
protected void configureServlets()
{
bind(ChatHandlerResource.class);
bind(ObjectMapper.class).toInstance(jsonMapper);
bind(ChatHandlerProvider.class).toInstance(receivers);
serve("/*").with(GuiceContainer.class);
}
@Provides
@Singleton
public JacksonJsonProvider getJacksonJsonProvider()
{
final JacksonJsonProvider provider = new JacksonJsonProvider();
provider.setMapper(jsonMapper);
return provider;
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.worker.http; package com.metamx.druid.merger.worker.http;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -33,6 +34,9 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.QueryableNode; import com.metamx.druid.QueryableNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
@ -41,6 +45,7 @@ import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.config.TaskLogConfig; import com.metamx.druid.merger.common.config.TaskLogConfig;
import com.metamx.druid.merger.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs; import com.metamx.druid.merger.common.tasklogs.NoopTaskLogs;
import com.metamx.druid.merger.common.tasklogs.S3TaskLogs; import com.metamx.druid.merger.common.tasklogs.S3TaskLogs;
@ -100,6 +105,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
private ServiceEmitter emitter = null; private ServiceEmitter emitter = null;
private WorkerConfig workerConfig = null; private WorkerConfig workerConfig = null;
private ServiceDiscovery serviceDiscovery = null; private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null; private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null; private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private WorkerTaskMonitor workerTaskMonitor = null; private WorkerTaskMonitor workerTaskMonitor = null;
@ -175,7 +181,7 @@ public class WorkerNode extends QueryableNode<WorkerNode>
initializeMonitors(); initializeMonitors();
initializeMergerConfig(); initializeMergerConfig();
initializeServiceDiscovery(); initializeServiceDiscovery();
initializeCoordinatorServiceProvider(); initializeJacksonInjections();
initializeJacksonSubtypes(); initializeJacksonSubtypes();
initializeCuratorCoordinator(); initializeCuratorCoordinator();
initializePersistentTaskLogs(); initializePersistentTaskLogs();
@ -255,9 +261,21 @@ public class WorkerNode extends QueryableNode<WorkerNode>
} }
} }
private void initializeJacksonInjections()
{
InjectableValues.Std injectables = new InjectableValues.Std();
injectables.addValue("s3Client", null)
.addValue("segmentPusher", null)
.addValue("chatHandlerProvider", null);
getJsonMapper().setInjectableValues(injectables);
}
private void initializeJacksonSubtypes() private void initializeJacksonSubtypes()
{ {
getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class); getJsonMapper().registerSubtypes(StaticS3FirehoseFactory.class);
getJsonMapper().registerSubtypes(EventReceiverFirehoseFactory.class);
} }
private void initializeHttpClient() private void initializeHttpClient()
@ -321,10 +339,6 @@ public class WorkerNode extends QueryableNode<WorkerNode>
getLifecycle() getLifecycle()
); );
} }
}
public void initializeCoordinatorServiceProvider()
{
if (coordinatorServiceProvider == null) { if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider( this.coordinatorServiceProvider = Initialization.makeServiceProvider(
workerConfig.getMasterService(), workerConfig.getMasterService(),

View File

@ -205,8 +205,7 @@ public class TaskSerdeTest
null, null,
null, null,
new Period("PT10M"), new Period("PT10M"),
IndexGranularity.HOUR, IndexGranularity.HOUR
null
); );
final ObjectMapper jsonMapper = new DefaultObjectMapper(); final ObjectMapper jsonMapper = new DefaultObjectMapper();

View File

@ -252,23 +252,29 @@ public class RemoteTaskRunnerTest
new IndexerZkConfig() new IndexerZkConfig()
{ {
@Override @Override
public String getAnnouncementPath() public String getIndexerAnnouncementPath()
{ {
return announcementsPath; return announcementsPath;
} }
@Override @Override
public String getTaskPath() public String getIndexerTaskPath()
{ {
return tasksPath; return tasksPath;
} }
@Override @Override
public String getStatusPath() public String getIndexerStatusPath()
{ {
return statusPath; return statusPath;
} }
@Override
public String getZkBasePath()
{
throw new UnsupportedOperationException();
}
@Override @Override
public long getMaxNumBytes() public long getMaxNumBytes()
{ {
@ -375,23 +381,29 @@ public class RemoteTaskRunnerTest
private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig private static class TestRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig
{ {
@Override @Override
public String getAnnouncementPath() public String getIndexerAnnouncementPath()
{ {
return announcementsPath; return announcementsPath;
} }
@Override @Override
public String getTaskPath() public String getIndexerTaskPath()
{ {
return tasksPath; return tasksPath;
} }
@Override @Override
public String getStatusPath() public String getIndexerStatusPath()
{ {
return statusPath; return statusPath;
} }
@Override
public String getZkBasePath()
{
throw new UnsupportedOperationException();
}
@Override @Override
public Duration getTaskAssignmentTimeoutDuration() public Duration getTaskAssignmentTimeoutDuration()
{ {

View File

@ -19,6 +19,7 @@
package com.metamx.druid.merger.coordinator; package com.metamx.druid.merger.coordinator;
import com.google.common.base.Optional;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
@ -58,8 +59,8 @@ import com.metamx.druid.merger.common.task.IndexTask;
import com.metamx.druid.merger.common.task.KillTask; import com.metamx.druid.merger.common.task.KillTask;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer; import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event; import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
@ -206,6 +207,9 @@ public class TaskLifecycleTest
-1 -1
); );
final Optional<TaskStatus> preRunTaskStatus = tsqa.getSameGroupMergedStatus(indexTask.getId());
Assert.assertTrue("pre run task status not present", !preRunTaskStatus.isPresent());
final TaskStatus mergedStatus = runTask(indexTask); final TaskStatus mergedStatus = runTask(indexTask);
final TaskStatus status = ts.getStatus(indexTask.getId()).get(); final TaskStatus status = ts.getStatus(indexTask.getId()).get();
final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished()); final List<DataSegment> publishedSegments = byIntervalOrdering.sortedCopy(mdc.getPublished());

View File

@ -21,6 +21,8 @@ package com.metamx.druid.realtime;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.PlumberSchool; import com.metamx.druid.realtime.plumber.PlumberSchool;

View File

@ -36,6 +36,7 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.segment.QuerySegmentWalker; import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor; import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.Sink; import com.metamx.druid.realtime.plumber.Sink;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;

View File

@ -0,0 +1,56 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Predicate;
import com.metamx.druid.input.InputRow;
import org.joda.time.Interval;
import java.io.IOException;
/**
* Creates firehoses clipped to a particular time interval. Useful for enforcing min time, max time, and time windows.
*/
public class ClippedFirehoseFactory implements FirehoseFactory
{
private final FirehoseFactory delegate;
private final Interval interval;
@JsonCreator
public ClippedFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegate,
@JsonProperty("interval") Interval interval
)
{
this.delegate = delegate;
this.interval = interval;
}
@JsonProperty
public FirehoseFactory getDelegate()
{
return delegate;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
@Override
public Firehose connect() throws IOException
{
return new PredicateFirehose(
delegate.connect(),
new Predicate<InputRow>()
{
@Override
public boolean apply(InputRow input)
{
return interval.contains(input.getTimestampFromEpoch());
}
}
);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
@ -28,7 +28,7 @@ import java.io.Closeable;
* abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement * abstraction. In order to add a new type of source for realtime data ingestion, all you need to do is implement
* one of these and register it with the RealtimeMain. * one of these and register it with the RealtimeMain.
* *
* This object acts a lot like an Iterator, but it doesn't not extend the Iterator interface because it extends * This object acts a lot like an Iterator, but it doesn't extend the Iterator interface because it extends
* Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this * Closeable and it is very important that the close() method doesn't get forgotten, which is easy to do if this
* gets passed around as an Iterator. * gets passed around as an Iterator.
* <p> * <p>

View File

@ -17,25 +17,25 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.io.IOException; import java.io.IOException;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({ @JsonSubTypes({
@JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class) @JsonSubTypes.Type(name = "kafka-0.7.2", value = KafkaFirehoseFactory.class),
@JsonSubTypes.Type(name = "clipped", value = ClippedFirehoseFactory.class),
@JsonSubTypes.Type(name = "timed", value = TimedShutoffFirehoseFactory.class)
}) })
public interface FirehoseFactory public interface FirehoseFactory
{ {
/** /**
* Initialization method that connects up the fire hose. If this method returns successfully it should be safe to * Initialization method that connects up the fire hose. If this method returns successfully it should be safe to
* call hasMore() on the returned Firehose (which might subsequently block). * call hasMore() on the returned Firehose (which might subsequently block).
* * <p/>
* If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return * If this method returns null, then any attempt to call hasMore(), nextRow(), commit() and close() on the return
* value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on * value will throw a surprising NPE. Throwing IOException on connection failure or runtime exception on
* invalid configuration is preferred over returning null. * invalid configuration is preferred over returning null.

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/ */
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;

View File

@ -1,26 +1,25 @@
package com.metamx.druid.realtime; package com.metamx.druid.realtime.firehose;
import com.google.common.base.Predicate;
import com.metamx.druid.input.InputRow; import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import java.io.IOException; import java.io.IOException;
import java.util.NoSuchElementException;
/** /**
* Provides a view on a firehose that only returns rows at or after a certain minimum timestamp. * Provides a view on a firehose that only returns rows that match a certain predicate.
* Not thread-safe. * Not thread-safe.
*/ */
public class MinTimeFirehose implements Firehose public class PredicateFirehose implements Firehose
{ {
private final Firehose firehose; private final Firehose firehose;
private final DateTime minTime; private final Predicate<InputRow> predicate;
private InputRow savedInputRow = null; private InputRow savedInputRow = null;
public MinTimeFirehose(Firehose firehose, DateTime minTime) public PredicateFirehose(Firehose firehose, Predicate<InputRow> predicate)
{ {
this.firehose = firehose; this.firehose = firehose;
this.minTime = minTime; this.predicate = predicate;
} }
@Override @Override
@ -32,7 +31,7 @@ public class MinTimeFirehose implements Firehose
while (firehose.hasMore()) { while (firehose.hasMore()) {
final InputRow row = firehose.nextRow(); final InputRow row = firehose.nextRow();
if (acceptable(row)) { if (predicate.apply(row)) {
savedInputRow = row; savedInputRow = row;
return true; return true;
} }
@ -60,9 +59,4 @@ public class MinTimeFirehose implements Firehose
{ {
firehose.close(); firehose.close();
} }
private boolean acceptable(InputRow row)
{
return row.getTimestampFromEpoch() >= minTime.getMillis();
}
} }

View File

@ -0,0 +1,122 @@
package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.druid.input.InputRow;
import com.metamx.emitter.EmittingLogger;
import org.joda.time.DateTime;
import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* Creates firehoses that shut off at a particular time. Useful for limiting the lifespan of a realtime job.
*/
public class TimedShutoffFirehoseFactory implements FirehoseFactory
{
private static final EmittingLogger log = new EmittingLogger(FirehoseFactory.class);
private final FirehoseFactory delegateFactory;
private final DateTime shutoffTime;
@JsonCreator
public TimedShutoffFirehoseFactory(
@JsonProperty("delegate") FirehoseFactory delegateFactory,
@JsonProperty("shutoffTime") DateTime shutoffTime
)
{
this.delegateFactory = delegateFactory;
this.shutoffTime = shutoffTime;
}
@Override
public Firehose connect() throws IOException
{
return new TimedShutoffFirehose();
}
public class TimedShutoffFirehose implements Firehose
{
private final Firehose firehose;
private final ScheduledExecutorService exec;
private final Object shutdownLock = new Object();
private volatile boolean shutdown = false;
public TimedShutoffFirehose() throws IOException
{
firehose = delegateFactory.connect();
exec = Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("timed-shutoff-firehose-%d")
.build()
);
exec.schedule(
new Runnable()
{
@Override
public void run()
{
log.info("Closing delegate firehose.");
shutdown = true;
try {
firehose.close();
} catch (IOException e) {
log.warn(e, "Failed to close delegate firehose, ignoring.");
}
}
},
shutoffTime.getMillis() - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
log.info("Firehose created, will shut down at: %s", shutoffTime);
}
@Override
public boolean hasMore()
{
return firehose.hasMore();
}
@Override
public InputRow nextRow()
{
return firehose.nextRow();
}
@Override
public Runnable commit()
{
return firehose.commit();
}
@Override
public void close() throws IOException
{
synchronized (shutdownLock) {
if (!shutdown) {
shutdown = true;
firehose.close();
}
}
}
}
@JsonProperty("delegate")
public FirehoseFactory getDelegateFactory()
{
return delegateFactory;
}
@JsonProperty("shutoffTime")
public DateTime getShutoffTime()
{
return shutoffTime;
}
}

View File

@ -39,6 +39,7 @@ import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager; import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig; import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager; import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig; import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManager;
@ -174,6 +175,10 @@ public class MasterMain
serviceDiscoveryConfig, serviceDiscoveryConfig,
lifecycle lifecycle
); );
final ServiceAnnouncer serviceAnnouncer = Initialization.makeServiceAnnouncer(
serviceDiscoveryConfig, serviceDiscovery
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
IndexingServiceClient indexingServiceClient = null; IndexingServiceClient indexingServiceClient = null;
if (druidMasterConfig.getMergerServiceName() != null) { if (druidMasterConfig.getMergerServiceName() != null) {