diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 550465e6ef6..e4e6fc89a6f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -30,7 +30,7 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; - private final String sequenceName; + private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; private final Map consumerProperties; @@ -38,14 +38,14 @@ public class KafkaIOConfig implements IOConfig @JsonCreator public KafkaIOConfig( - @JsonProperty("sequenceName") String sequenceName, + @JsonProperty("baseSequenceName") String baseSequenceName, @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction ) { - this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName"); + this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions"); this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); @@ -72,9 +72,9 @@ public class KafkaIOConfig implements IOConfig } @JsonProperty - public String getSequenceName() + public String getBaseSequenceName() { - return sequenceName; + return baseSequenceName; } @JsonProperty @@ -105,7 +105,7 @@ public class KafkaIOConfig implements IOConfig public String toString() { return "KafkaIOConfig{" + - "sequenceName='" + sequenceName + '\'' + + "baseSequenceName='" + baseSequenceName + '\'' + ", startPartitions=" + startPartitions + ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 6474ca5cd24..0c28768092b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -236,6 +236,12 @@ public class KafkaIndexTask extends AbstractTask } } + // Set up sequenceNames. + final Map sequenceNames = Maps.newHashMap(); + for (Integer partitionNum : nextOffsets.keySet()) { + sequenceNames.put(partitionNum, String.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum)); + } + // Set up committer. final Supplier committerSupplier = new Supplier() { @@ -295,7 +301,7 @@ public class KafkaIndexTask extends AbstractTask // Main loop. // Could eventually support early termination (triggered by a supervisor) // Could eventually support leader/follower mode (for keeping replicas more in sync) - boolean stillReading = true; + boolean stillReading = !assignment.isEmpty(); while (stillReading) { if (stopping) { log.info("Stopping early."); @@ -352,7 +358,11 @@ public class KafkaIndexTask extends AbstractTask try { final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); - final SegmentIdentifier identifier = driver.add(row, committerSupplier); + final SegmentIdentifier identifier = driver.add( + row, + sequenceNames.get(record.partition()), + committerSupplier + ); if (identifier == null) { // Failure to allocate segment puts determinism at risk, bail out to be safe. @@ -525,11 +535,7 @@ public class KafkaIndexTask extends AbstractTask { return new FiniteAppenderatorDriver( appenderator, - new ActionBasedSegmentAllocator( - toolbox.getTaskActionClient(), - dataSchema, - ioConfig.getSequenceName() - ), + new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema), toolbox.getSegmentHandoffNotifierFactory(), new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()), toolbox.getObjectMapper(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index ac51dfa0628..aab45e2cc67 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -170,7 +170,8 @@ public class KafkaIndexTaskTest new ProducerRecord("topic0", 0, null, JB("2011", "e", "y", 1.0f)), new ProducerRecord("topic0", 0, null, "unparseable".getBytes()), new ProducerRecord("topic0", 0, null, JB("2013", "f", "y", 1.0f)), - new ProducerRecord("topic0", 1, null, JB("2012", "g", "y", 1.0f)) + new ProducerRecord("topic0", 1, null, JB("2012", "g", "y", 1.0f)), + new ProducerRecord("topic0", 1, null, JB("2011", "h", "y", 1.0f)) ); static { @@ -377,6 +378,42 @@ public class KafkaIndexTaskTest Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); } + @Test(timeout = 60_000L) + public void testRunOnNothing() throws Exception + { + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + kafkaServer.consumerProperties(), + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(0, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + Assert.assertEquals(ImmutableSet.of(), publishedDescriptors()); + } + @Test(timeout = 60_000L) public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception { @@ -714,7 +751,7 @@ public class KafkaIndexTaskTest new KafkaIOConfig( "sequence0", new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)), - new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), true ), @@ -734,24 +771,30 @@ public class KafkaIndexTaskTest Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); // Check metrics - Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(5, task.getFireDepartmentMetrics().processed()); Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2012/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); + SegmentDescriptor desc3 = SD(task, "2011/P1D", 1); + SegmentDescriptor desc4 = SD(task, "2012/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 1L))), + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); // Check segments in deep storage Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); - Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc3)); + Assert.assertEquals(ImmutableList.of("g"), readSegmentDim1(desc4)); + + // Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically + Assert.assertEquals( + ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")), + ImmutableSet.of(readSegmentDim1(desc2), readSegmentDim1(desc3)) + ); } @Test(timeout = 60_000L) diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java index 9a98d430359..956a1443f95 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/test/TestBroker.java @@ -58,6 +58,8 @@ public class TestBroker implements Closeable { final Properties props = new Properties(); props.setProperty("zookeeper.connect", zookeeperConnect); + props.setProperty("zookeeper.session.timeout.ms", "30000"); + props.setProperty("zookeeper.connection.timeout.ms", "30000"); props.setProperty("log.dirs", directory.toString()); props.setProperty("broker.id", String.valueOf(id)); props.setProperty("port", String.valueOf(new Random().nextInt(9999) + 10000)); diff --git a/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java b/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java index 203363d79d4..de82223c516 100644 --- a/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java +++ b/indexing-service/src/main/java/io/druid/indexing/appenderator/ActionBasedSegmentAllocator.java @@ -32,22 +32,20 @@ public class ActionBasedSegmentAllocator implements SegmentAllocator { private final TaskActionClient taskActionClient; private final DataSchema dataSchema; - private final String sequenceName; public ActionBasedSegmentAllocator( TaskActionClient taskActionClient, - DataSchema dataSchema, - String sequenceName + DataSchema dataSchema ) { this.taskActionClient = taskActionClient; this.dataSchema = dataSchema; - this.sequenceName = sequenceName; } @Override public SegmentIdentifier allocate( final DateTime timestamp, + final String sequenceName, final String previousSegmentId ) throws IOException { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java index bda0d09f217..6a6a886a38a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriver.java @@ -27,7 +27,9 @@ import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -75,10 +77,13 @@ public class FiniteAppenderatorDriver implements Closeable private final int maxRowsPerSegment; private final long handoffConditionTimeout; - // Key = Start of segment intervals. Value = Segment we're currently adding data to. // All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments". - private final NavigableMap activeSegments = new TreeMap<>(); - private volatile String lastSegmentId = null; + + // sequenceName -> start of segment interval -> segment we're currently adding data to + private final Map> activeSegments = new TreeMap<>(); + + // sequenceName -> most recently allocated segment + private final Map lastSegmentIds = Maps.newHashMap(); // Notified when segments are dropped. private final Object handoffMonitor = new Object(); @@ -136,10 +141,17 @@ public class FiniteAppenderatorDriver implements Closeable if (metadata != null) { synchronized (activeSegments) { - for (SegmentIdentifier identifier : metadata.getActiveSegments()) { - activeSegments.put(identifier.getInterval().getStartMillis(), identifier); + for (Map.Entry> entry : metadata.getActiveSegments().entrySet()) { + final String sequenceName = entry.getKey(); + final TreeMap segmentMap = Maps.newTreeMap(); + + lastSegmentIds.put(sequenceName, metadata.getLastSegmentIds().get(sequenceName)); + activeSegments.put(sequenceName, segmentMap); + + for (SegmentIdentifier identifier : entry.getValue()) { + segmentMap.put(identifier.getInterval().getStartMillis(), identifier); + } } - lastSegmentId = metadata.getPreviousSegmentId(); } return metadata.getCallerMetadata(); @@ -163,6 +175,7 @@ public class FiniteAppenderatorDriver implements Closeable * Add a row. Must not be called concurrently from multiple threads. * * @param row the row to add + * @param sequenceName sequenceName for this row's segment * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * * @return segment to which this row was added, or null if segment allocator returned null for this row @@ -171,10 +184,15 @@ public class FiniteAppenderatorDriver implements Closeable */ public SegmentIdentifier add( final InputRow row, + final String sequenceName, final Supplier committerSupplier ) throws IOException { - final SegmentIdentifier identifier = getSegment(row.getTimestamp()); + Preconditions.checkNotNull(row, "row"); + Preconditions.checkNotNull(sequenceName, "sequenceName"); + Preconditions.checkNotNull(committerSupplier, "committerSupplier"); + + final SegmentIdentifier identifier = getSegment(row.getTimestamp(), sequenceName); if (identifier != null) { try { @@ -191,24 +209,10 @@ public class FiniteAppenderatorDriver implements Closeable return identifier; } - public int getActiveSegmentCount() - { - synchronized (activeSegments) { - return activeSegments.size(); - } - } - - public List getActiveSegments() - { - synchronized (activeSegments) { - return ImmutableList.copyOf(activeSegments.values()); - } - } - /** * Persist all data indexed through this driver so far. Blocks until complete. * - * Should be called after all data has been added through {@link #add(InputRow, Supplier)}. + * Should be called after all data has been added through {@link #add(InputRow, String, Supplier)}. * * @param committer committer representing all data that has been added so far * @@ -235,7 +239,7 @@ public class FiniteAppenderatorDriver implements Closeable * Publish all data indexed through this driver so far, and waits for it to be handed off. Blocks until complete. * Retries forever on transient failures, but may exit early on permanent failures. * - * Should be called after all data has been added and persisted through {@link #add(InputRow, Supplier)} and + * Should be called after all data has been added and persisted through {@link #add(InputRow, String, Supplier)} and * {@link #persist(Committer)}. * * @param publisher publisher to use for this set of segments @@ -256,7 +260,8 @@ public class FiniteAppenderatorDriver implements Closeable ? System.currentTimeMillis() + handoffConditionTimeout : 0; - log.info("Awaiting handoff..."); + log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments())); + synchronized (handoffMonitor) { while (!appenderator.getSegments().isEmpty()) { @@ -276,6 +281,8 @@ public class FiniteAppenderatorDriver implements Closeable } } + log.info("All segments handed off."); + return new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() @@ -294,10 +301,16 @@ public class FiniteAppenderatorDriver implements Closeable handoffNotifier.close(); } - private SegmentIdentifier getActiveSegment(final DateTime timestamp) + private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) { synchronized (activeSegments) { - final Map.Entry candidateEntry = activeSegments.floorEntry(timestamp.getMillis()); + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + + if (activeSegmentsForSequence == null) { + return null; + } + + final Map.Entry candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis()); if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) { return candidateEntry.getValue(); } else { @@ -309,39 +322,50 @@ public class FiniteAppenderatorDriver implements Closeable /** * Return a segment usable for "timestamp". May return null if no segment can be allocated. * - * @param timestamp data timestamp + * @param timestamp data timestamp + * @param sequenceName sequenceName for potential segment allocation * * @return identifier, or null * * @throws IOException if an exception occurs while allocating a segment */ - private SegmentIdentifier getSegment(final DateTime timestamp) throws IOException + private SegmentIdentifier getSegment(final DateTime timestamp, final String sequenceName) throws IOException { synchronized (activeSegments) { - final SegmentIdentifier existing = getActiveSegment(timestamp); + final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); if (existing != null) { return existing; } else { // Allocate new segment. - final SegmentIdentifier newSegment = segmentAllocator.allocate(timestamp, lastSegmentId); + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + final SegmentIdentifier newSegment = segmentAllocator.allocate( + timestamp, + sequenceName, + lastSegmentIds.get(sequenceName) + ); if (newSegment != null) { final Long key = newSegment.getInterval().getStartMillis(); - final SegmentIdentifier conflicting = activeSegments.get(key); - if (conflicting != null) { - throw new ISE( - "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", - newSegment, - conflicting - ); + + for (SegmentIdentifier identifier : appenderator.getSegments()) { + if (identifier.equals(newSegment)) { + throw new ISE( + "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", + newSegment, + identifier + ); + } } - log.info("New segment[%s].", newSegment); - activeSegments.put(key, newSegment); - lastSegmentId = newSegment.getIdentifierAsString(); + log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); + if (activeSegmentsForSequence == null) { + activeSegments.put(sequenceName, Maps.newTreeMap()); + } + activeSegments.get(sequenceName).put(key, newSegment); + lastSegmentIds.put(sequenceName, newSegment.getIdentifierAsString()); } else { // Well, we tried. - log.warn("Cannot allocate segment for timestamp[%s]. ", timestamp); + log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); } return newSegment; @@ -368,14 +392,15 @@ public class FiniteAppenderatorDriver implements Closeable /** * Push and publish all segments to the metadata store. * - * @param publisher segment publisher - * @param committer wrapped committer (from wrapCommitter) + * @param publisher segment publisher + * @param wrappedCommitter wrapped committer (from wrapCommitter) * - * @return published segments and metadata + * @return published segments and metadata, or null if segments could not be published due to transaction failure + * with commit metadata. */ private SegmentsAndMetadata publishAll( final TransactionalSegmentPublisher publisher, - final Committer committer + final Committer wrappedCommitter ) throws InterruptedException { final List theSegments = ImmutableList.copyOf(appenderator.getSegments()); @@ -384,7 +409,7 @@ public class FiniteAppenderatorDriver implements Closeable while (true) { try { log.info("Pushing segments: [%s]", Joiner.on(", ").join(theSegments)); - final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, committer).get(); + final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, wrappedCommitter).get(); // Sanity check if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(theSegments))) { @@ -401,21 +426,25 @@ public class FiniteAppenderatorDriver implements Closeable Joiner.on(", ").join(segmentsAndMetadata.getSegments()) ); - final boolean published = publisher.publishSegments( - ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() - ); - - if (published) { - log.info("Published segments, awaiting handoff."); + if (segmentsAndMetadata.getSegments().isEmpty()) { + log.info("Nothing to publish, skipping publish step."); } else { - log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); - if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments())) - .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { - log.info("Our segments really do exist, awaiting handoff."); + final boolean published = publisher.publishSegments( + ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), + ((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ); + + if (published) { + log.info("Published segments, awaiting handoff."); } else { - log.warn("Our segments don't exist, giving up."); - return null; + log.info("Transaction failure while publishing segments, checking if someone else beat us to it."); + if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments())) + .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { + log.info("Our segments really do exist, awaiting handoff."); + } else { + log.warn("Our segments don't exist, giving up."); + return null; + } } } @@ -491,8 +520,20 @@ public class FiniteAppenderatorDriver implements Closeable { synchronized (activeSegments) { final FiniteAppenderatorDriverMetadata wrappedMetadata = new FiniteAppenderatorDriverMetadata( - ImmutableList.copyOf(activeSegments.values()), - lastSegmentId, + ImmutableMap.copyOf( + Maps.transformValues( + activeSegments, + new Function, List>() + { + @Override + public List apply(NavigableMap input) + { + return ImmutableList.copyOf(input.values()); + } + } + ) + ), + ImmutableMap.copyOf(lastSegmentIds), committer.getMetadata() ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java index 9a1f469f5da..65c7e70c788 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverMetadata.java @@ -23,35 +23,36 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +import java.util.Map; public class FiniteAppenderatorDriverMetadata { - private final List activeSegments; - private final String previousSegmentId; + private final Map> activeSegments; + private final Map lastSegmentIds; private final Object callerMetadata; @JsonCreator public FiniteAppenderatorDriverMetadata( - @JsonProperty("activeSegments") List activeSegments, - @JsonProperty("previousSegmentId") String previousSegmentId, + @JsonProperty("activeSegments") Map> activeSegments, + @JsonProperty("lastSegmentIds") Map lastSegmentIds, @JsonProperty("callerMetadata") Object callerMetadata ) { this.activeSegments = activeSegments; - this.previousSegmentId = previousSegmentId; + this.lastSegmentIds = lastSegmentIds; this.callerMetadata = callerMetadata; } @JsonProperty - public List getActiveSegments() + public Map> getActiveSegments() { return activeSegments; } @JsonProperty - public String getPreviousSegmentId() + public Map getLastSegmentIds() { - return previousSegmentId; + return lastSegmentIds; } @JsonProperty @@ -65,7 +66,7 @@ public class FiniteAppenderatorDriverMetadata { return "FiniteAppenderatorDriverMetadata{" + "activeSegments=" + activeSegments + - ", previousSegmentId='" + previousSegmentId + '\'' + + ", lastSegmentIds=" + lastSegmentIds + ", callerMetadata=" + callerMetadata + '}'; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java index 65680c5539b..94b5ba245ff 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentAllocator.java @@ -29,12 +29,14 @@ public interface SegmentAllocator * Allocates a new segment for a given timestamp. * * @param timestamp timestamp of the event which triggered this allocation request - * @param previousSegmentId segment identifier returned on the previous call to allocate + * @param sequenceName sequenceName for this allocation + * @param previousSegmentId segment identifier returned on the previous call to allocate for your sequenceName * * @return the pending segment identifier, or null if it was impossible to allocate a new segment */ SegmentIdentifier allocate( DateTime timestamp, + String sequenceName, String previousSegmentId ) throws IOException; } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java index 8211d2520bd..2e108fc45a2 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/FiniteAppenderatorDriverTest.java @@ -85,6 +85,7 @@ public class FiniteAppenderatorDriverTest ) ); + SegmentAllocator allocator; AppenderatorTester appenderatorTester; FiniteAppenderatorDriver driver; @@ -92,9 +93,10 @@ public class FiniteAppenderatorDriverTest public void setUp() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); + allocator = new TestSegmentAllocator(DATA_SOURCE, Granularity.HOUR); driver = new FiniteAppenderatorDriver( appenderatorTester.getAppenderator(), - new TestSegmentAllocator(DATA_SOURCE, Granularity.HOUR), + allocator, new TestSegmentHandoffNotifierFactory(), new TestUsedSegmentChecker(), OBJECT_MAPPER, @@ -119,7 +121,7 @@ public class FiniteAppenderatorDriverTest for (int i = 0; i < ROWS.size(); i++) { committerSupplier.setMetadata(i + 1); - Assert.assertNotNull(driver.add(ROWS.get(i), committerSupplier)); + Assert.assertNotNull(driver.add(ROWS.get(i), "dummy", committerSupplier)); } final SegmentsAndMetadata segmentsAndMetadata = driver.finish( @@ -212,6 +214,7 @@ public class FiniteAppenderatorDriverTest @Override public SegmentIdentifier allocate( final DateTime timestamp, + final String sequenceName, final String previousSegmentId ) throws IOException {