mirror of https://github.com/apache/druid.git
Merge pull request #1997 from gianm/fix-nullable-actions
Switch TaskActions from Optionals to nullable.
This commit is contained in:
commit
f3185938a8
|
@ -21,12 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.google.common.base.Optional;
|
|
||||||
import io.druid.indexing.common.TaskLock;
|
import io.druid.indexing.common.TaskLock;
|
||||||
import io.druid.indexing.common.task.Task;
|
import io.druid.indexing.common.task.Task;
|
||||||
import org.joda.time.Interval;
|
import org.joda.time.Interval;
|
||||||
|
|
||||||
public class LockTryAcquireAction implements TaskAction<Optional<TaskLock>>
|
public class LockTryAcquireAction implements TaskAction<TaskLock>
|
||||||
{
|
{
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
private final Interval interval;
|
private final Interval interval;
|
||||||
|
@ -45,17 +44,17 @@ public class LockTryAcquireAction implements TaskAction<Optional<TaskLock>>
|
||||||
return interval;
|
return interval;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TypeReference<Optional<TaskLock>> getReturnTypeReference()
|
public TypeReference<TaskLock> getReturnTypeReference()
|
||||||
{
|
{
|
||||||
return new TypeReference<Optional<TaskLock>>()
|
return new TypeReference<TaskLock>()
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<TaskLock> perform(Task task, TaskActionToolbox toolbox)
|
public TaskLock perform(Task task, TaskActionToolbox toolbox)
|
||||||
{
|
{
|
||||||
return toolbox.getTaskLockbox().tryLock(task, interval);
|
return toolbox.getTaskLockbox().tryLock(task, interval).orNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||||
import com.google.api.client.util.Lists;
|
import com.google.api.client.util.Lists;
|
||||||
import com.google.common.base.Optional;
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.primitives.Longs;
|
import com.google.common.primitives.Longs;
|
||||||
import com.metamx.common.Granularity;
|
import com.metamx.common.Granularity;
|
||||||
|
@ -53,10 +52,9 @@ import java.util.Set;
|
||||||
* This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand,
|
* This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand,
|
||||||
* although you *do* have to release them yourself.
|
* although you *do* have to release them yourself.
|
||||||
* <p/>
|
* <p/>
|
||||||
* If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it will return
|
* If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it returns null.
|
||||||
* a missing Optional.
|
|
||||||
*/
|
*/
|
||||||
public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentifier>>
|
public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
|
||||||
{
|
{
|
||||||
private static final Logger log = new Logger(SegmentAllocateAction.class);
|
private static final Logger log = new Logger(SegmentAllocateAction.class);
|
||||||
|
|
||||||
|
@ -150,15 +148,15 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TypeReference<Optional<SegmentIdentifier>> getReturnTypeReference()
|
public TypeReference<SegmentIdentifier> getReturnTypeReference()
|
||||||
{
|
{
|
||||||
return new TypeReference<Optional<SegmentIdentifier>>()
|
return new TypeReference<SegmentIdentifier>()
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Optional<SegmentIdentifier> perform(
|
public SegmentIdentifier perform(
|
||||||
final Task task,
|
final Task task,
|
||||||
final TaskActionToolbox toolbox
|
final TaskActionToolbox toolbox
|
||||||
) throws IOException
|
) throws IOException
|
||||||
|
@ -215,7 +213,7 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
||||||
tryLock.getVersion()
|
tryLock.getVersion()
|
||||||
);
|
);
|
||||||
if (identifier != null) {
|
if (identifier != null) {
|
||||||
return Optional.of(identifier);
|
return identifier;
|
||||||
} else {
|
} else {
|
||||||
log.debug(
|
log.debug(
|
||||||
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
|
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
|
||||||
|
@ -255,10 +253,10 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
||||||
rowInterval,
|
rowInterval,
|
||||||
attempt
|
attempt
|
||||||
);
|
);
|
||||||
return Optional.absent();
|
return null;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Optional.absent();
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,7 +87,7 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
|
||||||
@Override
|
@Override
|
||||||
public boolean isReady(TaskActionClient taskActionClient) throws Exception
|
public boolean isReady(TaskActionClient taskActionClient) throws Exception
|
||||||
{
|
{
|
||||||
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
|
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class HadoopIndexTask extends HadoopTask
|
||||||
intervals.get()
|
intervals.get()
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
|
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
|
||||||
} else {
|
} else {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -710,7 +710,7 @@ public class SegmentAllocateActionTest
|
||||||
sequenceName,
|
sequenceName,
|
||||||
sequencePreviousId
|
sequencePreviousId
|
||||||
);
|
);
|
||||||
return action.perform(task, taskActionTestKit.getTaskActionToolbox()).orNull();
|
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other)
|
private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other)
|
||||||
|
|
Loading…
Reference in New Issue