Merger: Fix VersionConverterTask version selection by making SegmentInsertAction more flexible

This commit is contained in:
Gian Merlino 2013-03-14 10:37:40 -07:00
parent a1c823402b
commit 055a98a89c
2 changed files with 27 additions and 3 deletions

View File

@ -1,5 +1,6 @@
package com.metamx.druid.merger.common.actions;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
@ -18,14 +19,25 @@ import java.util.Set;
public class SegmentInsertAction implements TaskAction<Void>
{
@JsonIgnore
private final Set<DataSegment> segments;
@JsonIgnore
private final boolean allowOlderVersions;
public SegmentInsertAction(Set<DataSegment> segments)
{
this(segments, false);
}
@JsonCreator
public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("allowOlderVersions") boolean allowOlderVersions
)
{
this.segments = ImmutableSet.copyOf(segments);
this.allowOlderVersions = allowOlderVersions;
}
@JsonProperty
@ -34,6 +46,17 @@ public class SegmentInsertAction implements TaskAction<Void>
return segments;
}
@JsonProperty
public boolean isAllowOlderVersions()
{
return allowOlderVersions;
}
public SegmentInsertAction withAllowOlderVersions(boolean _allowOlderVersions)
{
return new SegmentInsertAction(segments, _allowOlderVersions);
}
public TypeReference<Void> getReturnTypeReference()
{
return new TypeReference<Void>() {};
@ -42,7 +65,7 @@ public class SegmentInsertAction implements TaskAction<Void>
@Override
public Void perform(Task task, TaskActionToolbox toolbox)
{
if(!toolbox.taskLockCoversSegments(task, segments, false)) {
if(!toolbox.taskLockCoversSegments(task, segments, allowOlderVersions)) {
throw new ISE("Segments not covered by locks for task: %s", task.getId());
}

View File

@ -236,7 +236,8 @@ public class VersionConverterTask extends AbstractTask
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
toolbox.getTaskActionClient().submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)));
toolbox.getTaskActionClient()
.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)).withAllowOlderVersions(true));
} else {
log.info("Conversion failed.");
}