Finish the merging, wtf IntelliJ?

This commit is contained in:
cheddar 2013-08-06 13:34:24 -07:00
parent eee1efdcb5
commit 4a64ce37ed
12 changed files with 30 additions and 106 deletions

View File

@ -20,11 +20,6 @@
package com.metamx.druid.indexing.common.index;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
/**
*/
@ -34,62 +29,5 @@ public interface ChatHandlerProvider
public void unregister(final String key);
public ChatHandlerProvider(
ChatHandlerProviderConfig config,
ServiceAnnouncer serviceAnnouncer
)
{
this.config = config;
this.serviceAnnouncer = serviceAnnouncer;
this.handlers = Maps.newConcurrentMap();
}
public void register(final String key, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
}
try {
serviceAnnouncer.announce(service);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
}
}
public void unregister(final String key)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
final ChatHandler handler = handlers.get(key);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
}
try {
serviceAnnouncer.unannounce(service);
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
}
handlers.remove(key, handler);
}
public Optional<ChatHandler> get(final String key)
{
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
{
return String.format(config.getServiceFormat(), key);
}
public Optional<ChatHandler> get(final String key);
}

View File

@ -25,6 +25,7 @@ import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
import com.metamx.druid.initialization.DruidNode;
import java.util.concurrent.ConcurrentMap;
@ -52,44 +53,43 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
}
@Override
public void register(final String key, ChatHandler handler)
public void register(final String service, ChatHandler handler)
{
final String service = serviceName(key);
log.info("Registering Eventhandler: %s", key);
final DruidNode node = makeDruidNode(service);
log.info("Registering Eventhandler[%s]", service);
if (handlers.putIfAbsent(key, handler) != null) {
throw new ISE("handler already registered for key: %s", key);
if (handlers.putIfAbsent(service, handler) != null) {
throw new ISE("handler already registered for service[%s]", service);
}
try {
serviceAnnouncer.announce(service);
serviceAnnouncer.announce(node);
}
catch (Exception e) {
log.warn(e, "Failed to register service: %s", service);
handlers.remove(key, handler);
log.warn(e, "Failed to register service[%s]", service);
handlers.remove(service, handler);
}
}
@Override
public void unregister(final String key)
public void unregister(final String service)
{
final String service = serviceName(key);
log.info("Unregistering chat handler: %s", key);
log.info("Unregistering chat handler[%s]", service);
final ChatHandler handler = handlers.get(key);
final ChatHandler handler = handlers.get(service);
if (handler == null) {
log.warn("handler not currently registered, ignoring: %s", key);
log.warn("handler[%s] not currently registered, ignoring.", service);
}
try {
serviceAnnouncer.unannounce(service);
serviceAnnouncer.unannounce(makeDruidNode(service));
}
catch (Exception e) {
log.warn(e, "Failed to unregister service: %s", service);
log.warn(e, "Failed to unregister service[%s]", service);
}
handlers.remove(key, handler);
handlers.remove(service, handler);
}
@Override
@ -98,8 +98,8 @@ public class EventReceivingChatHandlerProvider implements ChatHandlerProvider
return Optional.fromNullable(handlers.get(key));
}
private String serviceName(String key)
private DruidNode makeDruidNode(String key)
{
return String.format(config.getServiceFormat(), key);
return new DruidNode(key, config.getHost(), config.getPort());
}
}

View File

@ -31,13 +31,13 @@ import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultiset;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.actions.SpawnTasksAction;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.shard.SingleDimensionShardSpec;

View File

@ -30,13 +30,13 @@ import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
import com.metamx.druid.indexing.common.actions.SegmentInsertAction;
import com.metamx.druid.indexing.common.actions.SegmentListUsedAction;
import com.metamx.druid.indexing.common.actions.SpawnTasksAction;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.loading.SegmentLoadingException;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -26,8 +26,6 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.task.Task;
@ -35,6 +33,8 @@ import com.metamx.druid.indexing.coordinator.config.IndexerCoordinatorConfig;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementScheduler;
import com.metamx.druid.indexing.coordinator.scaling.ResourceManagementSchedulerFactory;
import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
@ -66,7 +66,7 @@ public class TaskMasterLifecycle
final TaskQueue taskQueue,
final TaskActionClientFactory taskActionClientFactory,
final IndexerCoordinatorConfig indexerCoordinatorConfig,
final DruidNode nodeConfig,
final DruidNode node,
final TaskRunnerFactory runnerFactory,
final ResourceManagementSchedulerFactory managementSchedulerFactory,
final CuratorFramework curator,
@ -119,7 +119,7 @@ public class TaskMasterLifecycle
}
);
leaderLifecycle.addManagedInstance(taskQueue);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, leaderLifecycle);
Initialization.announceDefaultService(node, serviceAnnouncer, leaderLifecycle);
leaderLifecycle.addManagedInstance(taskConsumer);
if ("remote".equalsIgnoreCase(indexerCoordinatorConfig.getRunnerImpl())) {

View File

@ -40,7 +40,6 @@ import com.metamx.druid.query.segment.QuerySegmentWalker;
import com.metamx.druid.query.segment.SegmentDescriptor;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.File;

View File

@ -20,7 +20,6 @@
package com.metamx.druid.indexing.worker.config;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/

View File

@ -388,14 +388,12 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{
if (chatHandlerProvider == null) {
final ChatHandlerProviderConfig config = configFactory.build(ChatHandlerProviderConfig.class);
final ServiceAnnouncer myServiceAnnouncer;
if (config.isPublishDiscovery()) {
myServiceAnnouncer = serviceAnnouncer;
this.chatHandlerProvider = new EventReceivingChatHandlerProvider(config, serviceAnnouncer);
} else {
log.info("ChatHandlerProvider: Using NoopServiceAnnouncer. Good luck finding your firehoses!");
myServiceAnnouncer = new NoopServiceAnnouncer();
this.chatHandlerProvider = new NoopChatHandlerProvider();
}
this.chatHandlerProvider = new ChatHandlerProvider(config, myServiceAnnouncer);
}
}

View File

@ -45,7 +45,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Arrays;
import java.util.Set;

View File

@ -36,13 +36,6 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.indexing.common.TaskLock;
import com.metamx.druid.indexing.common.TaskStatus;
import com.metamx.druid.indexing.common.TaskToolbox;
@ -57,6 +50,7 @@ import com.metamx.druid.indexing.common.task.AbstractTask;
import com.metamx.druid.indexing.common.task.IndexTask;
import com.metamx.druid.indexing.common.task.KillTask;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.common.task.TaskResource;
import com.metamx.druid.indexing.coordinator.exec.TaskConsumer;
import com.metamx.druid.input.InputRow;
import com.metamx.druid.input.MapBasedInputRow;

View File

@ -28,11 +28,11 @@ import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.google.common.collect.Lists;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.indexing.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.druid.indexing.coordinator.setup.EC2NodeData;
import com.metamx.druid.indexing.coordinator.setup.GalaxyUserData;
import com.metamx.druid.indexing.coordinator.setup.WorkerSetupData;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;

View File

@ -22,7 +22,6 @@ package com.metamx.druid.indexing.coordinator.scaling;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.indexing.TestTask;
@ -45,10 +44,8 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
/**