From b6c957b0d2f81648cf2a01aa08ba955dd90a693c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Mon, 2 Jul 2018 15:09:12 -0700 Subject: [PATCH] Allow reordered segment allocation in kafka indexing service (#5805) * Allow reordered segment allocation in kafka indexing service * address comments * fix a bug --- .../indexing/common/task/IndexTaskTest.java | 4 +- .../java/io/druid/java/util/common/Pair.java | 9 +- .../IndexerSQLMetadataStorageCoordinator.java | 573 +++++++++++------- .../appenderator/BaseAppenderatorDriver.java | 158 ++++- .../appenderator/BatchAppenderatorDriver.java | 50 +- .../StreamAppenderatorDriver.java | 58 +- .../BatchAppenderatorDriverTest.java | 2 +- 7 files changed, 576 insertions(+), 278 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 746315cfb25..b833aaaf095 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -450,12 +450,12 @@ public class IndexTaskTest Assert.assertEquals("test", segments.get(0).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval()); - Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass()); Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum()); Assert.assertEquals("test", segments.get(1).getDataSource()); Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval()); - Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class)); + Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass()); Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum()); } diff --git a/java-util/src/main/java/io/druid/java/util/common/Pair.java b/java-util/src/main/java/io/druid/java/util/common/Pair.java index 2d954b68072..c6fed009b39 100644 --- a/java-util/src/main/java/io/druid/java/util/common/Pair.java +++ b/java-util/src/main/java/io/druid/java/util/common/Pair.java @@ -19,6 +19,7 @@ package io.druid.java.util.common; +import javax.annotation.Nullable; import java.util.Objects; /** @@ -26,18 +27,20 @@ import java.util.Objects; public class Pair { - public static Pair of(T1 lhs, T2 rhs) + public static Pair of(@Nullable T1 lhs, @Nullable T2 rhs) { return new Pair<>(lhs, rhs); } + @Nullable public final T1 lhs; + @Nullable public final T2 rhs; public Pair( - T1 lhs, - T2 rhs + @Nullable T1 lhs, + @Nullable T2 rhs ) { this.lhs = lhs; diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 1dec5c60613..7e2ad65f92a 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -19,6 +19,7 @@ package io.druid.metadata; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -66,6 +67,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper; import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.StringMapper; +import javax.annotation.Nullable; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; @@ -131,7 +133,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor new HandleCallback>() { @Override - public List withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) { final VersionedIntervalTimeline timeline = getTimelineForIntervalsWithHandle( handle, @@ -378,7 +380,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor public SegmentIdentifier allocatePendingSegment( final String dataSource, final String sequenceName, - final String previousSegmentId, + @Nullable final String previousSegmentId, final Interval interval, final String maxVersion, final boolean skipSegmentLineageCheck @@ -389,220 +391,22 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor Preconditions.checkNotNull(interval, "interval"); Preconditions.checkNotNull(maxVersion, "maxVersion"); - final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; - return connector.retryTransaction( new TransactionCallback() { @Override public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception { - final List existingBytes; - if (!skipSegmentLineageCheck) { - existingBytes = handle - .createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "sequence_prev_id = :sequence_prev_id", - dbTables.getPendingSegmentsTable() - ) - ).bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .map(ByteArrayMapper.FIRST) - .list(); - } else { - existingBytes = handle - .createQuery( - StringUtils.format( - "SELECT payload FROM %s WHERE " - + "dataSource = :dataSource AND " - + "sequence_name = :sequence_name AND " - + "start = :start AND " - + "%2$send%2$s = :end", - dbTables.getPendingSegmentsTable(), connector.getQuoteString() - ) - ).bind("dataSource", dataSource) - .bind("sequence_name", sequenceName) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .map(ByteArrayMapper.FIRST) - .list(); - } - - if (!existingBytes.isEmpty()) { - final SegmentIdentifier existingIdentifier = jsonMapper.readValue( - Iterables.getOnlyElement(existingBytes), - SegmentIdentifier.class - ); - - if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() - && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { - log.info( - "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return existingIdentifier; - } else { - log.warn( - "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " - + "does not match requested interval[%s]", - existingIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull, - interval - ); - - return null; - } - } - - // Make up a pending segment based on existing segments and pending segments in the DB. This works - // assuming that all tasks inserting segments at a particular point in time are going through the - // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). - - final SegmentIdentifier newIdentifier; - - final List> existingChunks = getTimelineForIntervalsWithHandle( - handle, - dataSource, - ImmutableList.of(interval) - ).lookup(interval); - - if (existingChunks.size() > 1) { - // Not possible to expand more than one chunk with a single segment. - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", - dataSource, - interval, - maxVersion, - existingChunks.size() - ); - return null; - } else { - SegmentIdentifier max = null; - - if (!existingChunks.isEmpty()) { - TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); - for (PartitionChunk existing : existingHolder.getObject()) { - if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() - .getShardSpec() - .getPartitionNum()) { - max = SegmentIdentifier.fromDataSegment(existing.getObject()); - } - } - } - - final List pendings = getPendingSegmentsForIntervalWithHandle( - handle, - dataSource, - interval - ); - - for (SegmentIdentifier pending : pendings) { - if (max == null || - pending.getVersion().compareTo(max.getVersion()) > 0 || - (pending.getVersion().equals(max.getVersion()) - && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { - max = pending; - } - } - - if (max == null) { - newIdentifier = new SegmentIdentifier( - dataSource, - interval, - maxVersion, - new NumberedShardSpec(0, 0) - ); - } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", - dataSource, - interval, - maxVersion, - max.getIdentifierAsString() - ); - return null; - } else if (max.getShardSpec() instanceof LinearShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) - ); - } else if (max.getShardSpec() instanceof NumberedShardSpec) { - newIdentifier = new SegmentIdentifier( - dataSource, - max.getInterval(), - max.getVersion(), - new NumberedShardSpec( - max.getShardSpec().getPartitionNum() + 1, - ((NumberedShardSpec) max.getShardSpec()).getPartitions() - ) - ); - } else { - log.warn( - "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", - dataSource, - interval, - maxVersion, - max.getShardSpec().getClass(), - max.getIdentifierAsString() - ); - return null; - } - } - - // SELECT -> INSERT can fail due to races; callers must be prepared to retry. - // Avoiding ON DUPLICATE KEY since it's not portable. - // Avoiding try/catch since it may cause inadvertent transaction-splitting. - - // UNIQUE key for the row, ensuring sequences do not fork in two directions. - // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines - // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) - final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( - Hashing.sha1() - .newHasher() - .putBytes(StringUtils.toUtf8(sequenceName)) - .putByte((byte) 0xff) - .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) - .hash() - .asBytes() - ); - - handle.createStatement( - StringUtils.format( - "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " - + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", - dbTables.getPendingSegmentsTable(), - connector.getQuoteString() - ) - ) - .bind("id", newIdentifier.getIdentifierAsString()) - .bind("dataSource", dataSource) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .bind("sequence_name", sequenceName) - .bind("sequence_prev_id", previousSegmentIdNotNull) - .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) - .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) - .execute(); - - log.info( - "Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB", - newIdentifier.getIdentifierAsString(), - sequenceName, - previousSegmentIdNotNull - ); - - return newIdentifier; + return skipSegmentLineageCheck ? + allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) : + allocatePendingSegmentWithSegmentLineageCheck( + handle, + dataSource, + sequenceName, + previousSegmentId, + interval, + maxVersion + ); } }, ALLOCATE_SEGMENT_QUIET_TRIES, @@ -610,6 +414,355 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor ); } + @Nullable + private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck( + final Handle handle, + final String dataSource, + final String sequenceName, + @Nullable final String previousSegmentId, + final Interval interval, + final String maxVersion + ) throws IOException + { + final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId; + final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( + handle.createQuery( + StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "sequence_prev_id = :sequence_prev_id", + dbTables.getPendingSegmentsTable() + ) + ), + interval, + sequenceName, + previousSegmentIdNotNull, + Pair.of("dataSource", dataSource), + Pair.of("sequence_name", sequenceName), + Pair.of("sequence_prev_id", previousSegmentIdNotNull) + ); + + if (result.found) { + // The found existing segment identifier can be null if its interval doesn't match with the given interval + return result.segmentIdentifier; + } + + final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); + if (newIdentifier == null) { + return null; + } + + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring sequences do not fork in two directions. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putBytes(StringUtils.toUtf8(previousSegmentIdNotNull)) + .hash() + .asBytes() + ); + + insertToMetastore( + handle, + newIdentifier, + dataSource, + interval, + previousSegmentIdNotNull, + sequenceName, + sequenceNamePrevIdSha1 + ); + return newIdentifier; + } + + @Nullable + private SegmentIdentifier allocatePendingSegment( + final Handle handle, + final String dataSource, + final String sequenceName, + final Interval interval, + final String maxVersion + ) throws IOException + { + final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId( + handle.createQuery( + StringUtils.format( + "SELECT payload FROM %s WHERE " + + "dataSource = :dataSource AND " + + "sequence_name = :sequence_name AND " + + "start = :start AND " + + "%2$send%2$s = :end", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ), + interval, + sequenceName, + null, + Pair.of("dataSource", dataSource), + Pair.of("sequence_name", sequenceName), + Pair.of("start", interval.getStart().toString()), + Pair.of("end", interval.getEnd().toString()) + ); + + if (result.found) { + // The found existing segment identifier can be null if its interval doesn't match with the given interval + return result.segmentIdentifier; + } + + final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion); + if (newIdentifier == null) { + return null; + } + + // SELECT -> INSERT can fail due to races; callers must be prepared to retry. + // Avoiding ON DUPLICATE KEY since it's not portable. + // Avoiding try/catch since it may cause inadvertent transaction-splitting. + + // UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval. + // Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines + // have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319) + final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode( + Hashing.sha1() + .newHasher() + .putBytes(StringUtils.toUtf8(sequenceName)) + .putByte((byte) 0xff) + .putLong(interval.getStartMillis()) + .putLong(interval.getEndMillis()) + .hash() + .asBytes() + ); + + // always insert empty previous sequence id + insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1); + + log.info( + "Allocated pending segment [%s] for sequence[%s] in DB", + newIdentifier.getIdentifierAsString(), + sequenceName + ); + + return newIdentifier; + } + + private CheckExistingSegmentIdResult checkAndGetExistingSegmentId( + final Query> query, + final Interval interval, + final String sequenceName, + final @Nullable String previousSegmentId, + final Pair... queryVars + ) throws IOException + { + Query> boundQuery = query; + for (Pair var : queryVars) { + boundQuery = boundQuery.bind(var.lhs, var.rhs); + } + final List existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list(); + + if (!existingBytes.isEmpty()) { + final SegmentIdentifier existingIdentifier = jsonMapper.readValue( + Iterables.getOnlyElement(existingBytes), + SegmentIdentifier.class + ); + + if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis() + && existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) { + if (previousSegmentId == null) { + log.info( + "Found existing pending segment [%s] for sequence[%s] in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName + ); + } else { + log.info( + "Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentId + ); + } + + return new CheckExistingSegmentIdResult(true, existingIdentifier); + } else { + if (previousSegmentId == null) { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + interval + ); + } else { + log.warn( + "Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, " + + "does not match requested interval[%s]", + existingIdentifier.getIdentifierAsString(), + sequenceName, + previousSegmentId, + interval + ); + } + + return new CheckExistingSegmentIdResult(true, null); + } + } + return new CheckExistingSegmentIdResult(false, null); + } + + private static class CheckExistingSegmentIdResult + { + private final boolean found; + @Nullable + private final SegmentIdentifier segmentIdentifier; + + CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdentifier segmentIdentifier) + { + this.found = found; + this.segmentIdentifier = segmentIdentifier; + } + } + + private void insertToMetastore( + Handle handle, + SegmentIdentifier newIdentifier, + String dataSource, + Interval interval, + String previousSegmentId, + String sequenceName, + String sequenceNamePrevIdSha1 + ) throws JsonProcessingException + { + handle.createStatement( + StringUtils.format( + "INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) " + + "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)", + dbTables.getPendingSegmentsTable(), + connector.getQuoteString() + ) + ) + .bind("id", newIdentifier.getIdentifierAsString()) + .bind("dataSource", dataSource) + .bind("created_date", DateTimes.nowUtc().toString()) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .bind("sequence_name", sequenceName) + .bind("sequence_prev_id", previousSegmentId) + .bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1) + .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) + .execute(); + } + + @Nullable + private SegmentIdentifier createNewSegment( + final Handle handle, + final String dataSource, + final Interval interval, + final String maxVersion + ) throws IOException + { + // Make up a pending segment based on existing segments and pending segments in the DB. This works + // assuming that all tasks inserting segments at a particular point in time are going through the + // allocatePendingSegment flow. This should be assured through some other mechanism (like task locks). + + final List> existingChunks = getTimelineForIntervalsWithHandle( + handle, + dataSource, + ImmutableList.of(interval) + ).lookup(interval); + + if (existingChunks.size() > 1) { + // Not possible to expand more than one chunk with a single segment. + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.", + dataSource, + interval, + maxVersion, + existingChunks.size() + ); + return null; + } else { + SegmentIdentifier max = null; + + if (!existingChunks.isEmpty()) { + TimelineObjectHolder existingHolder = Iterables.getOnlyElement(existingChunks); + for (PartitionChunk existing : existingHolder.getObject()) { + if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject() + .getShardSpec() + .getPartitionNum()) { + max = SegmentIdentifier.fromDataSegment(existing.getObject()); + } + } + } + + final List pendings = getPendingSegmentsForIntervalWithHandle( + handle, + dataSource, + interval + ); + + for (SegmentIdentifier pending : pendings) { + if (max == null || + pending.getVersion().compareTo(max.getVersion()) > 0 || + (pending.getVersion().equals(max.getVersion()) + && pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) { + max = pending; + } + } + + if (max == null) { + return new SegmentIdentifier( + dataSource, + interval, + maxVersion, + new NumberedShardSpec(0, 0) + ); + } else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].", + dataSource, + interval, + maxVersion, + max.getIdentifierAsString() + ); + return null; + } else if (max.getShardSpec() instanceof LinearShardSpec) { + return new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1) + ); + } else if (max.getShardSpec() instanceof NumberedShardSpec) { + return new SegmentIdentifier( + dataSource, + max.getInterval(), + max.getVersion(), + new NumberedShardSpec( + max.getShardSpec().getPartitionNum() + 1, + ((NumberedShardSpec) max.getShardSpec()).getPartitions() + ) + ); + } else { + log.warn( + "Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].", + dataSource, + interval, + maxVersion, + max.getShardSpec().getClass(), + max.getIdentifierAsString() + ); + return null; + } + } + } + @Override public int deletePendingSegments(String dataSource, Interval deleteInterval) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index aee8778f9a9..081bf05a12a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -42,12 +42,13 @@ import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState; import org.joda.time.DateTime; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -55,6 +56,7 @@ import java.util.NavigableMap; import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -75,6 +77,94 @@ import java.util.stream.Stream; */ public abstract class BaseAppenderatorDriver implements Closeable { + /** + * Segments allocated for an intervval. + * There should be at most a single active (appending) segment at any time. + */ + static class SegmentsOfInterval + { + private final Interval interval; + private final List appendFinishedSegments = new ArrayList<>(); + + @Nullable + private SegmentWithState appendingSegment; + + SegmentsOfInterval(Interval interval) + { + this.interval = interval; + } + + SegmentsOfInterval( + Interval interval, + @Nullable SegmentWithState appendingSegment, + List appendFinishedSegments + ) + { + this.interval = interval; + this.appendingSegment = appendingSegment; + this.appendFinishedSegments.addAll(appendFinishedSegments); + + if (appendingSegment != null) { + Preconditions.checkArgument( + appendingSegment.getState() == SegmentState.APPENDING, + "appendingSegment[%s] is not in the APPENDING state", + appendingSegment.getSegmentIdentifier() + ); + } + if (appendFinishedSegments + .stream() + .anyMatch(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING)) { + throw new ISE("Some appendFinishedSegments[%s] is in the APPENDING state", appendFinishedSegments); + } + } + + void setAppendingSegment(SegmentWithState appendingSegment) + { + Preconditions.checkArgument( + appendingSegment.getState() == SegmentState.APPENDING, + "segment[%s] is not in the APPENDING state", + appendingSegment.getSegmentIdentifier() + ); + // There should be only one appending segment at any time + Preconditions.checkState( + this.appendingSegment == null, + "WTF?! Current appendingSegment[%s] is not null. " + + "Its state must be changed before setting a new appendingSegment[%s]", + this.appendingSegment, + appendingSegment + ); + this.appendingSegment = appendingSegment; + } + + void finishAppendingToCurrentActiveSegment(Consumer stateTransitionFn) + { + Preconditions.checkNotNull(appendingSegment, "appendingSegment"); + stateTransitionFn.accept(appendingSegment); + appendFinishedSegments.add(appendingSegment); + appendingSegment = null; + } + + Interval getInterval() + { + return interval; + } + + SegmentWithState getAppendingSegment() + { + return appendingSegment; + } + + List getAllSegments() + { + final List allSegments = new ArrayList<>(appendFinishedSegments.size() + 1); + if (appendingSegment != null) { + allSegments.add(appendingSegment); + } + allSegments.addAll(appendFinishedSegments); + return allSegments; + } + } + /** * Allocated segments for a sequence */ @@ -83,7 +173,7 @@ public abstract class BaseAppenderatorDriver implements Closeable // Interval Start millis -> List of Segments for this interval // there might be multiple segments for a start interval, for example one segment // can be in APPENDING state and others might be in PUBLISHING state - private final NavigableMap> intervalToSegmentStates; + private final NavigableMap intervalToSegmentStates; // most recently allocated segment private String lastSegmentId; @@ -94,7 +184,7 @@ public abstract class BaseAppenderatorDriver implements Closeable } SegmentsForSequence( - NavigableMap> intervalToSegmentStates, + NavigableMap intervalToSegmentStates, String lastSegmentId ) { @@ -104,24 +194,34 @@ public abstract class BaseAppenderatorDriver implements Closeable void add(SegmentIdentifier identifier) { - intervalToSegmentStates.computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>()) - .addFirst(SegmentWithState.newSegment(identifier)); + intervalToSegmentStates.computeIfAbsent( + identifier.getInterval().getStartMillis(), + k -> new SegmentsOfInterval(identifier.getInterval()) + ).setAppendingSegment(SegmentWithState.newSegment(identifier)); lastSegmentId = identifier.getIdentifierAsString(); } - Entry> floor(long timestamp) + Entry floor(long timestamp) { return intervalToSegmentStates.floorEntry(timestamp); } - LinkedList get(long timestamp) + SegmentsOfInterval get(long timestamp) { return intervalToSegmentStates.get(timestamp); } - Stream segmentStateStream() + Stream allSegmentStateStream() { - return intervalToSegmentStates.values().stream().flatMap(Collection::stream); + return intervalToSegmentStates + .values() + .stream() + .flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream()); + } + + Stream getAllSegmentsOfInterval() + { + return intervalToSegmentStates.values().stream(); } } @@ -183,13 +283,19 @@ public abstract class BaseAppenderatorDriver implements Closeable return null; } - final Map.Entry> candidateEntry = segmentsForSequence.floor( + final Map.Entry candidateEntry = segmentsForSequence.floor( timestamp.getMillis() ); - if (candidateEntry != null - && candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp) - && candidateEntry.getValue().getFirst().getState() == SegmentState.APPENDING) { - return candidateEntry.getValue().getFirst().getSegmentIdentifier(); + + if (candidateEntry != null) { + final SegmentsOfInterval segmentsOfInterval = candidateEntry.getValue(); + if (segmentsOfInterval.interval.contains(timestamp)) { + return segmentsOfInterval.appendingSegment == null ? + null : + segmentsOfInterval.appendingSegment.getSegmentIdentifier(); + } else { + return null; + } } else { return null; } @@ -327,7 +433,20 @@ public abstract class BaseAppenderatorDriver implements Closeable .map(segments::get) .filter(Objects::nonNull) .flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream()) - .flatMap(Collection::stream); + .flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream()); + } + } + + Stream getAppendingSegments(Collection sequenceNames) + { + synchronized (segments) { + return sequenceNames + .stream() + .map(segments::get) + .filter(Objects::nonNull) + .flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream()) + .map(segmentsOfInterval -> segmentsOfInterval.appendingSegment) + .filter(Objects::nonNull); } } @@ -538,10 +657,11 @@ public abstract class BaseAppenderatorDriver implements Closeable Maps.transformValues( snapshot, (Function>) input -> ImmutableList.copyOf( - input.intervalToSegmentStates.values() - .stream() - .flatMap(Collection::stream) - .collect(Collectors.toList()) + input.intervalToSegmentStates + .values() + .stream() + .flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream()) + .collect(Collectors.toList()) ) ) ), diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index c5c35a0df44..b2337e8be1c 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -33,11 +33,11 @@ import io.druid.timeline.DataSegment; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -129,13 +129,11 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver long pushAndClearTimeoutMs ) throws InterruptedException, ExecutionException, TimeoutException { - final List segmentIdentifierList = getSegmentWithStates(sequenceNames) - .filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING) - .map(SegmentWithState::getSegmentIdentifier) - .collect(Collectors.toList()); + final Map requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames) + .collect(Collectors.toMap(SegmentWithState::getSegmentIdentifier, Function.identity())); final ListenableFuture future = ListenableFutures.transformAsync( - pushInBackground(null, segmentIdentifierList, false), + pushInBackground(null, requestedSegmentIdsForSequences.keySet(), false), this::dropInBackground ); @@ -147,17 +145,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver final Map pushedSegmentIdToSegmentMap = segmentsAndMetadata .getSegments() .stream() - .collect(Collectors.toMap( - SegmentIdentifier::fromDataSegment, - dataSegment -> dataSegment - )); - - final Map requestedSegmentIdsForSequences = getSegmentWithStates(sequenceNames) - .filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING) - .collect(Collectors.toMap( - SegmentWithState::getSegmentIdentifier, - segmentWithState -> segmentWithState - )); + .collect(Collectors.toMap(SegmentIdentifier::fromDataSegment, Function.identity())); if (!pushedSegmentIdToSegmentMap.keySet().equals(requestedSegmentIdsForSequences.keySet())) { throw new ISE( @@ -167,12 +155,28 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver ); } - // State transition - requestedSegmentIdsForSequences.forEach( - (segmentId, segmentWithState) -> { - segmentWithState.pushAndDrop(pushedSegmentIdToSegmentMap.get(segmentId)); + synchronized (segments) { + for (String sequenceName : sequenceNames) { + final SegmentsForSequence segmentsForSequence = segments.get(sequenceName); + if (segmentsForSequence == null) { + throw new ISE("Can't find segmentsForSequence for sequence[%s]", sequenceName); } - ); + + segmentsForSequence.getAllSegmentsOfInterval().forEach(segmentsOfInterval -> { + final SegmentWithState appendingSegment = segmentsOfInterval.getAppendingSegment(); + if (appendingSegment != null) { + final DataSegment pushedSegment = pushedSegmentIdToSegmentMap.get(appendingSegment.getSegmentIdentifier()); + if (pushedSegment == null) { + throw new ISE("Can't find pushedSegments for segment[%s]", appendingSegment.getSegmentIdentifier()); + } + + segmentsOfInterval.finishAppendingToCurrentActiveSegment( + segmentWithState -> segmentWithState.pushAndDrop(pushedSegment) + ); + } + }); + } + } return segmentsAndMetadata; } @@ -196,7 +200,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver snapshot .values() .stream() - .flatMap(SegmentsForSequence::segmentStateStream) + .flatMap(SegmentsForSequence::allSegmentStateStream) .map(segmentWithState -> Preconditions .checkNotNull( segmentWithState.getDataSegment(), diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ddf5593169f..185419cd3b4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -32,7 +32,9 @@ import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.ListenableFutures; +import io.druid.java.util.common.guava.Comparators; import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; import io.druid.segment.loading.DataSegmentKiller; @@ -43,8 +45,9 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -194,18 +197,13 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver for (final SegmentIdentifier identifier : identifiers) { log.info("Moving segment[%s] out of active list.", identifier); final long key = identifier.getInterval().getStartMillis(); - if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch( - segmentWithState -> { - if (segmentWithState.getSegmentIdentifier().equals(identifier)) { - segmentWithState.finishAppending(); - return true; - } else { - return false; - } - } - )) { + final SegmentsOfInterval segmentsOfInterval = activeSegmentsForSequence.get(key); + if (segmentsOfInterval == null || + segmentsOfInterval.getAppendingSegment() == null || + !segmentsOfInterval.getAppendingSegment().getSegmentIdentifier().equals(identifier)) { throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); } + segmentsOfInterval.finishAppendingToCurrentActiveSegment(SegmentWithState::finishAppending); } } } @@ -396,33 +394,53 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver private static class SegmentsForSequenceBuilder { - private final NavigableMap> intervalToSegmentStates; + // segmentId -> (appendingSegment, appendFinishedSegments) + private final NavigableMap>> intervalToSegments = + new TreeMap<>(Comparator.comparing(SegmentIdentifier::getInterval, Comparators.intervalsByStartThenEnd())); private final String lastSegmentId; SegmentsForSequenceBuilder(String lastSegmentId) { - this.intervalToSegmentStates = new TreeMap<>(); this.lastSegmentId = lastSegmentId; } void add(SegmentWithState segmentWithState) { final SegmentIdentifier identifier = segmentWithState.getSegmentIdentifier(); - final LinkedList segmentsInInterval = intervalToSegmentStates.computeIfAbsent( - identifier.getInterval().getStartMillis(), - k -> new LinkedList<>() - ); + final Pair> pair = intervalToSegments.get(identifier); + final List appendFinishedSegments = pair == null || pair.rhs == null ? + new ArrayList<>() : + pair.rhs; + // always keep APPENDING segments for an interval start millis in the front if (segmentWithState.getState() == SegmentState.APPENDING) { - segmentsInInterval.addFirst(segmentWithState); + if (pair != null && pair.lhs != null) { + throw new ISE( + "WTF?! there was already an appendingSegment[%s] before adding an appendingSegment[%s]", + pair.lhs, + segmentWithState + ); + } + + intervalToSegments.put(identifier, Pair.of(segmentWithState, appendFinishedSegments)); } else { - segmentsInInterval.addLast(segmentWithState); + final SegmentWithState appendingSegment = pair == null ? null : pair.lhs; + appendFinishedSegments.add(segmentWithState); + intervalToSegments.put(identifier, Pair.of(appendingSegment, appendFinishedSegments)); } } SegmentsForSequence build() { - return new SegmentsForSequence(intervalToSegmentStates, lastSegmentId); + final NavigableMap map = new TreeMap<>(); + for (Entry>> entry : + intervalToSegments.entrySet()) { + map.put( + entry.getKey().getInterval().getStartMillis(), + new SegmentsOfInterval(entry.getKey().getInterval(), entry.getValue().lhs, entry.getValue().rhs) + ); + } + return new SegmentsForSequence(map, lastSegmentId); } } } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 5ca72100a94..2fd0f087d38 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -185,7 +185,7 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport final SegmentsForSequence segmentsForSequence = driver.getSegments().get("dummy"); Assert.assertNotNull(segmentsForSequence); final List segmentWithStates = segmentsForSequence - .segmentStateStream() + .allSegmentStateStream() .filter(segmentWithState -> segmentWithState.getState() == expectedState) .collect(Collectors.toList());