Merge pull request #32 from metamx/fjy

Fixes to indexing service to handle large merges better
This commit is contained in:
cheddar 2012-11-19 15:51:07 -08:00
commit 6107bb7b80
15 changed files with 200 additions and 54 deletions

View File

@ -168,7 +168,7 @@ public class SearchQuery extends BaseQuery<Result<SearchResultValue>>
@Override
public String toString()
{
return "SearchResultValue{" +
return "SearchQuery{" +
"dataSource='" + getDataSource() + '\'' +
", dimFilter=" + dimFilter +
", granularity='" + granularity + '\'' +

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.common.config;
import org.skife.config.Config;
import org.skife.config.Default;
/**
*/
@ -33,4 +34,8 @@ public abstract class IndexerZkConfig
@Config("druid.zk.paths.indexer.statusPath")
public abstract String getStatusPath();
@Config("druid.zk.maxNumBytes")
@Default("512000")
public abstract long getMaxNumBytes();
}

View File

@ -552,6 +552,12 @@ public class RemoteTaskRunner implements TaskRunner
tasks.put(task.getId(), taskWrapper);
byte[] rawBytes = jsonMapper.writeValueAsBytes(new TaskHolder(task, taskContext));
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
cf.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(

View File

@ -43,4 +43,7 @@ public abstract class EC2AutoScalingStrategyConfig
@Config("druid.indexer.maxNumInstancesToProvision")
@Default("1")
public abstract int getMaxNumInstancesToProvision();
@Config("druid.indexer.userDataFile")
public abstract String getUserDataFile();
}

View File

@ -469,6 +469,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
ScalingStrategy strategy;
if (config.getStrategyImpl().equalsIgnoreCase("ec2")) {
strategy = new EC2AutoScalingStrategy(
jsonMapper,
new AmazonEC2Client(
new BasicAWSCredentials(
PropUtils.getProperty(props, "com.metamx.aws.accessKey"),
@ -480,7 +481,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
} else if (config.getStrategyImpl().equalsIgnoreCase("noop")) {
strategy = new NoopScalingStrategy();
} else {
throw new ISE("Invalid strategy implementation: %s",config.getStrategyImpl());
throw new ISE("Invalid strategy implementation: %s", config.getStrategyImpl());
}
return new RemoteTaskRunner(

View File

@ -33,8 +33,10 @@ import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import com.metamx.emitter.EmittingLogger;
import org.codehaus.jackson.map.ObjectMapper;
import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
/**
@ -43,14 +45,17 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
{
private static final EmittingLogger log = new EmittingLogger(EC2AutoScalingStrategy.class);
private final ObjectMapper jsonMapper;
private final AmazonEC2Client amazonEC2Client;
private final EC2AutoScalingStrategyConfig config;
public EC2AutoScalingStrategy(
ObjectMapper jsonMapper,
AmazonEC2Client amazonEC2Client,
EC2AutoScalingStrategyConfig config
)
{
this.jsonMapper = jsonMapper;
this.amazonEC2Client = amazonEC2Client;
this.config = config;
}
@ -67,6 +72,7 @@ public class EC2AutoScalingStrategy implements ScalingStrategy<Instance>
config.getMaxNumInstancesToProvision()
)
.withInstanceType(InstanceType.fromValue(config.getInstanceType()))
.withUserData(jsonMapper.writeValueAsString(new File(config.getUserDataFile())))
);
List<String> instanceIds = Lists.transform(

View File

@ -22,6 +22,7 @@ package com.metamx.druid.merger.worker;
import com.google.common.base.Joiner;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.ISE;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
@ -49,6 +50,7 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final CuratorFramework curatorFramework;
private final Worker worker;
private final IndexerZkConfig config;
private final String baseAnnouncementsPath;
private final String baseTaskPath;
@ -66,6 +68,7 @@ public class WorkerCuratorCoordinator
this.jsonMapper = jsonMapper;
this.curatorFramework = curatorFramework;
this.worker = worker;
this.config = config;
this.baseAnnouncementsPath = getPath(Arrays.asList(config.getAnnouncementPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(config.getTaskPath(), worker.getHost()));
@ -144,9 +147,14 @@ public class WorkerCuratorCoordinator
{
if (curatorFramework.checkExists().forPath(path) == null) {
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(data);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
curatorFramework.create()
.withMode(mode)
.forPath(path, jsonMapper.writeValueAsBytes(data));
.forPath(path, rawBytes);
}
catch (Exception e) {
log.warn(e, "Could not create path[%s], perhaps it already exists?", path);
@ -212,11 +220,15 @@ public class WorkerCuratorCoordinator
}
try {
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(
getStatusPathForId(status.getId()),
jsonMapper.writeValueAsBytes(status)
getStatusPathForId(status.getId()), rawBytes
);
}
catch (Exception e) {
@ -237,11 +249,14 @@ public class WorkerCuratorCoordinator
announceStatus(status);
return;
}
byte[] rawBytes = jsonMapper.writeValueAsBytes(status);
if (rawBytes.length > config.getMaxNumBytes()) {
throw new ISE("Length of raw bytes for task too large[%,d > %,d]", rawBytes.length, config.getMaxNumBytes());
}
curatorFramework.setData()
.forPath(
getStatusPathForId(status.getId()),
jsonMapper.writeValueAsBytes(status)
getStatusPathForId(status.getId()), rawBytes
);
}
catch (Exception e) {

View File

@ -94,6 +94,23 @@ public class RemoteTaskRunnerTest
"0"
);
task1 = new TestTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0
)
), Lists.<AggregatorFactory>newArrayList()
);
makeRemoteTaskRunner();
makeTaskMonitor();
}
@ -116,6 +133,38 @@ public class RemoteTaskRunnerTest
);
}
@Test
public void testRunTooMuchZKData() throws Exception
{
boolean exceptionOccurred = false;
try {
remoteTaskRunner.run(
new TestTask(
new String(new char[5000]),
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0
)
), Lists.<AggregatorFactory>newArrayList()
),
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()),
null
);
}
catch (IllegalStateException e) {
exceptionOccurred = true;
}
Assert.assertTrue(exceptionOccurred);
}
@Test
public void testRunWithExistingCompletedTask() throws Exception
{
@ -174,6 +223,12 @@ public class RemoteTaskRunnerTest
{
return statusPath;
}
@Override
public long getMaxNumBytes()
{
return 1000;
}
},
cf,
worker1
@ -274,23 +329,6 @@ public class RemoteTaskRunnerTest
new TestScalingStrategy()
);
task1 = new TestTask(
"task1",
"dummyDs",
Lists.<DataSegment>newArrayList(
new DataSegment(
"dummyDs",
new Interval(new DateTime(), new DateTime()),
new DateTime().toString(),
null,
null,
null,
null,
0
)
), Lists.<AggregatorFactory>newArrayList()
);
// Create a single worker and wait for things for be ready
remoteTaskRunner.start();
cf.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(
@ -399,6 +437,12 @@ public class RemoteTaskRunnerTest
{
return new Duration(60000);
}
@Override
public long getMaxNumBytes()
{
return 1000;
}
}
@JsonTypeName("test")

View File

@ -27,6 +27,7 @@ import com.amazonaws.services.ec2.model.Reservation;
import com.amazonaws.services.ec2.model.RunInstancesRequest;
import com.amazonaws.services.ec2.model.RunInstancesResult;
import com.amazonaws.services.ec2.model.TerminateInstancesRequest;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.merger.coordinator.config.EC2AutoScalingStrategyConfig;
import org.easymock.EasyMock;
import org.junit.After;
@ -67,6 +68,7 @@ public class EC2AutoScalingStrategyTest
.withPrivateIpAddress(IP);
strategy = new EC2AutoScalingStrategy(
new DefaultObjectMapper(),
amazonEC2Client, new EC2AutoScalingStrategyConfig()
{
@Override
@ -98,6 +100,12 @@ public class EC2AutoScalingStrategyTest
{
return 1;
}
@Override
public String getUserDataFile()
{
return "";
}
}
);
}

View File

@ -563,7 +563,8 @@ public class DruidMaster
.withLoadManagementPeons(loadManagementPeons)
.withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting())
.withEmitter(emitter)
.withMergeThreshold(config.getMergeThreshold())
.withMergeBytesLimit(config.getMergeBytesLimit())
.withMergeSegmentsLimit(config.getMergeSegmentsLimit())
.build();
for (DruidMasterHelper helper : helpers) {

View File

@ -70,8 +70,14 @@ public abstract class DruidMasterConfig
}
@Config("druid.master.merge.threshold")
public long getMergeThreshold()
public long getMergeBytesLimit()
{
return 100000000L;
}
@Config("druid.master.merge.maxSegments")
public int getMergeSegmentsLimit()
{
return Integer.MAX_VALUE;
}
}

View File

@ -56,7 +56,8 @@ public class DruidMasterRuntimeParams
private final int movedCount;
private final int createdReplicantCount;
private final int destroyedReplicantCount;
private final long mergeThreshold;
private final long mergeBytesLimit;
private final int mergeSegmentsLimit;
private final int mergedSegmentCount;
public DruidMasterRuntimeParams(
@ -78,7 +79,8 @@ public class DruidMasterRuntimeParams
int movedCount,
int createdReplicantCount,
int destroyedReplicantCount,
long mergeThreshold,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
)
{
@ -100,7 +102,8 @@ public class DruidMasterRuntimeParams
this.movedCount = movedCount;
this.createdReplicantCount = createdReplicantCount;
this.destroyedReplicantCount = destroyedReplicantCount;
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
@ -194,9 +197,14 @@ public class DruidMasterRuntimeParams
return destroyedReplicantCount;
}
public long getMergeThreshold()
public long getMergeBytesLimit()
{
return mergeThreshold;
return mergeBytesLimit;
}
public int getMergeSegmentsLimit()
{
return mergeSegmentsLimit;
}
public int getMergedSegmentCount()
@ -230,7 +238,8 @@ public class DruidMasterRuntimeParams
movedCount,
createdReplicantCount,
destroyedReplicantCount,
mergeThreshold,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
);
}
@ -255,7 +264,8 @@ public class DruidMasterRuntimeParams
private int movedCount;
private int createdReplicantCount;
private int destroyedReplicantCount;
private long mergeThreshold;
private long mergeBytesLimit;
private int mergeSegmentsLimit;
private int mergedSegmentCount;
Builder()
@ -278,7 +288,8 @@ public class DruidMasterRuntimeParams
this.movedCount = 0;
this.createdReplicantCount = 0;
this.destroyedReplicantCount = 0;
this.mergeThreshold = 0;
this.mergeBytesLimit = 0;
this.mergeSegmentsLimit = 0;
this.mergedSegmentCount = 0;
}
@ -301,7 +312,8 @@ public class DruidMasterRuntimeParams
int movedCount,
int createdReplicantCount,
int destroyedReplicantCount,
long mergeThreshold,
long mergeBytesLimit,
int mergeSegmentsLimit,
int mergedSegmentCount
)
{
@ -323,7 +335,8 @@ public class DruidMasterRuntimeParams
this.movedCount = movedCount;
this.createdReplicantCount = createdReplicantCount;
this.destroyedReplicantCount = destroyedReplicantCount;
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
this.mergeSegmentsLimit = mergeSegmentsLimit;
this.mergedSegmentCount = mergedSegmentCount;
}
@ -348,7 +361,8 @@ public class DruidMasterRuntimeParams
movedCount,
createdReplicantCount,
destroyedReplicantCount,
mergeThreshold,
mergeBytesLimit,
mergeSegmentsLimit,
mergedSegmentCount
);
}
@ -467,9 +481,15 @@ public class DruidMasterRuntimeParams
return this;
}
public Builder withMergeThreshold(long mergeThreshold)
public Builder withMergeBytesLimit(long mergeBytesLimit)
{
this.mergeThreshold = mergeThreshold;
this.mergeBytesLimit = mergeBytesLimit;
return this;
}
public Builder withMergeSegmentsLimit(int mergeSegmentsLimit)
{
this.mergeSegmentsLimit = mergeSegmentsLimit;
return this;
}

View File

@ -101,21 +101,23 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
List<TimelineObjectHolder<String, DataSegment>> timelineObjects =
timeline.lookup(new Interval(new DateTime(0), new DateTime("3000-01-01")));
// Accumulate timelineObjects greedily until we surpass our size threshold, then backtrack to the maximum complete set
// Accumulate timelineObjects greedily until we reach our limits, then backtrack to the maximum complete set
SegmentsToMerge segmentsToMerge = new SegmentsToMerge();
for(int i = 0 ; i < timelineObjects.size() ; i++) {
segmentsToMerge.add(timelineObjects.get(i));
if(segmentsToMerge.getMergedSize() > params.getMergeThreshold()) {
i -= segmentsToMerge.backtrack(params.getMergeThreshold());
if (segmentsToMerge.getMergedSize() > params.getMergeBytesLimit()
|| segmentsToMerge.size() >= params.getMergeSegmentsLimit())
{
i -= segmentsToMerge.backtrack(params.getMergeBytesLimit());
if(segmentsToMerge.size() > 1) {
if (segmentsToMerge.size() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}
if(segmentsToMerge.size() == 0) {
if (segmentsToMerge.size() == 0) {
// Backtracked all the way to zero. Increment by one so we continue to make progress.
i++;
}
@ -125,7 +127,7 @@ public class DruidMasterSegmentMerger implements DruidMasterHelper
}
// Finish any timelineObjects to merge that may have not hit threshold
segmentsToMerge.backtrack(params.getMergeThreshold());
segmentsToMerge.backtrack(params.getMergeBytesLimit());
if (segmentsToMerge.size() > 1) {
count += mergeSegments(segmentsToMerge, entry.getKey());
}

View File

@ -32,7 +32,8 @@ import java.util.List;
public class DruidMasterSegmentMergerTest
{
private static final long mergeThreshold = 100;
private static final long mergeBytesLimit = 100;
private static final int mergeSegmentsLimit = 8;
@Test
public void testNoMerges()
@ -101,7 +102,7 @@ public class DruidMasterSegmentMergerTest
}
@Test
public void testMergeSeries()
public void testMergeSeriesByteLimited()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(40).build(),
@ -121,6 +122,39 @@ public class DruidMasterSegmentMergerTest
);
}
@Test
public void testMergeSeriesSegmentLimited()
{
final List<DataSegment> segments = ImmutableList.of(
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-08/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("2").size(1).build(),
DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("2").size(1).build()
);
Assert.assertEquals(
ImmutableList.of(
ImmutableList.of(
segments.get(0),
segments.get(1),
segments.get(2),
segments.get(3),
segments.get(4),
segments.get(5),
segments.get(6),
segments.get(7)
),
ImmutableList.of(segments.get(8), segments.get(9))
), merge(segments)
);
}
@Test
public void testOverlappingMergeWithBacktracking()
{
@ -308,7 +342,8 @@ public class DruidMasterSegmentMergerTest
final DruidMasterSegmentMerger merger = new DruidMasterSegmentMerger(mergerClient);
final DruidMasterRuntimeParams params = DruidMasterRuntimeParams.newBuilder()
.withAvailableSegments(ImmutableSet.copyOf(segments))
.withMergeThreshold(mergeThreshold)
.withMergeBytesLimit(mergeBytesLimit)
.withMergeSegmentsLimit(mergeSegmentsLimit)
.build();
merger.run(params);

View File

@ -126,12 +126,6 @@ public class DruidMasterTest
{
return "";
}
@Override
public long getMergeThreshold()
{
return super.getMergeThreshold();
}
},
null,
null,