From 16835a1f84543f311419c94b9e9a42c66ef7439b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 19 Nov 2012 14:42:57 -0800 Subject: [PATCH 1/2] Master: Add segment limit to merge selection algo If we attempt to merge too many segments at once, we risk creating an oversized indexing task. --- .../com/metamx/druid/master/DruidMaster.java | 3 +- .../druid/master/DruidMasterConfig.java | 8 +++- .../master/DruidMasterRuntimeParams.java | 46 +++++++++++++------ .../master/DruidMasterSegmentMerger.java | 14 +++--- .../master/DruidMasterSegmentMergerTest.java | 41 +++++++++++++++-- .../metamx/druid/master/DruidMasterTest.java | 6 --- 6 files changed, 88 insertions(+), 30 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index e0cf7acc723..2fd954954ef 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -563,7 +563,8 @@ public class DruidMaster .withLoadManagementPeons(loadManagementPeons) .withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting()) .withEmitter(emitter) - .withMergeThreshold(config.getMergeThreshold()) + .withMergeBytesLimit(config.getMergeBytesLimit()) + .withMergeSegmentsLimit(config.getMergeSegmentsLimit()) .build(); for (DruidMasterHelper helper : helpers) { diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index e97c1588e48..87df78d4f5e 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -70,8 +70,14 @@ public abstract class DruidMasterConfig } @Config("druid.master.merge.threshold") - public long getMergeThreshold() + public long getMergeBytesLimit() { return 100000000L; } + + @Config("druid.master.merge.maxSegments") + public int getMergeSegmentsLimit() + { + return Integer.MAX_VALUE; + } } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java index 2a4cd255bb4..3277888f05b 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterRuntimeParams.java @@ -56,7 +56,8 @@ public class DruidMasterRuntimeParams private final int movedCount; private final int createdReplicantCount; private final int destroyedReplicantCount; - private final long mergeThreshold; + private final long mergeBytesLimit; + private final int mergeSegmentsLimit; private final int mergedSegmentCount; public DruidMasterRuntimeParams( @@ -78,7 +79,8 @@ public class DruidMasterRuntimeParams int movedCount, int createdReplicantCount, int destroyedReplicantCount, - long mergeThreshold, + long mergeBytesLimit, + int mergeSegmentsLimit, int mergedSegmentCount ) { @@ -100,7 +102,8 @@ public class DruidMasterRuntimeParams this.movedCount = movedCount; this.createdReplicantCount = createdReplicantCount; this.destroyedReplicantCount = destroyedReplicantCount; - this.mergeThreshold = mergeThreshold; + this.mergeBytesLimit = mergeBytesLimit; + this.mergeSegmentsLimit = mergeSegmentsLimit; this.mergedSegmentCount = mergedSegmentCount; } @@ -194,9 +197,14 @@ public class DruidMasterRuntimeParams return destroyedReplicantCount; } - public long getMergeThreshold() + public long getMergeBytesLimit() { - return mergeThreshold; + return mergeBytesLimit; + } + + public int getMergeSegmentsLimit() + { + return mergeSegmentsLimit; } public int getMergedSegmentCount() @@ -230,7 +238,8 @@ public class DruidMasterRuntimeParams movedCount, createdReplicantCount, destroyedReplicantCount, - mergeThreshold, + mergeBytesLimit, + mergeSegmentsLimit, mergedSegmentCount ); } @@ -255,7 +264,8 @@ public class DruidMasterRuntimeParams private int movedCount; private int createdReplicantCount; private int destroyedReplicantCount; - private long mergeThreshold; + private long mergeBytesLimit; + private int mergeSegmentsLimit; private int mergedSegmentCount; Builder() @@ -278,7 +288,8 @@ public class DruidMasterRuntimeParams this.movedCount = 0; this.createdReplicantCount = 0; this.destroyedReplicantCount = 0; - this.mergeThreshold = 0; + this.mergeBytesLimit = 0; + this.mergeSegmentsLimit = 0; this.mergedSegmentCount = 0; } @@ -301,7 +312,8 @@ public class DruidMasterRuntimeParams int movedCount, int createdReplicantCount, int destroyedReplicantCount, - long mergeThreshold, + long mergeBytesLimit, + int mergeSegmentsLimit, int mergedSegmentCount ) { @@ -323,7 +335,8 @@ public class DruidMasterRuntimeParams this.movedCount = movedCount; this.createdReplicantCount = createdReplicantCount; this.destroyedReplicantCount = destroyedReplicantCount; - this.mergeThreshold = mergeThreshold; + this.mergeBytesLimit = mergeBytesLimit; + this.mergeSegmentsLimit = mergeSegmentsLimit; this.mergedSegmentCount = mergedSegmentCount; } @@ -348,7 +361,8 @@ public class DruidMasterRuntimeParams movedCount, createdReplicantCount, destroyedReplicantCount, - mergeThreshold, + mergeBytesLimit, + mergeSegmentsLimit, mergedSegmentCount ); } @@ -467,9 +481,15 @@ public class DruidMasterRuntimeParams return this; } - public Builder withMergeThreshold(long mergeThreshold) + public Builder withMergeBytesLimit(long mergeBytesLimit) { - this.mergeThreshold = mergeThreshold; + this.mergeBytesLimit = mergeBytesLimit; + return this; + } + + public Builder withMergeSegmentsLimit(int mergeSegmentsLimit) + { + this.mergeSegmentsLimit = mergeSegmentsLimit; return this; } diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java index 3044b6dccbf..2c34134c9f0 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterSegmentMerger.java @@ -101,21 +101,23 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper List> timelineObjects = timeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01"))); - // Accumulate timelineObjects greedily until we surpass our size threshold, then backtrack to the maximum complete set + // Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set SegmentsToMerge segmentsToMerge = new SegmentsToMerge(); for(int i = 0 ; i < timelineObjects.size() ; i++) { segmentsToMerge.add(timelineObjects.get(i)); - if(segmentsToMerge.getMergedSize() > params.getMergeThreshold()) { - i -= segmentsToMerge.backtrack(params.getMergeThreshold()); + if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit() + || segmentsToMerge.size() >= params.getMergeSegmentsLimit()) + { + i -= segmentsToMerge.backtrack(params.getMergeBytesLimit()); - if(segmentsToMerge.size() > 1) { + if (segmentsToMerge.size() > 1) { count += mergeSegments(segmentsToMerge, entry.getKey()); } - if(segmentsToMerge.size() == 0) { + if (segmentsToMerge.size() == 0) { // Backtracked all the way to zero. Increment by one so we continue to make progress. i++; } @@ -125,7 +127,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper } // Finish any timelineObjects to merge that may have not hit threshold - segmentsToMerge.backtrack(params.getMergeThreshold()); + segmentsToMerge.backtrack(params.getMergeBytesLimit()); if (segmentsToMerge.size() > 1) { count += mergeSegments(segmentsToMerge, entry.getKey()); } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java index ddb120a9857..c47ffab8fb5 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterSegmentMergerTest.java @@ -32,7 +32,8 @@ import java.util.List; public class DruidMasterSegmentMergerTest { - private static final long mergeThreshold = 100; + private static final long mergeBytesLimit = 100; + private static final int mergeSegmentsLimit = 8; @Test public void testNoMerges() @@ -101,7 +102,7 @@ public class DruidMasterSegmentMergerTest } @Test - public void testMergeSeries() + public void testMergeSeriesByteLimited() { final List segments = ImmutableList.of( DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(40).build(), @@ -121,6 +122,39 @@ public class DruidMasterSegmentMergerTest ); } + @Test + public void testMergeSeriesSegmentLimited() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-08/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("2").size(1).build() + ); + + Assert.assertEquals( + ImmutableList.of( + ImmutableList.of( + segments.get(0), + segments.get(1), + segments.get(2), + segments.get(3), + segments.get(4), + segments.get(5), + segments.get(6), + segments.get(7) + ), + ImmutableList.of(segments.get(8), segments.get(9)) + ), merge(segments) + ); + } + @Test public void testOverlappingMergeWithBacktracking() { @@ -308,7 +342,8 @@ public class DruidMasterSegmentMergerTest final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(mergerClient); final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder() .withAvailableSegments(ImmutableSet.copyOf(segments)) - .withMergeThreshold(mergeThreshold) + .withMergeBytesLimit(mergeBytesLimit) + .withMergeSegmentsLimit(mergeSegmentsLimit) .build(); merger.run(params); diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index 19458f1a7e0..e2eaef9a4d0 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -126,12 +126,6 @@ public class DruidMasterTest { return ""; } - - @Override - public long getMergeThreshold() - { - return super.getMergeThreshold(); - } }, null, null, From b228c053e55fdb2c62f543d2d06b9d647352b149 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 19 Nov 2012 15:06:46 -0800 Subject: [PATCH 2/2] bug fix for too much data to indexer --- .../druid/query/search/SearchQuery.java | 2 +- .../merger/common/config/IndexerZkConfig.java | 5 ++ .../merger/coordinator/RemoteTaskRunner.java | 6 ++ .../config/EC2AutoScalingStrategyConfig.java | 3 + .../http/IndexerCoordinatorNode.java | 3 +- .../scaling/EC2AutoScalingStrategy.java | 6 ++ .../worker/WorkerCuratorCoordinator.java | 25 ++++-- .../coordinator/RemoteTaskRunnerTest.java | 78 +++++++++++++++---- .../scaling/EC2AutoScalingStrategyTest.java | 8 ++ 9 files changed, 112 insertions(+), 24 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/query/search/SearchQuery.java b/client/src/main/java/com/metamx/druid/query/search/SearchQuery.java index 2a721431b89..e3a8a849bf3 100644 --- a/client/src/main/java/com/metamx/druid/query/search/SearchQuery.java +++ b/client/src/main/java/com/metamx/druid/query/search/SearchQuery.java @@ -168,7 +168,7 @@ public class SearchQuery extends BaseQuery> @Override public String toString() { - return "SearchResultValue{" + + return "SearchQuery{" + "dataSource='" + getDataSource() + '\'' + ", dimFilter=" + dimFilter + ", granularity='" + granularity + '\'' + diff --git a/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java b/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java index bfaaeedc040..92cc8393f15 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/config/IndexerZkConfig.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.config; import org.skife.config.Config; +import org.skife.config.Default; /** */ @@ -33,4 +34,8 @@ public abstract class IndexerZkConfig @Config("druid.zk.paths.indexer.statusPath") public abstract String getStatusPath(); + + @Config("druid.zk.maxNumBytes") + @Default("512000") + public abstract long getMaxNumBytes(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index fce83b8618a..18225ec4a38 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -552,6 +552,12 @@ public class RemoteTaskRunner implements TaskRunner tasks.put(task.getId(), taskWrapper); + byte[] rawBytes = jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext)); + + if (rawBytes.length > config.getMaxNumBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); + } + cf.create() .withMode(CreateMode.EPHEMERAL) .forPath( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java index 3f7b9a0171f..c364070e313 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/config/EC2AutoScalingStrategyConfig.java @@ -43,4 +43,7 @@ public abstract class EC2AutoScalingStrategyConfig @Config("druid.indexer.maxNumInstancesToProvision") @Default("1") public abstract int getMaxNumInstancesToProvision(); + + @Config("druid.indexer.userDataFile") + public abstract String getUserDataFile(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 72238c9e4ef..1c11c62cb7b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -469,6 +469,7 @@ public class IndexerCoordinatorNode extends RegisteringNode ScalingStrategy strategy; if (config.getStrategyImpl().equalsIgnoreCase("ec2")) { strategy = new EC2AutoScalingStrategy( + jsonMapper, new AmazonEC2Client( new BasicAWSCredentials( PropUtils.getProperty(props, "com.metamx.aws.accessKey"), @@ -480,7 +481,7 @@ public class IndexerCoordinatorNode extends RegisteringNode } else if (config.getStrategyImpl().equalsIgnoreCase("noop")) { strategy = new NoopScalingStrategy(); } else { - throw new ISE("Invalid strategy implementation: %s",config.getStrategyImpl()); + throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl()); } return new RemoteTaskRunner( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java index cd94b70d3ce..265fe62287c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategy.java @@ -33,8 +33,10 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import com.metamx.emitter.EmittingLogger; +import org.codehaus.jackson.map.ObjectMapper; import javax.annotation.Nullable; +import java.io.File; import java.util.List; /** @@ -43,14 +45,17 @@ public class EC2AutoScalingStrategy implements ScalingStrategy { private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class); + private final ObjectMapper jsonMapper; private final AmazonEC2Client amazonEC2Client; private final EC2AutoScalingStrategyConfig config; public EC2AutoScalingStrategy( + ObjectMapper jsonMapper, AmazonEC2Client amazonEC2Client, EC2AutoScalingStrategyConfig config ) { + this.jsonMapper = jsonMapper; this.amazonEC2Client = amazonEC2Client; this.config = config; } @@ -67,6 +72,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy config.getMaxNumInstancesToProvision() ) .withInstanceType(InstanceType.fromValue(config.getInstanceType())) + .withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile()))) ); List instanceIds = Lists.transform( diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java index 82dbd75adf8..d4237da7a9e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java @@ -22,6 +22,7 @@ package com.metamx.druid.merger.worker; import com.google.common.base.Joiner; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; +import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; @@ -49,6 +50,7 @@ public class WorkerCuratorCoordinator private final ObjectMapper jsonMapper; private final CuratorFramework curatorFramework; private final Worker worker; + private final IndexerZkConfig config; private final String baseAnnouncementsPath; private final String baseTaskPath; @@ -66,6 +68,7 @@ public class WorkerCuratorCoordinator this.jsonMapper = jsonMapper; this.curatorFramework = curatorFramework; this.worker = worker; + this.config = config; this.baseAnnouncementsPath = getPath(Arrays.asList(config.getAnnouncementPath(), worker.getHost())); this.baseTaskPath = getPath(Arrays.asList(config.getTaskPath(), worker.getHost())); @@ -144,9 +147,14 @@ public class WorkerCuratorCoordinator { if (curatorFramework.checkExists().forPath(path) == null) { try { + byte[] rawBytes = jsonMapper.writeValueAsBytes(data); + if (rawBytes.length > config.getMaxNumBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); + } + curatorFramework.create() .withMode(mode) - .forPath(path, jsonMapper.writeValueAsBytes(data)); + .forPath(path, rawBytes); } catch (Exception e) { log.warn(e, "Could not create path[%s], perhaps it already exists?", path); @@ -212,11 +220,15 @@ public class WorkerCuratorCoordinator } try { + byte[] rawBytes = jsonMapper.writeValueAsBytes(status); + if (rawBytes.length > config.getMaxNumBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); + } + curatorFramework.create() .withMode(CreateMode.EPHEMERAL) .forPath( - getStatusPathForId(status.getId()), - jsonMapper.writeValueAsBytes(status) + getStatusPathForId(status.getId()), rawBytes ); } catch (Exception e) { @@ -237,11 +249,14 @@ public class WorkerCuratorCoordinator announceStatus(status); return; } + byte[] rawBytes = jsonMapper.writeValueAsBytes(status); + if (rawBytes.length > config.getMaxNumBytes()) { + throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes()); + } curatorFramework.setData() .forPath( - getStatusPathForId(status.getId()), - jsonMapper.writeValueAsBytes(status) + getStatusPathForId(status.getId()), rawBytes ); } catch (Exception e) { diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 47373b0e6c1..aca8eef5114 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -94,6 +94,23 @@ public class RemoteTaskRunnerTest "0" ); + task1 = new TestTask( + "task1", + "dummyDs", + Lists.newArrayList( + new DataSegment( + "dummyDs", + new Interval(new DateTime(), new DateTime()), + new DateTime().toString(), + null, + null, + null, + null, + 0 + ) + ), Lists.newArrayList() + ); + makeRemoteTaskRunner(); makeTaskMonitor(); } @@ -116,6 +133,38 @@ public class RemoteTaskRunnerTest ); } + @Test + public void testRunTooMuchZKData() throws Exception + { + boolean exceptionOccurred = false; + try { + remoteTaskRunner.run( + new TestTask( + new String(new char[5000]), + "dummyDs", + Lists.newArrayList( + new DataSegment( + "dummyDs", + new Interval(new DateTime(), new DateTime()), + new DateTime().toString(), + null, + null, + null, + null, + 0 + ) + ), Lists.newArrayList() + ), + new TaskContext(new DateTime().toString(), Sets.newHashSet()), + null + ); + } + catch (IllegalStateException e) { + exceptionOccurred = true; + } + Assert.assertTrue(exceptionOccurred); + } + @Test public void testRunWithExistingCompletedTask() throws Exception { @@ -174,6 +223,12 @@ public class RemoteTaskRunnerTest { return statusPath; } + + @Override + public long getMaxNumBytes() + { + return 1000; + } }, cf, worker1 @@ -274,23 +329,6 @@ public class RemoteTaskRunnerTest new TestScalingStrategy() ); - task1 = new TestTask( - "task1", - "dummyDs", - Lists.newArrayList( - new DataSegment( - "dummyDs", - new Interval(new DateTime(), new DateTime()), - new DateTime().toString(), - null, - null, - null, - null, - 0 - ) - ), Lists.newArrayList() - ); - // Create a single worker and wait for things for be ready remoteTaskRunner.start(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath( @@ -399,6 +437,12 @@ public class RemoteTaskRunnerTest { return new Duration(60000); } + + @Override + public long getMaxNumBytes() + { + return 1000; + } } @JsonTypeName("test") diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java index 95039978476..958a2c1d836 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/EC2AutoScalingStrategyTest.java @@ -27,6 +27,7 @@ import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.RunInstancesResult; import com.amazonaws.services.ec2.model.TerminateInstancesRequest; +import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig; import org.easymock.EasyMock; import org.junit.After; @@ -67,6 +68,7 @@ public class EC2AutoScalingStrategyTest .withPrivateIpAddress(IP); strategy = new EC2AutoScalingStrategy( + new DefaultObjectMapper(), amazonEC2Client, new EC2AutoScalingStrategyConfig() { @Override @@ -98,6 +100,12 @@ public class EC2AutoScalingStrategyTest { return 1; } + + @Override + public String getUserDataFile() + { + return ""; + } } ); }