Fix indefinite WAITING batch task when lock is revoked (#11788)

* Fix indefinite WAITING batch task when lock is revoked

* Use revoked property on TaskLock

* Update TimeChunkLockAcquireAction to return TaskLock for revoked locks
This commit is contained in:
Jonathan Wei 2021-10-27 17:49:15 -05:00 committed by GitHub
parent 9ca8f1ec97
commit a96aed021e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 145 additions and 18 deletions

View File

@ -95,7 +95,7 @@ public class TimeChunkLockAcquireAction implements TaskAction<TaskLock>
.lock(task, new TimeChunkLockRequest(type, task, interval, null)) .lock(task, new TimeChunkLockRequest(type, task, interval, null))
: toolbox.getTaskLockbox() : toolbox.getTaskLockbox()
.lock(task, new TimeChunkLockRequest(type, task, interval, null), timeoutMs); .lock(task, new TimeChunkLockRequest(type, task, interval, null), timeoutMs);
return result.isOk() ? result.getTaskLock() : null; return result.getTaskLock();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -81,7 +81,8 @@ public class TimeChunkLockTryAcquireAction implements TaskAction<TaskLock>
task, task,
new TimeChunkLockRequest(type, task, interval, null) new TimeChunkLockRequest(type, task, interval, null)
); );
return result.isOk() ? result.getTaskLock() : null;
return result.getTaskLock();
} }
@Override @Override

View File

@ -45,6 +45,7 @@ import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.Segments; import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.granularity.GranularityType;
@ -404,6 +405,9 @@ public abstract class AbstractBatchIndexTask extends AbstractTask
if (lock == null) { if (lock == null) {
return false; return false;
} }
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", cur));
}
} }
return true; return true;
} }

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.io.IOException; import java.io.IOException;
@ -78,7 +79,19 @@ public abstract class AbstractFixedIntervalTask extends AbstractTask
@Override @Override
public boolean isReady(TaskActionClient taskActionClient) throws Exception public boolean isReady(TaskActionClient taskActionClient) throws Exception
{ {
return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null; final TaskLock lock = taskActionClient.submit(
new TimeChunkLockTryAcquireAction(
TaskLockType.EXCLUSIVE,
interval
)
);
if (lock == null) {
return false;
}
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return true;
} }
@JsonProperty @JsonProperty

View File

@ -45,6 +45,7 @@ import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
@ -299,13 +300,20 @@ public class AppenderatorDriverRealtimeIndexTask extends AbstractTask implements
) )
).isOk(); ).isOk();
} else { } else {
return toolbox.getTaskActionClient().submit( final TaskLock lock = toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction( new TimeChunkLockAcquireAction(
TaskLockType.EXCLUSIVE, TaskLockType.EXCLUSIVE,
segmentId.getInterval(), segmentId.getInterval(),
1000L 1000L
) )
) != null; );
if (lock == null) {
return false;
}
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
}
return true;
} }
} }
catch (IOException e) { catch (IOException e) {

View File

@ -54,6 +54,7 @@ import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever; import org.apache.druid.indexing.hadoop.OverlordActionBasedUsedSegmentsRetriever;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.Granularity;
@ -198,7 +199,19 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
Interval interval = JodaUtils.umbrellaInterval( Interval interval = JodaUtils.umbrellaInterval(
JodaUtils.condenseIntervals(intervals) JodaUtils.condenseIntervals(intervals)
); );
return taskActionClient.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, interval)) != null; final TaskLock lock = taskActionClient.submit(
new TimeChunkLockTryAcquireAction(
TaskLockType.EXCLUSIVE,
interval
)
);
if (lock == null) {
return false;
}
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return true;
} else { } else {
return true; return true;
} }
@ -393,6 +406,9 @@ public class HadoopIndexTask extends HadoopTask implements ChatHandler
), ),
"Cannot acquire a lock for interval[%s]", interval "Cannot acquire a lock for interval[%s]", interval
); );
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
version = lock.getVersion(); version = lock.getVersion();
} else { } else {
Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient()); Iterable<TaskLock> locks = getTaskLocks(toolbox.getTaskActionClient());

View File

@ -42,6 +42,7 @@ import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.NoopQueryRunner;
@ -238,13 +239,16 @@ public class RealtimeIndexTask extends AbstractTask
public void announceSegment(final DataSegment segment) throws IOException public void announceSegment(final DataSegment segment) throws IOException
{ {
// Side effect: Calling announceSegment causes a lock to be acquired // Side effect: Calling announceSegment causes a lock to be acquired
Preconditions.checkNotNull( final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit( toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs) new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs)
), ),
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
segment.getInterval() segment.getInterval()
); );
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
}
toolbox.getSegmentAnnouncer().announceSegment(segment); toolbox.getSegmentAnnouncer().announceSegment(segment);
} }
@ -264,13 +268,16 @@ public class RealtimeIndexTask extends AbstractTask
{ {
// Side effect: Calling announceSegments causes locks to be acquired // Side effect: Calling announceSegments causes locks to be acquired
for (DataSegment segment : segments) { for (DataSegment segment : segments) {
Preconditions.checkNotNull( final TaskLock lock = Preconditions.checkNotNull(
toolbox.getTaskActionClient().submit( toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs) new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segment.getInterval(), lockTimeoutMs)
), ),
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
segment.getInterval() segment.getInterval()
); );
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segment.getInterval()));
}
} }
toolbox.getSegmentAnnouncer().announceSegments(segments); toolbox.getSegmentAnnouncer().announceSegments(segments);
} }
@ -312,7 +319,9 @@ public class RealtimeIndexTask extends AbstractTask
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
interval interval
); );
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
return lock.getVersion(); return lock.getVersion();
} }
catch (IOException e) { catch (IOException e) {

View File

@ -39,6 +39,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskC
import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair; import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.SegmentId;
@ -350,6 +351,9 @@ public class SinglePhaseParallelIndexTaskRunner extends ParallelIndexPhaseRunner
"Cannot acquire a lock for interval[%s]", "Cannot acquire a lock for interval[%s]",
interval interval
); );
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", interval));
}
version = lock.getVersion(); version = lock.getVersion();
} }
} }

View File

@ -48,9 +48,14 @@ public class LockResult
return new LockResult(taskLock, newSegmentId, false); return new LockResult(taskLock, newSegmentId, false);
} }
public static LockResult fail(boolean revoked) public static LockResult fail()
{ {
return new LockResult(null, null, revoked); return new LockResult(null, null, false);
}
public static LockResult revoked(TaskLock taskLock)
{
return new LockResult(taskLock, null, true);
} }
@JsonCreator @JsonCreator
@ -87,6 +92,6 @@ public class LockResult
public boolean isOk() public boolean isOk()
{ {
return taskLock != null; return taskLock != null && !revoked;
} }
} }

View File

@ -351,7 +351,7 @@ public class TaskLockbox
if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) { if (lockRequestForNewSegment.getGranularity() == LockGranularity.SEGMENT) {
newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion()); newSegmentId = allocateSegmentId(lockRequestForNewSegment, request.getVersion());
if (newSegmentId == null) { if (newSegmentId == null) {
return LockResult.fail(false); return LockResult.fail();
} }
convertedRequest = new SpecificSegmentLockRequest(lockRequestForNewSegment, newSegmentId); convertedRequest = new SpecificSegmentLockRequest(lockRequestForNewSegment, newSegmentId);
} else { } else {
@ -400,7 +400,7 @@ public class TaskLockbox
? ((SegmentLock) posseToUse.taskLock).getPartitionId() ? ((SegmentLock) posseToUse.taskLock).getPartitionId()
: null : null
); );
return LockResult.fail(false); return LockResult.fail();
} }
} else { } else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId()); log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
@ -408,7 +408,10 @@ public class TaskLockbox
} }
} else { } else {
final boolean lockRevoked = posseToUse != null && posseToUse.getTaskLock().isRevoked(); final boolean lockRevoked = posseToUse != null && posseToUse.getTaskLock().isRevoked();
return LockResult.fail(lockRevoked); if (lockRevoked) {
return LockResult.revoked(posseToUse.getTaskLock());
}
return LockResult.fail();
} }
} }
finally { finally {
@ -608,7 +611,8 @@ public class TaskLockbox
* @param taskId an id of the task holding the lock * @param taskId an id of the task holding the lock
* @param lock lock to be revoked * @param lock lock to be revoked
*/ */
private void revokeLock(String taskId, TaskLock lock) @VisibleForTesting
protected void revokeLock(String taskId, TaskLock lock)
{ {
giant.lock(); giant.lock();

View File

@ -51,6 +51,7 @@ import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport; import org.apache.druid.indexing.common.TaskReport;
@ -436,13 +437,20 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
) )
).isOk(); ).isOk();
} else { } else {
return toolbox.getTaskActionClient().submit( final TaskLock lock = toolbox.getTaskActionClient().submit(
new TimeChunkLockAcquireAction( new TimeChunkLockAcquireAction(
TaskLockType.EXCLUSIVE, TaskLockType.EXCLUSIVE,
segmentId.getInterval(), segmentId.getInterval(),
1000L 1000L
) )
) != null; );
if (lock == null) {
return false;
}
if (lock.isRevoked()) {
throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", segmentId.getInterval()));
}
return true;
} }
} }
catch (IOException e) { catch (IOException e) {

View File

@ -1426,6 +1426,61 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
} }
@Test
public void testLockRevoked() throws Exception
{
final Task task = new AbstractFixedIntervalTask(
"id1",
"id1",
new TaskResource("id1", 1),
"ds",
Intervals.of("2012-01-01/P1D"),
null
)
{
@Override
public String getType()
{
return "test";
}
@Override
public void stopGracefully(TaskConfig taskConfig)
{
}
@Override
public TaskStatus run(TaskToolbox toolbox) throws Exception
{
final Interval interval = Intervals.of("2012-01-01/P1D");
final TimeChunkLockTryAcquireAction action = new TimeChunkLockTryAcquireAction(
TaskLockType.EXCLUSIVE,
interval
);
final TaskLock lock = toolbox.getTaskActionClient().submit(action);
if (lock == null) {
throw new ISE("Failed to get a lock");
}
final TaskLock lockBeforeRevoke = toolbox.getTaskActionClient().submit(action);
Assert.assertFalse(lockBeforeRevoke.isRevoked());
taskLockbox.revokeLock(getId(), lock);
final TaskLock lockAfterRevoke = toolbox.getTaskActionClient().submit(action);
Assert.assertTrue(lockAfterRevoke.isRevoked());
return TaskStatus.failure(getId(), "lock revoked test");
}
};
final TaskStatus status = runTask(task);
Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("segments published", 0, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
private TaskStatus runTask(final Task task) throws Exception private TaskStatus runTask(final Task task) throws Exception
{ {
final Task dummyTask = new DefaultObjectMapper().readValue( final Task dummyTask = new DefaultObjectMapper().readValue(