MSQ: Fix task lock checking during publish, fix lock priority. (#13282)

* MSQ: Fix task lock checking during publish, fix lock priority.

Fixes two issues:

1) ControllerImpl did not properly check the return value of
   SegmentTransactionalInsertAction when doing a REPLACE. This could cause
   it to not realize that its locks were preempted.

2) Task lock priority was the default of 0. It should be the higher
   batch default of 50. The low priority made it possible for MSQ tasks
   to be preempted by compaction tasks, which is not desired.

* Restructuring, add docs.

* Add performSegmentPublish tests.

* Fix tests.
This commit is contained in:
Gian Merlino 2022-11-07 19:57:34 -08:00 committed by GitHub
parent f6aca21e82
commit 48528a0c98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 167 additions and 25 deletions

View File

@ -332,7 +332,7 @@ Each task type has a different default lock priority. The below table shows the
|task type|default priority|
|---------|----------------|
|Realtime index task|75|
|Batch index task|50|
|Batch index tasks, including [native batch](native-batch.md), [SQL](../multi-stage-query/index.md), and [Hadoop-based](hadoop.md)|50|
|Merge/Append/Compaction task|25|
|Other tasks|0|

View File

@ -59,8 +59,9 @@ import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.MarkSegmentsAsUnusedAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentInsertAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
@ -1090,30 +1091,17 @@ public class ControllerImpl implements Controller
.submit(new MarkSegmentsAsUnusedAction(task.getDataSource(), interval));
}
} else {
try {
context.taskActionClient()
.submit(SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments));
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.overwriteAction(null, segmentsToDrop, segments)
);
}
} else if (!segments.isEmpty()) {
// Append mode.
try {
context.taskActionClient().submit(new SegmentInsertAction(segments));
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
performSegmentPublish(
context.taskActionClient(),
SegmentTransactionalInsertAction.appendAction(segments, null, null)
);
}
}
@ -1863,6 +1851,32 @@ public class ControllerImpl implements Controller
return retVal;
}
/**
* Performs a particular {@link SegmentTransactionalInsertAction}, publishing segments.
*
* Throws {@link MSQException} with {@link InsertLockPreemptedFault} if the action fails due to lock preemption.
*/
static void performSegmentPublish(
final TaskActionClient client,
final SegmentTransactionalInsertAction action
) throws IOException
{
try {
final SegmentPublishResult result = client.submit(action);
if (!result.isSuccess()) {
throw new MSQException(InsertLockPreemptedFault.instance());
}
}
catch (Exception e) {
if (isTaskLockPreemptedException(e)) {
throw new MSQException(e, InsertLockPreemptedFault.instance());
} else {
throw e;
}
}
}
/**
* Method that determines whether an exception was raised due to the task lock for the controller task being
* preempted. Uses string comparison, because the relevant Overlord APIs do not have a more reliable way of
@ -1874,6 +1888,9 @@ public class ControllerImpl implements Controller
private static boolean isTaskLockPreemptedException(Exception e)
{
final String exceptionMsg = e.getMessage();
if (exceptionMsg == null) {
return false;
}
final List<String> validExceptionExcerpts = ImmutableList.of(
"are not covered by locks" /* From TaskLocks */,
"is preempted and no longer valid" /* From SegmentAllocateAction */

View File

@ -204,6 +204,12 @@ public class MSQControllerTask extends AbstractTask
}
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
private static String getDataSourceForTaskMetadata(final MSQSpec querySpec)
{
final MSQDestination destination = querySpec.getDestination();

View File

@ -30,6 +30,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.task.AbstractTask;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.msq.exec.MSQTasks;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.exec.WorkerContext;
@ -112,4 +113,10 @@ public class MSQWorkerTask extends AbstractTask
worker.stopGracefully();
}
}
@Override
public int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}
}

View File

@ -32,8 +32,8 @@ public class InsertLockPreemptedFault extends BaseMSQFault
{
super(
CODE,
"Insert lock preempted while trying to ingest the data."
+ " This can occur if there are higher priority jobs like real-time ingestion running on same time chunks."
"Lock preempted while trying to ingest the data. This can occur if there are higher priority tasks, such as "
+ "real-time ingestion, running on the same time chunks."
);
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.msq.exec;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Collections;
public class ControllerImplTest
{
@Test
public void test_performSegmentPublish_ok() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);
final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.ok(Collections.emptySet()));
EasyMock.replay(taskActionClient);
// All OK.
ControllerImpl.performSegmentPublish(taskActionClient, action);
}
@Test
public void test_performSegmentPublish_publishFail() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);
final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andReturn(SegmentPublishResult.fail("oops"));
EasyMock.replay(taskActionClient);
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);
Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault());
}
@Test
public void test_performSegmentPublish_publishException() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);
final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("oops"));
EasyMock.replay(taskActionClient);
final ISE e = Assert.assertThrows(
ISE.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);
Assert.assertEquals("oops", e.getMessage());
}
@Test
public void test_performSegmentPublish_publishLockPreemptedException() throws IOException
{
final SegmentTransactionalInsertAction action =
SegmentTransactionalInsertAction.appendAction(Collections.emptySet(), null, null);
final TaskActionClient taskActionClient = EasyMock.mock(TaskActionClient.class);
EasyMock.expect(taskActionClient.submit(action)).andThrow(new ISE("are not covered by locks"));
EasyMock.replay(taskActionClient);
final MSQException e = Assert.assertThrows(
MSQException.class,
() -> ControllerImpl.performSegmentPublish(taskActionClient, action)
);
Assert.assertEquals(InsertLockPreemptedFault.instance(), e.getFault());
}
}

View File

@ -27,16 +27,20 @@ import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.LockListAction;
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.common.actions.TaskAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.PeriodGranularity;
import org.apache.druid.msq.indexing.error.InsertLockPreemptedFaultTest;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.joda.time.Interval;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -91,6 +95,10 @@ public class MSQTestTaskActionClient implements TaskActionClient
));
} else if (taskAction instanceof RetrieveUsedSegmentsAction) {
return (RetType) ImmutableSet.of();
} else if (taskAction instanceof SegmentTransactionalInsertAction) {
// Always OK.
final Set<DataSegment> segments = ((SegmentTransactionalInsertAction) taskAction).getSegments();
return (RetType) SegmentPublishResult.ok(segments);
} else {
return null;
}