Add SegmentAllocationQueue to batch SegmentAllocateActions (#13369)

In a cluster with a large number of streaming tasks (~1000), SegmentAllocateActions 
on the overlord can often take very long intervals of time to finish thus causing spikes 
in the `task/action/run/time`. This may result in lag building up while a task waits for a
segment to get allocated.

The root causes are:
- large number of metadata calls made to the segments and pending segments tables
- `giant` lock held in `TaskLockbox.tryLock()` to acquire task locks and allocate segments

Since the contention typically arises when several tasks of the same datasource try
to allocate segments for the same interval/granularity, the allocation run times can be
improved by batching the requests together.

Changes
- Add flags
   - `druid.indexer.tasklock.batchSegmentAllocation` (default `false`)
   - `druid.indexer.tasklock.batchAllocationMaxWaitTime` (in millis) (default `1000`)
- Add methods `canPerformAsync` and `performAsync` to `TaskAction`
- Submit each allocate action to a `SegmentAllocationQueue`, and add to correct batch
- Process batch after `batchAllocationMaxWaitTime`
- Acquire `giant` lock just once per batch in `TaskLockbox`
- Reduce metadata calls by batching statements together and updating query filters
- Except for batching, retain the whole behaviour (order of steps, retries, etc.)
- Respond to leadership changes and fail items in queue when not leader
- Emit batch and request level metrics
This commit is contained in:
Kashif Faraz 2022-12-05 14:00:07 +05:30 committed by GitHub
parent 9177419628
commit 45a8fa280c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 2752 additions and 296 deletions

View File

@ -20,10 +20,13 @@
package org.apache.druid.java.util.common;
import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
import javax.annotation.Nullable;
public final class Intervals
{
public static final Interval ETERNITY = utc(JodaUtils.MIN_INSTANT, JodaUtils.MAX_INSTANT);
@ -68,6 +71,32 @@ public final class Intervals
return ETERNITY.equals(interval);
}
/**
* Finds an interval from the given set of sortedIntervals which overlaps with
* the searchInterval. If multiple candidate intervals overlap with the
* searchInterval, the "smallest" interval based on the
* {@link Comparators#intervalsByStartThenEnd()} is returned.
*
* @param searchInterval Interval which should overlap with the result
* @param sortedIntervals Candidate overlapping intervals, sorted in ascending
* order, using {@link Comparators#intervalsByStartThenEnd()}.
* @return The first overlapping interval, if one exists, otherwise null.
*/
@Nullable
public static Interval findOverlappingInterval(Interval searchInterval, Interval[] sortedIntervals)
{
for (Interval interval : sortedIntervals) {
if (interval.overlaps(searchInterval)) {
return interval;
} else if (interval.getStart().isAfter(searchInterval.getEnd())) {
// Intervals after this cannot have an overlap
return null;
}
}
return null;
}
private Intervals()
{
}

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class IntervalsTest
{
@Test
public void testFindOverlappingInterval()
{
final Interval[] sortedIntervals = new Interval[]{
Intervals.of("2019/2020"),
Intervals.of("2021/2022"),
Intervals.of("2021-04-01/2021-05-01"),
Intervals.of("2022/2023")
};
Arrays.sort(sortedIntervals, Comparators.intervalsByStartThenEnd());
// Search interval outside the bounds of the sorted intervals
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2018/2019"), sortedIntervals)
);
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2023/2024"), sortedIntervals)
);
// Search interval within bounds, overlap exists
// Fully overlapping interval
Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021/2022"), sortedIntervals)
);
// Partially overlapping interval
Assert.assertEquals(
Intervals.of("2022/2023"),
Intervals.findOverlappingInterval(Intervals.of("2022-01-01/2022-01-02"), sortedIntervals)
);
Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021-06-01/2021-07-01"), sortedIntervals)
);
// Overlap with multiple intervals, "smallest" one is returned
Assert.assertEquals(
Intervals.of("2021/2022"),
Intervals.findOverlappingInterval(Intervals.of("2021-03-01/2021-04-01"), sortedIntervals)
);
// Search interval within bounds, no overlap
Assert.assertNull(
Intervals.findOverlappingInterval(Intervals.of("2020-01-02/2020-03-03"), sortedIntervals)
);
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
@ -329,14 +328,7 @@ public class TaskToolbox
// Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
segments,
new Function<DataSegment, Interval>()
{
@Override
public Interval apply(DataSegment segment)
{
return segment.getInterval();
}
}
DataSegment::getInterval
);
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));

View File

@ -30,6 +30,7 @@ import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class LocalTaskActionClient implements TaskActionClient
{
@ -76,11 +77,28 @@ public class LocalTaskActionClient implements TaskActionClient
}
final long performStartTime = System.currentTimeMillis();
final RetType result = taskAction.perform(task, toolbox);
final RetType result = performAction(taskAction);
emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime);
return result;
}
private <R> R performAction(TaskAction<R> taskAction)
{
try {
final R result;
if (taskAction.canPerformAsync(task, toolbox)) {
result = taskAction.performAsync(task, toolbox).get(5, TimeUnit.MINUTES);
} else {
result = taskAction.perform(task, toolbox);
}
return result;
}
catch (Throwable t) {
throw new RuntimeException(t);
}
}
private void emitTimerMetric(final String metric, final TaskAction<?> action, final long time)
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();

View File

@ -47,6 +47,7 @@ import javax.annotation.Nullable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@ -180,6 +181,23 @@ public class SegmentAllocateAction implements TaskAction<SegmentIdWithShardSpec>
};
}
@Override
public boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return toolbox.canBatchSegmentAllocation();
}
@Override
public Future<SegmentIdWithShardSpec> performAsync(Task task, TaskActionToolbox toolbox)
{
if (!toolbox.canBatchSegmentAllocation()) {
throw new ISE("Batched segment allocation is disabled");
}
return toolbox.getSegmentAllocationQueue().add(
new SegmentAllocateRequest(task, this, MAX_ATTEMPTS)
);
}
@Override
public SegmentIdWithShardSpec perform(
final Task task,

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import org.apache.druid.indexing.common.task.Task;
import org.joda.time.Interval;
import org.joda.time.chrono.ISOChronology;
/**
* Request received by the overlord for segment allocation.
*/
public class SegmentAllocateRequest
{
private final Task task;
private final SegmentAllocateAction action;
private final int maxAttempts;
private final Interval rowInterval;
private int attempts;
public SegmentAllocateRequest(Task task, SegmentAllocateAction action, int maxAttempts)
{
this.task = task;
this.action = action;
this.maxAttempts = maxAttempts;
this.rowInterval = action.getQueryGranularity()
.bucket(action.getTimestamp())
.withChronology(ISOChronology.getInstanceUTC());
}
public Task getTask()
{
return task;
}
public SegmentAllocateAction getAction()
{
return action;
}
public void incrementAttempts()
{
++attempts;
}
public boolean canRetry()
{
return attempts < maxAttempts;
}
public int getAttempts()
{
return attempts;
}
public Interval getRowInterval()
{
return rowInterval;
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
public class SegmentAllocateResult
{
private final SegmentIdWithShardSpec segmentId;
private final String errorMessage;
public SegmentAllocateResult(SegmentIdWithShardSpec segmentId, String errorMessage)
{
this.segmentId = segmentId;
this.errorMessage = errorMessage;
}
public SegmentIdWithShardSpec getSegmentId()
{
return segmentId;
}
public String getErrorMessage()
{
return errorMessage;
}
public boolean isSuccess()
{
return segmentId != null;
}
}

View File

@ -0,0 +1,716 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import com.google.inject.Inject;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Queue for {@link SegmentAllocateRequest}s.
*/
@ManageLifecycle
public class SegmentAllocationQueue
{
private static final Logger log = new Logger(SegmentAllocationQueue.class);
private static final int MAX_QUEUE_SIZE = 2000;
private final long maxWaitTimeMillis;
private final TaskLockbox taskLockbox;
private final ScheduledExecutorService executor;
private final IndexerMetadataStorageCoordinator metadataStorage;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final ServiceEmitter emitter;
private final ConcurrentHashMap<AllocateRequestKey, AllocateRequestBatch> keyToBatch = new ConcurrentHashMap<>();
private final BlockingDeque<AllocateRequestKey> processingQueue = new LinkedBlockingDeque<>(MAX_QUEUE_SIZE);
@Inject
public SegmentAllocationQueue(
TaskLockbox taskLockbox,
TaskLockConfig taskLockConfig,
IndexerMetadataStorageCoordinator metadataStorage,
ServiceEmitter emitter,
ScheduledExecutorFactory executorFactory
)
{
this.emitter = emitter;
this.taskLockbox = taskLockbox;
this.metadataStorage = metadataStorage;
this.maxWaitTimeMillis = taskLockConfig.getBatchAllocationMaxWaitTime();
this.executor = taskLockConfig.isBatchSegmentAllocation()
? executorFactory.create(1, "SegmentAllocQueue-%s") : null;
}
@LifecycleStart
public void start()
{
if (isEnabled()) {
log.info("Initializing segment allocation queue.");
scheduleQueuePoll(maxWaitTimeMillis);
}
}
@LifecycleStop
public void stop()
{
if (isEnabled()) {
log.info("Tearing down segment allocation queue.");
executor.shutdownNow();
}
}
public void becomeLeader()
{
if (!isLeader.compareAndSet(false, true)) {
log.info("Already the leader. Queue processing has started.");
} else if (isEnabled()) {
log.info("Elected leader. Starting queue processing.");
} else {
log.info(
"Elected leader but batched segment allocation is disabled. "
+ "Segment allocation queue will not be used."
);
}
}
public void stopBeingLeader()
{
if (!isLeader.compareAndSet(true, false)) {
log.info("Already surrendered leadership. Queue processing is stopped.");
} else if (isEnabled()) {
log.info("Not leader anymore. Stopping queue processing.");
} else {
log.info("Not leader anymore. Segment allocation queue is already disabled.");
}
}
public boolean isEnabled()
{
return executor != null && !executor.isShutdown();
}
private void scheduleQueuePoll(long delay)
{
executor.schedule(this::processBatchesDue, delay, TimeUnit.MILLISECONDS);
}
/**
* Gets the number of batches currently in the queue.
*/
public int size()
{
return processingQueue.size();
}
/**
* Queues a SegmentAllocateRequest. The returned future may complete successfully
* with a non-null value or with a non-null value.
*/
public Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
{
if (!isLeader.get()) {
throw new ISE("Cannot allocate segment if not leader.");
} else if (!isEnabled()) {
throw new ISE("Batched segment allocation is disabled.");
}
final AllocateRequestKey requestKey = new AllocateRequestKey(request, maxWaitTimeMillis);
final AtomicReference<Future<SegmentIdWithShardSpec>> futureReference = new AtomicReference<>();
// Possible race condition:
// t1 -> new batch is added to queue or batch already exists in queue
// t2 -> executor pops batch, processes all requests in it
// t1 -> new request is added to dangling batch and is never picked up
// Solution: For existing batch, call keyToBatch.remove() on the key to
// wait on keyToBatch.compute() to finish before proceeding with processBatch().
// For new batch, keyToBatch.remove() would not wait as key is not in map yet
// but a new batch is unlikely to be due immediately, so it won't get popped right away.
keyToBatch.compute(requestKey, (key, existingBatch) -> {
if (existingBatch == null) {
AllocateRequestBatch newBatch = new AllocateRequestBatch(key);
futureReference.set(newBatch.add(request));
return addBatchToQueue(newBatch) ? newBatch : null;
} else {
futureReference.set(existingBatch.add(request));
return existingBatch;
}
});
return futureReference.get();
}
/**
* Tries to add the given batch to the processing queue. Fails all the pending
* requests in the batch if we are not leader or if the queue is full.
*/
private boolean addBatchToQueue(AllocateRequestBatch batch)
{
batch.key.resetQueueTime();
if (!isLeader.get()) {
batch.failPendingRequests("Cannot allocate segment if not leader");
return false;
} else if (processingQueue.offer(batch.key)) {
log.debug("Added a new batch [%s] to queue.", batch.key);
return true;
} else {
batch.failPendingRequests(
"Segment allocation queue is full. Check the metric `task/action/batch/runTime` "
+ "to determine if metadata operations are slow."
);
return false;
}
}
/**
* Tries to add the given batch to the processing queue. If a batch already
* exists for this key, transfers all the requests from this batch to the
* existing one.
*/
private void requeueBatch(AllocateRequestBatch batch)
{
log.info("Requeueing [%d] failed requests in batch [%s].", batch.size(), batch.key);
keyToBatch.compute(batch.key, (key, existingBatch) -> {
if (existingBatch == null) {
return addBatchToQueue(batch) ? batch : null;
} else {
// Merge requests from this batch to existing one
existingBatch.transferRequestsFrom(batch);
return existingBatch;
}
});
}
private void processBatchesDue()
{
clearQueueIfNotLeader();
// Process all batches which are due
log.debug("Processing batches which are due. Queue size [%d].", processingQueue.size());
int numProcessedBatches = 0;
AllocateRequestKey nextKey = processingQueue.peekFirst();
while (nextKey != null && nextKey.isDue()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
boolean processed;
try {
processed = processBatch(nextBatch);
}
catch (Throwable t) {
nextBatch.failPendingRequests(t);
processed = true;
log.error(t, "Error while processing batch [%s]", nextKey);
}
// Requeue if not fully processed yet
if (processed) {
++numProcessedBatches;
} else {
requeueBatch(nextBatch);
}
nextKey = processingQueue.peek();
}
// Schedule the next round of processing
final long nextScheduleDelay;
if (processingQueue.isEmpty()) {
nextScheduleDelay = maxWaitTimeMillis;
} else {
nextKey = processingQueue.peek();
long timeElapsed = System.currentTimeMillis() - nextKey.getQueueTime();
nextScheduleDelay = Math.max(0, maxWaitTimeMillis - timeElapsed);
}
scheduleQueuePoll(nextScheduleDelay);
log.info("Processed [%d] batches, next execution in [%d ms]", numProcessedBatches, nextScheduleDelay);
}
/**
* Removes items from the queue as long as we are not leader.
*/
private void clearQueueIfNotLeader()
{
int failedBatches = 0;
AllocateRequestKey nextKey = processingQueue.peekFirst();
while (nextKey != null && !isLeader.get()) {
processingQueue.pollFirst();
AllocateRequestBatch nextBatch = keyToBatch.remove(nextKey);
nextBatch.failPendingRequests("Cannot allocate segment if not leader");
++failedBatches;
nextKey = processingQueue.peekFirst();
}
if (failedBatches > 0) {
log.info("Not leader. Failed [%d] batches, remaining in queue [%d].", failedBatches, processingQueue.size());
}
}
/**
* Processes the given batch. Returns true if the batch was completely processed
* and should not be requeued.
*/
private boolean processBatch(AllocateRequestBatch requestBatch)
{
final AllocateRequestKey requestKey = requestBatch.key;
if (requestBatch.isEmpty()) {
return true;
} else if (!isLeader.get()) {
requestBatch.failPendingRequests("Cannot allocate segment if not leader");
return true;
}
log.debug(
"Processing [%d] requests for batch [%s], queue time [%s].",
requestBatch.size(),
requestKey,
requestKey.getQueueTime()
);
final long startTimeMillis = System.currentTimeMillis();
final int batchSize = requestBatch.size();
emitBatchMetric("task/action/batch/size", batchSize, requestKey);
emitBatchMetric("task/action/batch/queueTime", (startTimeMillis - requestKey.getQueueTime()), requestKey);
final Set<DataSegment> usedSegments = retrieveUsedSegments(requestKey);
final int successCount = allocateSegmentsForBatch(requestBatch, usedSegments);
emitBatchMetric("task/action/batch/attempts", 1L, requestKey);
emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - startTimeMillis), requestKey);
log.info("Successfully processed [%d / %d] requests in batch [%s].", successCount, batchSize, requestKey);
if (requestBatch.isEmpty()) {
log.debug("All requests in batch [%s] have been processed.", requestKey);
return true;
}
// Requeue the batch only if used segments have changed
log.debug("There are [%d] failed requests in batch [%s].", requestBatch.size(), requestKey);
final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);
if (updatedUsedSegments.equals(usedSegments)) {
requestBatch.failPendingRequests("Allocation failed probably due to conflicting segments.");
return true;
} else {
log.debug("Used segments have changed. Requeuing failed requests.");
return false;
}
}
private Set<DataSegment> retrieveUsedSegments(AllocateRequestKey key)
{
return new HashSet<>(
metadataStorage.retrieveUsedSegmentsForInterval(
key.dataSource,
key.preferredAllocationInterval,
Segments.ONLY_VISIBLE
)
);
}
private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set<DataSegment> usedSegments)
{
int successCount = 0;
// Find requests whose row interval overlaps with an existing used segment
final Set<SegmentAllocateRequest> allRequests = requestBatch.getRequests();
final Set<SegmentAllocateRequest> requestsWithNoOverlappingSegment = new HashSet<>();
if (usedSegments.isEmpty()) {
requestsWithNoOverlappingSegment.addAll(allRequests);
} else {
final Interval[] sortedUsedSegmentIntervals = getSortedIntervals(usedSegments);
final Map<Interval, List<SegmentAllocateRequest>> overlapIntervalToRequests = new HashMap<>();
for (SegmentAllocateRequest request : allRequests) {
// If there is an overlapping used segment, the interval of the used segment
// is the only candidate for allocation for this request
final Interval overlappingInterval = Intervals.findOverlappingInterval(
request.getRowInterval(),
sortedUsedSegmentIntervals
);
if (overlappingInterval == null) {
requestsWithNoOverlappingSegment.add(request);
} else if (overlappingInterval.contains(request.getRowInterval())) {
// Found an enclosing interval, use this for allocation
overlapIntervalToRequests.computeIfAbsent(overlappingInterval, i -> new ArrayList<>())
.add(request);
} else {
// There is no valid allocation interval for this request due to a
// partially overlapping used segment. Need not do anything right now.
// The request will be retried upon requeueing the batch.
}
}
// Try to allocate segments for the identified used segment intervals.
// Do not retry the failed requests with other intervals unless the batch is requeued.
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : overlapIntervalToRequests.entrySet()) {
successCount += allocateSegmentsForInterval(
entry.getKey(),
entry.getValue(),
requestBatch
);
}
}
// For requests that do not overlap with a used segment, first try to allocate
// using the preferred granularity, then smaller granularities
final Set<SegmentAllocateRequest> pendingRequests = new HashSet<>(requestsWithNoOverlappingSegment);
for (Granularity granularity :
Granularity.granularitiesFinerThan(requestBatch.key.preferredSegmentGranularity)) {
Map<Interval, List<SegmentAllocateRequest>> requestsByInterval =
getRequestsByInterval(pendingRequests, granularity);
for (Map.Entry<Interval, List<SegmentAllocateRequest>> entry : requestsByInterval.entrySet()) {
successCount += allocateSegmentsForInterval(
entry.getKey(),
entry.getValue(),
requestBatch
);
pendingRequests.retainAll(requestBatch.getRequests());
}
}
return successCount;
}
private Interval[] getSortedIntervals(Set<DataSegment> usedSegments)
{
TreeSet<Interval> sortedSet = new TreeSet<>(Comparators.intervalsByStartThenEnd());
usedSegments.forEach(segment -> sortedSet.add(segment.getInterval()));
return sortedSet.toArray(new Interval[0]);
}
/**
* Tries to allocate segments for the given requests over the specified interval.
* Returns the number of requests for which segments were successfully allocated.
*/
private int allocateSegmentsForInterval(
Interval tryInterval,
List<SegmentAllocateRequest> requests,
AllocateRequestBatch requestBatch
)
{
if (requests.isEmpty()) {
return 0;
}
final AllocateRequestKey requestKey = requestBatch.key;
log.debug(
"Trying allocation for [%d] requests, interval [%s] in batch [%s]",
requests.size(),
tryInterval,
requestKey
);
final List<SegmentAllocateResult> results = taskLockbox.allocateSegments(
requests,
requestKey.dataSource,
tryInterval,
requestKey.skipSegmentLineageCheck,
requestKey.lockGranularity
);
int successfulRequests = 0;
for (int i = 0; i < requests.size(); ++i) {
SegmentAllocateRequest request = requests.get(i);
SegmentAllocateResult result = results.get(i);
if (result.isSuccess()) {
++successfulRequests;
}
requestBatch.handleResult(result, request);
}
return successfulRequests;
}
private Map<Interval, List<SegmentAllocateRequest>> getRequestsByInterval(
Set<SegmentAllocateRequest> requests,
Granularity tryGranularity
)
{
final Map<Interval, List<SegmentAllocateRequest>> tryIntervalToRequests = new HashMap<>();
for (SegmentAllocateRequest request : requests) {
Interval tryInterval = tryGranularity.bucket(request.getAction().getTimestamp());
if (tryInterval.contains(request.getRowInterval())) {
tryIntervalToRequests.computeIfAbsent(tryInterval, i -> new ArrayList<>()).add(request);
}
}
return tryIntervalToRequests;
}
private void emitTaskMetric(String metric, long value, SegmentAllocateRequest request)
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask());
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
emitter.emit(metricBuilder.build(metric, value));
}
private void emitBatchMetric(String metric, long value, AllocateRequestKey key)
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
metricBuilder.setDimension(DruidMetrics.DATASOURCE, key.dataSource);
metricBuilder.setDimension(DruidMetrics.INTERVAL, key.preferredAllocationInterval.toString());
emitter.emit(metricBuilder.build(metric, value));
}
/**
* A batch of segment allocation requests.
*/
private class AllocateRequestBatch
{
private final AllocateRequestKey key;
/**
* Map from allocate requests (represents a single SegmentAllocateAction)
* to the future of allocated segment id.
* <p>
* This must be accessed through methods synchronized on this batch.
* It is to avoid races between a new request being added just when the batch
* is being processed.
*/
private final Map<SegmentAllocateRequest, CompletableFuture<SegmentIdWithShardSpec>>
requestToFuture = new HashMap<>();
AllocateRequestBatch(AllocateRequestKey key)
{
this.key = key;
}
synchronized Future<SegmentIdWithShardSpec> add(SegmentAllocateRequest request)
{
log.debug("Adding request to batch [%s]: %s", key, request.getAction());
return requestToFuture.computeIfAbsent(request, req -> new CompletableFuture<>());
}
synchronized void transferRequestsFrom(AllocateRequestBatch batch)
{
requestToFuture.putAll(batch.requestToFuture);
batch.requestToFuture.clear();
}
synchronized Set<SegmentAllocateRequest> getRequests()
{
return new HashSet<>(requestToFuture.keySet());
}
synchronized void failPendingRequests(String reason)
{
failPendingRequests(new ISE(reason));
}
synchronized void failPendingRequests(Throwable cause)
{
if (!requestToFuture.isEmpty()) {
log.warn("Failing [%d] requests in batch due to [%s]. Batch key: %s", size(), cause.getMessage(), key);
requestToFuture.values().forEach(future -> future.completeExceptionally(cause));
requestToFuture.keySet().forEach(
request -> emitTaskMetric("task/action/failed/count", 1L, request)
);
requestToFuture.clear();
}
}
synchronized void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
{
request.incrementAttempts();
if (result.isSuccess()) {
emitTaskMetric("task/action/success/count", 1L, request);
requestToFuture.remove(request).complete(result.getSegmentId());
} else if (request.canRetry()) {
log.info(
"Allocation failed in attempt [%d] due to error [%s]. Can still retry. Action: %s",
request.getAttempts(),
result.getErrorMessage(),
request.getAction()
);
} else {
emitTaskMetric("task/action/failed/count", 1L, request);
log.error(
"Failing allocate action after [%d] attempts. Latest error [%s]. Action: %s",
request.getAttempts(),
result.getErrorMessage(),
request.getAction()
);
requestToFuture.remove(request).completeExceptionally(new ISE(result.getErrorMessage()));
}
}
synchronized boolean isEmpty()
{
return requestToFuture.isEmpty();
}
synchronized int size()
{
return requestToFuture.size();
}
}
/**
* Key to identify a batch of allocation requests.
*/
private static class AllocateRequestKey
{
private long queueTimeMillis;
private final long maxWaitTimeMillis;
private final String dataSource;
private final String groupId;
private final Interval preferredAllocationInterval;
private final Granularity preferredSegmentGranularity;
private final boolean skipSegmentLineageCheck;
private final LockGranularity lockGranularity;
private final boolean useNonRootGenPartitionSpace;
private final int hash;
/**
* Creates a new key for the given request. The batch for a unique key will
* always contain a single request.
*/
AllocateRequestKey(SegmentAllocateRequest request, long maxWaitTimeMillis)
{
final SegmentAllocateAction action = request.getAction();
final Task task = request.getTask();
this.dataSource = action.getDataSource();
this.groupId = task.getGroupId();
this.skipSegmentLineageCheck = action.isSkipSegmentLineageCheck();
this.lockGranularity = action.getLockGranularity();
this.useNonRootGenPartitionSpace = action.getPartialShardSpec()
.useNonRootGenerationPartitionSpace();
this.preferredSegmentGranularity = action.getPreferredSegmentGranularity();
this.preferredAllocationInterval = action.getPreferredSegmentGranularity()
.bucket(action.getTimestamp());
this.hash = Objects.hash(
skipSegmentLineageCheck,
useNonRootGenPartitionSpace,
dataSource,
groupId,
preferredAllocationInterval,
lockGranularity
);
this.maxWaitTimeMillis = maxWaitTimeMillis;
}
void resetQueueTime()
{
queueTimeMillis = System.currentTimeMillis();
}
long getQueueTime()
{
return queueTimeMillis;
}
boolean isDue()
{
return System.currentTimeMillis() - queueTimeMillis >= maxWaitTimeMillis;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AllocateRequestKey that = (AllocateRequestKey) o;
return skipSegmentLineageCheck == that.skipSegmentLineageCheck
&& useNonRootGenPartitionSpace == that.useNonRootGenPartitionSpace
&& dataSource.equals(that.dataSource)
&& groupId.equals(that.groupId)
&& preferredAllocationInterval.equals(that.preferredAllocationInterval)
&& lockGranularity == that.lockGranularity;
}
@Override
public int hashCode()
{
return hash;
}
@Override
public String toString()
{
return "{" +
"ds='" + dataSource + '\'' +
", gr='" + groupId + '\'' +
", lock=" + lockGranularity +
", invl=" + preferredAllocationInterval +
", slc=" + skipSegmentLineageCheck +
'}';
}
}
}

View File

@ -24,6 +24,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.druid.indexing.common.task.Task;
import java.util.concurrent.Future;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = TaskAction.TYPE_FIELD)
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "lockAcquire", value = TimeChunkLockAcquireAction.class),
@ -58,6 +60,16 @@ public interface TaskAction<RetType>
boolean isAudited();
default boolean canPerformAsync(Task task, TaskActionToolbox toolbox)
{
return false;
}
default Future<RetType> performAsync(Task task, TaskActionToolbox toolbox)
{
throw new UnsupportedOperationException();
}
@Override
String toString();
}

View File

@ -35,6 +35,7 @@ public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final TaskStorage taskStorage;
private final SegmentAllocationQueue segmentAllocationQueue;
private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private final ServiceEmitter emitter;
private final SupervisorManager supervisorManager;
@ -46,6 +47,7 @@ public class TaskActionToolbox
TaskLockbox taskLockbox,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
SegmentAllocationQueue segmentAllocationQueue,
ServiceEmitter emitter,
SupervisorManager supervisorManager,
@Json ObjectMapper jsonMapper
@ -57,6 +59,27 @@ public class TaskActionToolbox
this.emitter = emitter;
this.supervisorManager = supervisorManager;
this.jsonMapper = jsonMapper;
this.segmentAllocationQueue = segmentAllocationQueue;
}
public TaskActionToolbox(
TaskLockbox taskLockbox,
TaskStorage taskStorage,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
ServiceEmitter emitter,
SupervisorManager supervisorManager,
@Json ObjectMapper jsonMapper
)
{
this(
taskLockbox,
taskStorage,
indexerMetadataStorageCoordinator,
null,
emitter,
supervisorManager,
jsonMapper
);
}
public TaskLockbox getTaskLockbox()
@ -103,4 +126,13 @@ public class TaskActionToolbox
return Optional.absent();
}
public SegmentAllocationQueue getSegmentAllocationQueue()
{
return segmentAllocationQueue;
}
public boolean canBatchSegmentAllocation()
{
return segmentAllocationQueue != null && segmentAllocationQueue.isEnabled();
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.overlord;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
@ -34,9 +33,13 @@ import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.SegmentAllocateResult;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
@ -446,6 +449,140 @@ public class TaskLockbox
}
}
/**
* Attempts to allocate segments for the given requests. Each request contains
* a {@link Task} and a {@link SegmentAllocateAction}. This method tries to
* acquire the task locks on the required intervals/segments and then performs
* a batch allocation of segments. It is possible that some requests succeed
* successfully and others failed. In that case, only the failed ones should be
* retried.
*
* @param requests List of allocation requests
* @param dataSource Datasource for which segment is to be allocated.
* @param interval Interval for which segment is to be allocated.
* @param skipSegmentLineageCheck Whether lineage check is to be skipped
* (this is true for streaming ingestion)
* @param lockGranularity Granularity of task lock
* @return List of allocation results in the same order as the requests.
*/
public List<SegmentAllocateResult> allocateSegments(
List<SegmentAllocateRequest> requests,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
LockGranularity lockGranularity
)
{
log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval);
final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK;
final AllocationHolderList holderList = new AllocationHolderList(requests, interval);
holderList.getPending().forEach(this::verifyTaskIsActive);
giant.lock();
try {
if (isTimeChunkLock) {
// For time-chunk locking, segment must be allocated only after acquiring the lock
holderList.getPending().forEach(holder -> acquireTaskLock(holder, true));
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
} else {
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck, holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder, false));
}
holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder, isTimeChunkLock));
}
finally {
giant.unlock();
}
return holderList.getResults();
}
/**
* Marks the segment allocation as failed if the underlying task is not active.
*/
private void verifyTaskIsActive(SegmentAllocationHolder holder)
{
final String taskId = holder.task.getId();
if (!activeTasks.contains(taskId)) {
holder.markFailed("Unable to grant lock to inactive Task [%s]", taskId);
}
}
/**
* Creates a task lock request and creates or finds the lock for that request.
* Marks the segment allocation as failed if the lock could not be acquired or
* was revoked.
*/
private void acquireTaskLock(SegmentAllocationHolder holder, boolean isTimeChunkLock)
{
final LockRequest lockRequest;
if (isTimeChunkLock) {
lockRequest = new TimeChunkLockRequest(holder.lockRequest);
} else {
lockRequest = new SpecificSegmentLockRequest(holder.lockRequest, holder.allocatedSegment);
}
// Create or find the task lock for the created lock request
final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest);
final TaskLock acquiredLock = posseToUse == null ? null : posseToUse.getTaskLock();
if (posseToUse == null) {
holder.markFailed("Could not find or create lock posse.");
} else if (acquiredLock.isRevoked()) {
holder.markFailed("Lock was revoked.");
} else {
holder.setAcquiredLock(posseToUse, lockRequest.getInterval());
}
}
/**
* Adds the task to the found lock posse if not already added and updates
* in the metadata store. Marks the segment allocation as failed if the update
* did not succeed.
*/
private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean isTimeChunkLock)
{
final Task task = holder.task;
final TaskLock acquiredLock = holder.acquiredLock;
if (holder.taskLockPosse.addTask(task)) {
log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock);
// This can also be batched later
boolean success = updateLockInStorage(task, acquiredLock);
if (success) {
holder.markSucceeded();
} else {
final Integer partitionId = isTimeChunkLock
? null : ((SegmentLock) acquiredLock).getPartitionId();
unlock(task, holder.lockRequestInterval, partitionId);
holder.markFailed("Could not update task lock in metadata store.");
}
} else {
log.info("Task [%s] already present in TaskLock [%s]", task.getId(), acquiredLock.getGroupId());
holder.markSucceeded();
}
}
private boolean updateLockInStorage(Task task, TaskLock taskLock)
{
try {
taskStorage.addLock(task.getId(), taskLock);
return true;
}
catch (Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", taskLock.getDataSource())
.addData("interval", taskLock.getInterval())
.addData("version", taskLock.getVersion())
.emit();
return false;
}
}
private TaskLockPosse createOrFindLockPosse(LockRequest request)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment), "Can't handle LockRequestForNewSegment");
@ -541,7 +678,6 @@ public class TaskLockbox
* monotonicity and that callers specifying {@code preferredVersion} are doing the right thing.
*
* @param request request to lock
*
* @return a new {@link TaskLockPosse}
*/
private TaskLockPosse createNewTaskLockPosse(LockRequest request)
@ -550,7 +686,10 @@ public class TaskLockbox
try {
final TaskLockPosse posseToUse = new TaskLockPosse(request.toLock());
running.computeIfAbsent(request.getDataSource(), k -> new TreeMap<>())
.computeIfAbsent(request.getInterval().getStart(), k -> new TreeMap<>(Comparators.intervalsByStartThenEnd()))
.computeIfAbsent(
request.getInterval().getStart(),
k -> new TreeMap<>(Comparators.intervalsByStartThenEnd())
)
.computeIfAbsent(request.getInterval(), k -> new ArrayList<>())
.add(posseToUse);
@ -561,6 +700,45 @@ public class TaskLockbox
}
}
/**
* Makes a call to the {@link #metadataStorageCoordinator} to allocate segments
* for the given requests. Updates the holder with the allocated segment if
* the allocation succeeds, otherwise marks it as failed.
*/
private void allocateSegmentIds(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
Collection<SegmentAllocationHolder> holders
)
{
if (holders.isEmpty()) {
return;
}
final List<SegmentCreateRequest> createRequests =
holders.stream()
.map(SegmentAllocationHolder::getSegmentRequest)
.collect(Collectors.toList());
Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatedSegments =
metadataStorageCoordinator.allocatePendingSegments(
dataSource,
interval,
skipSegmentLineageCheck,
createRequests
);
for (SegmentAllocationHolder holder : holders) {
SegmentIdWithShardSpec segmentId = allocatedSegments.get(holder.getSegmentRequest());
if (segmentId == null) {
holder.markFailed("Storage coordinator could not allocate segment.");
} else {
holder.setAllocatedSegment(segmentId);
}
}
}
private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment request, String version)
{
return metadataStorageCoordinator.allocatePendingSegment(
@ -577,7 +755,7 @@ public class TaskLockbox
/**
* Perform the given action with a guarantee that the locks of the task are not revoked in the middle of action. This
* method first checks that all locks for the given task and intervals are valid and perform the right action.
*
* <p>
* The given action should be finished as soon as possible because all other methods in this class are blocked until
* this method is finished.
*
@ -611,7 +789,7 @@ public class TaskLockbox
.allMatch(interval -> {
final List<TaskLockPosse> lockPosses = getOnlyTaskLockPosseContainingInterval(task, interval);
return lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
lock -> lock.isRevoked()
TaskLock::isRevoked
);
});
}
@ -664,7 +842,9 @@ public class TaskLockbox
final TaskLock revokedLock = lock.revokedCopy();
taskStorage.replaceLock(taskId, lock, revokedLock);
final List<TaskLockPosse> possesHolder = running.get(task.getDataSource()).get(lock.getInterval().getStart()).get(lock.getInterval());
final List<TaskLockPosse> possesHolder = running.get(task.getDataSource())
.get(lock.getInterval().getStart())
.get(lock.getInterval());
final TaskLockPosse foundPosse = possesHolder.stream()
.filter(posse -> posse.getTaskLock().equals(lock))
.findFirst()
@ -692,16 +872,7 @@ public class TaskLockbox
giant.lock();
try {
return Lists.transform(
findLockPossesForTask(task), new Function<TaskLockPosse, TaskLock>()
{
@Override
public TaskLock apply(TaskLockPosse taskLockPosse)
{
return taskLockPosse.getTaskLock();
}
}
);
return Lists.transform(findLockPossesForTask(task), TaskLockPosse::getTaskLock);
}
finally {
giant.unlock();
@ -778,7 +949,7 @@ public class TaskLockbox
* Release lock held for a task on a particular interval. Does nothing if the task does not currently
* hold the mentioned lock.
*
* @param task task to unlock
* @param task task to unlock
* @param interval interval to unlock
*/
public void unlock(final Task task, final Interval interval, @Nullable Integer partitionId)
@ -1233,7 +1404,7 @@ public class TaskLockbox
TaskLockPosse that = (TaskLockPosse) o;
return java.util.Objects.equals(taskLock, that.taskLock) &&
java.util.Objects.equals(taskIds, that.taskIds);
java.util.Objects.equals(taskIds, that.taskIds);
}
@Override
@ -1251,4 +1422,121 @@ public class TaskLockbox
.toString();
}
}
/**
* Maintains a list of pending allocation holders.
*/
private static class AllocationHolderList
{
final List<SegmentAllocationHolder> all = new ArrayList<>();
final Set<SegmentAllocationHolder> pending = new HashSet<>();
final Set<SegmentAllocationHolder> recentlyCompleted = new HashSet<>();
AllocationHolderList(List<SegmentAllocateRequest> requests, Interval interval)
{
for (SegmentAllocateRequest request : requests) {
SegmentAllocationHolder holder = new SegmentAllocationHolder(request, interval, this);
all.add(holder);
pending.add(holder);
}
}
void markCompleted(SegmentAllocationHolder holder)
{
recentlyCompleted.add(holder);
}
Set<SegmentAllocationHolder> getPending()
{
pending.removeAll(recentlyCompleted);
recentlyCompleted.clear();
return pending;
}
List<SegmentAllocateResult> getResults()
{
return all.stream().map(holder -> holder.result).collect(Collectors.toList());
}
}
/**
* Contains the task, request, lock and final result for a segment allocation.
*/
private static class SegmentAllocationHolder
{
final AllocationHolderList list;
final Task task;
final Interval allocateInterval;
final SegmentAllocateAction action;
final LockRequestForNewSegment lockRequest;
SegmentCreateRequest segmentRequest;
TaskLock acquiredLock;
TaskLockPosse taskLockPosse;
Interval lockRequestInterval;
SegmentIdWithShardSpec allocatedSegment;
SegmentAllocateResult result;
SegmentAllocationHolder(SegmentAllocateRequest request, Interval allocateInterval, AllocationHolderList list)
{
this.list = list;
this.allocateInterval = allocateInterval;
this.task = request.getTask();
this.action = request.getAction();
this.lockRequest = new LockRequestForNewSegment(
action.getLockGranularity(),
action.getTaskLockType(),
task.getGroupId(),
action.getDataSource(),
allocateInterval,
action.getPartialShardSpec(),
task.getPriority(),
action.getSequenceName(),
action.getPreviousSegmentId(),
action.isSkipSegmentLineageCheck()
);
}
SegmentCreateRequest getSegmentRequest()
{
// Initialize the first time this is requested
if (segmentRequest == null) {
segmentRequest = new SegmentCreateRequest(
action.getSequenceName(),
action.getPreviousSegmentId(),
acquiredLock == null ? lockRequest.getVersion() : acquiredLock.getVersion(),
action.getPartialShardSpec()
);
}
return segmentRequest;
}
void markFailed(String msgFormat, Object... args)
{
list.markCompleted(this);
result = new SegmentAllocateResult(null, StringUtils.format(msgFormat, args));
}
void markSucceeded()
{
list.markCompleted(this);
result = new SegmentAllocateResult(allocatedSegment, null);
}
void setAllocatedSegment(SegmentIdWithShardSpec segmentId)
{
this.allocatedSegment = segmentId;
}
void setAcquiredLock(TaskLockPosse lockPosse, Interval lockRequestInterval)
{
this.taskLockPosse = lockPosse;
this.acquiredLock = lockPosse == null ? null : lockPosse.getTaskLock();
this.lockRequestInterval = lockRequestInterval;
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.curator.discovery.ServiceAnnouncer;
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.discovery.DruidLeaderSelector.Listener;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.Task;
@ -91,7 +92,8 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
final ServiceEmitter emitter,
final SupervisorManager supervisorManager,
final OverlordHelperManager overlordHelperManager,
@IndexingService final DruidLeaderSelector overlordLeaderSelector
@IndexingService final DruidLeaderSelector overlordLeaderSelector,
final SegmentAllocationQueue segmentAllocationQueue
)
{
this.supervisorManager = supervisorManager;
@ -136,6 +138,22 @@ public class TaskMaster implements TaskCountStatsProvider, TaskSlotCountStatsPro
leaderLifecycle.addManagedInstance(taskQueue);
leaderLifecycle.addManagedInstance(supervisorManager);
leaderLifecycle.addManagedInstance(overlordHelperManager);
leaderLifecycle.addHandler(
new Lifecycle.Handler()
{
@Override
public void start()
{
segmentAllocationQueue.becomeLeader();
}
@Override
public void stop()
{
segmentAllocationQueue.stopBeingLeader();
}
}
);
leaderLifecycle.addHandler(
new Lifecycle.Handler()

View File

@ -30,8 +30,24 @@ public class TaskLockConfig
@JsonProperty
private boolean forceTimeChunkLock = true;
@JsonProperty
private boolean batchSegmentAllocation = false;
@JsonProperty
private long batchAllocationMaxWaitTime = 500L;
public boolean isForceTimeChunkLock()
{
return forceTimeChunkLock;
}
public boolean isBatchSegmentAllocation()
{
return batchSegmentAllocation;
}
public long getBatchAllocationMaxWaitTime()
{
return batchAllocationMaxWaitTime;
}
}

View File

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.joda.time.DateTime;
public class SegmentAllocateActionBuilder
{
private String dataSource;
private DateTime timestamp;
private Granularity queryGranularity;
private Granularity preferredSegmentGranularity;
private String sequenceName;
private String previousSegmentId;
private boolean skipSegmentLineageCheck;
private PartialShardSpec partialShardSpec;
private LockGranularity lockGranularity;
private TaskLockType taskLockType;
private Task task;
public SegmentAllocateActionBuilder forDatasource(String dataSource)
{
this.dataSource = dataSource;
return this;
}
public SegmentAllocateActionBuilder forTimestamp(DateTime timestamp)
{
this.timestamp = timestamp;
return this;
}
public SegmentAllocateActionBuilder forTimestamp(String instant)
{
this.timestamp = DateTimes.of(instant);
return this;
}
public SegmentAllocateActionBuilder withQueryGranularity(Granularity queryGranularity)
{
this.queryGranularity = queryGranularity;
return this;
}
public SegmentAllocateActionBuilder withSegmentGranularity(Granularity segmentGranularity)
{
this.preferredSegmentGranularity = segmentGranularity;
return this;
}
public SegmentAllocateActionBuilder withSequenceName(String sequenceName)
{
this.sequenceName = sequenceName;
return this;
}
public SegmentAllocateActionBuilder withPreviousSegmentId(String previousSegmentId)
{
this.previousSegmentId = previousSegmentId;
return this;
}
public SegmentAllocateActionBuilder withSkipLineageCheck(boolean skipLineageCheck)
{
this.skipSegmentLineageCheck = skipLineageCheck;
return this;
}
public SegmentAllocateActionBuilder withPartialShardSpec(PartialShardSpec partialShardSpec)
{
this.partialShardSpec = partialShardSpec;
return this;
}
public SegmentAllocateActionBuilder withLockGranularity(LockGranularity lockGranularity)
{
this.lockGranularity = lockGranularity;
return this;
}
public SegmentAllocateActionBuilder withTaskLockType(TaskLockType taskLockType)
{
this.taskLockType = taskLockType;
return this;
}
public SegmentAllocateActionBuilder forTask(Task task)
{
this.dataSource = task.getDataSource();
this.sequenceName = task.getId();
this.task = task;
return this;
}
public SegmentAllocateRequest build()
{
return new SegmentAllocateRequest(task, buildAction(), 1);
}
public SegmentAllocateAction buildAction()
{
return new SegmentAllocateAction(
dataSource,
timestamp,
queryGranularity,
preferredSegmentGranularity,
sequenceName,
previousSegmentId,
skipSegmentLineageCheck,
partialShardSpec,
lockGranularity,
taskLockType
);
}
}

View File

@ -20,7 +20,6 @@
package org.apache.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
@ -51,6 +50,7 @@ import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
@ -64,6 +64,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@RunWith(Parameterized.class)
@ -79,20 +81,26 @@ public class SegmentAllocateActionTest
private static final DateTime PARTY_TIME = DateTimes.of("1999");
private static final DateTime THE_DISTANT_FUTURE = DateTimes.of("3000");
private final boolean useBatch;
private final LockGranularity lockGranularity;
@Parameterized.Parameters(name = "{0}")
private SegmentAllocationQueue allocationQueue;
@Parameterized.Parameters(name = "granularity = {0}, useBatch = {1}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
new Object[]{LockGranularity.SEGMENT},
new Object[]{LockGranularity.TIME_CHUNK}
new Object[]{LockGranularity.SEGMENT, true},
new Object[]{LockGranularity.SEGMENT, false},
new Object[]{LockGranularity.TIME_CHUNK, true},
new Object[]{LockGranularity.TIME_CHUNK, false}
);
}
public SegmentAllocateActionTest(LockGranularity lockGranularity)
public SegmentAllocateActionTest(LockGranularity lockGranularity, boolean useBatch)
{
this.lockGranularity = lockGranularity;
this.useBatch = useBatch;
}
@Before
@ -101,6 +109,19 @@ public class SegmentAllocateActionTest
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
EmittingLogger.registerEmitter(emitter);
EasyMock.replay(emitter);
allocationQueue = taskActionTestKit.getTaskActionToolbox().getSegmentAllocationQueue();
if (allocationQueue != null) {
allocationQueue.start();
allocationQueue.becomeLeader();
}
}
@After
public void tearDown()
{
if (allocationQueue != null) {
allocationQueue.stop();
}
}
@Test
@ -288,29 +309,11 @@ public class SegmentAllocateActionTest
if (lockGranularity == LockGranularity.TIME_CHUNK) {
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
.filter(input -> input.getInterval().contains(PARTY_TIME))
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
.filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE))
);
assertSameIdentifier(
@ -446,29 +449,11 @@ public class SegmentAllocateActionTest
if (lockGranularity == LockGranularity.TIME_CHUNK) {
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
.filter(input -> input.getInterval().contains(PARTY_TIME))
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
.filter(input -> input.getInterval().contains(THE_DISTANT_FUTURE))
);
assertSameIdentifier(
@ -990,21 +975,26 @@ public class SegmentAllocateActionTest
lockGranularity,
null
);
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
try {
if (useBatch) {
return action.performAsync(task, taskActionTestKit.getTaskActionToolbox())
.get(5, TimeUnit.SECONDS);
} else {
return action.perform(task, taskActionTestKit.getTaskActionToolbox());
}
}
catch (ExecutionException e) {
return null;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private void assertSameIdentifier(final SegmentIdWithShardSpec expected, final SegmentIdWithShardSpec actual)
{
Assert.assertEquals(expected, actual);
Assert.assertEquals(expected.getShardSpec().getPartitionNum(), actual.getShardSpec().getPartitionNum());
Assert.assertEquals(expected.getShardSpec().getClass(), actual.getShardSpec().getClass());
if (expected.getShardSpec().getClass() == NumberedShardSpec.class
&& actual.getShardSpec().getClass() == NumberedShardSpec.class) {
Assert.assertEquals(expected.getShardSpec().getNumCorePartitions(), actual.getShardSpec().getNumCorePartitions());
} else if (expected.getShardSpec().getClass() == LinearShardSpec.class
&& actual.getShardSpec().getClass() == LinearShardSpec.class) {
// do nothing
}
Assert.assertEquals(expected.getShardSpec(), actual.getShardSpec());
}
}

View File

@ -0,0 +1,349 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common.actions;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.NoopTask;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SegmentAllocationQueueTest
{
@Rule
public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
private static final String DS_WIKI = "wiki";
private static final String DS_KOALA = "koala";
private SegmentAllocationQueue allocationQueue;
private StubServiceEmitter emitter;
private BlockingExecutorService executor;
@Before
public void setUp()
{
executor = new BlockingExecutorService("alloc-test-exec");
emitter = new StubServiceEmitter("overlord", "alloc-test");
final TaskLockConfig lockConfig = new TaskLockConfig()
{
@Override
public boolean isBatchSegmentAllocation()
{
return true;
}
@Override
public long getBatchAllocationMaxWaitTime()
{
return 0;
}
};
allocationQueue = new SegmentAllocationQueue(
taskActionTestKit.getTaskLockbox(),
lockConfig,
taskActionTestKit.getMetadataStorageCoordinator(),
emitter,
(corePoolSize, nameFormat)
-> new WrappingScheduledExecutorService(nameFormat, executor, false)
);
allocationQueue.start();
allocationQueue.becomeLeader();
}
@After
public void tearDown()
{
if (allocationQueue != null) {
allocationQueue.stop();
}
if (executor != null) {
executor.shutdownNow();
}
emitter.flush();
}
@Test
public void testBatchWithMultipleTimestamps()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.forTimestamp("2022-01-01T01:00:00")
.withSegmentGranularity(Granularities.DAY)
.withQueryGranularity(Granularities.SECOND)
.withLockGranularity(LockGranularity.TIME_CHUNK)
.withSequenceName("seq_1")
.build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.forTimestamp("2022-01-01T02:00:00")
.withSegmentGranularity(Granularities.DAY)
.withQueryGranularity(Granularities.SECOND)
.withLockGranularity(LockGranularity.TIME_CHUNK)
.withSequenceName("seq_2")
.build(),
true
);
}
@Test
public void testBatchWithExclusiveLocks()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withTaskLockType(TaskLockType.EXCLUSIVE).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withTaskLockType(TaskLockType.EXCLUSIVE).build(),
true
);
}
@Test
public void testBatchWithSharedLocks()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withTaskLockType(TaskLockType.SHARED).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withTaskLockType(TaskLockType.SHARED).build(),
true
);
}
@Test
public void testBatchWithMultipleQueryGranularities()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withQueryGranularity(Granularities.SECOND).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withQueryGranularity(Granularities.MINUTE).build(),
true
);
}
@Test
public void testMultipleDatasourcesCannotBatch()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1")).build(),
allocateRequest().forTask(createTask(DS_KOALA, "group_1")).build(),
false
);
}
@Test
public void testMultipleGroupIdsCannotBatch()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_2")).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_3")).build(),
false
);
}
@Test
public void testMultipleLockGranularitiesCannotBatch()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withLockGranularity(LockGranularity.TIME_CHUNK).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withLockGranularity(LockGranularity.SEGMENT).build(),
false
);
}
@Test
public void testMultipleAllocateIntervalsCannotBatch()
{
verifyAllocationWithBatching(
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.forTimestamp("2022-01-01")
.withSegmentGranularity(Granularities.DAY).build(),
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.forTimestamp("2022-01-02")
.withSegmentGranularity(Granularities.DAY).build(),
false
);
}
@Test
public void testConflictingPendingSegment()
{
SegmentAllocateRequest hourSegmentRequest =
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withSegmentGranularity(Granularities.HOUR)
.build();
Future<SegmentIdWithShardSpec> hourSegmentFuture = allocationQueue.add(hourSegmentRequest);
SegmentAllocateRequest halfHourSegmentRequest =
allocateRequest().forTask(createTask(DS_WIKI, "group_1"))
.withSegmentGranularity(Granularities.THIRTY_MINUTE)
.build();
Future<SegmentIdWithShardSpec> halfHourSegmentFuture = allocationQueue.add(halfHourSegmentRequest);
executor.finishNextPendingTask();
Assert.assertNotNull(getSegmentId(hourSegmentFuture));
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(halfHourSegmentFuture));
Assert.assertEquals("Storage coordinator could not allocate segment.", t.getMessage());
}
@Test
public void testFullAllocationQueue()
{
for (int i = 0; i < 2000; ++i) {
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "group_" + i)).build();
allocationQueue.add(request);
}
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "next_group")).build();
Future<SegmentIdWithShardSpec> future = allocationQueue.add(request);
// Verify that the future is already complete and segment allocation has failed
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
Assert.assertEquals(
"Segment allocation queue is full. Check the metric `task/action/batch/runTime` "
+ "to determine if metadata operations are slow.",
t.getMessage()
);
}
@Test
public void testMultipleRequestsForSameSegment()
{
final List<Future<SegmentIdWithShardSpec>> segmentFutures = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "group_" + i))
.withSequenceName("sequence_1")
.withPreviousSegmentId("segment_1")
.build();
segmentFutures.add(allocationQueue.add(request));
}
executor.finishNextPendingTask();
SegmentIdWithShardSpec segmentId1 = getSegmentId(segmentFutures.get(0));
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Assert.assertEquals(getSegmentId(future), segmentId1);
}
}
@Test
public void testMaxWaitTime()
{
// Verify that the batch is due yet
}
@Test
public void testRequestsFailOnLeaderChange()
{
final List<Future<SegmentIdWithShardSpec>> segmentFutures = new ArrayList<>();
for (int i = 0; i < 10; ++i) {
SegmentAllocateRequest request =
allocateRequest().forTask(createTask(DS_WIKI, "group_" + i)).build();
segmentFutures.add(allocationQueue.add(request));
}
allocationQueue.stopBeingLeader();
executor.finishNextPendingTask();
for (Future<SegmentIdWithShardSpec> future : segmentFutures) {
Throwable t = Assert.assertThrows(ISE.class, () -> getSegmentId(future));
Assert.assertEquals("Cannot allocate segment if not leader", t.getMessage());
}
}
private void verifyAllocationWithBatching(
SegmentAllocateRequest a,
SegmentAllocateRequest b,
boolean canBatch
)
{
Assert.assertEquals(0, allocationQueue.size());
final Future<SegmentIdWithShardSpec> futureA = allocationQueue.add(a);
final Future<SegmentIdWithShardSpec> futureB = allocationQueue.add(b);
final int expectedCount = canBatch ? 1 : 2;
Assert.assertEquals(expectedCount, allocationQueue.size());
executor.finishNextPendingTask();
emitter.verifyEmitted("task/action/batch/size", expectedCount);
Assert.assertNotNull(getSegmentId(futureA));
Assert.assertNotNull(getSegmentId(futureB));
}
private SegmentIdWithShardSpec getSegmentId(Future<SegmentIdWithShardSpec> future)
{
try {
return future.get(5, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
throw new ISE(e.getCause().getMessage());
}
catch (InterruptedException | TimeoutException e) {
throw new RuntimeException(e);
}
}
private SegmentAllocateActionBuilder allocateRequest()
{
return new SegmentAllocateActionBuilder()
.forDatasource(DS_WIKI)
.forTimestamp("2022-01-01")
.withLockGranularity(LockGranularity.TIME_CHUNK)
.withTaskLockType(TaskLockType.SHARED)
.withQueryGranularity(Granularities.SECOND)
.withSegmentGranularity(Granularities.HOUR);
}
private Task createTask(String datasource, String groupId)
{
Task task = new NoopTask(null, groupId, datasource, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
return task;
}
}

View File

@ -27,7 +27,10 @@ import org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskLockbox;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
@ -99,11 +102,34 @@ public class TaskActionTestKit extends ExternalResource
Suppliers.ofInstance(metadataStorageTablesConfig),
testDerbyConnector
);
final ServiceEmitter noopEmitter = new NoopServiceEmitter();
final TaskLockConfig taskLockConfig = new TaskLockConfig()
{
@Override
public boolean isBatchSegmentAllocation()
{
return true;
}
@Override
public long getBatchAllocationMaxWaitTime()
{
return 10L;
}
};
taskActionToolbox = new TaskActionToolbox(
taskLockbox,
taskStorage,
metadataStorageCoordinator,
new NoopServiceEmitter(),
new SegmentAllocationQueue(
taskLockbox,
taskLockConfig,
metadataStorageCoordinator,
noopEmitter,
ScheduledExecutors::fixed
),
noopEmitter,
EasyMock.createMock(SupervisorManager.class),
objectMapper
);

View File

@ -40,6 +40,7 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TimeChunkLock;
import org.apache.druid.indexing.common.actions.SegmentAllocationQueue;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.indexing.common.task.NoopTask;
@ -226,7 +227,8 @@ public class OverlordTest
serviceEmitter,
supervisorManager,
EasyMock.createNiceMock(OverlordHelperManager.class),
new TestDruidLeaderSelector()
new TestDruidLeaderSelector(),
EasyMock.createNiceMock(SegmentAllocationQueue.class)
);
EmittingLogger.registerEmitter(serviceEmitter);
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.jackson.DefaultObjectMapper;
@ -36,8 +37,10 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
@ -124,6 +127,17 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return ImmutableSet.copyOf(added);
}
@Override
public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
)
{
return Collections.emptyMap();
}
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,

View File

@ -31,6 +31,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
@ -154,6 +155,27 @@ public interface IndexerMetadataStorageCoordinator
*/
Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException;
/**
* Allocates pending segments for the given requests in the pending segments table.
* The segment id allocated for a request will not be given out again unless a
* request is made with the same {@link SegmentCreateRequest}.
*
* @param dataSource dataSource for which to allocate a segment
* @param interval interval for which to allocate a segment
* @param skipSegmentLineageCheck if true, perform lineage validation using previousSegmentId for this sequence.
* Should be set to false if replica tasks would index events in same order
* @param requests Requests for which to allocate segments. All
* the requests must share the same partition space.
* @return Map from request to allocated segment id. The map does not contain
* entries for failed requests.
*/
Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
);
/**
* Allocate a new pending segment in the pending segments table. This segment identifier will never be given out
* again, <em>unless</em> another call is made with the same dataSource, sequenceName, and previousSegmentId.

View File

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.druid.timeline.partition.PartialShardSpec;
/**
* Contains information used by {@link IndexerMetadataStorageCoordinator} for
* creating a new segment.
* <p>
* The {@code sequenceName} and {@code previousSegmentId} fields are meant to
* make it easy for two independent ingestion tasks to produce the same series
* of segments.
*/
public class SegmentCreateRequest
{
// DO NOT IMPLEMENT equals or hashCode for this class as each request must be
// treated as unique even if it is for the same parameters
private final String version;
private final String sequenceName;
private final String previousSegmentId;
private final PartialShardSpec partialShardSpec;
public SegmentCreateRequest(
String sequenceName,
String previousSegmentId,
String version,
PartialShardSpec partialShardSpec
)
{
this.sequenceName = sequenceName;
this.previousSegmentId = previousSegmentId == null ? "" : previousSegmentId;
this.version = version;
this.partialShardSpec = partialShardSpec;
}
public String getSequenceName()
{
return sequenceName;
}
/**
* Non-null previous segment id. This can be used for persisting to the
* pending segments table in the metadata store.
*/
public String getPreviousSegmentId()
{
return previousSegmentId;
}
public String getVersion()
{
return version;
}
public PartialShardSpec getPartialShardSpec()
{
return partialShardSpec;
}
}

View File

@ -28,12 +28,14 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
import com.google.inject.Inject;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
@ -75,10 +77,13 @@ import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -221,6 +226,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return numSegmentsMarkedUnused;
}
/**
* Fetches all the pending segments, whose interval overlaps with the given
* search interval from the metadata store.
*/
private Set<SegmentIdWithShardSpec> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
@ -481,6 +490,23 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return SQLMetadataConnector.DEFAULT_MAX_TRIES;
}
@Override
public Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(
String dataSource,
Interval allocateInterval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
)
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(allocateInterval, "interval");
final Interval interval = allocateInterval.withChronology(ISOChronology.getInstanceUTC());
return connector.retryWithHandle(
handle -> allocatePendingSegments(handle, dataSource, interval, skipSegmentLineageCheck, requests)
);
}
@Override
public SegmentIdWithShardSpec allocatePendingSegment(
final String dataSource,
@ -599,6 +625,81 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return newIdentifier;
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatePendingSegments(
final Handle handle,
final String dataSource,
final Interval interval,
final boolean skipSegmentLineageCheck,
final List<SegmentCreateRequest> requests
) throws IOException
{
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> existingSegmentIds;
if (skipSegmentLineageCheck) {
existingSegmentIds = getExistingSegmentIdsSkipLineageCheck(handle, dataSource, interval, requests);
} else {
existingSegmentIds = getExistingSegmentIdsWithLineageCheck(handle, dataSource, interval, requests);
}
// For every request see if a segment id already exists
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> allocatedSegmentIds = new HashMap<>();
final List<SegmentCreateRequest> requestsForNewSegments = new ArrayList<>();
for (SegmentCreateRequest request : requests) {
CheckExistingSegmentIdResult existingSegmentId = existingSegmentIds.get(request);
if (existingSegmentId == null || !existingSegmentId.found) {
requestsForNewSegments.add(request);
} else if (existingSegmentId.segmentIdentifier != null) {
log.info("Found valid existing segment [%s] for request.", existingSegmentId.segmentIdentifier);
allocatedSegmentIds.put(request, existingSegmentId.segmentIdentifier);
} else {
log.info("Found clashing existing segment [%s] for request.", existingSegmentId);
}
}
// For each of the remaining requests, create a new segment
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments =
createNewSegments(handle, dataSource, interval, skipSegmentLineageCheck, requestsForNewSegments);
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
// UNIQUE key for the row, ensuring we don't have more than one segment per sequence per interval.
// Using a single column instead of (sequence_name, sequence_prev_id) as some MySQL storage engines
// have difficulty with large unique keys (see https://github.com/apache/druid/issues/2319)
insertPendingSegmentsIntoMetastore(
handle,
createdSegments,
dataSource,
interval,
skipSegmentLineageCheck
);
allocatedSegmentIds.putAll(createdSegments);
return allocatedSegmentIds;
}
@SuppressWarnings("UnstableApiUsage")
private String getSequenceNameAndPrevIdSha(
SegmentCreateRequest request,
Interval interval,
boolean skipSegmentLineageCheck
)
{
final Hasher hasher = Hashing.sha1().newHasher()
.putBytes(StringUtils.toUtf8(request.getSequenceName()))
.putByte((byte) 0xff);
if (skipSegmentLineageCheck) {
hasher
.putLong(interval.getStartMillis())
.putLong(interval.getEndMillis());
} else {
hasher
.putBytes(StringUtils.toUtf8(request.getPreviousSegmentId()));
}
return BaseEncoding.base16().encode(hasher.hash().asBytes());
}
@Nullable
private SegmentIdWithShardSpec allocatePendingSegment(
final Handle handle,
@ -631,7 +732,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
if (result.found) {
// The found existing segment identifier can be null if its interval doesn't match with the given interval
return result.segmentIdentifier;
}
@ -672,6 +772,95 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return newIdentifier;
}
/**
* Returns a map from sequenceName to segment id.
*/
private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsSkipLineageCheck(
Handle handle,
String dataSource,
Interval interval,
List<SegmentCreateRequest> requests
) throws IOException
{
final Query<Map<String, Object>> query = handle
.createQuery(
StringUtils.format(
"SELECT sequence_name, payload "
+ "FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "start = :start AND "
+ "%2$send%2$s = :end",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString());
final ResultIterator<PendingSegmentsRecord> dbSegments = query
.map((index, r, ctx) -> PendingSegmentsRecord.fromResultSet(r))
.iterator();
// Map from sequenceName to segment id
final Map<String, SegmentIdWithShardSpec> sequenceToSegmentId = new HashMap<>();
while (dbSegments.hasNext()) {
final PendingSegmentsRecord record = dbSegments.next();
final SegmentIdWithShardSpec segmentId =
jsonMapper.readValue(record.getPayload(), SegmentIdWithShardSpec.class);
sequenceToSegmentId.put(record.getSequenceName(), segmentId);
}
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<>();
for (SegmentCreateRequest request : requests) {
SegmentIdWithShardSpec segmentId = sequenceToSegmentId.get(request.getSequenceName());
requestToResult.put(request, new CheckExistingSegmentIdResult(segmentId != null, segmentId));
}
return requestToResult;
}
/**
* Returns a map from sequenceName to segment id.
*/
private Map<SegmentCreateRequest, CheckExistingSegmentIdResult> getExistingSegmentIdsWithLineageCheck(
Handle handle,
String dataSource,
Interval interval,
List<SegmentCreateRequest> requests
) throws IOException
{
// This cannot be batched because there doesn't seem to be a clean option:
// 1. WHERE must have sequence_name and sequence_prev_id but not start or end.
// (sequence columns are used to find the matching segment whereas start and
// end are used to determine if the found segment is valid or not)
// 2. IN filters on sequence_name and sequence_prev_id might perform worse than individual SELECTs?
// 3. IN filter on sequence_name alone might be a feasible option worth evaluating
final String sql = StringUtils.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
);
final Map<SegmentCreateRequest, CheckExistingSegmentIdResult> requestToResult = new HashMap<>();
for (SegmentCreateRequest request : requests) {
CheckExistingSegmentIdResult result = checkAndGetExistingSegmentId(
handle.createQuery(sql)
.bind("dataSource", dataSource)
.bind("sequence_name", request.getSequenceName())
.bind("sequence_prev_id", request.getPreviousSegmentId()),
interval,
request.getSequenceName(),
request.getPreviousSegmentId()
);
requestToResult.put(request, result);
}
return requestToResult;
}
private CheckExistingSegmentIdResult checkAndGetExistingSegmentId(
final Query<Map<String, Object>> query,
final Interval interval,
@ -686,50 +875,36 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
final List<byte[]> existingBytes = boundQuery.map(ByteArrayMapper.FIRST).list();
if (!existingBytes.isEmpty()) {
if (existingBytes.isEmpty()) {
return new CheckExistingSegmentIdResult(false, null);
} else {
final SegmentIdWithShardSpec existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdWithShardSpec.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
if (previousSegmentId == null) {
log.info("Found existing pending segment [%s] for sequence[%s] in DB", existingIdentifier, sequenceName);
} else {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier,
sequenceName,
previousSegmentId
);
}
if (existingIdentifier.getInterval().isEqual(interval)) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier,
sequenceName,
previousSegmentId
);
return new CheckExistingSegmentIdResult(true, existingIdentifier);
} else {
if (previousSegmentId == null) {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
interval
);
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
previousSegmentId,
interval
);
}
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier,
sequenceName,
previousSegmentId,
interval
);
return new CheckExistingSegmentIdResult(true, null);
}
}
return new CheckExistingSegmentIdResult(false, null);
}
private static class CheckExistingSegmentIdResult
@ -745,6 +920,48 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
private void insertPendingSegmentsIntoMetastore(
Handle handle,
Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck
) throws JsonProcessingException
{
final PreparedBatch insertBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, sequence_name, sequence_prev_id, "
+ "sequence_name_prev_id_sha1, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, "
+ ":sequence_name_prev_id_sha1, :payload)",
dbTables.getPendingSegmentsTable(),
connector.getQuoteString()
));
// Deduplicate the segment ids by inverting the map
Map<SegmentIdWithShardSpec, SegmentCreateRequest> segmentIdToRequest = new HashMap<>();
createdSegments.forEach((request, segmentId) -> segmentIdToRequest.put(segmentId, request));
for (Map.Entry<SegmentIdWithShardSpec, SegmentCreateRequest> entry : segmentIdToRequest.entrySet()) {
final SegmentCreateRequest request = entry.getValue();
final SegmentIdWithShardSpec segmentId = entry.getKey();
insertBatch.add()
.bind("id", segmentId.toString())
.bind("dataSource", dataSource)
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", request.getSequenceName())
.bind("sequence_prev_id", request.getPreviousSegmentId())
.bind(
"sequence_name_prev_id_sha1",
getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck)
)
.bind("payload", jsonMapper.writeValueAsBytes(segmentId));
}
insertBatch.execute();
}
private void insertPendingSegmentIntoMetastore(
Handle handle,
SegmentIdWithShardSpec newIdentifier,
@ -777,6 +994,204 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.execute();
}
private Map<SegmentCreateRequest, SegmentIdWithShardSpec> createNewSegments(
Handle handle,
String dataSource,
Interval interval,
boolean skipSegmentLineageCheck,
List<SegmentCreateRequest> requests
) throws IOException
{
if (requests.isEmpty()) {
return Collections.emptyMap();
}
// Get the time chunk and associated data segments for the given interval, if any
final List<TimelineObjectHolder<String, DataSegment>> existingChunks =
getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval))
.lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segments for dataSource[%s], interval[%s]: already have [%,d] chunks.",
dataSource,
interval,
existingChunks.size()
);
return Collections.emptyMap();
}
// Shard spec of any of the requests (as they are all compatible) can be used to
// identify existing shard specs that share partition space with the requested ones.
final PartialShardSpec partialShardSpec = requests.get(0).getPartialShardSpec();
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
@Nullable
final String versionOfExistingChunk;
if (existingChunks.isEmpty()) {
versionOfExistingChunk = null;
} else {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
versionOfExistingChunk = existingHolder.getVersion();
// Don't use the stream API for performance.
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
// Here we check only the segments of the shardSpec which shares the same partition space with the given
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
}
}
}
// Fetch the pending segments for this interval to determine max partitionId
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendingSegments =
getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval);
final Map<SegmentCreateRequest, SegmentIdWithShardSpec> createdSegments = new HashMap<>();
final Map<String, SegmentIdWithShardSpec> sequenceHashToSegment = new HashMap<>();
for (SegmentCreateRequest request : requests) {
// Check if the required segment has already been created in this batch
final String sequenceHash = getSequenceNameAndPrevIdSha(request, interval, skipSegmentLineageCheck);
final SegmentIdWithShardSpec createdSegment;
if (sequenceHashToSegment.containsKey(sequenceHash)) {
createdSegment = sequenceHashToSegment.get(sequenceHash);
} else {
createdSegment = createNewSegment(
request,
dataSource,
interval,
versionOfExistingChunk,
committedMaxId,
pendingSegments
);
// Add to pendingSegments to consider for partitionId
if (createdSegment != null) {
pendingSegments.add(createdSegment);
sequenceHashToSegment.put(sequenceHash, createdSegment);
log.info("Created new segment [%s]", createdSegment);
}
}
if (createdSegment != null) {
createdSegments.put(request, createdSegment);
}
}
log.info("Created [%d] new segments for [%d] allocate requests.", sequenceHashToSegment.size(), requests.size());
return createdSegments;
}
private SegmentIdWithShardSpec createNewSegment(
SegmentCreateRequest request,
String dataSource,
Interval interval,
String versionOfExistingChunk,
SegmentIdWithShardSpec committedMaxId,
Set<SegmentIdWithShardSpec> pendingSegments
)
{
final PartialShardSpec partialShardSpec = request.getPartialShardSpec();
final String existingVersion = request.getVersion();
// Include the committedMaxId while computing the overallMaxId
if (committedMaxId != null) {
pendingSegments.add(committedMaxId);
}
// If there is an existing chunk, find the max id with the same version as the existing chunk.
// There may still be a pending segment with a higher version (but no corresponding used segments)
// which may generate a clash with an existing segment once the new id is generated
final SegmentIdWithShardSpec overallMaxId =
pendingSegments.stream()
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.filter(id -> versionOfExistingChunk == null
|| id.getVersion().equals(versionOfExistingChunk))
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
.thenComparing(id -> id.getShardSpec().getPartitionNum()))
.orElse(null);
// Determine the version of the new segment
final String newSegmentVersion;
if (versionOfExistingChunk != null) {
newSegmentVersion = versionOfExistingChunk;
} else if (overallMaxId != null) {
newSegmentVersion = overallMaxId.getVersion();
} else {
// this is the first segment for this interval
newSegmentVersion = null;
}
if (overallMaxId == null) {
// When appending segments, null overallMaxId means that we are allocating the very initial
// segment for this time chunk.
// This code is executed when the Overlord coordinates segment allocation, which is either you append segments
// or you use segment lock. Since the core partitions set is not determined for appended segments, we set
// it 0. When you use segment lock, the core partitions set doesn't work with it. We simply set it 0 so that the
// OvershadowableManager handles the atomic segment update.
final int newPartitionId = partialShardSpec.useNonRootGenerationPartitionSpace()
? PartitionIds.NON_ROOT_GEN_START_PARTITION_ID
: PartitionIds.ROOT_GEN_START_PARTITION_ID;
String version = newSegmentVersion == null ? existingVersion : newSegmentVersion;
return new SegmentIdWithShardSpec(
dataSource,
interval,
version,
partialShardSpec.complete(jsonMapper, newPartitionId, 0)
);
} else if (!overallMaxId.getInterval().equals(interval)
|| overallMaxId.getVersion().compareTo(existingVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], existingVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
existingVersion,
overallMaxId
);
return null;
} else if (committedMaxId != null
&& committedMaxId.getShardSpec().getNumCorePartitions()
== SingleDimensionShardSpec.UNKNOWN_NUM_CORE_PARTITIONS) {
log.warn(
"Cannot allocate new segment because of unknown core partition size of segment[%s], shardSpec[%s]",
committedMaxId,
committedMaxId.getShardSpec()
);
return null;
} else {
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
return new SegmentIdWithShardSpec(
dataSource,
interval,
Preconditions.checkNotNull(newSegmentVersion, "newSegmentVersion"),
partialShardSpec.complete(
jsonMapper,
overallMaxId.getShardSpec().getPartitionNum() + 1,
committedMaxId == null ? 0 : committedMaxId.getShardSpec().getNumCorePartitions()
)
);
}
}
/**
* This function creates a new segment for the given datasource/interval/etc. A critical
* aspect of the creation is to make sure that the new version & new partition number will make
@ -818,15 +1233,18 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return null;
} else {
// max partitionId of the committed shardSpecs which share the same partition space.
// max partitionId of published data segments which share the same partition space.
SegmentIdWithShardSpec committedMaxId = null;
// max partitionId of the all shardSpecs including the pending ones which share the same partition space.
SegmentIdWithShardSpec overallMaxId;
if (!existingChunks.isEmpty()) {
@Nullable
final String versionOfExistingChunk;
if (existingChunks.isEmpty()) {
versionOfExistingChunk = null;
} else {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
versionOfExistingChunk = existingHolder.getVersion();
//noinspection ConstantConditions
// Don't use the stream API for performance.
for (DataSegment segment : FluentIterable
.from(existingHolder.getObject())
.transform(PartitionChunk::getObject)
@ -834,8 +1252,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
// partialShardSpec. Note that OverwriteShardSpec doesn't share the partition space with others.
// See PartitionIds.
.filter(segment -> segment.getShardSpec().sharePartitionSpace(partialShardSpec))) {
// Don't use the stream API for performance.
// Note that this will compute the max id of existing, visible, data segments in the time chunk:
if (committedMaxId == null
|| committedMaxId.getShardSpec().getPartitionNum() < segment.getShardSpec().getPartitionNum()) {
committedMaxId = SegmentIdWithShardSpec.fromDataSegment(segment);
@ -843,63 +1259,41 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
}
}
// Get the version of the existing chunk, we might need it in some of the cases below
// to compute the new identifier's version
@Nullable
final String versionOfExistingChunk;
if (!existingChunks.isEmpty()) {
// remember only one chunk possible for given interval so get the first & only one
versionOfExistingChunk = existingChunks.get(0).getVersion();
} else {
versionOfExistingChunk = null;
}
// next, we need to enrich the overallMaxId computed with committed segments with the information of the pending segments
// it is possible that a pending segment has a higher id in which case we need that, it will work,
// and it will avoid clashes when inserting the new pending segment later in the caller of this method
// Fetch the pending segments for this interval to determine max partitionId
// across all shard specs (published + pending).
// A pending segment having a higher partitionId must also be considered
// to avoid clashes when inserting the pending segment created here.
final Set<SegmentIdWithShardSpec> pendings = getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
// Make sure we add the committed max id we obtained from the segments table:
if (committedMaxId != null) {
pendings.add(committedMaxId);
}
// Now compute the overallMaxId with all the information: pendings + segments:
// The versionOfExistingChunks filter is ensure that we pick the max id with the version of the existing chunk
// in the case that there may be a pending segment with a higher version but no corresponding used segments
// If there is an existing chunk, find the max id with the same version as the existing chunk.
// There may still be a pending segment with a higher version (but no corresponding used segments)
// which may generate a clash with an existing segment once the new id is generated
final SegmentIdWithShardSpec overallMaxId;
overallMaxId = pendings.stream()
.filter(id -> id.getShardSpec().sharePartitionSpace(partialShardSpec))
.filter(id -> versionOfExistingChunk == null || id.getVersion()
.equals(versionOfExistingChunk))
.max((id1, id2) -> {
final int versionCompare = id1.getVersion().compareTo(id2.getVersion());
if (versionCompare != 0) {
return versionCompare;
} else {
return Integer.compare(
id1.getShardSpec().getPartitionNum(),
id2.getShardSpec().getPartitionNum()
);
}
})
.filter(id -> versionOfExistingChunk == null
|| id.getVersion().equals(versionOfExistingChunk))
.max(Comparator.comparing(SegmentIdWithShardSpec::getVersion)
.thenComparing(id -> id.getShardSpec().getPartitionNum()))
.orElse(null);
// The following code attempts to compute the new version, if this
// new version is not null at the end of next block then it will be
// used as the new version in the case for initial or appended segment
// Determine the version of the new segment
final String newSegmentVersion;
if (versionOfExistingChunk != null) {
// segment version overrides, so pick that now that we know it exists
newSegmentVersion = versionOfExistingChunk;
} else if (!pendings.isEmpty() && overallMaxId != null) {
// there is no visible segments in the time chunk, so pick the max id of pendings, as computed above
} else if (overallMaxId != null) {
newSegmentVersion = overallMaxId.getVersion();
} else {
// no segments, no pendings, so this must be the very first segment created for this interval
// this is the first segment for this interval
newSegmentVersion = null;
}
@ -940,7 +1334,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
return null;
} else {
// The number of core partitions must always be chosen from the set of used segments in the VersionedIntervalTimeline.
// The number of core partitions must always be chosen from the set of used segments in the SegmentTimeline.
// When the core partitions have been dropped, using pending segments may lead to an incorrect state
// where the chunk is believed to have core partitions and queries results are incorrect.
@ -1472,4 +1866,46 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
);
}
private static class PendingSegmentsRecord
{
private final String sequenceName;
private final byte[] payload;
/**
* The columns expected in the result set are:
* <ol>
* <li>sequence_name</li>
* <li>payload</li>
* </ol>
*/
static PendingSegmentsRecord fromResultSet(ResultSet resultSet)
{
try {
return new PendingSegmentsRecord(
resultSet.getString(1),
resultSet.getBytes(2)
);
}
catch (SQLException e) {
throw new RuntimeException(e);
}
}
PendingSegmentsRecord(String sequenceName, byte[] payload)
{
this.payload = payload;
this.sequenceName = sequenceName;
}
public byte[] getPayload()
{
return payload;
}
public String getSequenceName()
{
return sequenceName;
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord;
import org.apache.druid.timeline.partition.NumberedPartialShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.junit.Assert;
import org.junit.Test;
public class SegmentCreateRequestTest
{
@Test
public void testNullPreviousSegmentId()
{
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
SegmentCreateRequest request = new SegmentCreateRequest(
"sequence",
null,
"version",
partialShardSpec
);
Assert.assertEquals("sequence", request.getSequenceName());
Assert.assertEquals("", request.getPreviousSegmentId());
Assert.assertEquals("version", request.getVersion());
Assert.assertEquals(partialShardSpec, request.getPartialShardSpec());
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.ObjectMetadata;
import org.apache.druid.indexing.overlord.SegmentCreateRequest;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.java.util.common.DateTimes;
@ -59,7 +60,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.StringMapper;
import java.io.IOException;
@ -371,40 +371,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(
1,
(int) derbyConnector.getDBI().<Integer>withHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle)
{
String request = StringUtils.format(
"UPDATE %s SET used = false WHERE id = :id",
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
);
return handle.createStatement(request).bind("id", segment.getId().toString()).execute();
}
}
)
);
}
}
private void markAllSegmentsUsed(Set<DataSegment> segments)
{
for (final DataSegment segment : segments) {
Assert.assertEquals(
1,
(int) derbyConnector.getDBI().<Integer>withHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle)
{
String request = StringUtils.format(
"UPDATE %s SET used = true WHERE id = :id",
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
);
return handle.createStatement(request).bind("id", segment.getId().toString()).execute();
}
handle -> {
String request = StringUtils.format(
"UPDATE %s SET used = false WHERE id = :id",
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable()
);
return handle.createStatement(request).bind("id", segment.getId().toString()).execute();
}
)
);
@ -415,32 +387,19 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getPendingSegmentsTable();
return derbyConnector.retryWithHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle)
{
return handle.createQuery("SELECT id FROM " + table + " ORDER BY id")
.map(StringMapper.FIRST)
.list();
}
}
handle -> handle.createQuery("SELECT id FROM " + table + " ORDER BY id")
.map(StringMapper.FIRST)
.list()
);
}
private List<String> retrieveUsedSegmentIds()
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
return derbyConnector.retryWithHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle)
{
return handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id")
.map(StringMapper.FIRST)
.list();
}
}
handle -> handle.createQuery("SELECT id FROM " + table + " WHERE used = true ORDER BY id")
.map(StringMapper.FIRST)
.list()
);
}
@ -448,16 +407,9 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
return derbyConnector.retryWithHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle)
{
return handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id")
.map(StringMapper.FIRST)
.list();
}
}
handle -> handle.createQuery("SELECT id FROM " + table + " WHERE used = false ORDER BY id")
.map(StringMapper.FIRST)
.list()
);
}
@ -466,39 +418,34 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{
final String table = derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
return derbyConnector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
table,
derbyConnector.getQuoteString()
)
);
for (DataSegment segment : dataSegments) {
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", mapper.writeValueAsBytes(segment));
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (!succeeded) {
throw new ISE("Failed to publish segments to DB");
}
return true;
handle -> {
PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
table,
derbyConnector.getQuoteString()
)
);
for (DataSegment segment : dataSegments) {
preparedBatch.add()
.bind("id", segment.getId().toString())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", mapper.writeValueAsBytes(segment));
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
if (!succeeded) {
throw new ISE("Failed to publish segments to DB");
}
return true;
}
);
}
@ -561,12 +508,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
List<String> segmentIds = segments.stream().map(segment -> segment.getId().toString()).collect(Collectors.toList());
segmentIds.sort(Comparator.naturalOrder());
Assert.assertEquals(
segmentIds,
retrieveUsedSegmentIds()
);
List<String> segmentIds = segments.stream()
.map(segment -> segment.getId().toString())
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());
Assert.assertEquals(segmentIds, retrieveUsedSegmentIds());
// Should not update dataSource metadata.
Assert.assertEquals(0, metadataUpdateCounter.get());
@ -823,10 +770,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
retrieveUsedSegmentIds()
);
DataSegment nonExistingSegment = defaultSegment4;
Set<DataSegment> dropSegments = ImmutableSet.of(existingSegment1, nonExistingSegment);
Set<DataSegment> dropSegments = ImmutableSet.of(existingSegment1, defaultSegment4);
final SegmentPublishResult result1 = coordinator.announceHistoricalSegments(
SEGMENTS,
dropSegments,
@ -1576,17 +1520,19 @@ public class IndexerSQLMetadataStorageCoordinatorTest
}
/**
* This test simulates an issue detected on the field consisting of the following sequence of events:
* - A kafka stream segment was created on a given interval
* - Later, after the above was published, another segment on same interval was created by the stream
* - Later, after the above was published, another segment on same interval was created by the stream
* - Later a compaction was issued for the three segments above
* - Later, after the above was published, another segment on same interval was created by the stream
* - Later, the compacted segment got dropped due to a drop rule
* This test verifies the behaviour in the following sequence of events:
* - create segment1 for an interval and publish
* - create segment2 for same interval and publish
* - create segment3 for same interval and publish
* - compact all segments above and publish new segments
* - create segment4 for the same interval
* - drop the compacted segment
* - create segment5 for the same interval
* - verify that the id for segment5 is correct
* - Later, after the above was dropped, another segment on same interval was created by the stream but this
* time there was an integrity violation in the pending segments table because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
* method returned an segment id that already existed in the pending segments table
* time there was an integrity violation in the pending segments table because the
* {@link IndexerSQLMetadataStorageCoordinator#createNewSegment(Handle, String, Interval, PartialShardSpec, String)}
* method returned an segment id that already existed in the pending segments table
*/
@Test
public void testAllocatePendingSegmentAfterDroppingExistingSegment()
@ -1690,13 +1636,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
/**
* Slightly different that the above test but that involves reverted compaction
1) used segments of version = A, id = 0, 1, 2
2) overwrote segments of version = B, id = 0 <= compaction
3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
4) pending segment of version = B, id = 1 <= appending new data, aborted
5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused
6) used segments of version = A, id = 0, 1, 2
7) pending segment of version = B, id = 1
* 1) used segments of version = A, id = 0, 1, 2
* 2) overwrote segments of version = B, id = 0 <= compaction
* 3) marked segments unused for version = A, id = 0, 1, 2 <= overshadowing
* 4) pending segment of version = B, id = 1 <= appending new data, aborted
* 5) reverted compaction, mark segments used for version = A, id = 0, 1, 2, and mark compacted segments unused
* 6) used segments of version = A, id = 0, 1, 2
* 7) pending segment of version = B, id = 1
*/
@Test
public void testAnotherAllocatePendingSegmentAfterRevertingCompaction()
@ -1842,13 +1788,13 @@ public class IndexerSQLMetadataStorageCoordinatorTest
// used segment: version = A, id = 0,1,2
// unused segment: version = B, id = 0
List<String> pendings = retrievePendingSegmentIds();
Assert.assertTrue(pendings.size() == 4);
Assert.assertEquals(4, pendings.size());
List<String> used = retrieveUsedSegmentIds();
Assert.assertTrue(used.size() == 3);
Assert.assertEquals(3, used.size());
List<String> unused = retrieveUnusedSegmentIds();
Assert.assertTrue(unused.size() == 1);
Assert.assertEquals(1, unused.size());
// Simulate one more append load
final SegmentIdWithShardSpec identifier4 = coordinator.allocatePendingSegment(
@ -1886,7 +1832,71 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_3", ids.get(3));
}
@Test
public void testAllocatePendingSegments()
{
final PartialShardSpec partialShardSpec = NumberedPartialShardSpec.instance();
final String dataSource = "ds";
final Interval interval = Intervals.of("2017-01-01/2017-02-01");
final String sequenceName = "seq";
final SegmentCreateRequest request = new SegmentCreateRequest(sequenceName, null, "v1", partialShardSpec);
final SegmentIdWithShardSpec segmentId0 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request)
).get(request);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1", segmentId0.toString());
final SegmentCreateRequest request1 =
new SegmentCreateRequest(sequenceName, segmentId0.toString(), segmentId0.getVersion(), partialShardSpec);
final SegmentIdWithShardSpec segmentId1 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request1)
).get(request1);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_1", segmentId1.toString());
final SegmentCreateRequest request2 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
final SegmentIdWithShardSpec segmentId2 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request2)
).get(request2);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId2.toString());
final SegmentCreateRequest request3 =
new SegmentCreateRequest(sequenceName, segmentId1.toString(), segmentId1.getVersion(), partialShardSpec);
final SegmentIdWithShardSpec segmentId3 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request3)
).get(request3);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_2", segmentId3.toString());
Assert.assertEquals(segmentId2, segmentId3);
final SegmentCreateRequest request4 =
new SegmentCreateRequest("seq1", null, "v1", partialShardSpec);
final SegmentIdWithShardSpec segmentId4 = coordinator.allocatePendingSegments(
dataSource,
interval,
false,
Collections.singletonList(request4)
).get(request4);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_v1_3", segmentId4.toString());
}
@Test
public void testNoPendingSegmentsAndOneUsedSegment()
{
@ -1923,9 +1933,8 @@ public class IndexerSQLMetadataStorageCoordinatorTest
true
);
Assert.assertEquals("ds_2017-01-01T00:00:00.000Z_2017-02-01T00:00:00.000Z_A_1", identifier.toString());
}
}
@Test