Fix create segment phase of batch ingestion to take segment identifiers that have a non UTC interval… (#11635)

* Fix create segment phase of batch ingestion to take segment identifiers with non UTC time zones

* Fix  comment and LGTM forbidden error
This commit is contained in:
Agustin Gonzalez 2021-08-30 23:19:07 -07:00 committed by GitHub
parent adeae3960f
commit 2405a9f25e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 97 additions and 28 deletions

View File

@ -84,7 +84,7 @@ import java.util.stream.Collectors;
* reasons, the code for creating segments was all handled by the same code path in that class. The code
* was correct but inefficient for batch ingestion from a memory perspective. If the input file being processed
* by batch ingestion had enough sinks & hydrants produced then it may run out of memory either in the
* hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore a new class,
* hydrant creation phase (append) of this class or in the merge hydrants phase. Therefore, a new class,
* {@code BatchAppenderator}, this class, was created to specialize in batch ingestion and the old class
* for stream ingestion was renamed to {@link StreamAppenderator}.
* <p>
@ -321,10 +321,10 @@ public class BatchAppenderator implements Appenderator
return new AppenderatorAddResult(identifier, sinksMetadata.get(identifier).numRowsInSegment, false);
}
@Override
/**
* Returns all active segments regardless whether they are in memory or persisted
*/
@Override
public List<SegmentIdWithShardSpec> getSegments()
{
return ImmutableList.copyOf(sinksMetadata.keySet());
@ -568,9 +568,9 @@ public class BatchAppenderator implements Appenderator
log.info("Preparing to push...");
// get the dirs for the identfiers:
List<File> identifiersDirs = new ArrayList<>();
// Traverse identifiers, load their sink, and push it:
int totalHydrantsMerged = 0;
final List<DataSegment> dataSegments = new ArrayList<>();
for (SegmentIdWithShardSpec identifier : identifiers) {
SinkMetadata sm = sinksMetadata.get(identifier);
if (sm == null) {
@ -580,34 +580,28 @@ public class BatchAppenderator implements Appenderator
if (persistedDir == null) {
throw new ISE("Sink for identifier[%s] not found in local file system", identifier);
}
identifiersDirs.add(persistedDir);
totalHydrantsMerged += sm.getNumHydrants();
}
// push all sinks for identifiers:
final List<DataSegment> dataSegments = new ArrayList<>();
for (File identifier : identifiersDirs) {
// retrieve sink from disk:
Pair<SegmentIdWithShardSpec, Sink> identifiersAndSinks;
Sink sinkForIdentifier;
try {
identifiersAndSinks = getIdentifierAndSinkForPersistedFile(identifier);
sinkForIdentifier = getSinkForIdentifierPath(identifier, persistedDir);
}
catch (IOException e) {
throw new ISE(e, "Failed to retrieve sinks for identifier[%s]", identifier);
throw new ISE(e, "Failed to retrieve sinks for identifier[%s] in path[%s]", identifier, persistedDir);
}
// push it:
// push sink:
final DataSegment dataSegment = mergeAndPush(
identifiersAndSinks.lhs,
identifiersAndSinks.rhs
identifier,
sinkForIdentifier
);
// record it:
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
log.warn("mergeAndPush[%s] returned null, skipping.", identifiersAndSinks.lhs);
log.warn("mergeAndPush[%s] returned null, skipping.", identifier);
}
}
@ -862,15 +856,9 @@ public class BatchAppenderator implements Appenderator
return retVal;
}
private Pair<SegmentIdWithShardSpec, Sink> getIdentifierAndSinkForPersistedFile(File identifierPath)
private Sink getSinkForIdentifierPath(SegmentIdWithShardSpec identifier, File identifierPath)
throws IOException
{
final SegmentIdWithShardSpec identifier = objectMapper.readValue(
new File(identifierPath, IDENTIFIER_FILE_NAME),
SegmentIdWithShardSpec.class
);
// To avoid reading and listing of "merged" dir and other special files
final File[] sinkFiles = identifierPath.listFiles(
(dir, fileName) -> !(Ints.tryParse(fileName) == null)
@ -901,7 +889,7 @@ public class BatchAppenderator implements Appenderator
);
}
Sink currSink = new Sink(
Sink retVal = new Sink(
identifier.getInterval(),
schema,
identifier.getShardSpec(),
@ -912,8 +900,8 @@ public class BatchAppenderator implements Appenderator
null,
hydrants
);
currSink.finishWriting(); // this sink is not writable
return new Pair<>(identifier, currSink);
retVal.finishWriting(); // this sink is not writable
return retVal;
}
// This function does not remove the sink from its tracking Map (sinks), the caller is responsible for that

View File

@ -31,6 +31,9 @@ import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.Test;
@ -125,6 +128,72 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest
}
}
/**
* Test the case when a segment identifier contains non UTC timestamps in its interval. This can happen
* when a custom segment granularity for an interval with a non UTC Chronlogy is created by
* {@link org.apache.druid.java.util.common.granularity.PeriodGranularity#bucketStart(DateTime)}
*/
@Test
public void testPeriodGranularityNonUTCIngestion() throws Exception
{
try (final BatchAppenderatorTester tester = new BatchAppenderatorTester(1, true)) {
final Appenderator appenderator = tester.getAppenderator();
// startJob
Assert.assertNull(appenderator.startJob());
// getDataSource
Assert.assertEquals(BatchAppenderatorTester.DATASOURCE, appenderator.getDataSource());
// Create a segment identifier with a non-utc interval
SegmentIdWithShardSpec segmentIdWithNonUTCTime =
createNonUTCSegmentId("2021-06-27T00:00:00.000+09:00/2021-06-28T00:00:00.000+09:00",
"A", 0); // should be in seg_0
Assert.assertEquals(
1,
appenderator.add(segmentIdWithNonUTCTime, createInputRow("2021-06-27T00:01:11.080Z", "foo", 1), null)
.getNumRowsInSegment()
);
// getSegments
Assert.assertEquals(
Collections.singletonList(segmentIdWithNonUTCTime),
appenderator.getSegments().stream().sorted().collect(Collectors.toList())
);
// since we just added one row and the max rows in memory is one, all the segments (sinks etc)
// above should be cleared now
Assert.assertEquals(
Collections.emptyList(),
((BatchAppenderator) appenderator).getInMemorySegments().stream().sorted().collect(Collectors.toList())
);
// push all
final SegmentsAndCommitMetadata segmentsAndCommitMetadata = appenderator.push(
appenderator.getSegments(),
null,
false
).get();
Assert.assertEquals(
Collections.singletonList(segmentIdWithNonUTCTime),
Lists.transform(
segmentsAndCommitMetadata.getSegments(),
SegmentIdWithShardSpec::fromDataSegment
).stream().sorted().collect(Collectors.toList())
);
Assert.assertEquals(
tester.getPushedSegments().stream().sorted().collect(Collectors.toList()),
segmentsAndCommitMetadata.getSegments().stream().sorted().collect(Collectors.toList())
);
appenderator.close();
Assert.assertTrue(appenderator.getSegments().isEmpty());
}
}
@Test
public void testSimpleIngestionWithFallbackCodePath() throws Exception
{
@ -887,7 +956,19 @@ public class BatchAppenderatorTest extends InitializedNullHandlingTest
}
}
private static SegmentIdWithShardSpec createNonUTCSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(
BatchAppenderatorTester.DATASOURCE,
new Interval(interval, ISOChronology.getInstance(DateTimes.inferTzFromString("Asia/Seoul"))),
version,
new LinearShardSpec(partitionNum)
);
}
private static SegmentIdWithShardSpec createSegmentId(String interval, String version, int partitionNum)
{
return new SegmentIdWithShardSpec(