diff --git a/.gitignore b/.gitignore index 845a2ddc007..dc61b625f65 100644 --- a/.gitignore +++ b/.gitignore @@ -10,7 +10,5 @@ target .idea .project .settings/ -examples/rand/RealtimeNode.out -examples/twitter/RealtimeNode.out *.log *.DS_Store diff --git a/client/src/main/java/com/metamx/druid/QueryableNode.java b/client/src/main/java/com/metamx/druid/QueryableNode.java index 0b8f4b6e5f0..9d2ef45c0a1 100644 --- a/client/src/main/java/com/metamx/druid/QueryableNode.java +++ b/client/src/main/java/com/metamx/druid/QueryableNode.java @@ -46,15 +46,13 @@ import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer; import com.metamx.druid.coordination.BatchDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; -import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.SingleDataSegmentAnnouncer; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.announcement.Announcer; import com.metamx.druid.guice.JsonConfigurator; +import com.metamx.druid.http.log.NoopRequestLogger; import com.metamx.druid.http.log.RequestLogger; -import com.metamx.druid.http.NoopRequestLogger; -import com.metamx.druid.http.RequestLogger; -import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig; @@ -82,6 +80,8 @@ import java.lang.reflect.Field; import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; /** @@ -376,18 +376,14 @@ public abstract class QueryableNode extends Registering if ("legacy".equalsIgnoreCase(announcerType)) { serverInventoryView = new SingleServerInventoryView( - serverInventoryViewConfig, getZkPaths(), getCuratorFramework(), - exec, getJsonMapper() ); } else if ("batch".equalsIgnoreCase(announcerType)) { serverInventoryView = new BatchServerInventoryView( - serverInventoryViewConfig, getZkPaths(), getCuratorFramework(), - exec, getJsonMapper() ); } else { diff --git a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java index 58497e293db..ad155bac3b7 100644 --- a/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/BatchServerInventoryView.java @@ -23,55 +23,38 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; +import com.google.inject.Inject; import com.metamx.common.ISE; -import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; /** */ +@ManageLifecycle public class BatchServerInventoryView extends ServerInventoryView> { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); final ConcurrentMap> zNodes = new MapMaker().makeMap(); + @Inject public BatchServerInventoryView( - final ServerInventoryViewConfig config, final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ExecutorService exec, final ObjectMapper jsonMapper ) { super( - config, log, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return zkPaths.getAnnouncementsPath(); - } - - @Override - public String getInventoryPath() - { - return zkPaths.getLiveSegmentsPath(); - } - }, + zkPaths, curator, - exec, jsonMapper, - new TypeReference>() - { - } + new TypeReference>(){} ); } diff --git a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java index 72519cbeb8c..b0f460f050b 100644 --- a/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java +++ b/client/src/main/java/com/metamx/druid/client/CachingClusteredClient.java @@ -32,13 +32,11 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.ISE; import com.metamx.common.Pair; import com.metamx.common.guava.BaseSequence; import com.metamx.common.guava.LazySequence; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; -import com.metamx.common.logger.Logger; import com.metamx.druid.Query; import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; @@ -49,7 +47,6 @@ import com.metamx.druid.client.selector.ServerSelector; import com.metamx.druid.partition.PartitionChunk; import com.metamx.druid.query.CacheStrategy; import com.metamx.druid.query.MetricManipulationFn; -import com.metamx.druid.query.Queries; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.QueryToolChestWarehouse; diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java index ef2dfb19257..d17b4ea1f22 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryView.java @@ -25,15 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.MapMaker; -import com.google.inject.Inject; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.concurrent.Execs; -import com.metamx.common.logger.Logger; import com.metamx.druid.curator.inventory.CuratorInventoryManager; import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy; import com.metamx.druid.curator.inventory.InventoryManagerConfig; -import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; @@ -49,8 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public abstract class ServerInventoryView implements ServerView, InventoryView { - private final ServerInventoryViewConfig config; - private final Logger log; + + private final EmittingLogger log; private final CuratorInventoryManager inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); @@ -59,18 +56,14 @@ public abstract class ServerInventoryView implements ServerView, private final Map removedSegments = new MapMaker().makeMap(); - @Inject public ServerInventoryView( - final ServerInventoryViewConfig config, - final Logger log, - final InventoryManagerConfig inventoryManagerConfig, + final EmittingLogger log, + final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ExecutorService exec, final ObjectMapper jsonMapper, final TypeReference typeReference ) { - this.config = config; this.log = log; this.inventoryManager = new CuratorInventoryManager( curator, @@ -89,7 +82,7 @@ public abstract class ServerInventoryView implements ServerView, } }, Execs.singleThreaded("ServerInventoryView-%s"), - new CuratorInventoryManagerStrategy() + new CuratorInventoryManagerStrategy() { @Override public DruidServer deserializeContainer(byte[] bytes) @@ -299,6 +292,7 @@ public abstract class ServerInventoryView implements ServerView, final DataSegment inventory ) { +/* TODO log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier()); if (container.getSegment(inventory.getIdentifier()) != null) { @@ -323,10 +317,12 @@ public abstract class ServerInventoryView implements ServerView, } } ); +*/ } protected void removeSingleInventory(final DruidServer container, String inventoryKey) { +/* TODO log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey); final DataSegment segment = container.getSegment(inventoryKey); @@ -354,6 +350,7 @@ public abstract class ServerInventoryView implements ServerView, ); removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime()); +*/ } protected abstract DruidServer addInnerInventory( diff --git a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java index 4b345dc5a29..5845c465b1a 100644 --- a/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java +++ b/client/src/main/java/com/metamx/druid/client/SingleServerInventoryView.java @@ -21,50 +21,32 @@ package com.metamx.druid.client; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.metamx.druid.curator.inventory.InventoryManagerConfig; +import com.google.inject.Inject; +import com.metamx.druid.guice.ManageLifecycle; import com.metamx.druid.initialization.ZkPathsConfig; import com.metamx.emitter.EmittingLogger; import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ExecutorService; - /** */ +@ManageLifecycle public class SingleServerInventoryView extends ServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); + @Inject public SingleServerInventoryView( - final ServerInventoryViewConfig config, final ZkPathsConfig zkPaths, final CuratorFramework curator, - final ExecutorService exec, final ObjectMapper jsonMapper ) { super( - config, log, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return zkPaths.getAnnouncementsPath(); - } - - @Override - public String getInventoryPath() - { - return zkPaths.getServedSegmentsPath(); - } - }, + zkPaths, curator, - exec, jsonMapper, - new TypeReference() - { - } + new TypeReference(){} ); } diff --git a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java index fb6fa72ce46..befac85326e 100644 --- a/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java +++ b/client/src/main/java/com/metamx/druid/client/cache/MemcachedCache.java @@ -32,7 +32,6 @@ import net.spy.memcached.FailureMode; import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedClientIF; import net.spy.memcached.internal.BulkFuture; -import net.spy.memcached.transcoders.SerializingTranscoder; import org.apache.commons.codec.digest.DigestUtils; import javax.annotation.Nullable; diff --git a/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java index 053a3d85fac..b9f42d5b2bd 100644 --- a/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/MultipleDataSegmentAnnouncerDataSegmentAnnouncer.java @@ -19,12 +19,9 @@ package com.metamx.druid.coordination; -import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.curator.announcement.Announcer; -import com.metamx.druid.initialization.ZkPathsConfig; import java.io.IOException; diff --git a/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java index e238e3b91a3..5c91a0c41a2 100644 --- a/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java +++ b/client/src/main/java/com/metamx/druid/coordination/SingleDataSegmentAnnouncer.java @@ -20,10 +20,7 @@ package com.metamx.druid.coordination; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Throwables; import com.google.inject.Inject; -import com.metamx.common.lifecycle.LifecycleStart; -import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.curator.announcement.Announcer; @@ -43,7 +40,6 @@ public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private volatile boolean started = false; @Inject - public CuratorDataSegmentAnnouncer( public SingleDataSegmentAnnouncer( DruidServerMetadata server, ZkPathsConfig config, diff --git a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java index 065829fd9ea..6f484c1bc63 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ZkPathsConfig.java @@ -21,7 +21,6 @@ package com.metamx.druid.initialization; import org.apache.curator.utils.ZKPaths; import org.skife.config.Config; -import org.skife.config.Default; public abstract class ZkPathsConfig { diff --git a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java index b981fde0975..65f6f2833e9 100644 --- a/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/MetricsEmittingExecutorService.java @@ -19,22 +19,11 @@ package com.metamx.druid.query; -import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.druid.guice.ManageLifecycle; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.AbstractExecutorService; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; public class MetricsEmittingExecutorService extends DelegatingExecutorService { diff --git a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java index f943a0c112f..b3892477a3d 100644 --- a/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java +++ b/client/src/main/java/com/metamx/druid/query/PrioritizedExecutorService.java @@ -27,7 +27,6 @@ import com.metamx.common.lifecycle.Lifecycle; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.FutureTask; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RunnableFuture; diff --git a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java index 1cfceac4db8..9b210c5aa30 100644 --- a/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/metadata/SegmentMetadataQueryQueryToolChest.java @@ -40,13 +40,11 @@ import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.ResultMergeQueryRunner; import com.metamx.druid.utils.JodaUtils; import com.metamx.emitter.service.ServiceMetricEvent; - import org.joda.time.Interval; import org.joda.time.Minutes; import javax.annotation.Nullable; import java.nio.ByteBuffer; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; diff --git a/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java index 7690b0ab06b..1a54651423d 100644 --- a/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java +++ b/client/src/test/java/com/metamx/druid/client/BatchServerInventoryViewTest.java @@ -45,7 +45,6 @@ import org.junit.Before; import org.junit.Test; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** @@ -130,20 +129,6 @@ public class BatchServerInventoryViewTest } batchServerInventoryView = new BatchServerInventoryView( - new ServerInventoryViewConfig() - { - @Override - public int getRemovedSegmentLifetime() - { - return 0; - } - - @Override - public String getAnnouncerType() - { - return "batch"; - } - }, new ZkPathsConfig() { @Override @@ -153,7 +138,6 @@ public class BatchServerInventoryViewTest } }, cf, - Executors.newSingleThreadExecutor(), jsonMapper ); diff --git a/examples/src/main/java/druid/examples/rabbitmq/RabbitMQProducerMain.java b/examples/src/main/java/druid/examples/rabbitmq/RabbitMQProducerMain.java index a070bfff929..0e9bd2be4dd 100644 --- a/examples/src/main/java/druid/examples/rabbitmq/RabbitMQProducerMain.java +++ b/examples/src/main/java/druid/examples/rabbitmq/RabbitMQProducerMain.java @@ -3,10 +3,21 @@ package druid.examples.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; -import org.apache.commons.cli.*; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.OptionBuilder; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Comparator; +import java.util.Date; +import java.util.List; +import java.util.Random; /** * diff --git a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java index e87b5e44de2..36843522d13 100644 --- a/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java +++ b/examples/src/main/java/druid/examples/twitter/TwitterSpritzerFirehoseFactory.java @@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; import twitter4j.ConnectionLifeCycleListener; import twitter4j.HashtagEntity; +import twitter4j.StallWarning; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.User; -import twitter4j.StallWarning; import java.io.IOException; import java.util.Arrays; diff --git a/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java b/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java index 8e429d45879..963c62dc964 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/QueryableIndex.java @@ -23,7 +23,6 @@ import com.metamx.druid.index.column.ColumnSelector; import com.metamx.druid.kv.Indexed; import org.joda.time.Interval; -import java.io.Closeable; import java.io.IOException; /** diff --git a/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java b/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java index 864a0087831..22c0c5c7edf 100644 --- a/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java +++ b/indexing-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java @@ -30,9 +30,7 @@ import com.google.common.collect.Sets; import com.google.common.primitives.Floats; import com.metamx.common.ISE; import com.metamx.druid.input.InputRow; -import org.joda.time.DateTime; -import javax.annotation.Nullable; import java.util.Arrays; import java.util.List; import java.util.Map; diff --git a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index 70b5ddfa903..6a0110e0b2a 100644 --- a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -27,9 +27,11 @@ import com.metamx.druid.indexer.granularity.UniformGranularitySpec; import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; - +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.joda.time.DateTime; import org.joda.time.Interval; -import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Test; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java index e716cc3c369..5304228e648 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolbox.java @@ -35,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller; import com.metamx.druid.loading.SegmentLoaderConfig; import com.metamx.druid.loading.SegmentLoadingException; import com.metamx.druid.loading.SingleSegmentLoader; -import com.metamx.druid.indexing.common.actions.TaskActionClient; -import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; @@ -154,7 +150,7 @@ public class TaskToolbox new SegmentLoaderConfig() { @Override - public File getSegmentLocations() + public String getLocations() { return new File(getTaskWorkDir(), "fetched_segments").toString(); } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java index 91f5c99a333..5acd075bee7 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/TaskToolboxFactory.java @@ -22,11 +22,11 @@ package com.metamx.druid.indexing.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.metamx.druid.client.ServerView; import com.metamx.druid.coordination.DataSegmentAnnouncer; -import com.metamx.druid.loading.DataSegmentKiller; -import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.indexing.common.actions.TaskActionClientFactory; import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.task.Task; +import com.metamx.druid.loading.DataSegmentKiller; +import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.metrics.MonitorScheduler; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java index 2b698f1b234..289fcf005d9 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -1,6 +1,7 @@ package com.metamx.druid.indexing.common.actions; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.metamx.common.ISE; @@ -10,7 +11,6 @@ import com.metamx.druid.indexing.common.RetryPolicyFactory; import com.metamx.druid.indexing.common.task.Task; import com.metamx.http.client.HttpClient; import com.metamx.http.client.response.ToStringResponseHandler; -import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.ServiceProvider; import org.joda.time.Duration; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java index e2cb18ccb05..cf390eb68ef 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/common/task/RealtimeIndexTask.java @@ -46,13 +46,11 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.realtime.FireDepartment; import com.metamx.druid.realtime.FireDepartmentConfig; -import com.metamx.druid.realtime.FireDepartmentMetrics; import com.metamx.druid.realtime.RealtimeMetricsMonitor; import com.metamx.druid.realtime.Schema; import com.metamx.druid.realtime.SegmentPublisher; import com.metamx.druid.realtime.firehose.Firehose; import com.metamx.druid.realtime.firehose.FirehoseFactory; -import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory; import com.metamx.druid.realtime.plumber.Plumber; import com.metamx.druid.realtime.plumber.RealtimePlumberSchool; import com.metamx.druid.realtime.plumber.RejectionPolicyFactory; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java index 4ac78c96c40..f993bcf3a1e 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/ForkingTaskRunner.java @@ -26,12 +26,10 @@ import com.google.common.base.Optional; import com.google.common.base.Splitter; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.ByteStreams; -import com.google.common.io.Closeables; import com.google.common.io.Closer; import com.google.common.io.Files; import com.google.common.io.InputSupplier; @@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain; import com.metamx.emitter.EmittingLogger; import org.apache.commons.io.FileUtils; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -63,7 +60,6 @@ import java.util.Properties; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicInteger; /** * Runs tasks in separate processes using {@link ExecutorMain}. diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java index 4ff499dbd92..31efa5331ec 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/config/ForkingTaskRunnerConfig.java @@ -1,14 +1,12 @@ package com.metamx.druid.indexing.coordinator.config; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import com.metamx.druid.indexing.worker.executor.ExecutorMain; import org.skife.config.Config; import org.skife.config.Default; import java.io.File; import java.util.List; -import java.util.Set; public abstract class ForkingTaskRunnerConfig { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java index a2183e98d60..5c7d600c542 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/coordinator/http/IndexerCoordinatorNode.java @@ -102,10 +102,6 @@ import com.metamx.druid.initialization.DruidNode; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorLifecycle.java index d4f47a19771..5d5543b9de7 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -1,13 +1,11 @@ package com.metamx.druid.indexing.worker.executor; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.indexing.common.TaskStatus; @@ -15,12 +13,9 @@ import com.metamx.druid.indexing.common.task.Task; import com.metamx.druid.indexing.coordinator.TaskRunner; import com.metamx.emitter.EmittingLogger; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index 820306a4df9..3e8aaed5512 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -36,6 +36,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.BaseServerNode; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.NoopServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; @@ -49,7 +50,6 @@ import com.metamx.druid.indexing.common.config.RetryPolicyConfig; import com.metamx.druid.indexing.common.config.TaskConfig; import com.metamx.druid.indexing.common.index.ChatHandlerProvider; import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; -import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; @@ -62,14 +62,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.S3DataSegmentKiller; -import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.initialization.ServerConfig; -import com.metamx.druid.initialization.ServerInit; -import com.metamx.druid.initialization.ServiceDiscoveryConfig; -import com.metamx.druid.jackson.DefaultObjectMapper; -import com.metamx.druid.loading.DataSegmentKiller; -import com.metamx.druid.loading.DataSegmentPusher; -import com.metamx.druid.loading.S3DataSegmentKiller; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -353,11 +345,14 @@ public class ExecutorNode extends BaseServerNode public void initializeServiceDiscovery() throws Exception { - final CuratorDiscoveryConfig config = configFactory.build(CuratorDiscoveryConfig.class); + final CuratorConfig config = configFactory.build(CuratorConfig.class); if (serviceDiscovery == null) { final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle); + CuratorDiscoveryConfig discoveryConfig = getJsonConfigurator() + .configurate(getProps(), "druid.discovery.curator", CuratorDiscoveryConfig.class); + this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - serviceDiscoveryCuratorFramework, config, lifecycle + serviceDiscoveryCuratorFramework, discoveryConfig, lifecycle ); } if (serviceAnnouncer == null) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index d490d631796..ec39b6e93a3 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -34,7 +34,7 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.QueryableNode; -import com.metamx.druid.curator.discovery.ServiceAnnouncer; +import com.metamx.druid.curator.CuratorConfig; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.indexing.common.config.IndexerZkConfig; @@ -103,7 +103,6 @@ public class WorkerNode extends QueryableNode private ServiceEmitter emitter = null; private WorkerConfig workerConfig = null; private ServiceDiscovery serviceDiscovery = null; - private ServiceAnnouncer serviceAnnouncer = null; private ServiceProvider coordinatorServiceProvider = null; private WorkerCuratorCoordinator workerCuratorCoordinator = null; private WorkerTaskMonitor workerTaskMonitor = null; @@ -330,15 +329,13 @@ public class WorkerNode extends QueryableNode public void initializeServiceDiscovery() throws Exception { if (serviceDiscovery == null) { - final CuratorDiscoveryConfig config = getConfigFactory().build(CuratorDiscoveryConfig.class); + final CuratorDiscoveryConfig config = getJsonConfigurator() + .configurate(getProps(), "druid.discovery.curator", CuratorDiscoveryConfig.class); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), + Initialization.makeCuratorFramework(getConfigFactory().build(CuratorConfig.class), getLifecycle()), config, getLifecycle() ); - this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - serviceDiscoveryCuratorFramework, config, getLifecycle() - ); } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( diff --git a/indexing-service/src/test/java/com/metamx/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/com/metamx/druid/indexing/common/task/TaskSerdeTest.java index d4d2e99d4f2..0c54c296973 100644 --- a/indexing-service/src/test/java/com/metamx/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/com/metamx/druid/indexing/common/task/TaskSerdeTest.java @@ -1,5 +1,6 @@ package com.metamx.druid.indexing.common.task; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; @@ -18,7 +19,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.realtime.Schema; import com.metamx.druid.shard.NoneShardSpec; import junit.framework.Assert; -import com.fasterxml.jackson.databind.ObjectMapper; import org.joda.time.Interval; import org.joda.time.Period; import org.junit.Test; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java b/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java index 7da92c6a8e7..b304191faf4 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/firehose/JacksonifiedConnectionFactory.java @@ -3,7 +3,6 @@ package com.metamx.druid.realtime.firehose; import com.fasterxml.jackson.annotation.JsonProperty; import com.rabbitmq.client.ConnectionFactory; -import java.net.URI; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; diff --git a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java index 08383bc6456..391fd305e21 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/plumber/RealtimePlumberSchool.java @@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Predicates; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; diff --git a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java index ebc5bc2c1fb..f9737571d66 100644 --- a/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java +++ b/server/src/main/java/com/metamx/druid/coordination/ZkCoordinator.java @@ -52,7 +52,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final ObjectMapper jsonMapper; private final SegmentLoaderConfig config; - private final ZkCoordinatorConfig config; private final ZkPathsConfig zkPaths; private final DruidServerMetadata me; private final DataSegmentAnnouncer announcer; @@ -110,9 +109,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient()); curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); +/* TODO if (config.isLoadFromSegmentCacheEnabled()) { loadCache(); } +*/ + loadCache(); loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() @@ -234,7 +236,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler try { serverManager.loadSegment(segment); - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.exists()) { try { jsonMapper.writeValue(segmentInfoCacheFile, segment); @@ -269,7 +271,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (DataSegment segment : segments) { serverManager.loadSegment(segment); - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.exists()) { try { jsonMapper.writeValue(segmentInfoCacheFile, segment); @@ -325,7 +327,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler for (DataSegment segment : segments) { serverManager.dropSegment(segment); - File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier()); + File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier()); if (!segmentInfoCacheFile.delete()) { log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile); } diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index 70733da0f66..06ef0dbccf3 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -20,7 +20,6 @@ package com.metamx.druid.db; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; diff --git a/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java b/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java index 412fa9a61ff..985770bf527 100644 --- a/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java +++ b/server/src/main/java/com/metamx/druid/guice/CoordinatorModule.java @@ -5,6 +5,7 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.druid.client.InventoryView; import com.metamx.druid.client.ServerInventoryView; import com.metamx.druid.client.ServerInventoryViewConfig; @@ -13,7 +14,6 @@ import com.metamx.druid.client.indexing.IndexingServiceClient; import com.metamx.druid.client.indexing.IndexingServiceSelector; import com.metamx.druid.client.selector.DiscoverySelector; import com.metamx.druid.client.selector.Server; -import com.metamx.druid.concurrent.Execs; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManagerConfig; import com.metamx.druid.db.DatabaseRuleManagerProvider; @@ -101,8 +101,10 @@ public class CoordinatorModule implements Module } @Provides @LazySingleton - public LoadQueueTaskMaster getLoadQueueTaskMaster(CuratorFramework curator, ObjectMapper jsonMapper) + public LoadQueueTaskMaster getLoadQueueTaskMaster( + CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config + ) { - return new LoadQueueTaskMaster(curator, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d")); + return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config); } } diff --git a/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java b/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java index dc31e07c073..71525bc8d8c 100644 --- a/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java +++ b/server/src/main/java/com/metamx/druid/guice/HistoricalModule.java @@ -15,9 +15,9 @@ import com.metamx.druid.Query; import com.metamx.druid.client.DruidServerConfig; import com.metamx.druid.collect.StupidPool; import com.metamx.druid.concurrent.Execs; -import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer; import com.metamx.druid.coordination.DataSegmentAnnouncer; import com.metamx.druid.coordination.DruidServerMetadata; +import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer; import com.metamx.druid.coordination.ServerManager; import com.metamx.druid.coordination.ZkCoordinator; import com.metamx.druid.curator.announcement.Announcer; @@ -120,7 +120,9 @@ public class HistoricalModule implements Module binder.bind(ZkCoordinator.class).in(ManageLifecycle.class); - binder.bind(DataSegmentAnnouncer.class).to(CuratorDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); + binder.bind(DataSegmentAnnouncer.class) + .to(MultipleDataSegmentAnnouncerDataSegmentAnnouncer.class) + .in(ManageLifecycleLast.class); } private void bindDeepStorageS3(Binder binder) diff --git a/server/src/main/java/com/metamx/druid/http/ComputeNode.java b/server/src/main/java/com/metamx/druid/http/ComputeNode.java index 2b2db2790e6..de91530d34d 100644 --- a/server/src/main/java/com/metamx/druid/http/ComputeNode.java +++ b/server/src/main/java/com/metamx/druid/http/ComputeNode.java @@ -47,8 +47,6 @@ import com.metamx.metrics.Monitor; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.jets3t.service.S3ServiceException; -import org.mortbay.jetty.servlet.Context; -import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; import java.util.List; diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index bf404147e93..5cad41a63ef 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -21,10 +21,6 @@ package com.metamx.druid.http; import com.google.inject.Injector; import com.google.inject.servlet.GuiceFilter; -import com.metamx.common.IAE; -import com.metamx.common.concurrent.ScheduledExecutorFactory; -import com.metamx.common.concurrent.ScheduledExecutors; -import com.metamx.common.config.Config; import com.metamx.common.lifecycle.Lifecycle; import com.metamx.common.logger.Logger; import com.metamx.druid.curator.CuratorModule; @@ -37,23 +33,6 @@ import com.metamx.druid.guice.LifecycleModule; import com.metamx.druid.guice.ServerModule; import com.metamx.druid.guice.annotations.Self; import com.metamx.druid.initialization.EmitterModule; -import com.metamx.druid.client.BatchServerInventoryView; -import com.metamx.druid.client.ServerInventoryView; -import com.metamx.druid.client.ServerInventoryViewConfig; -import com.metamx.druid.client.SingleServerInventoryView; -import com.metamx.druid.client.indexing.IndexingServiceClient; -import com.metamx.druid.concurrent.Execs; -import com.metamx.druid.config.ConfigManager; -import com.metamx.druid.config.ConfigManagerConfig; -import com.metamx.druid.config.JacksonConfigManager; -import com.metamx.druid.curator.discovery.ServiceAnnouncer; -import com.metamx.druid.db.DatabaseRuleManager; -import com.metamx.druid.db.DatabaseRuleManagerConfig; -import com.metamx.druid.db.DatabaseSegmentManager; -import com.metamx.druid.db.DatabaseSegmentManagerConfig; -import com.metamx.druid.db.DbConnector; -import com.metamx.druid.db.DbConnectorConfig; -import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.JettyServerInitializer; import com.metamx.druid.initialization.JettyServerModule; diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index 349194836d9..c71832d9f63 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -19,10 +19,9 @@ package com.metamx.druid.loading; -import org.joda.time.format.ISODateTimeFormat; - import com.google.common.base.Joiner; import com.metamx.druid.client.DataSegment; +import org.joda.time.format.ISODateTimeFormat; /** */ diff --git a/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java b/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java index 535649baea4..276bbc2028a 100644 --- a/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java +++ b/server/src/main/java/com/metamx/druid/loading/QueryableIndexFactory.java @@ -22,7 +22,6 @@ package com.metamx.druid.loading; import com.metamx.druid.index.QueryableIndex; import java.io.File; -import java.io.IOException; /** */ diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java index 75d212886da..8729ce9364c 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java @@ -40,7 +40,6 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.util.Map; -import java.util.Random; import java.util.concurrent.Callable; import java.util.zip.GZIPInputStream; diff --git a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java index d3518f7d53e..db20e8edbb3 100644 --- a/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java +++ b/server/src/main/java/com/metamx/druid/loading/SingleSegmentLoader.java @@ -34,8 +34,10 @@ import com.metamx.druid.index.Segment; import org.apache.commons.io.FileUtils; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Set; /** */ @@ -60,13 +62,14 @@ public class SingleSegmentLoader implements SegmentLoader final ImmutableList.Builder locBuilder = ImmutableList.builder(); + // TODO // This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone // We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that // But, that'll have to wait for some other day. - for (String dirSpec : config.getCacheDirectory().split(",")) { + for (String dirSpec : config.getLocations().split(",")) { String[] dirSplit = dirSpec.split("\\|"); if (dirSplit.length == 1) { - locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize())); + locBuilder.add(new StorageLocation(new File(dirSplit[0]), Integer.MAX_VALUE)); } else if (dirSplit.length == 2) { final Long maxSize = Longs.tryParse(dirSplit[1]); @@ -78,7 +81,7 @@ public class SingleSegmentLoader implements SegmentLoader else { throw new ISE( "Unknown segment storage location[%s]=>[%s], config[%s].", - dirSplit.length, dirSpec, config.getCacheDirectory() + dirSplit.length, dirSpec, config.getLocations() ); } } @@ -89,19 +92,21 @@ public class SingleSegmentLoader implements SegmentLoader } @Override - public boolean isSegmentLoaded(final DataSegment segment) - { - File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment)); - if (localStorageDir.exists()) { - return true; + public boolean isSegmentLoaded(final DataSegment segment) + { + return findStorageLocationIfLoaded(segment) != null; } - final File legacyStorageDir = new File( - config.getLocations(), - DataSegmentPusherUtil.getLegacyStorageDir(segment) - ); - return legacyStorageDir.exists(); - } + public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) + { + for (StorageLocation location : locations) { + File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment)); + if (localStorageDir.exists()) { + return location; + } + } + return null; + } @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException @@ -114,12 +119,8 @@ public class SingleSegmentLoader implements SegmentLoader public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException { - File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment)); StorageLocation loc = findStorageLocationIfLoaded(segment); - final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment); - if (legacyDir != null) { - File legacyStorageDir = new File(config.getLocations(), legacyDir); final File retVal; if (loc == null) { @@ -160,7 +161,6 @@ public class SingleSegmentLoader implements SegmentLoader { StorageLocation loc = findStorageLocationIfLoaded(segment); - return new File(config.getLocations(), outputKey); if (loc == null) { log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment); return; diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index a5d9d053cf6..b870708d73b 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; /** diff --git a/server/src/main/java/com/metamx/druid/master/rules/Rule.java b/server/src/main/java/com/metamx/druid/master/rules/Rule.java index 1c77a0ebc8f..82e710cfbfa 100644 --- a/server/src/main/java/com/metamx/druid/master/rules/Rule.java +++ b/server/src/main/java/com/metamx/druid/master/rules/Rule.java @@ -25,7 +25,6 @@ import com.metamx.druid.client.DataSegment; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMasterRuntimeParams; import com.metamx.druid.master.MasterStats; - import org.joda.time.DateTime; /** diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index 213e350f76b..a49dc85a582 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -36,7 +36,6 @@ import org.junit.Before; import org.junit.Test; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ScheduledExecutorService; /** */