Allow reordered segment allocation in kafka indexing service (#5805)

* Allow reordered segment allocation in kafka indexing service

* address comments

* fix a bug
This commit is contained in:
Jihoon Son 2018-07-02 15:09:12 -07:00 committed by Gian Merlino
parent b76a056c14
commit b6c957b0d2
7 changed files with 576 additions and 278 deletions

View File

@ -450,12 +450,12 @@ public class IndexTaskTest
Assert.assertEquals("test", segments.get(0).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(0).getInterval());
Assert.assertTrue(segments.get(0).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(NumberedShardSpec.class, segments.get(0).getShardSpec().getClass());
Assert.assertEquals(0, segments.get(0).getShardSpec().getPartitionNum());
Assert.assertEquals("test", segments.get(1).getDataSource());
Assert.assertEquals(Intervals.of("2014/P1D"), segments.get(1).getInterval());
Assert.assertTrue(segments.get(1).getShardSpec().getClass().equals(NumberedShardSpec.class));
Assert.assertEquals(NumberedShardSpec.class, segments.get(1).getShardSpec().getClass());
Assert.assertEquals(1, segments.get(1).getShardSpec().getPartitionNum());
}

View File

@ -19,6 +19,7 @@
package io.druid.java.util.common;
import javax.annotation.Nullable;
import java.util.Objects;
/**
@ -26,18 +27,20 @@ import java.util.Objects;
public class Pair<T1, T2>
{
public static <T1, T2> Pair<T1, T2> of(T1 lhs, T2 rhs)
public static <T1, T2> Pair<T1, T2> of(@Nullable T1 lhs, @Nullable T2 rhs)
{
return new Pair<>(lhs, rhs);
}
@Nullable
public final T1 lhs;
@Nullable
public final T2 rhs;
public Pair(
T1 lhs,
T2 rhs
@Nullable T1 lhs,
@Nullable T2 rhs
)
{
this.lhs = lhs;

View File

@ -19,6 +19,7 @@
package io.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
@ -66,6 +67,7 @@ import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
@ -131,7 +133,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
public List<DataSegment> withHandle(Handle handle)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalsWithHandle(
handle,
@ -378,7 +380,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
public SegmentIdentifier allocatePendingSegment(
final String dataSource,
final String sequenceName,
final String previousSegmentId,
@Nullable final String previousSegmentId,
final Interval interval,
final String maxVersion,
final boolean skipSegmentLineageCheck
@ -389,220 +391,22 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final List<byte[]> existingBytes;
if (!skipSegmentLineageCheck) {
existingBytes = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();
} else {
existingBytes = handle
.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(), connector.getQuoteString()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.list();
}
if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdentifier.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);
return existingIdentifier;
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull,
interval
);
return null;
}
}
// Make up a pending segment based on existing segments and pending segments in the DB. This works
// assuming that all tasks inserting segments at a particular point in time are going through the
// allocatePendingSegment flow. This should be assured through some other mechanism (like task locks).
final SegmentIdentifier newIdentifier;
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.",
dataSource,
interval,
maxVersion,
existingChunks.size()
);
return null;
} else {
SegmentIdentifier max = null;
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) {
if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject()
.getShardSpec()
.getPartitionNum()) {
max = SegmentIdentifier.fromDataSegment(existing.getObject());
}
}
}
final List<SegmentIdentifier> pendings = getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
for (SegmentIdentifier pending : pendings) {
if (max == null ||
pending.getVersion().compareTo(max.getVersion()) > 0 ||
(pending.getVersion().equals(max.getVersion())
&& pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) {
max = pending;
}
}
if (max == null) {
newIdentifier = new SegmentIdentifier(
dataSource,
interval,
maxVersion,
new NumberedShardSpec(0, 0)
);
} else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
maxVersion,
max.getIdentifierAsString()
);
return null;
} else if (max.getShardSpec() instanceof LinearShardSpec) {
newIdentifier = new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1)
);
} else if (max.getShardSpec() instanceof NumberedShardSpec) {
newIdentifier = new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new NumberedShardSpec(
max.getShardSpec().getPartitionNum() + 1,
((NumberedShardSpec) max.getShardSpec()).getPartitions()
)
);
} else {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].",
dataSource,
interval,
maxVersion,
max.getShardSpec().getClass(),
max.getIdentifierAsString()
);
return null;
}
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring sequences do not fork in two directions.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319)
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
.hash()
.asBytes()
);
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", newIdentifier.getIdentifierAsString())
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
.execute();
log.info(
"Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
newIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);
return newIdentifier;
return skipSegmentLineageCheck ?
allocatePendingSegment(handle, dataSource, sequenceName, interval, maxVersion) :
allocatePendingSegmentWithSegmentLineageCheck(
handle,
dataSource,
sequenceName,
previousSegmentId,
interval,
maxVersion
);
}
},
ALLOCATE_SEGMENT_QUIET_TRIES,
@ -610,6 +414,355 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
@Nullable
private SegmentIdentifier allocatePendingSegmentWithSegmentLineageCheck(
final Handle handle,
final String dataSource,
final String sequenceName,
@Nullable final String previousSegmentId,
final Interval interval,
final String maxVersion
) throws IOException
{
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
),
interval,
sequenceName,
previousSegmentIdNotNull,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("sequence_prev_id", previousSegmentIdNotNull)
);
if (result.found) {
// The found existing segment identifier can be null if its interval doesn't match with the given interval
return result.segmentIdentifier;
}
final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring sequences do not fork in two directions.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319)
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putBytes(StringUtils.toUtf8(previousSegmentIdNotNull))
.hash()
.asBytes()
);
insertToMetastore(
handle,
newIdentifier,
dataSource,
interval,
previousSegmentIdNotNull,
sequenceName,
sequenceNamePrevIdSha1
);
return newIdentifier;
}
@Nullable
private SegmentIdentifier allocatePendingSegment(
final Handle handle,
final String dataSource,
final String sequenceName,
final Interval interval,
final String maxVersion
) throws IOException
{
final CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(
StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
),
interval,
sequenceName,
null,
Pair.of("dataSource", dataSource),
Pair.of("sequence_name", sequenceName),
Pair.of("start", interval.getStart().toString()),
Pair.of("end", interval.getEnd().toString())
);
if (result.found) {
// The found existing segment identifier can be null if its interval doesn't match with the given interval
return result.segmentIdentifier;
}
final SegmentIdentifier newIdentifier = createNewSegment(handle, dataSource, interval, maxVersion);
if (newIdentifier == null) {
return null;
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/druid-io/druid/issues/2319)
final String sequenceNamePrevIdSha1 = BaseEncoding.base16().encode(
Hashing.sha1()
.newHasher()
.putBytes(StringUtils.toUtf8(sequenceName))
.putByte((byte) 0xff)
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis())
.hash()
.asBytes()
);
// always insert empty previous sequence id
insertToMetastore(handle, newIdentifier, dataSource, interval, "", sequenceName, sequenceNamePrevIdSha1);
log.info(
"Allocated pending segment [%s] for sequence[%s] in DB",
newIdentifier.getIdentifierAsString(),
sequenceName
);
return newIdentifier;
}
private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
final Query<Map<String, Object>> query,
final Interval interval,
final String sequenceName,
final @Nullable String previousSegmentId,
final Pair<String, String>... queryVars
) throws IOException
{
Query<Map<String, Object>> boundQuery = query;
for (Pair<String, String> var : queryVars) {
boundQuery = boundQuery.bind(var.lhs, var.rhs);
}
final List<byte[]> existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list();
if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdentifier.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
if (previousSegmentId == null) {
log.info(
"Found existing pending segment [%s] for sequence[%s] in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName
);
} else {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentId
);
}
return new CheckExistingSegmentIdResult(true, existingIdentifier);
} else {
if (previousSegmentId == null) {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
interval
);
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentId,
interval
);
}
return new CheckExistingSegmentIdResult(true, null);
}
}
return new CheckExistingSegmentIdResult(false, null);
}
private static class CheckExistingSegmentIdResult
{
private final boolean found;
@Nullable
private final SegmentIdentifier segmentIdentifier;
CheckExistingSegmentIdResult(boolean found, @Nullable SegmentIdentifier segmentIdentifier)
{
this.found = found;
this.segmentIdentifier = segmentIdentifier;
}
}
private void insertToMetastore(
Handle handle,
SegmentIdentifier newIdentifier,
String dataSource,
Interval interval,
String previousSegmentId,
String sequenceName,
String sequenceNamePrevIdSha1
) throws JsonProcessingException
{
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
)
.bind("id", newIdentifier.getIdentifierAsString())
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentId)
.bind("sequence_name_prev_id_sha1", sequenceNamePrevIdSha1)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
.execute();
}
@Nullable
private SegmentIdentifier createNewSegment(
final Handle handle,
final String dataSource,
final Interval interval,
final String maxVersion
) throws IOException
{
// Make up a pending segment based on existing segments and pending segments in the DB. This works
// assuming that all tasks inserting segments at a particular point in time are going through the
// allocatePendingSegment flow. This should be assured through some other mechanism (like task locks).
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
).lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.",
dataSource,
interval,
maxVersion,
existingChunks.size()
);
return null;
} else {
SegmentIdentifier max = null;
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) {
if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject()
.getShardSpec()
.getPartitionNum()) {
max = SegmentIdentifier.fromDataSegment(existing.getObject());
}
}
}
final List<SegmentIdentifier> pendings = getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
for (SegmentIdentifier pending : pendings) {
if (max == null ||
pending.getVersion().compareTo(max.getVersion()) > 0 ||
(pending.getVersion().equals(max.getVersion())
&& pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) {
max = pending;
}
}
if (max == null) {
return new SegmentIdentifier(
dataSource,
interval,
maxVersion,
new NumberedShardSpec(0, 0)
);
} else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
maxVersion,
max.getIdentifierAsString()
);
return null;
} else if (max.getShardSpec() instanceof LinearShardSpec) {
return new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1)
);
} else if (max.getShardSpec() instanceof NumberedShardSpec) {
return new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new NumberedShardSpec(
max.getShardSpec().getPartitionNum() + 1,
((NumberedShardSpec) max.getShardSpec()).getPartitions()
)
);
} else {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].",
dataSource,
interval,
maxVersion,
max.getShardSpec().getClass(),
max.getIdentifierAsString()
);
return null;
}
}
}
@Override
public int deletePendingSegments(String dataSource, Interval deleteInterval)
{

View File

@ -42,12 +42,13 @@ import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.realtime.appenderator.SegmentWithState.SegmentState;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -55,6 +56,7 @@ import java.util.NavigableMap;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -75,6 +77,94 @@ import java.util.stream.Stream;
*/
public abstract class BaseAppenderatorDriver implements Closeable
{
/**
* Segments allocated for an intervval.
* There should be at most a single active (appending) segment at any time.
*/
static class SegmentsOfInterval
{
private final Interval interval;
private final List<SegmentWithState> appendFinishedSegments = new ArrayList<>();
@Nullable
private SegmentWithState appendingSegment;
SegmentsOfInterval(Interval interval)
{
this.interval = interval;
}
SegmentsOfInterval(
Interval interval,
@Nullable SegmentWithState appendingSegment,
List<SegmentWithState> appendFinishedSegments
)
{
this.interval = interval;
this.appendingSegment = appendingSegment;
this.appendFinishedSegments.addAll(appendFinishedSegments);
if (appendingSegment != null) {
Preconditions.checkArgument(
appendingSegment.getState() == SegmentState.APPENDING,
"appendingSegment[%s] is not in the APPENDING state",
appendingSegment.getSegmentIdentifier()
);
}
if (appendFinishedSegments
.stream()
.anyMatch(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING)) {
throw new ISE("Some appendFinishedSegments[%s] is in the APPENDING state", appendFinishedSegments);
}
}
void setAppendingSegment(SegmentWithState appendingSegment)
{
Preconditions.checkArgument(
appendingSegment.getState() == SegmentState.APPENDING,
"segment[%s] is not in the APPENDING state",
appendingSegment.getSegmentIdentifier()
);
// There should be only one appending segment at any time
Preconditions.checkState(
this.appendingSegment == null,
"WTF?! Current appendingSegment[%s] is not null. "
+ "Its state must be changed before setting a new appendingSegment[%s]",
this.appendingSegment,
appendingSegment
);
this.appendingSegment = appendingSegment;
}
void finishAppendingToCurrentActiveSegment(Consumer<SegmentWithState> stateTransitionFn)
{
Preconditions.checkNotNull(appendingSegment, "appendingSegment");
stateTransitionFn.accept(appendingSegment);
appendFinishedSegments.add(appendingSegment);
appendingSegment = null;
}
Interval getInterval()
{
return interval;
}
SegmentWithState getAppendingSegment()
{
return appendingSegment;
}
List<SegmentWithState> getAllSegments()
{
final List<SegmentWithState> allSegments = new ArrayList<>(appendFinishedSegments.size() + 1);
if (appendingSegment != null) {
allSegments.add(appendingSegment);
}
allSegments.addAll(appendFinishedSegments);
return allSegments;
}
}
/**
* Allocated segments for a sequence
*/
@ -83,7 +173,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
// Interval Start millis -> List of Segments for this interval
// there might be multiple segments for a start interval, for example one segment
// can be in APPENDING state and others might be in PUBLISHING state
private final NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates;
private final NavigableMap<Long, SegmentsOfInterval> intervalToSegmentStates;
// most recently allocated segment
private String lastSegmentId;
@ -94,7 +184,7 @@ public abstract class BaseAppenderatorDriver implements Closeable
}
SegmentsForSequence(
NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates,
NavigableMap<Long, SegmentsOfInterval> intervalToSegmentStates,
String lastSegmentId
)
{
@ -104,24 +194,34 @@ public abstract class BaseAppenderatorDriver implements Closeable
void add(SegmentIdentifier identifier)
{
intervalToSegmentStates.computeIfAbsent(identifier.getInterval().getStartMillis(), k -> new LinkedList<>())
.addFirst(SegmentWithState.newSegment(identifier));
intervalToSegmentStates.computeIfAbsent(
identifier.getInterval().getStartMillis(),
k -> new SegmentsOfInterval(identifier.getInterval())
).setAppendingSegment(SegmentWithState.newSegment(identifier));
lastSegmentId = identifier.getIdentifierAsString();
}
Entry<Long, LinkedList<SegmentWithState>> floor(long timestamp)
Entry<Long, SegmentsOfInterval> floor(long timestamp)
{
return intervalToSegmentStates.floorEntry(timestamp);
}
LinkedList<SegmentWithState> get(long timestamp)
SegmentsOfInterval get(long timestamp)
{
return intervalToSegmentStates.get(timestamp);
}
Stream<SegmentWithState> segmentStateStream()
Stream<SegmentWithState> allSegmentStateStream()
{
return intervalToSegmentStates.values().stream().flatMap(Collection::stream);
return intervalToSegmentStates
.values()
.stream()
.flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream());
}
Stream<SegmentsOfInterval> getAllSegmentsOfInterval()
{
return intervalToSegmentStates.values().stream();
}
}
@ -183,13 +283,19 @@ public abstract class BaseAppenderatorDriver implements Closeable
return null;
}
final Map.Entry<Long, LinkedList<SegmentWithState>> candidateEntry = segmentsForSequence.floor(
final Map.Entry<Long, SegmentsOfInterval> candidateEntry = segmentsForSequence.floor(
timestamp.getMillis()
);
if (candidateEntry != null
&& candidateEntry.getValue().getFirst().getSegmentIdentifier().getInterval().contains(timestamp)
&& candidateEntry.getValue().getFirst().getState() == SegmentState.APPENDING) {
return candidateEntry.getValue().getFirst().getSegmentIdentifier();
if (candidateEntry != null) {
final SegmentsOfInterval segmentsOfInterval = candidateEntry.getValue();
if (segmentsOfInterval.interval.contains(timestamp)) {
return segmentsOfInterval.appendingSegment == null ?
null :
segmentsOfInterval.appendingSegment.getSegmentIdentifier();
} else {
return null;
}
} else {
return null;
}
@ -327,7 +433,20 @@ public abstract class BaseAppenderatorDriver implements Closeable
.map(segments::get)
.filter(Objects::nonNull)
.flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream())
.flatMap(Collection::stream);
.flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream());
}
}
Stream<SegmentWithState> getAppendingSegments(Collection<String> sequenceNames)
{
synchronized (segments) {
return sequenceNames
.stream()
.map(segments::get)
.filter(Objects::nonNull)
.flatMap(segmentsForSequence -> segmentsForSequence.intervalToSegmentStates.values().stream())
.map(segmentsOfInterval -> segmentsOfInterval.appendingSegment)
.filter(Objects::nonNull);
}
}
@ -538,10 +657,11 @@ public abstract class BaseAppenderatorDriver implements Closeable
Maps.transformValues(
snapshot,
(Function<SegmentsForSequence, List<SegmentWithState>>) input -> ImmutableList.copyOf(
input.intervalToSegmentStates.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList())
input.intervalToSegmentStates
.values()
.stream()
.flatMap(segmentsOfInterval -> segmentsOfInterval.getAllSegments().stream())
.collect(Collectors.toList())
)
)
),

View File

@ -33,11 +33,11 @@ import io.druid.timeline.DataSegment;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
@ -129,13 +129,11 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
long pushAndClearTimeoutMs
) throws InterruptedException, ExecutionException, TimeoutException
{
final List<SegmentIdentifier> segmentIdentifierList = getSegmentWithStates(sequenceNames)
.filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING)
.map(SegmentWithState::getSegmentIdentifier)
.collect(Collectors.toList());
final Map<SegmentIdentifier, SegmentWithState> requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames)
.collect(Collectors.toMap(SegmentWithState::getSegmentIdentifier, Function.identity()));
final ListenableFuture<SegmentsAndMetadata> future = ListenableFutures.transformAsync(
pushInBackground(null, segmentIdentifierList, false),
pushInBackground(null, requestedSegmentIdsForSequences.keySet(), false),
this::dropInBackground
);
@ -147,17 +145,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
final Map<SegmentIdentifier, DataSegment> pushedSegmentIdToSegmentMap = segmentsAndMetadata
.getSegments()
.stream()
.collect(Collectors.toMap(
SegmentIdentifier::fromDataSegment,
dataSegment -> dataSegment
));
final Map<SegmentIdentifier, SegmentWithState> requestedSegmentIdsForSequences = getSegmentWithStates(sequenceNames)
.filter(segmentWithState -> segmentWithState.getState() == SegmentState.APPENDING)
.collect(Collectors.toMap(
SegmentWithState::getSegmentIdentifier,
segmentWithState -> segmentWithState
));
.collect(Collectors.toMap(SegmentIdentifier::fromDataSegment, Function.identity()));
if (!pushedSegmentIdToSegmentMap.keySet().equals(requestedSegmentIdsForSequences.keySet())) {
throw new ISE(
@ -167,12 +155,28 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
);
}
// State transition
requestedSegmentIdsForSequences.forEach(
(segmentId, segmentWithState) -> {
segmentWithState.pushAndDrop(pushedSegmentIdToSegmentMap.get(segmentId));
synchronized (segments) {
for (String sequenceName : sequenceNames) {
final SegmentsForSequence segmentsForSequence = segments.get(sequenceName);
if (segmentsForSequence == null) {
throw new ISE("Can't find segmentsForSequence for sequence[%s]", sequenceName);
}
);
segmentsForSequence.getAllSegmentsOfInterval().forEach(segmentsOfInterval -> {
final SegmentWithState appendingSegment = segmentsOfInterval.getAppendingSegment();
if (appendingSegment != null) {
final DataSegment pushedSegment = pushedSegmentIdToSegmentMap.get(appendingSegment.getSegmentIdentifier());
if (pushedSegment == null) {
throw new ISE("Can't find pushedSegments for segment[%s]", appendingSegment.getSegmentIdentifier());
}
segmentsOfInterval.finishAppendingToCurrentActiveSegment(
segmentWithState -> segmentWithState.pushAndDrop(pushedSegment)
);
}
});
}
}
return segmentsAndMetadata;
}
@ -196,7 +200,7 @@ public class BatchAppenderatorDriver extends BaseAppenderatorDriver
snapshot
.values()
.stream()
.flatMap(SegmentsForSequence::segmentStateStream)
.flatMap(SegmentsForSequence::allSegmentStateStream)
.map(segmentWithState -> Preconditions
.checkNotNull(
segmentWithState.getDataSegment(),

View File

@ -32,7 +32,9 @@ import com.google.common.util.concurrent.SettableFuture;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.ListenableFutures;
import io.druid.java.util.common.guava.Comparators;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.SegmentDescriptor;
import io.druid.segment.loading.DataSegmentKiller;
@ -43,8 +45,9 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -194,18 +197,13 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
for (final SegmentIdentifier identifier : identifiers) {
log.info("Moving segment[%s] out of active list.", identifier);
final long key = identifier.getInterval().getStartMillis();
if (activeSegmentsForSequence.get(key) == null || activeSegmentsForSequence.get(key).stream().noneMatch(
segmentWithState -> {
if (segmentWithState.getSegmentIdentifier().equals(identifier)) {
segmentWithState.finishAppending();
return true;
} else {
return false;
}
}
)) {
final SegmentsOfInterval segmentsOfInterval = activeSegmentsForSequence.get(key);
if (segmentsOfInterval == null ||
segmentsOfInterval.getAppendingSegment() == null ||
!segmentsOfInterval.getAppendingSegment().getSegmentIdentifier().equals(identifier)) {
throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier);
}
segmentsOfInterval.finishAppendingToCurrentActiveSegment(SegmentWithState::finishAppending);
}
}
}
@ -396,33 +394,53 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
private static class SegmentsForSequenceBuilder
{
private final NavigableMap<Long, LinkedList<SegmentWithState>> intervalToSegmentStates;
// segmentId -> (appendingSegment, appendFinishedSegments)
private final NavigableMap<SegmentIdentifier, Pair<SegmentWithState, List<SegmentWithState>>> intervalToSegments =
new TreeMap<>(Comparator.comparing(SegmentIdentifier::getInterval, Comparators.intervalsByStartThenEnd()));
private final String lastSegmentId;
SegmentsForSequenceBuilder(String lastSegmentId)
{
this.intervalToSegmentStates = new TreeMap<>();
this.lastSegmentId = lastSegmentId;
}
void add(SegmentWithState segmentWithState)
{
final SegmentIdentifier identifier = segmentWithState.getSegmentIdentifier();
final LinkedList<SegmentWithState> segmentsInInterval = intervalToSegmentStates.computeIfAbsent(
identifier.getInterval().getStartMillis(),
k -> new LinkedList<>()
);
final Pair<SegmentWithState, List<SegmentWithState>> pair = intervalToSegments.get(identifier);
final List<SegmentWithState> appendFinishedSegments = pair == null || pair.rhs == null ?
new ArrayList<>() :
pair.rhs;
// always keep APPENDING segments for an interval start millis in the front
if (segmentWithState.getState() == SegmentState.APPENDING) {
segmentsInInterval.addFirst(segmentWithState);
if (pair != null && pair.lhs != null) {
throw new ISE(
"WTF?! there was already an appendingSegment[%s] before adding an appendingSegment[%s]",
pair.lhs,
segmentWithState
);
}
intervalToSegments.put(identifier, Pair.of(segmentWithState, appendFinishedSegments));
} else {
segmentsInInterval.addLast(segmentWithState);
final SegmentWithState appendingSegment = pair == null ? null : pair.lhs;
appendFinishedSegments.add(segmentWithState);
intervalToSegments.put(identifier, Pair.of(appendingSegment, appendFinishedSegments));
}
}
SegmentsForSequence build()
{
return new SegmentsForSequence(intervalToSegmentStates, lastSegmentId);
final NavigableMap<Long, SegmentsOfInterval> map = new TreeMap<>();
for (Entry<SegmentIdentifier, Pair<SegmentWithState, List<SegmentWithState>>> entry :
intervalToSegments.entrySet()) {
map.put(
entry.getKey().getInterval().getStartMillis(),
new SegmentsOfInterval(entry.getKey().getInterval(), entry.getValue().lhs, entry.getValue().rhs)
);
}
return new SegmentsForSequence(map, lastSegmentId);
}
}
}

View File

@ -185,7 +185,7 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport
final SegmentsForSequence segmentsForSequence = driver.getSegments().get("dummy");
Assert.assertNotNull(segmentsForSequence);
final List<SegmentWithState> segmentWithStates = segmentsForSequence
.segmentStateStream()
.allSegmentStateStream()
.filter(segmentWithState -> segmentWithState.getState() == expectedState)
.collect(Collectors.toList());