Make it all compile again...

This commit is contained in:
cheddar 2013-08-02 10:14:46 -07:00
parent 9e78bb38f5
commit 2361e0112a
45 changed files with 94 additions and 225 deletions

2
.gitignore vendored
View File

@ -10,7 +10,5 @@ target
.idea
.project
.settings/
examples/rand/RealtimeNode.out
examples/twitter/RealtimeNode.out
*.log
*.DS_Store

View File

@ -46,15 +46,13 @@ import com.metamx.druid.coordination.AbstractDataSegmentAnnouncer;
import com.metamx.druid.coordination.BatchDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.SingleDataSegmentAnnouncer;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.guice.JsonConfigurator;
import com.metamx.druid.http.log.NoopRequestLogger;
import com.metamx.druid.http.log.RequestLogger;
import com.metamx.druid.http.NoopRequestLogger;
import com.metamx.druid.http.RequestLogger;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ZkDataSegmentAnnouncerConfig;
@ -82,6 +80,8 @@ import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
@ -376,18 +376,14 @@ public abstract class QueryableNode<T extends QueryableNode> extends Registering
if ("legacy".equalsIgnoreCase(announcerType)) {
serverInventoryView = new SingleServerInventoryView(
serverInventoryViewConfig,
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
} else if ("batch".equalsIgnoreCase(announcerType)) {
serverInventoryView = new BatchServerInventoryView(
serverInventoryViewConfig,
getZkPaths(),
getCuratorFramework(),
exec,
getJsonMapper()
);
} else {

View File

@ -23,55 +23,38 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.MapMaker;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
/**
*/
@ManageLifecycle
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
{
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
final ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
@Inject
public BatchServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getLiveSegmentsPath();
}
},
zkPaths,
curator,
exec,
jsonMapper,
new TypeReference<Set<DataSegment>>()
{
}
new TypeReference<Set<DataSegment>>(){}
);
}

View File

@ -32,13 +32,11 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.LazySequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.druid.Query;
import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
@ -49,7 +47,6 @@ import com.metamx.druid.client.selector.ServerSelector;
import com.metamx.druid.partition.PartitionChunk;
import com.metamx.druid.query.CacheStrategy;
import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.Queries;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.QueryToolChestWarehouse;

View File

@ -25,15 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.MapMaker;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.concurrent.Execs;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.inventory.CuratorInventoryManager;
import com.metamx.druid.curator.inventory.CuratorInventoryManagerStrategy;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
@ -49,8 +46,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class ServerInventoryView<InventoryType> implements ServerView, InventoryView
{
private final ServerInventoryViewConfig config;
private final Logger log;
private final EmittingLogger log;
private final CuratorInventoryManager<DruidServer, InventoryType> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);
@ -59,18 +56,14 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
private final Map<String, Integer> removedSegments = new MapMaker().makeMap();
@Inject
public ServerInventoryView(
final ServerInventoryViewConfig config,
final Logger log,
final InventoryManagerConfig inventoryManagerConfig,
final EmittingLogger log,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper,
final TypeReference<InventoryType> typeReference
)
{
this.config = config;
this.log = log;
this.inventoryManager = new CuratorInventoryManager<DruidServer, InventoryType>(
curator,
@ -89,7 +82,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
}
},
Execs.singleThreaded("ServerInventoryView-%s"),
new CuratorInventoryManagerStrategy<DruidServer, DataSegment>()
new CuratorInventoryManagerStrategy<DruidServer, InventoryType>()
{
@Override
public DruidServer deserializeContainer(byte[] bytes)
@ -299,6 +292,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
final DataSegment inventory
)
{
/* TODO
log.info("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier());
if (container.getSegment(inventory.getIdentifier()) != null) {
@ -323,10 +317,12 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
}
}
);
*/
}
protected void removeSingleInventory(final DruidServer container, String inventoryKey)
{
/* TODO
log.info("Server[%s] removed segment[%s]", container.getName(), inventoryKey);
final DataSegment segment = container.getSegment(inventoryKey);
@ -354,6 +350,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
);
removedSegments.put(inventoryKey, config.getRemovedSegmentLifetime());
*/
}
protected abstract DruidServer addInnerInventory(

View File

@ -21,50 +21,32 @@ package com.metamx.druid.client;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.curator.inventory.InventoryManagerConfig;
import com.google.inject.Inject;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.druid.initialization.ZkPathsConfig;
import com.metamx.emitter.EmittingLogger;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
/**
*/
@ManageLifecycle
public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
{
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
@Inject
public SingleServerInventoryView(
final ServerInventoryViewConfig config,
final ZkPathsConfig zkPaths,
final CuratorFramework curator,
final ExecutorService exec,
final ObjectMapper jsonMapper
)
{
super(
config,
log,
new InventoryManagerConfig()
{
@Override
public String getContainerPath()
{
return zkPaths.getAnnouncementsPath();
}
@Override
public String getInventoryPath()
{
return zkPaths.getServedSegmentsPath();
}
},
zkPaths,
curator,
exec,
jsonMapper,
new TypeReference<DataSegment>()
{
}
new TypeReference<DataSegment>(){}
);
}

View File

@ -32,7 +32,6 @@ import net.spy.memcached.FailureMode;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.MemcachedClientIF;
import net.spy.memcached.internal.BulkFuture;
import net.spy.memcached.transcoders.SerializingTranscoder;
import org.apache.commons.codec.digest.DigestUtils;
import javax.annotation.Nullable;

View File

@ -19,12 +19,9 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
import com.metamx.druid.initialization.ZkPathsConfig;
import java.io.IOException;

View File

@ -20,10 +20,7 @@
package com.metamx.druid.coordination;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.curator.announcement.Announcer;
@ -43,7 +40,6 @@ public class SingleDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer
private volatile boolean started = false;
@Inject
public CuratorDataSegmentAnnouncer(
public SingleDataSegmentAnnouncer(
DruidServerMetadata server,
ZkPathsConfig config,

View File

@ -21,7 +21,6 @@ package com.metamx.druid.initialization;
import org.apache.curator.utils.ZKPaths;
import org.skife.config.Config;
import org.skife.config.Default;
public abstract class ZkPathsConfig
{

View File

@ -19,22 +19,11 @@
package com.metamx.druid.query;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.guice.ManageLifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class MetricsEmittingExecutorService extends DelegatingExecutorService
{

View File

@ -27,7 +27,6 @@ import com.metamx.common.lifecycle.Lifecycle;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RunnableFuture;

View File

@ -40,13 +40,11 @@ import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.ResultMergeQueryRunner;
import com.metamx.druid.utils.JodaUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;

View File

@ -45,7 +45,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
@ -130,20 +129,6 @@ public class BatchServerInventoryViewTest
}
batchServerInventoryView = new BatchServerInventoryView(
new ServerInventoryViewConfig()
{
@Override
public int getRemovedSegmentLifetime()
{
return 0;
}
@Override
public String getAnnouncerType()
{
return "batch";
}
},
new ZkPathsConfig()
{
@Override
@ -153,7 +138,6 @@ public class BatchServerInventoryViewTest
}
},
cf,
Executors.newSingleThreadExecutor(),
jsonMapper
);

View File

@ -3,10 +3,21 @@ package druid.examples.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.cli.*;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
/**
*

View File

@ -11,13 +11,13 @@ import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import twitter4j.ConnectionLifeCycleListener;
import twitter4j.HashtagEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.User;
import twitter4j.StallWarning;
import java.io.IOException;
import java.util.Arrays;

View File

@ -23,7 +23,6 @@ import com.metamx.druid.index.column.ColumnSelector;
import com.metamx.druid.kv.Indexed;
import org.joda.time.Interval;
import java.io.Closeable;
import java.io.IOException;
/**

View File

@ -30,9 +30,7 @@ import com.google.common.collect.Sets;
import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import com.metamx.druid.input.InputRow;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

View File

@ -27,9 +27,11 @@ import com.metamx.druid.indexer.granularity.UniformGranularitySpec;
import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Test;

View File

@ -35,10 +35,6 @@ import com.metamx.druid.loading.S3DataSegmentPuller;
import com.metamx.druid.loading.SegmentLoaderConfig;
import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.loading.SingleSegmentLoader;
import com.metamx.druid.indexing.common.actions.TaskActionClient;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;
@ -154,7 +150,7 @@ public class TaskToolbox
new SegmentLoaderConfig()
{
@Override
public File getSegmentLocations()
public String getLocations()
{
return new File(getTaskWorkDir(), "fetched_segments").toString();
}

View File

@ -22,11 +22,11 @@ package com.metamx.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.metamx.druid.client.ServerView;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.indexing.common.actions.TaskActionClientFactory;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.MonitorScheduler;

View File

@ -1,6 +1,7 @@
package com.metamx.druid.indexing.common.actions;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Throwables;
import com.metamx.common.ISE;
@ -10,7 +11,6 @@ import com.metamx.druid.indexing.common.RetryPolicyFactory;
import com.metamx.druid.indexing.common.task.Task;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.ToStringResponseHandler;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.joda.time.Duration;

View File

@ -46,13 +46,11 @@ import com.metamx.druid.query.QueryRunnerFactoryConglomerate;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.realtime.FireDepartment;
import com.metamx.druid.realtime.FireDepartmentConfig;
import com.metamx.druid.realtime.FireDepartmentMetrics;
import com.metamx.druid.realtime.RealtimeMetricsMonitor;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.realtime.SegmentPublisher;
import com.metamx.druid.realtime.firehose.Firehose;
import com.metamx.druid.realtime.firehose.FirehoseFactory;
import com.metamx.druid.realtime.plumber.NoopRejectionPolicyFactory;
import com.metamx.druid.realtime.plumber.Plumber;
import com.metamx.druid.realtime.plumber.RealtimePlumberSchool;
import com.metamx.druid.realtime.plumber.RejectionPolicyFactory;

View File

@ -26,12 +26,10 @@ import com.google.common.base.Optional;
import com.google.common.base.Splitter;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Closer;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
@ -49,7 +47,6 @@ import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import com.metamx.emitter.EmittingLogger;
import org.apache.commons.io.FileUtils;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@ -63,7 +60,6 @@ import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Runs tasks in separate processes using {@link ExecutorMain}.

View File

@ -1,14 +1,12 @@
package com.metamx.druid.indexing.coordinator.config;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.druid.indexing.worker.executor.ExecutorMain;
import org.skife.config.Config;
import org.skife.config.Default;
import java.io.File;
import java.util.List;
import java.util.Set;
public abstract class ForkingTaskRunnerConfig
{

View File

@ -102,10 +102,6 @@ import com.metamx.druid.initialization.DruidNode;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;

View File

@ -1,13 +1,11 @@
package com.metamx.druid.indexing.worker.executor;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.indexing.common.TaskStatus;
@ -15,12 +13,9 @@ import com.metamx.druid.indexing.common.task.Task;
import com.metamx.druid.indexing.coordinator.TaskRunner;
import com.metamx.emitter.EmittingLogger;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

View File

@ -36,6 +36,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
@ -49,7 +50,6 @@ import com.metamx.druid.indexing.common.config.RetryPolicyConfig;
import com.metamx.druid.indexing.common.config.TaskConfig;
import com.metamx.druid.indexing.common.index.ChatHandlerProvider;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory;
import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory;
import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner;
import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig;
@ -62,14 +62,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServerInit;
import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.DataSegmentKiller;
import com.metamx.druid.loading.DataSegmentPusher;
import com.metamx.druid.loading.S3DataSegmentKiller;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -353,11 +345,14 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
public void initializeServiceDiscovery() throws Exception
{
final CuratorDiscoveryConfig config = configFactory.build(CuratorDiscoveryConfig.class);
final CuratorConfig config = configFactory.build(CuratorConfig.class);
if (serviceDiscovery == null) {
final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle);
CuratorDiscoveryConfig discoveryConfig = getJsonConfigurator()
.configurate(getProps(), "druid.discovery.curator", CuratorDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, config, lifecycle
serviceDiscoveryCuratorFramework, discoveryConfig, lifecycle
);
}
if (serviceAnnouncer == null) {

View File

@ -34,7 +34,7 @@ import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.QueryableNode;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.CuratorConfig;
import com.metamx.druid.http.GuiceServletConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.indexing.common.config.IndexerZkConfig;
@ -103,7 +103,6 @@ public class WorkerNode extends QueryableNode<WorkerNode>
private ServiceEmitter emitter = null;
private WorkerConfig workerConfig = null;
private ServiceDiscovery serviceDiscovery = null;
private ServiceAnnouncer serviceAnnouncer = null;
private ServiceProvider coordinatorServiceProvider = null;
private WorkerCuratorCoordinator workerCuratorCoordinator = null;
private WorkerTaskMonitor workerTaskMonitor = null;
@ -330,15 +329,13 @@ public class WorkerNode extends QueryableNode<WorkerNode>
public void initializeServiceDiscovery() throws Exception
{
if (serviceDiscovery == null) {
final CuratorDiscoveryConfig config = getConfigFactory().build(CuratorDiscoveryConfig.class);
final CuratorDiscoveryConfig config = getJsonConfigurator()
.configurate(getProps(), "druid.discovery.curator", CuratorDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(),
Initialization.makeCuratorFramework(getConfigFactory().build(CuratorConfig.class), getLifecycle()),
config,
getLifecycle()
);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, config, getLifecycle()
);
}
if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider(

View File

@ -1,5 +1,6 @@
package com.metamx.druid.indexing.common.task;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.common.Granularity;
@ -18,7 +19,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.realtime.Schema;
import com.metamx.druid.shard.NoneShardSpec;
import junit.framework.Assert;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Test;

View File

@ -3,7 +3,6 @@ package com.metamx.druid.realtime.firehose;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.rabbitmq.client.ConnectionFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;

View File

@ -25,7 +25,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

View File

@ -52,7 +52,6 @@ public class ZkCoordinator implements DataSegmentChangeHandler
private final ObjectMapper jsonMapper;
private final SegmentLoaderConfig config;
private final ZkCoordinatorConfig config;
private final ZkPathsConfig zkPaths;
private final DruidServerMetadata me;
private final DataSegmentAnnouncer announcer;
@ -110,9 +109,12 @@ public class ZkCoordinator implements DataSegmentChangeHandler
curator.newNamespaceAwareEnsurePath(servedSegmentsLocation).ensure(curator.getZookeeperClient());
curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient());
/* TODO
if (config.isLoadFromSegmentCacheEnabled()) {
loadCache();
}
*/
loadCache();
loadQueueCache.getListenable().addListener(
new PathChildrenCacheListener()
@ -234,7 +236,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
try {
serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
@ -269,7 +271,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
for (DataSegment segment : segments) {
serverManager.loadSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.exists()) {
try {
jsonMapper.writeValue(segmentInfoCacheFile, segment);
@ -325,7 +327,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler
for (DataSegment segment : segments) {
serverManager.dropSegment(segment);
File segmentInfoCacheFile = new File(config.getSegmentInfoCacheDirectory(), segment.getIdentifier());
File segmentInfoCacheFile = new File(config.getInfoDir(), segment.getIdentifier());
if (!segmentInfoCacheFile.delete()) {
log.warn("Unable to delete segmentInfoCacheFile[%s]", segmentInfoCacheFile);
}

View File

@ -20,7 +20,6 @@
package com.metamx.druid.db;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

View File

@ -5,6 +5,7 @@ import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.druid.client.InventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
@ -13,7 +14,6 @@ import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.client.indexing.IndexingServiceSelector;
import com.metamx.druid.client.selector.DiscoverySelector;
import com.metamx.druid.client.selector.Server;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseRuleManagerProvider;
@ -101,8 +101,10 @@ public class CoordinatorModule implements Module
}
@Provides @LazySingleton
public LoadQueueTaskMaster getLoadQueueTaskMaster(CuratorFramework curator, ObjectMapper jsonMapper)
public LoadQueueTaskMaster getLoadQueueTaskMaster(
CuratorFramework curator, ObjectMapper jsonMapper, ScheduledExecutorFactory factory, DruidMasterConfig config
)
{
return new LoadQueueTaskMaster(curator, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d"));
return new LoadQueueTaskMaster(curator, jsonMapper, factory.create(1, "Master-PeonExec--%d"), config);
}
}

View File

@ -15,9 +15,9 @@ import com.metamx.druid.Query;
import com.metamx.druid.client.DruidServerConfig;
import com.metamx.druid.collect.StupidPool;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.coordination.CuratorDataSegmentAnnouncer;
import com.metamx.druid.coordination.DataSegmentAnnouncer;
import com.metamx.druid.coordination.DruidServerMetadata;
import com.metamx.druid.coordination.MultipleDataSegmentAnnouncerDataSegmentAnnouncer;
import com.metamx.druid.coordination.ServerManager;
import com.metamx.druid.coordination.ZkCoordinator;
import com.metamx.druid.curator.announcement.Announcer;
@ -120,7 +120,9 @@ public class HistoricalModule implements Module
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(DataSegmentAnnouncer.class).to(CuratorDataSegmentAnnouncer.class).in(ManageLifecycleLast.class);
binder.bind(DataSegmentAnnouncer.class)
.to(MultipleDataSegmentAnnouncerDataSegmentAnnouncer.class)
.in(ManageLifecycleLast.class);
}
private void bindDeepStorageS3(Binder binder)

View File

@ -47,8 +47,6 @@ import com.metamx.metrics.Monitor;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.jets3t.service.S3ServiceException;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import java.util.List;

View File

@ -21,10 +21,6 @@ package com.metamx.druid.http;
import com.google.inject.Injector;
import com.google.inject.servlet.GuiceFilter;
import com.metamx.common.IAE;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.config.Config;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.common.logger.Logger;
import com.metamx.druid.curator.CuratorModule;
@ -37,23 +33,6 @@ import com.metamx.druid.guice.LifecycleModule;
import com.metamx.druid.guice.ServerModule;
import com.metamx.druid.guice.annotations.Self;
import com.metamx.druid.initialization.EmitterModule;
import com.metamx.druid.client.BatchServerInventoryView;
import com.metamx.druid.client.ServerInventoryView;
import com.metamx.druid.client.ServerInventoryViewConfig;
import com.metamx.druid.client.SingleServerInventoryView;
import com.metamx.druid.client.indexing.IndexingServiceClient;
import com.metamx.druid.concurrent.Execs;
import com.metamx.druid.config.ConfigManager;
import com.metamx.druid.config.ConfigManagerConfig;
import com.metamx.druid.config.JacksonConfigManager;
import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseRuleManagerConfig;
import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.JettyServerInitializer;
import com.metamx.druid.initialization.JettyServerModule;

View File

@ -19,10 +19,9 @@
package com.metamx.druid.loading;
import org.joda.time.format.ISODateTimeFormat;
import com.google.common.base.Joiner;
import com.metamx.druid.client.DataSegment;
import org.joda.time.format.ISODateTimeFormat;
/**
*/

View File

@ -22,7 +22,6 @@ package com.metamx.druid.loading;
import com.metamx.druid.index.QueryableIndex;
import java.io.File;
import java.io.IOException;
/**
*/

View File

@ -40,7 +40,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;

View File

@ -34,8 +34,10 @@ import com.metamx.druid.index.Segment;
import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
*/
@ -60,13 +62,14 @@ public class SingleSegmentLoader implements SegmentLoader
final ImmutableList.Builder<StorageLocation> locBuilder = ImmutableList.builder();
// TODO
// This is a really, really stupid way of getting this information. Splitting on commas and bars is error-prone
// We should instead switch it up to be a JSON Array of JSON Object or something and cool stuff like that
// But, that'll have to wait for some other day.
for (String dirSpec : config.getCacheDirectory().split(",")) {
for (String dirSpec : config.getLocations().split(",")) {
String[] dirSplit = dirSpec.split("\\|");
if (dirSplit.length == 1) {
locBuilder.add(new StorageLocation(new File(dirSplit[0]), config.getServerMaxSize()));
locBuilder.add(new StorageLocation(new File(dirSplit[0]), Integer.MAX_VALUE));
}
else if (dirSplit.length == 2) {
final Long maxSize = Longs.tryParse(dirSplit[1]);
@ -78,7 +81,7 @@ public class SingleSegmentLoader implements SegmentLoader
else {
throw new ISE(
"Unknown segment storage location[%s]=>[%s], config[%s].",
dirSplit.length, dirSpec, config.getCacheDirectory()
dirSplit.length, dirSpec, config.getLocations()
);
}
}
@ -91,16 +94,18 @@ public class SingleSegmentLoader implements SegmentLoader
@Override
public boolean isSegmentLoaded(final DataSegment segment)
{
File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return true;
return findStorageLocationIfLoaded(segment) != null;
}
final File legacyStorageDir = new File(
config.getLocations(),
DataSegmentPusherUtil.getLegacyStorageDir(segment)
);
return legacyStorageDir.exists();
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : locations) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
}
return null;
}
@Override
@ -114,12 +119,8 @@ public class SingleSegmentLoader implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
File localStorageDir = new File(config.getLocations(), DataSegmentPusherUtil.getStorageDir(segment));
StorageLocation loc = findStorageLocationIfLoaded(segment);
final String legacyDir = DataSegmentPusherUtil.getLegacyStorageDir(segment);
if (legacyDir != null) {
File legacyStorageDir = new File(config.getLocations(), legacyDir);
final File retVal;
if (loc == null) {
@ -160,7 +161,6 @@ public class SingleSegmentLoader implements SegmentLoader
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
return new File(config.getLocations(), outputKey);
if (loc == null) {
log.info("Asked to cleanup something[%s] that didn't exist. Skipping.", segment);
return;

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/**

View File

@ -25,7 +25,6 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.DruidMasterRuntimeParams;
import com.metamx.druid.master.MasterStats;
import org.joda.time.DateTime;
/**

View File

@ -36,7 +36,6 @@ import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/**
*/