mirror of https://github.com/apache/druid.git
Switch TaskActions from Optionals to nullable.
Deserialization of Optionals does not work quite right- they come back as actual nulls, rather than absent Optionals. So these probably only ever worked for the local task action client.
This commit is contained in:
parent
6305dfe1b9
commit
666d785787
|
@ -21,12 +21,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.base.Optional;
|
||||
import io.druid.indexing.common.TaskLock;
|
||||
import io.druid.indexing.common.task.Task;
|
||||
import org.joda.time.Interval;
|
||||
|
||||
public class LockTryAcquireAction implements TaskAction<Optional<TaskLock>>
|
||||
public class LockTryAcquireAction implements TaskAction<TaskLock>
|
||||
{
|
||||
@JsonIgnore
|
||||
private final Interval interval;
|
||||
|
@ -45,17 +44,17 @@ public class LockTryAcquireAction implements TaskAction<Optional<TaskLock>>
|
|||
return interval;
|
||||
}
|
||||
|
||||
public TypeReference<Optional<TaskLock>> getReturnTypeReference()
|
||||
public TypeReference<TaskLock> getReturnTypeReference()
|
||||
{
|
||||
return new TypeReference<Optional<TaskLock>>()
|
||||
return new TypeReference<TaskLock>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<TaskLock> perform(Task task, TaskActionToolbox toolbox)
|
||||
public TaskLock perform(Task task, TaskActionToolbox toolbox)
|
||||
{
|
||||
return toolbox.getTaskLockbox().tryLock(task, interval);
|
||||
return toolbox.getTaskLockbox().tryLock(task, interval).orNull();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.core.type.TypeReference;
|
|||
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
|
||||
import com.google.api.client.repackaged.com.google.common.base.Throwables;
|
||||
import com.google.api.client.util.Lists;
|
||||
import com.google.common.base.Optional;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.primitives.Longs;
|
||||
import com.metamx.common.Granularity;
|
||||
|
@ -53,10 +52,9 @@ import java.util.Set;
|
|||
* This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand,
|
||||
* although you *do* have to release them yourself.
|
||||
* <p/>
|
||||
* If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it will return
|
||||
* a missing Optional.
|
||||
* If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it returns null.
|
||||
*/
|
||||
public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentifier>>
|
||||
public class SegmentAllocateAction implements TaskAction<SegmentIdentifier>
|
||||
{
|
||||
private static final Logger log = new Logger(SegmentAllocateAction.class);
|
||||
|
||||
|
@ -150,15 +148,15 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
|||
}
|
||||
|
||||
@Override
|
||||
public TypeReference<Optional<SegmentIdentifier>> getReturnTypeReference()
|
||||
public TypeReference<SegmentIdentifier> getReturnTypeReference()
|
||||
{
|
||||
return new TypeReference<Optional<SegmentIdentifier>>()
|
||||
return new TypeReference<SegmentIdentifier>()
|
||||
{
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<SegmentIdentifier> perform(
|
||||
public SegmentIdentifier perform(
|
||||
final Task task,
|
||||
final TaskActionToolbox toolbox
|
||||
) throws IOException
|
||||
|
@ -215,7 +213,7 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
|||
tryLock.getVersion()
|
||||
);
|
||||
if (identifier != null) {
|
||||
return Optional.of(identifier);
|
||||
return identifier;
|
||||
} else {
|
||||
log.debug(
|
||||
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
|
||||
|
@ -255,10 +253,10 @@ public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentif
|
|||
rowInterval,
|
||||
attempt
|
||||
);
|
||||
return Optional.absent();
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return Optional.absent();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
|
|||
@Override
|
||||
public boolean isReady(TaskActionClient taskActionClient) throws Exception
|
||||
{
|
||||
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
|
||||
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
|
|
@ -132,7 +132,7 @@ public class HadoopIndexTask extends HadoopTask
|
|||
intervals.get()
|
||||
)
|
||||
);
|
||||
return taskActionClient.submit(new LockTryAcquireAction(interval)).isPresent();
|
||||
return taskActionClient.submit(new LockTryAcquireAction(interval)) != null;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -710,7 +710,7 @@ public class SegmentAllocateActionTest
|
|||
sequenceName,
|
||||
sequencePreviousId
|
||||
);
|
||||
return action.perform(task, taskActionTestKit.getTaskActionToolbox()).orNull();
|
||||
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
|
||||
}
|
||||
|
||||
private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other)
|
||||
|
|
Loading…
Reference in New Issue