Passing lockTimeout as a parameter for TaskLockbox.lock() (#4549)

* Passing lockTimeout as a parameter for TaskLockbox.lock()

* Remove TIME_UNIT

* Fix tc fail

* Add taskLockTimeout to TaskContext

* Add caution
This commit is contained in:
Jihoon Son 2017-08-09 10:21:07 +09:00 committed by Gian Merlino
parent f5d4171459
commit d5606bc558
20 changed files with 165 additions and 133 deletions

View File

@ -89,7 +89,8 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
|--------|-----------|---------|
|type|The task type, this should always be "index".|yes|
|id|The task ID. If this is not explicitly specified, Druid generates the task ID using the name of the task file and date-time stamp. |no|
|spec|The ingestion spec. See below for more details. |yes|
|spec|The ingestion spec including the data schema, IOConfig, and TuningConfig. See below for more details. |yes|
|context|Context containing various task configuration parameters. See below for more details.|no|
#### DataSchema
@ -160,6 +161,19 @@ On the contrary, in the incremental publishing mode, segments are incrementally
To enable bulk publishing mode, `forceGuaranteedRollup` should be set in the TuningConfig. Note that this option cannot be used with either `forceExtendableShardSpecs` of TuningConfig or `appendToExisting` of IOConfig.
### Task Context
The task context is used for various task configuration parameters. The following parameters apply to all tasks.
|property|default|description|
|--------|-------|-----------|
|taskLockTimeout|300000|task lock timeout in millisecond. For more details, see [the below Locking section](#locking).|
<div class="note caution">
When a task acquires a lock, it sends a request via HTTP and awaits until it receives a response containing the lock acquisition result.
As a result, an HTTP timeout error can occur if `taskLockTimeout` is greater than `druid.server.http.maxIdleTime` of overlords.
</div>
Segment Merging Tasks
---------------------

View File

@ -1478,7 +1478,7 @@ public class KafkaIndexTaskTest
derby.metadataTablesConfigSupplier().get(),
derbyConnector
);
taskLockbox = new TaskLockbox(taskStorage, 3000);
taskLockbox = new TaskLockbox(taskStorage);
final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,

View File

@ -246,6 +246,13 @@ public class HadoopTuningConfig implements TuningConfig
return useExplicitVersion;
}
@JsonProperty("allowedHadoopPrefix")
public List<String> getUserAllowedHadoopPrefix()
{
// Just the user-specified list. More are added in HadoopDruidIndexerConfig.
return allowedHadoopPrefix;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -320,11 +327,4 @@ public class HadoopTuningConfig implements TuningConfig
allowedHadoopPrefix
);
}
@JsonProperty("allowedHadoopPrefix")
public List<String> getUserAllowedHadoopPrefix()
{
// Just the user-specified list. More are added in HadoopDruidIndexerConfig.
return allowedHadoopPrefix;
}
}

View File

@ -33,12 +33,17 @@ public class LockAcquireAction implements TaskAction<TaskLock>
@JsonIgnore
private final Interval interval;
@JsonIgnore
private final long timeoutMs;
@JsonCreator
public LockAcquireAction(
@JsonProperty("interval") Interval interval
@JsonProperty("interval") Interval interval,
@JsonProperty("timeoutMs") long timeoutMs
)
{
this.interval = interval;
this.timeoutMs = timeoutMs;
}
@JsonProperty
@ -47,6 +52,12 @@ public class LockAcquireAction implements TaskAction<TaskLock>
return interval;
}
@JsonProperty
public long getTimeoutMs()
{
return timeoutMs;
}
@Override
public TypeReference<TaskLock> getReturnTypeReference()
{
@ -59,7 +70,11 @@ public class LockAcquireAction implements TaskAction<TaskLock>
public TaskLock perform(Task task, TaskActionToolbox toolbox)
{
try {
return toolbox.getTaskLockbox().lock(task, interval);
if (timeoutMs == 0) {
return toolbox.getTaskLockbox().lock(task, interval);
} else {
return toolbox.getTaskLockbox().lock(task, interval, timeoutMs);
}
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
@ -77,6 +92,7 @@ public class LockAcquireAction implements TaskAction<TaskLock>
{
return "LockAcquireAction{" +
"interval=" + interval +
"timeoutMs=" + timeoutMs +
'}';
}
}

View File

@ -29,7 +29,6 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.druid.common.utils.JodaUtils;
import io.druid.indexer.HadoopDruidDetermineConfigurationJob;
import io.druid.indexer.HadoopDruidIndexerConfig;
@ -198,7 +197,9 @@ public class HadoopIndexTask extends HadoopTask
indexerSchema.getDataSchema().getGranularitySpec().bucketIntervals().get()
)
);
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
version = lock.getVersion();
} else {
Iterable<TaskLock> locks = getTaskLocks(toolbox);

View File

@ -207,7 +207,9 @@ public class IndexTask extends AbstractTask
final DataSchema dataSchema;
if (determineIntervals) {
Interval interval = JodaUtils.umbrellaInterval(shardSpecs.getIntervals());
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval));
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, the below line can incur http timeout error.
TaskLock lock = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval, lockTimeoutMs));
version = lock.getVersion();
dataSchema = ingestionSchema.getDataSchema().withGranularitySpec(
ingestionSchema.getDataSchema()

View File

@ -207,13 +207,16 @@ public class RealtimeIndexTask extends AbstractTask
// which will typically be either the main data processing loop or the persist thread.
// Wrap default DataSegmentAnnouncer such that we unlock intervals as we unannounce segments
final long lockTimeoutMs = getContextValue(Tasks.LOCK_TIMEOUT_KEY, Tasks.DEFAULT_LOCK_TIMEOUT);
// Note: if lockTimeoutMs is larger than ServerConfig.maxIdleTime, http timeout error can occur while waiting for a
// lock to be acquired.
final DataSegmentAnnouncer lockingSegmentAnnouncer = new DataSegmentAnnouncer()
{
@Override
public void announceSegment(final DataSegment segment) throws IOException
{
// Side effect: Calling announceSegment causes a lock to be acquired
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
toolbox.getSegmentAnnouncer().announceSegment(segment);
}
@ -233,7 +236,7 @@ public class RealtimeIndexTask extends AbstractTask
{
// Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) {
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval()));
toolbox.getTaskActionClient().submit(new LockAcquireAction(segment.getInterval(), lockTimeoutMs));
}
toolbox.getSegmentAnnouncer().announceSegments(segments);
}
@ -266,7 +269,7 @@ public class RealtimeIndexTask extends AbstractTask
try {
// Side effect: Calling getVersion causes a lock to be acquired
final TaskLock myLock = toolbox.getTaskActionClient()
.submit(new LockAcquireAction(interval));
.submit(new LockAcquireAction(interval, lockTimeoutMs));
return myLock.getVersion();
}

View File

@ -27,6 +27,7 @@ import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import javax.annotation.Nullable;
import java.util.Map;
/**
@ -166,6 +167,15 @@ public interface Task
public Map<String, Object> getContext();
public Object getContextValue(String key);
@Nullable
default <ContextValueType> ContextValueType getContextValue(String key)
{
return getContext() == null ? null : (ContextValueType) getContext().get(key);
}
default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
{
final ContextValueType value = getContextValue(key);
return value == null ? defaultValue : value;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.task;
public class Tasks
{
public static String LOCK_TIMEOUT_KEY = "taskLockTimeout";
public static long DEFAULT_LOCK_TIMEOUT = 5 * 60 * 1000; // 5 min
}

View File

@ -41,7 +41,6 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.guava.FunctionalIterable;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -69,7 +68,6 @@ public class TaskLockbox
private final TaskStorage taskStorage;
private final ReentrantLock giant = new ReentrantLock(true);
private final Condition lockReleaseCondition = giant.newCondition();
protected final long lockTimeoutMillis;
private static final EmittingLogger log = new EmittingLogger(TaskLockbox.class);
@ -79,21 +77,10 @@ public class TaskLockbox
@Inject
public TaskLockbox(
TaskStorage taskStorage,
ServerConfig serverConfig
TaskStorage taskStorage
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = serverConfig.getMaxIdleTime().getMillis();
}
public TaskLockbox(
TaskStorage taskStorage,
long lockTimeoutMillis
)
{
this.taskStorage = taskStorage;
this.lockTimeoutMillis = lockTimeoutMillis;
}
/**
@ -140,7 +127,7 @@ public class TaskLockbox
continue;
}
final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse(
final TaskLockPosse taskLockPosse = createOrFindLockPosse(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
@ -190,44 +177,52 @@ public class TaskLockbox
}
/**
* Acquires a lock on behalf of a task. Blocks until the lock is acquired. Throws an exception if the lock
* cannot be acquired.
* Acquires a lock on behalf of a task. Blocks until the lock is acquired.
*
* @param task task to acquire lock for
* @param interval interval to lock
* @return acquired TaskLock
*
* @throws InterruptedException if the lock cannot be acquired
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval) throws InterruptedException
{
long timeout = lockTimeoutMillis;
giant.lock();
giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
long startTime = System.currentTimeMillis();
lockReleaseCondition.await(timeout, TimeUnit.MILLISECONDS);
long timeDelta = System.currentTimeMillis() - startTime;
if (timeDelta >= timeout) {
log.error(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
);
throw new InterruptedException(String.format(
"Task [%s] can not acquire lock for interval [%s] within [%s] ms",
task.getId(),
interval,
lockTimeoutMillis
));
} else {
timeout -= timeDelta;
}
lockReleaseCondition.await();
}
return taskLock.get();
}
finally {
giant.unlock();
}
}
/**
* Acquires a lock on behalf of a task, waiting up to the specified wait time if necessary.
*
* @param task task to acquire a lock for
* @param interval interval to lock
* @param timeoutMs maximum time to wait
*
* @return acquired lock
*
* @throws InterruptedException if the current thread is interrupted
*/
public TaskLock lock(final Task task, final Interval interval, long timeoutMs) throws InterruptedException
{
long nanos = TimeUnit.MILLISECONDS.toNanos(timeoutMs);
giant.lockInterruptibly();
try {
Optional<TaskLock> taskLock;
while (!(taskLock = tryLock(task, interval)).isPresent()) {
if (nanos <= 0) {
return null;
}
nanos = lockReleaseCondition.awaitNanos(nanos);
}
return taskLock.get();
}
finally {
@ -274,7 +269,7 @@ public class TaskLockbox
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");
final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion);
final TaskLockPosse posseToUse = createOrFindLockPosse(task, interval, preferredVersion);
if (posseToUse != null) {
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
@ -310,7 +305,7 @@ public class TaskLockbox
}
private TaskLockPosse tryAddTaskToLockPosse(
private TaskLockPosse createOrFindLockPosse(
final Task task,
final Interval interval,
final Optional<String> preferredVersion

View File

@ -90,7 +90,7 @@ public class SegmentInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertEquals(
@ -108,7 +108,7 @@ public class SegmentInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));

View File

@ -89,7 +89,7 @@ public class SegmentTransactionalInsertActionTest
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
SegmentPublishResult result1 = new SegmentTransactionalInsertAction(
ImmutableSet.of(SEGMENT1),
@ -130,7 +130,7 @@ public class SegmentTransactionalInsertActionTest
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
SegmentPublishResult result = new SegmentTransactionalInsertAction(
ImmutableSet.of(SEGMENT1),
@ -150,7 +150,7 @@ public class SegmentTransactionalInsertActionTest
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentTransactionalInsertAction action = new SegmentTransactionalInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL), 5000);
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));

View File

@ -80,7 +80,7 @@ public class TaskActionTestKit extends ExternalResource
public void before()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
taskLockbox = new TaskLockbox(taskStorage, 300);
taskLockbox = new TaskLockbox(taskStorage);
testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig)

View File

@ -933,7 +933,7 @@ public class RealtimeIndexTaskTest
)
{
final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, 300);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage);
try {
taskStorage.insert(task, TaskStatus.running(task.getId()));
}

View File

@ -30,6 +30,9 @@ import io.druid.guice.FirehoseModule;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
@ -172,7 +175,7 @@ public class TaskSerdeTest
final IndexTask task = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new IndexIngestionSpec(
new DataSchema(
"foo",
null,
@ -184,8 +187,8 @@ public class TaskSerdeTest
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTask.IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null)
new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTuningConfig(10000, 10, null, 9999, null, indexSpec, 3, true, true, false, null, null)
),
null
);
@ -234,7 +237,7 @@ public class TaskSerdeTest
final IndexTask task = new IndexTask(
null,
new TaskResource("rofl", 2),
new IndexTask.IndexIngestionSpec(
new IndexIngestionSpec(
new DataSchema(
"foo",
null,
@ -246,8 +249,8 @@ public class TaskSerdeTest
),
jsonMapper
),
new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
new IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true),
new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null
);

View File

@ -151,7 +151,7 @@ public class IngestSegmentFirehoseFactoryTest
}
INDEX_MERGER_V9.persist(index, persistDir, indexSpec);
final TaskLockbox tl = new TaskLockbox(ts, 300);
final TaskLockbox tl = new TaskLockbox(ts);
final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null)
{
final private Set<DataSegment> published = Sets.newHashSet();

View File

@ -72,7 +72,7 @@ public class RealtimeishTask extends AbstractTask
// Sort of similar to what realtime tasks do:
// Acquire lock for first interval
final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1));
final TaskLock lock1 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval1, 5000));
final List<TaskLock> locks1 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)
@ -80,7 +80,7 @@ public class RealtimeishTask extends AbstractTask
Assert.assertEquals("locks1", ImmutableList.of(lock1), locks1);
// Acquire lock for second interval
final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2));
final TaskLock lock2 = toolbox.getTaskActionClient().submit(new LockAcquireAction(interval2, 5000));
final List<TaskLock> locks2 = toolbox.getTaskActionClient().submit(new LockListAction());
// (Confirm lock sanity)

View File

@ -60,6 +60,9 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.AbstractFixedIntervalTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.IndexTask.IndexIOConfig;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import io.druid.indexing.common.task.KillTask;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.Task;
@ -514,7 +517,7 @@ public class TaskLifecycleTest
Preconditions.checkNotNull(taskStorage);
Preconditions.checkNotNull(emitter);
taskLockbox = new TaskLockbox(taskStorage, 300);
taskLockbox = new TaskLockbox(taskStorage);
tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
SupervisorManager.class)));
File tmpDir = temporaryFolder.newFolder();
@ -642,7 +645,7 @@ public class TaskLifecycleTest
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new IndexIngestionSpec(
new DataSchema(
"foo",
null,
@ -654,8 +657,8 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
new IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null
);
@ -699,7 +702,7 @@ public class TaskLifecycleTest
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new IndexIngestionSpec(
new DataSchema(
"foo",
null,
@ -711,8 +714,8 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
new IndexIOConfig(new MockExceptionalFirehoseFactory(), false),
new IndexTuningConfig(10000, 10, null, null, null, indexSpec, 3, true, true, false, null, null)
),
null
);
@ -1063,7 +1066,7 @@ public class TaskLifecycleTest
final Task indexTask = new IndexTask(
null,
null,
new IndexTask.IndexIngestionSpec(
new IndexIngestionSpec(
new DataSchema(
"foo",
null,
@ -1075,8 +1078,8 @@ public class TaskLifecycleTest
),
mapper
),
new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTask.IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null)
new IndexIOConfig(new MockFirehoseFactory(false), false),
new IndexTuningConfig(10000, 10, null, null, null, indexSpec, null, false, null, null, null, null)
),
null
);

View File

@ -19,13 +19,11 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
@ -37,10 +35,8 @@ import io.druid.java.util.common.StringUtils;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -48,7 +44,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TaskLockboxTest
@ -57,7 +52,6 @@ public class TaskLockboxTest
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private ServerConfig serverConfig;
private TaskStorage taskStorage;
private TaskLockbox lockbox;
@ -78,15 +72,11 @@ public class TaskLockboxTest
objectMapper
)
);
serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)).anyTimes();
EasyMock.replay(serverConfig);
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
lockbox = new TaskLockbox(taskStorage, serverConfig);
lockbox = new TaskLockbox(taskStorage);
}
@Test
@ -179,20 +169,18 @@ public class TaskLockboxTest
public void testTimeoutForLock() throws InterruptedException
{
Task task1 = NoopTask.create();
Task task2 = new SomeTask(null, 0, 0, null, null, null);
Task task2 = NoopTask.create();
lockbox.add(task1);
lockbox.add(task2);
exception.expect(InterruptedException.class);
exception.expectMessage("can not acquire lock for interval");
lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"));
lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"));
Assert.assertNotNull(lockbox.lock(task1, new Interval("2015-01-01/2015-01-02"), 5000));
Assert.assertNull(lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"), 1000));
}
@Test
public void testSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage, serverConfig);
final TaskLockbox originalBox = new TaskLockbox(taskStorage);
for (int i = 0; i < 5; i++) {
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
@ -207,7 +195,7 @@ public class TaskLockboxTest
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toList());
final TaskLockbox newBox = new TaskLockbox(taskStorage, serverConfig);
final TaskLockbox newBox = new TaskLockbox(taskStorage);
newBox.syncFromStorage();
Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks());
@ -219,32 +207,4 @@ public class TaskLockboxTest
Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}
public static class SomeTask extends NoopTask
{
public SomeTask(
@JsonProperty("id") String id,
@JsonProperty("runTime") long runTime,
@JsonProperty("isReadyTime") long isReadyTime,
@JsonProperty("isReadyResult") String isReadyResult,
@JsonProperty("firehose") FirehoseFactory firehoseFactory,
@JsonProperty("context") Map<String, Object> context
)
{
super(id, runTime, isReadyTime, isReadyResult, firehoseFactory, context);
}
@Override
public String getType()
{
return "someTask";
}
@Override
public String getGroupId()
{
return "someGroupId";
}
}
}

View File

@ -28,7 +28,6 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;