bugfix: when building materialized-view, if taskCount>1, may cause concurrentModificationException (#6690)

* bugfix: when building materialized-view, if taskCount >1, may cause ConcurrentModificationException

* remove entry after iteration instead of using ConcurrentMap, and add unit test

* small change

* modify unit test for coverage

* remove unused method
This commit is contained in:
Fangyuan Deng 2019-02-20 05:10:55 +08:00 committed by Jihoon Son
parent 258485a2fb
commit 7d1e8f353e
2 changed files with 96 additions and 2 deletions

View File

@ -270,13 +270,18 @@ public class MaterializedViewSupervisor implements Supervisor
void checkSegmentsAndSubmitTasks()
{
synchronized (taskLock) {
List<Interval> intervalsToRemove = new ArrayList<>();
for (Map.Entry<Interval, HadoopIndexTask> entry : runningTasks.entrySet()) {
Optional<TaskStatus> taskStatus = taskStorage.getStatus(entry.getValue().getId());
if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) {
runningTasks.remove(entry.getKey());
runningVersion.remove(entry.getKey());
intervalsToRemove.add(entry.getKey());
}
}
for (Interval interval : intervalsToRemove) {
runningTasks.remove(interval);
runningVersion.remove(interval);
}
if (runningTasks.size() == maxTaskCount) {
//if the number of running tasks reach the max task count, supervisor won't submit new tasks.
return;
@ -289,6 +294,12 @@ public class MaterializedViewSupervisor implements Supervisor
}
}
@VisibleForTesting
Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks()
{
return new Pair<>(runningTasks, runningVersion);
}
/**
* Find infomation about the intervals in which derived dataSource data should be rebuilt.
* The infomation includes the version and DataSegments list of a interval.

View File

@ -27,7 +27,11 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.indexer.HadoopIOConfig;
import org.apache.druid.indexer.HadoopIngestionSpec;
import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.HadoopIndexTask;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
@ -41,7 +45,9 @@ import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@ -176,6 +182,83 @@ public class MaterializedViewSupervisorTest
Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
}
@Test
public void testCheckSegmentsAndSubmitTasks() throws IOException
{
Set<DataSegment> baseSegments = Sets.newHashSet(
new DataSegment(
"base",
Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
"2015-01-03",
ImmutableMap.of(),
ImmutableList.of("dim1", "dim2"),
ImmutableList.of("m1"),
new HashBasedNumberedShardSpec(0, 1, null, null),
9,
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
expect(taskStorage.getStatus("test_task1")).andReturn(Optional.of(TaskStatus.failure("test_task1"))).anyTimes();
expect(taskStorage.getStatus("test_task2")).andReturn(Optional.of(TaskStatus.running("test_task2"))).anyTimes();
EasyMock.replay(taskStorage);
Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> runningTasksPair = supervisor.getRunningTasks();
Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
Map<Interval, String> runningVersion = runningTasksPair.rhs;
DataSchema dataSchema = new DataSchema(
"test_datasource",
null,
null,
null,
TransformSpec.NONE,
objectMapper
);
HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null);
HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null);
HadoopIndexTask task1 = new HadoopIndexTask(
"test_task1",
spec,
null,
null,
null,
objectMapper,
null,
null,
null
);
runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1);
runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), "test_version1");
HadoopIndexTask task2 = new HadoopIndexTask(
"test_task2",
spec,
null,
null,
null,
objectMapper,
null,
null,
null
);
runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
supervisor.checkSegmentsAndSubmitTasks();
Map<Interval, HadoopIndexTask> expectedRunningTasks = new HashMap<>();
Map<Interval, String> expectedRunningVersion = new HashMap<>();
expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2");
Assert.assertEquals(expectedRunningTasks, runningTasks);
Assert.assertEquals(expectedRunningVersion, runningVersion);
}
@Test
public void testSuspendedDoesntRun()