Merge branch 'master' of github.com:metamx/druid into worker-resource

Conflicts:
	indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java
This commit is contained in:
fjy 2013-07-31 15:23:13 -07:00
commit e2b5cd6067
17 changed files with 282 additions and 71 deletions

View File

@ -33,7 +33,7 @@ public abstract class CuratorConfig
@Default("30000") @Default("30000")
public abstract int getZkSessionTimeoutMs(); public abstract int getZkSessionTimeoutMs();
@Config("druid.curator.compression.enable") @Config("druid.curator.compress")
@Default("false") @Default("false")
public abstract boolean enableCompression(); public abstract boolean enableCompression();
} }

View File

@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig
@Config("druid.zk.paths.discoveryPath") @Config("druid.zk.paths.discoveryPath")
public abstract String getDiscoveryPath(); public abstract String getDiscoveryPath();
@Override
@Config("druid.curator.discovery.compress")
public boolean enableCompression()
{
return false;
}
} }

View File

@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.shard.ShardSpec;
import com.metamx.druid.utils.JodaUtils; import com.metamx.druid.utils.JodaUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -656,22 +658,33 @@ public class HadoopDruidIndexerConfig
return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", "")));
} }
public Path makeSegmentOutputPath(Bucket bucket) public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket)
{ {
final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get();
if (fileSystem instanceof DistributedFileSystem)
return new Path( {
String.format( return new Path(
"%s/%s/%s_%s/%s/%s", String.format(
getSegmentOutputDir(), "%s/%s/%s_%s/%s/%s",
dataSource, getSegmentOutputDir().replace(":", "_"),
bucketInterval.getStart().toString(), dataSource.replace(":", "_"),
bucketInterval.getEnd().toString(), bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()),
getVersion(), bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()),
bucket.partitionNum getVersion().replace(":", "_"),
) bucket.partitionNum
); ));
} }
return new Path(
String.format(
"%s/%s/%s_%s/%s/%s",
getSegmentOutputDir(),
dataSource,
bucketInterval.getStart().toString(),
bucketInterval.getEnd().toString(),
getVersion(),
bucket.partitionNum
));
}
public Job addInputPaths(Job job) throws IOException public Job addInputPaths(Job job) throws IOException
{ {

View File

@ -375,12 +375,14 @@ public class IndexGeneratorJob implements Jobby
Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
int attemptNumber = context.getTaskAttemptID().getId(); int attemptNumber = context.getTaskAttemptID().getId();
Path indexBasePath = config.makeSegmentOutputPath(bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath); FileSystem fileSystem = FileSystem.get(context.getConfiguration());
Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket);
Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber));
final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration());
final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration());
outputFS.mkdirs(indexBasePath);
Exception caughtException = null; Exception caughtException = null;
ZipOutputStream out = null; ZipOutputStream out = null;

View File

@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec;
import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest
); );
} }
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket);
Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString());
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() {
final HadoopDruidIndexerConfig cfg;
try {
cfg = jsonReadWriteRead(
"{"
+ "\"dataSource\": \"the:data:source\","
+ " \"granularitySpec\":{"
+ " \"type\":\"uniform\","
+ " \"gran\":\"hour\","
+ " \"intervals\":[\"2012-07-10/P1D\"]"
+ " },"
+ "\"segmentOutputPath\": \"/tmp/dru:id/data:test\""
+ "}",
HadoopDruidIndexerConfig.class
);
} catch(Exception e) {
throw Throwables.propagate(e);
}
cfg.setVersion("some:brand:new:version");
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket);
Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString());
}
private <T> T jsonReadWriteRead(String s, Class<T> klass) private <T> T jsonReadWriteRead(String s, Class<T> klass)
{ {
try { try {

View File

@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
@ -38,7 +37,6 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.druid.BaseServerNode; import com.metamx.druid.BaseServerNode;
import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer; import com.metamx.druid.curator.discovery.CuratorServiceAnnouncer;
import com.metamx.druid.curator.discovery.NoopServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceAnnouncer; import com.metamx.druid.curator.discovery.ServiceAnnouncer;
import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.curator.discovery.ServiceInstanceFactory;
import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.GuiceServletConfig;
@ -72,11 +70,10 @@ import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit; import com.metamx.http.client.HttpClientInit;
import com.metamx.metrics.JvmMonitor;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import com.metamx.metrics.MonitorSchedulerConfig; import com.metamx.metrics.MonitorSchedulerConfig;
import com.metamx.metrics.SysMonitor; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceProvider; import org.apache.curator.x.discovery.ServiceProvider;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.S3ServiceException;
@ -88,7 +85,6 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import java.util.List;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -352,8 +348,9 @@ public class ExecutorNode extends BaseServerNode<ExecutorNode>
{ {
final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class);
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
getCuratorFramework(), config, lifecycle serviceDiscoveryCuratorFramework, config, lifecycle
); );
} }
if (serviceAnnouncer == null) { if (serviceAnnouncer == null) {

View File

@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode<WorkerNode>
{ {
if (serviceDiscovery == null) { if (serviceDiscovery == null) {
final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class);
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(
getCuratorFramework(),
config, config,
getLifecycle() getLifecycle()
); );
this.serviceDiscovery = Initialization.makeServiceDiscoveryClient(
serviceDiscoveryCuratorFramework, config, getLifecycle()
);
} }
if (coordinatorServiceProvider == null) { if (coordinatorServiceProvider == null) {
this.coordinatorServiceProvider = Initialization.makeServiceProvider( this.coordinatorServiceProvider = Initialization.makeServiceProvider(

View File

@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DatabaseSegmentManagerConfig;
import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
@ -124,10 +125,15 @@ public class MasterMain
final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle);
final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework( CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(
serviceDiscoveryConfig, serviceDiscoveryConfig,
lifecycle lifecycle
); );
final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class);
CuratorFramework curatorFramework = Initialization.makeCuratorFramework(
curatorConfig,
lifecycle
);
final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class);
@ -201,7 +207,7 @@ public class MasterMain
final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class);
final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient(
curatorFramework, serviceDiscoveryCuratorFramework,
serviceDiscoveryConfig, serviceDiscoveryConfig,
lifecycle lifecycle
); );
@ -227,7 +233,10 @@ public class MasterMain
); );
final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster(
curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d") curatorFramework,
jsonMapper,
scheduledExecutorFactory.create(1, "Master-PeonExec--%d"),
druidMasterConfig
); );
final DruidMaster master = new DruidMaster( final DruidMaster master = new DruidMaster(

View File

@ -19,6 +19,8 @@
package com.metamx.druid.loading; package com.metamx.druid.loading;
import org.joda.time.format.ISODateTimeFormat;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
@ -26,19 +28,37 @@ import com.metamx.druid.client.DataSegment;
*/ */
public class DataSegmentPusherUtil public class DataSegmentPusherUtil
{ {
private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final Joiner JOINER = Joiner.on("/").skipNulls();
public static String getStorageDir(DataSegment segment) public static String getStorageDir(DataSegment segment)
{ {
return JOINER.join( return JOINER.join(
segment.getDataSource(), segment.getDataSource(),
String.format( String.format(
"%s_%s", "%s_%s",
segment.getInterval().getStart(), segment.getInterval().getStart(),
segment.getInterval().getEnd() segment.getInterval().getEnd()
), ),
segment.getVersion(), segment.getVersion(),
segment.getShardSpec().getPartitionNum() segment.getShardSpec().getPartitionNum()
); );
} }
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
public static String getHdfsStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_"),
segment.getShardSpec().getPartitionNum()
);
}
} }

View File

@ -42,7 +42,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override @Override
public DataSegment push(File inDir, DataSegment segment) throws IOException public DataSegment push(File inDir, DataSegment segment) throws IOException
{ {
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir));
FileSystem fs = outFile.getFileSystem(hadoopConfig); FileSystem fs = outFile.getFileSystem(hadoopConfig);

View File

@ -89,4 +89,10 @@ public abstract class DruidMasterConfig
@Config("druid.master.replicant.throttleLimit") @Config("druid.master.replicant.throttleLimit")
@Default("10") @Default("10")
public abstract int getReplicantThrottleLimit(); public abstract int getReplicantThrottleLimit();
@Config("druid.master.load.timeout")
public Duration getLoadTimeoutDelay()
{
return new Duration(15 * 60 * 1000);
}
} }

View File

@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper
"master/loadQueue/size", queuePeon.getLoadQueueSize() "master/loadQueue/size", queuePeon.getLoadQueueSize()
) )
); );
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(serverName).build(
"master/loadQueue/failed", queuePeon.getAndResetFailedAssignCount()
)
);
emitter.emit( emitter.emit(
new ServiceMetricEvent.Builder() new ServiceMetricEvent.Builder()
.setUser1(serverName).build( .setUser1(serverName).build(

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.collect.Collections2; import com.google.common.collect.Collections2;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.metamx.common.ISE;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.coordination.DataSegmentChangeRequest; import com.metamx.druid.coordination.DataSegmentChangeRequest;
@ -43,7 +44,9 @@ import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
@ -54,15 +57,6 @@ public class LoadQueuePeon
private static final int DROP = 0; private static final int DROP = 0;
private static final int LOAD = 1; private static final int LOAD = 1;
private final Object lock = new Object();
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ExecutorService zkWritingExecutor;
private final AtomicLong queuedSize = new AtomicLong(0);
private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>() private static Comparator<SegmentHolder> segmentHolderComparator = new Comparator<SegmentHolder>()
{ {
private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); private Comparator<DataSegment> comparator = Comparators.inverse(DataSegment.bucketMonthComparator());
@ -74,6 +68,15 @@ public class LoadQueuePeon
} }
}; };
private final CuratorFramework curator;
private final String basePath;
private final ObjectMapper jsonMapper;
private final ScheduledExecutorService zkWritingExecutor;
private final DruidMasterConfig config;
private final AtomicLong queuedSize = new AtomicLong(0);
private final AtomicInteger failedAssignCount = new AtomicInteger(0);
private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>( private final ConcurrentSkipListSet<SegmentHolder> segmentsToLoad = new ConcurrentSkipListSet<SegmentHolder>(
segmentHolderComparator segmentHolderComparator
); );
@ -81,19 +84,23 @@ public class LoadQueuePeon
segmentHolderComparator segmentHolderComparator
); );
private final Object lock = new Object();
private volatile SegmentHolder currentlyLoading = null; private volatile SegmentHolder currentlyLoading = null;
LoadQueuePeon( LoadQueuePeon(
CuratorFramework curator, CuratorFramework curator,
String basePath, String basePath,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ExecutorService zkWritingExecutor ScheduledExecutorService zkWritingExecutor,
DruidMasterConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.basePath = basePath; this.basePath = basePath;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.zkWritingExecutor = zkWritingExecutor; this.zkWritingExecutor = zkWritingExecutor;
this.config = config;
} }
public Set<DataSegment> getSegmentsToLoad() public Set<DataSegment> getSegmentsToLoad()
@ -135,6 +142,11 @@ public class LoadQueuePeon
return queuedSize.get(); return queuedSize.get();
} }
public int getAndResetFailedAssignCount()
{
return failedAssignCount.getAndSet(0);
}
public void loadSegment( public void loadSegment(
DataSegment segment, DataSegment segment,
LoadPeonCallback callback LoadPeonCallback callback
@ -232,6 +244,26 @@ public class LoadQueuePeon
final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest());
curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload);
zkWritingExecutor.schedule(
new Runnable()
{
@Override
public void run()
{
try {
if (curator.checkExists().forPath(path) != null) {
failAssign(new ISE("%s was never removed! Failing this assign!", path));
}
}
catch (Exception e) {
failAssign(e);
}
}
},
config.getLoadTimeoutDelay().getMillis(),
TimeUnit.MILLISECONDS
);
final Stat stat = curator.checkExists().usingWatcher( final Stat stat = curator.checkExists().usingWatcher(
new CuratorWatcher() new CuratorWatcher()
{ {
@ -268,10 +300,7 @@ public class LoadQueuePeon
} }
} }
catch (Exception e) { catch (Exception e) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); failAssign(e);
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
} }
} }
} }
@ -327,6 +356,7 @@ public class LoadQueuePeon
segmentsToLoad.clear(); segmentsToLoad.clear();
queuedSize.set(0L); queuedSize.set(0L);
failedAssignCount.set(0);
} }
} }
@ -351,6 +381,17 @@ public class LoadQueuePeon
doNext(); doNext();
} }
private void failAssign(Exception e)
{
synchronized (lock) {
log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading);
failedAssignCount.getAndIncrement();
// Act like it was completed so that the master gives it to someone else
actionCompleted();
doNext();
}
}
private class SegmentHolder private class SegmentHolder
{ {
private final DataSegment segment; private final DataSegment segment;

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
/** /**
* Provides LoadQueuePeons * Provides LoadQueuePeons
@ -31,21 +32,24 @@ public class LoadQueueTaskMaster
{ {
private final CuratorFramework curator; private final CuratorFramework curator;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ExecutorService peonExec; private final ScheduledExecutorService peonExec;
private final DruidMasterConfig config;
public LoadQueueTaskMaster( public LoadQueueTaskMaster(
CuratorFramework curator, CuratorFramework curator,
ObjectMapper jsonMapper, ObjectMapper jsonMapper,
ExecutorService peonExec ScheduledExecutorService peonExec,
DruidMasterConfig config
) )
{ {
this.curator = curator; this.curator = curator;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.peonExec = peonExec; this.peonExec = peonExec;
this.config = config;
} }
public LoadQueuePeon giveMePeon(String basePath) public LoadQueuePeon giveMePeon(String basePath)
{ {
return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec); return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config);
} }
} }

View File

@ -0,0 +1,39 @@
package com.metamx.druid.loading;
import com.google.common.collect.ImmutableMap;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.shard.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
/**
* @author jan.rudert
*/
public class DataSegmentPusherUtilTest {
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception {
Interval interval = new Interval("2011-10-01/2011-10-02");
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");
DataSegment segment = new DataSegment(
"something",
interval,
"brand:new:version",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
new NoneShardSpec(),
IndexIO.CURRENT_VERSION_ID,
1
);
String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir);
}
}

View File

@ -36,6 +36,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
/** /**
*/ */
@ -46,7 +47,7 @@ public class DruidMasterTest
private LoadQueueTaskMaster taskMaster; private LoadQueueTaskMaster taskMaster;
private DatabaseSegmentManager databaseSegmentManager; private DatabaseSegmentManager databaseSegmentManager;
private SingleServerInventoryView serverInventoryView; private SingleServerInventoryView serverInventoryView;
private ScheduledExecutorFactory scheduledExecutorFactory; private ScheduledExecutorService scheduledExecutorService;
private DruidServer druidServer; private DruidServer druidServer;
private DataSegment segment; private DataSegment segment;
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons; private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
@ -64,8 +65,8 @@ public class DruidMasterTest
databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class);
EasyMock.replay(databaseSegmentManager); EasyMock.replay(databaseSegmentManager);
scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class);
EasyMock.replay(scheduledExecutorFactory); EasyMock.replay(scheduledExecutorService);
master = new DruidMaster( master = new DruidMaster(
new DruidMasterConfig() new DruidMasterConfig()
@ -138,7 +139,7 @@ public class DruidMasterTest
null, null,
curator, curator,
new NoopServiceEmitter(), new NoopServiceEmitter(),
scheduledExecutorFactory, scheduledExecutorService,
null, null,
taskMaster, taskMaster,
loadManagementPeons loadManagementPeons

View File

@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon
public LoadQueuePeonTester() public LoadQueuePeonTester()
{ {
super(null, null, null, null); super(null, null, null, null, null, null);
} }
@Override @Override