mirror of https://github.com/apache/druid.git
Fix ParallelIndexTask when publishing empty segments (#6807)
* Fix ParallelIndexTask when publishing empty segments * unused import
This commit is contained in:
parent
ef80c4e036
commit
c4716d1639
|
@ -390,10 +390,11 @@ public class SinglePhaseParallelIndexTaskRunner implements ParallelIndexTaskRunn
|
|||
.stream()
|
||||
.flatMap(report -> report.getSegments().stream())
|
||||
.collect(Collectors.toSet());
|
||||
final boolean published = publisher.publishSegments(segmentsToPublish, null).isSuccess();
|
||||
final boolean published = segmentsToPublish.isEmpty()
|
||||
|| publisher.publishSegments(segmentsToPublish, null).isSuccess();
|
||||
|
||||
if (published) {
|
||||
log.info("Published segments");
|
||||
log.info("Published [%d] segments", segmentsToPublish.size());
|
||||
} else {
|
||||
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
|
||||
final Set<SegmentIdentifier> segmentsIdentifiers = segmentsMap
|
||||
|
|
|
@ -187,6 +187,24 @@ public class ParallelIndexSupervisorTaskTest extends AbstractParallelIndexSuperv
|
|||
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPublishEmptySegments() throws Exception
|
||||
{
|
||||
final ParallelIndexSupervisorTask task = newTask(
|
||||
Intervals.of("2020/2021"),
|
||||
new ParallelIndexIOConfig(
|
||||
new LocalFirehoseFactory(inputDir, "test_*", null),
|
||||
false
|
||||
)
|
||||
);
|
||||
actionClient = createActionClient(task);
|
||||
toolbox = createTaskToolbox(task);
|
||||
|
||||
prepareTaskForLocking(task);
|
||||
Assert.assertTrue(task.isReady(actionClient));
|
||||
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
|
||||
}
|
||||
|
||||
private ParallelIndexSupervisorTask newTask(
|
||||
Interval interval,
|
||||
ParallelIndexIOConfig ioConfig
|
||||
|
|
Loading…
Reference in New Issue