diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java index 741bc59d3d9..be46aea41f6 100644 --- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java @@ -33,7 +33,7 @@ public abstract class CuratorConfig @Default("30000") public abstract int getZkSessionTimeoutMs(); - @Config("druid.curator.compression.enable") + @Config("druid.curator.compress") @Default("false") public abstract boolean enableCompression(); } diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 04776d6545a..7d0c20cd7ef 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig @Config("druid.zk.paths.discoveryPath") public abstract String getDiscoveryPath(); + + @Override + @Config("druid.curator.discovery.compress") + public boolean enableCompression() + { + return false; + } } diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 6d0ea322313..0fe72cf0e44 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.utils.JodaUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; @@ -656,22 +658,33 @@ public class HadoopDruidIndexerConfig return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); } - public Path makeSegmentOutputPath(Bucket bucket) - { - final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); - - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - getSegmentOutputDir(), - dataSource, - bucketInterval.getStart().toString(), - bucketInterval.getEnd().toString(), - getVersion(), - bucket.partitionNum - ) - ); - } + public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) + { + final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); + if (fileSystem instanceof DistributedFileSystem) + { + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir().replace(":", "_"), + dataSource.replace(":", "_"), + bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), + bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), + getVersion().replace(":", "_"), + bucket.partitionNum + )); + } + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir(), + dataSource, + bucketInterval.getStart().toString(), + bucketInterval.getEnd().toString(), + getVersion(), + bucket.partitionNum + )); + } public Job addInputPaths(Job job) throws IOException { diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 45cbded7f1a..583cafcae4f 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -375,12 +375,14 @@ public class IndexGeneratorJob implements Jobby Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); int attemptNumber = context.getTaskAttemptID().getId(); - Path indexBasePath = config.makeSegmentOutputPath(bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); - outputFS.mkdirs(indexBasePath); + FileSystem fileSystem = FileSystem.get(context.getConfiguration()); + Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); + Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + + outputFS.mkdirs(indexBasePath); Exception caughtException = null; ZipOutputStream out = null; diff --git a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index dcabe168e67..d0d2ee1d2c0 100644 --- a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Test; @@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest ); } + + @Test + public void shouldMakeHDFSCompliantSegmentOutputPath() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket); + Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString()); + + } + + @Test + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); + Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString()); + + } + private T jsonReadWriteRead(String s, Class klass) { try { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index d12600c54af..341a578d6de 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -38,7 +37,6 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.BaseServerNode; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; -import com.metamx.druid.curator.discovery.NoopServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; @@ -72,11 +70,10 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceProvider; import org.jets3t.service.S3ServiceException; @@ -88,7 +85,6 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; -import java.util.List; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -352,8 +348,9 @@ public class ExecutorNode extends BaseServerNode { final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); if (serviceDiscovery == null) { + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), config, lifecycle + serviceDiscoveryCuratorFramework, config, lifecycle ); } if (serviceAnnouncer == null) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 4bd9cb8cb51..5c38440a9a2 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode { if (serviceDiscovery == null) { final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); - this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( config, getLifecycle() ); + this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( + serviceDiscoveryCuratorFramework, config, getLifecycle() + ); } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 63521717e79..a1db1bf0888 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; @@ -124,10 +125,15 @@ public class MasterMain final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( serviceDiscoveryConfig, lifecycle ); + final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class); + CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + curatorConfig, + lifecycle + ); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); @@ -201,7 +207,7 @@ public class MasterMain final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, + serviceDiscoveryCuratorFramework, serviceDiscoveryConfig, lifecycle ); @@ -227,7 +233,10 @@ public class MasterMain ); final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( - curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d") + curatorFramework, + jsonMapper, + scheduledExecutorFactory.create(1, "Master-PeonExec--%d"), + druidMasterConfig ); final DruidMaster master = new DruidMaster( diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index e72bd787bb3..349194836d9 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -19,6 +19,8 @@ package com.metamx.druid.loading; +import org.joda.time.format.ISODateTimeFormat; + import com.google.common.base.Joiner; import com.metamx.druid.client.DataSegment; @@ -26,19 +28,37 @@ import com.metamx.druid.client.DataSegment; */ public class DataSegmentPusherUtil { - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private static final Joiner JOINER = Joiner.on("/").skipNulls(); - public static String getStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart(), - segment.getInterval().getEnd() - ), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - } + public static String getStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart(), + segment.getInterval().getEnd() + ), + segment.getVersion(), + segment.getShardSpec().getPartitionNum() + ); + } + + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + public static String getHdfsStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_"), + segment.getShardSpec().getPartitionNum() + ); + } } diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java index 52ac15129d4..e232bbb8f08 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java @@ -42,7 +42,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); FileSystem fs = outFile.getFileSystem(hadoopConfig); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 662569e8485..d514b4d5c4f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -89,4 +89,10 @@ public abstract class DruidMasterConfig @Config("druid.master.replicant.throttleLimit") @Default("10") public abstract int getReplicantThrottleLimit(); + + @Config("druid.master.load.timeout") + public Duration getLoadTimeoutDelay() + { + return new Duration(15 * 60 * 1000); + } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 382d03966b6..1b226933a52 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper "master/loadQueue/size", queuePeon.getLoadQueueSize() ) ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser1(serverName).build( + "master/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() + ) + ); emitter.emit( new ServiceMetricEvent.Builder() .setUser1(serverName).build( diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index c4ce6ac5a8c..2b5c96cb187 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import com.metamx.common.ISE; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentChangeRequest; @@ -43,7 +44,9 @@ import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -54,15 +57,6 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private final Object lock = new Object(); - - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ExecutorService zkWritingExecutor; - - private final AtomicLong queuedSize = new AtomicLong(0); - private static Comparator segmentHolderComparator = new Comparator() { private Comparator comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); @@ -74,6 +68,15 @@ public class LoadQueuePeon } }; + private final CuratorFramework curator; + private final String basePath; + private final ObjectMapper jsonMapper; + private final ScheduledExecutorService zkWritingExecutor; + private final DruidMasterConfig config; + + private final AtomicLong queuedSize = new AtomicLong(0); + private final AtomicInteger failedAssignCount = new AtomicInteger(0); + private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( segmentHolderComparator ); @@ -81,19 +84,23 @@ public class LoadQueuePeon segmentHolderComparator ); + private final Object lock = new Object(); + private volatile SegmentHolder currentlyLoading = null; LoadQueuePeon( CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ExecutorService zkWritingExecutor + ScheduledExecutorService zkWritingExecutor, + DruidMasterConfig config ) { this.curator = curator; this.basePath = basePath; this.jsonMapper = jsonMapper; this.zkWritingExecutor = zkWritingExecutor; + this.config = config; } public Set getSegmentsToLoad() @@ -135,6 +142,11 @@ public class LoadQueuePeon return queuedSize.get(); } + public int getAndResetFailedAssignCount() + { + return failedAssignCount.getAndSet(0); + } + public void loadSegment( DataSegment segment, LoadPeonCallback callback @@ -232,6 +244,26 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + zkWritingExecutor.schedule( + new Runnable() + { + @Override + public void run() + { + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this assign!", path)); + } + } + catch (Exception e) { + failAssign(e); + } + } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS + ); + final Stat stat = curator.checkExists().usingWatcher( new CuratorWatcher() { @@ -268,10 +300,7 @@ public class LoadQueuePeon } } catch (Exception e) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); - // Act like it was completed so that the master gives it to someone else - actionCompleted(); - doNext(); + failAssign(e); } } } @@ -327,6 +356,7 @@ public class LoadQueuePeon segmentsToLoad.clear(); queuedSize.set(0L); + failedAssignCount.set(0); } } @@ -351,6 +381,17 @@ public class LoadQueuePeon doNext(); } + private void failAssign(Exception e) + { + synchronized (lock) { + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); + failedAssignCount.getAndIncrement(); + // Act like it was completed so that the master gives it to someone else + actionCompleted(); + doNext(); + } + } + private class SegmentHolder { private final DataSegment segment; diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index 9ef5b61e5a0..2547127bc5a 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * Provides LoadQueuePeons @@ -31,21 +32,24 @@ public class LoadQueueTaskMaster { private final CuratorFramework curator; private final ObjectMapper jsonMapper; - private final ExecutorService peonExec; + private final ScheduledExecutorService peonExec; + private final DruidMasterConfig config; public LoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, - ExecutorService peonExec + ScheduledExecutorService peonExec, + DruidMasterConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; + this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config); } } diff --git a/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java new file mode 100644 index 00000000000..f367fceab47 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java @@ -0,0 +1,39 @@ +package com.metamx.druid.loading; + +import com.google.common.collect.ImmutableMap; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.shard.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +/** + * @author jan.rudert + */ +public class DataSegmentPusherUtilTest { + @Test + public void shouldNotHaveColonsInHdfsStorageDir() throws Exception { + + Interval interval = new Interval("2011-10-01/2011-10-02"); + ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + DataSegment segment = new DataSegment( + "something", + interval, + "brand:new:version", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + new NoneShardSpec(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); + Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir); + + } +} diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a49dc85a582..e47afd8d109 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -36,6 +36,7 @@ import org.junit.Before; import org.junit.Test; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; /** */ @@ -46,7 +47,7 @@ public class DruidMasterTest private LoadQueueTaskMaster taskMaster; private DatabaseSegmentManager databaseSegmentManager; private SingleServerInventoryView serverInventoryView; - private ScheduledExecutorFactory scheduledExecutorFactory; + private ScheduledExecutorService scheduledExecutorService; private DruidServer druidServer; private DataSegment segment; private ConcurrentMap loadManagementPeons; @@ -64,8 +65,8 @@ public class DruidMasterTest databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); EasyMock.replay(databaseSegmentManager); - scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); - EasyMock.replay(scheduledExecutorFactory); + scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class); + EasyMock.replay(scheduledExecutorService); master = new DruidMaster( new DruidMasterConfig() @@ -138,7 +139,7 @@ public class DruidMasterTest null, curator, new NoopServiceEmitter(), - scheduledExecutorFactory, + scheduledExecutorService, null, taskMaster, loadManagementPeons diff --git a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java index b0e000d20ad..3594c660c09 100644 --- a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java +++ b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java @@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null); + super(null, null, null, null, null, null); } @Override