SegmentAllocateAction (fixes #1515)

This is a feature meant to allow realtime tasks to work without being told upfront
what shardSpec they should use (so we can potentially publish a variable number
of segments per interval).

The idea is that there is a "pendingSegments" table in the metadata store that
tracks allocated segments. Each one has a segment id (the same segment id we know
and love) and is also part of a sequence.

The sequences are an idea from @cheddar that offers a way of doing replication.
If there are N tasks reading exactly the same data with exactly the same logic
(think Kafka tasks reading a fixed range of offsets) then you can place them
in the same sequence, and they will generate the same sequence of segments.
This commit is contained in:
Gian Merlino 2015-10-30 14:39:16 -07:00
parent 8e743b70c6
commit e4e5f0375b
17 changed files with 1651 additions and 99 deletions

View File

@ -21,7 +21,7 @@ package io.druid.metadata;
*/ */
public interface MetadataStorageConnector public interface MetadataStorageConnector
{ {
public Void insertOrUpdate( Void insertOrUpdate(
final String tableName, final String tableName,
final String keyColumn, final String keyColumn,
final String valueColumn, final String valueColumn,
@ -30,20 +30,22 @@ public interface MetadataStorageConnector
) throws Exception; ) throws Exception;
public byte[] lookup( byte[] lookup(
final String tableName, final String tableName,
final String keyColumn, final String keyColumn,
final String valueColumn, final String valueColumn,
final String key final String key
); );
public void createSegmentTable(); void createPendingSegmentsTable();
public void createRulesTable(); void createSegmentTable();
public void createConfigTable(); void createRulesTable();
public void createTaskTables(); void createConfigTable();
public void createAuditTable(); void createTaskTables();
void createAuditTable();
} }

View File

@ -29,7 +29,7 @@ public class MetadataStorageTablesConfig
{ {
public static MetadataStorageTablesConfig fromBase(String base) public static MetadataStorageTablesConfig fromBase(String base)
{ {
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null); return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null);
} }
public static final String TASK_ENTRY_TYPE = "task"; public static final String TASK_ENTRY_TYPE = "task";
@ -43,6 +43,9 @@ public class MetadataStorageTablesConfig
@JsonProperty("base") @JsonProperty("base")
private final String base; private final String base;
@JsonProperty("pendingSegments")
private final String pendingSegmentsTable;
@JsonProperty("segments") @JsonProperty("segments")
private final String segmentsTable; private final String segmentsTable;
@ -67,6 +70,7 @@ public class MetadataStorageTablesConfig
@JsonCreator @JsonCreator
public MetadataStorageTablesConfig( public MetadataStorageTablesConfig(
@JsonProperty("base") String base, @JsonProperty("base") String base,
@JsonProperty("pendingSegments") String pendingSegmentsTable,
@JsonProperty("segments") String segmentsTable, @JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable, @JsonProperty("rules") String rulesTable,
@JsonProperty("config") String configTable, @JsonProperty("config") String configTable,
@ -77,6 +81,7 @@ public class MetadataStorageTablesConfig
) )
{ {
this.base = (base == null) ? DEFAULT_BASE : base; this.base = (base == null) ? DEFAULT_BASE : base;
this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments"); this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules"); this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config"); this.configTable = makeTableName(configTable, "config");
@ -108,6 +113,11 @@ public class MetadataStorageTablesConfig
return base; return base;
} }
public String getPendingSegmentsTable()
{
return pendingSegmentsTable;
}
public String getSegmentsTable() public String getSegmentsTable()
{ {
return segmentsTable; return segmentsTable;

View File

@ -35,7 +35,7 @@ public class PostgreSQLConnectorTest
{ {
PostgreSQLConnector connector = new PostgreSQLConnector( PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null)) Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null))
); );
Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE"))); Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE")));

View File

@ -92,6 +92,7 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
null, null,
null, null,
null, null,
null,
null null
); );
} }

View File

@ -0,0 +1,284 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.client.repackaged.com.google.common.base.Preconditions;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.api.client.util.Lists;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableSet;
import com.google.common.primitives.Longs;
import com.metamx.common.Granularity;
import com.metamx.common.IAE;
import com.metamx.common.logger.Logger;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
/**
* Allocates a pending segment for a given timestamp. The preferredSegmentGranularity is used if there are no prior
* segments for the given timestamp, or if the prior segments for the given timestamp are already at the
* preferredSegmentGranularity. Otherwise, the prior segments will take precedence.
* <p/>
* This action implicitly acquires locks when it allocates segments. You do not have to acquire them beforehand,
* although you *do* have to release them yourself.
* <p/>
* If this action cannot acquire an appropriate lock, or if it cannot expand an existing segment set, it will return
* a missing Optional.
*/
public class SegmentAllocateAction implements TaskAction<Optional<SegmentIdentifier>>
{
private static final Logger log = new Logger(SegmentAllocateAction.class);
// Prevent spinning forever in situations where the segment list just won't stop changing.
private static final int MAX_ATTEMPTS = 90;
private final String dataSource;
private final DateTime timestamp;
private final QueryGranularity queryGranularity;
private final Granularity preferredSegmentGranularity;
private final String sequenceName;
private final String previousSegmentId;
public static List<Granularity> granularitiesFinerThan(final Granularity gran0)
{
final DateTime epoch = new DateTime(0);
final List<Granularity> retVal = Lists.newArrayList();
for (Granularity gran : Granularity.values()) {
if (gran.bucket(epoch).toDurationMillis() <= gran0.bucket(epoch).toDurationMillis()) {
retVal.add(gran);
}
}
Collections.sort(
retVal,
new Comparator<Granularity>()
{
@Override
public int compare(Granularity g1, Granularity g2)
{
return Longs.compare(g2.bucket(epoch).toDurationMillis(), g1.bucket(epoch).toDurationMillis());
}
}
);
return retVal;
}
public SegmentAllocateAction(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("timestamp") DateTime timestamp,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity,
@JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
@JsonProperty("sequenceName") String sequenceName,
@JsonProperty("previousSegmentId") String previousSegmentId
)
{
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
this.queryGranularity = Preconditions.checkNotNull(queryGranularity, "queryGranularity");
this.preferredSegmentGranularity = Preconditions.checkNotNull(
preferredSegmentGranularity,
"preferredSegmentGranularity"
);
this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
this.previousSegmentId = previousSegmentId;
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public DateTime getTimestamp()
{
return timestamp;
}
@JsonProperty
public QueryGranularity getQueryGranularity()
{
return queryGranularity;
}
@JsonProperty
public Granularity getPreferredSegmentGranularity()
{
return preferredSegmentGranularity;
}
@JsonProperty
public String getSequenceName()
{
return sequenceName;
}
@JsonProperty
public String getPreviousSegmentId()
{
return previousSegmentId;
}
@Override
public TypeReference<Optional<SegmentIdentifier>> getReturnTypeReference()
{
return new TypeReference<Optional<SegmentIdentifier>>()
{
};
}
@Override
public Optional<SegmentIdentifier> perform(
final Task task,
final TaskActionToolbox toolbox
) throws IOException
{
int attempt = 0;
while (true) {
attempt++;
if (!task.getDataSource().equals(dataSource)) {
throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource);
}
final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator();
// 1) if something overlaps our timestamp, use that
// 2) otherwise try preferredSegmentGranularity & going progressively smaller
final List<Interval> tryIntervals = Lists.newArrayList();
final Interval rowInterval = new Interval(
queryGranularity.truncate(timestamp.getMillis()),
queryGranularity.next(queryGranularity.truncate(timestamp.getMillis()))
);
final Set<DataSegment> usedSegmentsForRow = ImmutableSet.copyOf(
msc.getUsedSegmentsForInterval(dataSource, rowInterval)
);
if (usedSegmentsForRow.isEmpty()) {
// No existing segments for this row, but there might still be nearby ones that conflict with our preferred
// segment granularity. Try that first, and then progressively smaller ones if it fails.
for (Granularity gran : granularitiesFinerThan(preferredSegmentGranularity)) {
tryIntervals.add(gran.bucket(timestamp));
}
} else {
// Existing segment(s) exist for this row; use the interval of the first one.
tryIntervals.add(usedSegmentsForRow.iterator().next().getInterval());
}
for (final Interval tryInterval : tryIntervals) {
if (tryInterval.contains(rowInterval)) {
log.debug(
"Trying to allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
final TaskLock tryLock = toolbox.getTaskLockbox().tryLock(task, tryInterval).orNull();
if (tryLock != null) {
final SegmentIdentifier identifier = msc.allocatePendingSegment(
dataSource,
sequenceName,
previousSegmentId,
tryInterval,
tryLock.getVersion()
);
if (identifier != null) {
return Optional.of(identifier);
} else {
log.debug(
"Could not allocate pending segment for rowInterval[%s], segmentInterval[%s].",
rowInterval,
tryInterval
);
}
} else {
log.debug("Could not acquire lock for rowInterval[%s], segmentInterval[%s].", rowInterval, tryInterval);
}
}
}
// Could not allocate a pending segment. There's a chance that this is because someone else inserted a segment
// overlapping with this row between when we called "mdc.getUsedSegmentsForInterval" and now. Check it again,
// and if it's different, repeat.
if (!ImmutableSet.copyOf(msc.getUsedSegmentsForInterval(dataSource, rowInterval)).equals(usedSegmentsForRow)) {
if (attempt < MAX_ATTEMPTS) {
final long shortRandomSleep = 50 + (long) (Math.random() * 450);
log.debug(
"Used segment set changed for rowInterval[%s]. Retrying segment allocation in %,dms (attempt = %,d).",
rowInterval,
shortRandomSleep,
attempt
);
try {
Thread.sleep(shortRandomSleep);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw Throwables.propagate(e);
}
} else {
log.error(
"Used segment set changed for rowInterval[%s]. Not trying again (attempt = %,d).",
rowInterval,
attempt
);
return Optional.absent();
}
} else {
return Optional.absent();
}
}
}
@Override
public boolean isAudited()
{
return false;
}
@Override
public String toString()
{
return "SegmentAllocateAction{" +
"dataSource='" + dataSource + '\'' +
", timestamp=" + timestamp +
", queryGranularity=" + queryGranularity +
", preferredSegmentGranularity=" + preferredSegmentGranularity +
", sequenceName='" + sequenceName + '\'' +
", previousSegmentId='" + previousSegmentId + '\'' +
'}';
}
}

View File

@ -34,7 +34,8 @@ import java.io.IOException;
@JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class), @JsonSubTypes.Type(name = "segmentListUsed", value = SegmentListUsedAction.class),
@JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class),
@JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class),
@JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class) @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class),
@JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class)
}) })
public interface TaskAction<RetType> public interface TaskAction<RetType>
{ {

View File

@ -65,24 +65,6 @@ public class TaskActionToolbox
return emitter; return emitter;
} }
public boolean segmentsAreFromSamePartitionSet(
final Set<DataSegment> segments
)
{
// Verify that these segments are all in the same partition set
Preconditions.checkArgument(!segments.isEmpty(), "segments nonempty");
final DataSegment firstSegment = segments.iterator().next();
for (final DataSegment segment : segments) {
if (!segment.getDataSource().equals(firstSegment.getDataSource())
|| !segment.getInterval().equals(firstSegment.getInterval())
|| !segment.getVersion().equals(firstSegment.getVersion())) {
return false;
}
}
return true;
}
public void verifyTaskLocks( public void verifyTaskLocks(
final Task task, final Task task,
final Set<DataSegment> segments final Set<DataSegment> segments

View File

@ -0,0 +1,738 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.common.Granularity;
import com.metamx.common.ISE;
import io.druid.granularity.DurationGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
public class SegmentAllocateActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit taskActionTestKit = new TaskActionTestKit();
private static final String DATA_SOURCE = "none";
private static final DateTime PARTY_TIME = new DateTime("1999");
private static final DateTime THE_DISTANT_FUTURE = new DateTime("3000");
@Test
public void testGranularitiesFinerThanDay() throws Exception
{
Assert.assertEquals(
ImmutableList.of(
Granularity.DAY,
Granularity.SIX_HOUR,
Granularity.HOUR,
Granularity.FIFTEEN_MINUTE,
Granularity.TEN_MINUTE,
Granularity.FIVE_MINUTE,
Granularity.MINUTE,
Granularity.SECOND
),
SegmentAllocateAction.granularitiesFinerThan(Granularity.DAY)
);
}
@Test
public void testGranularitiesFinerThanHour() throws Exception
{
Assert.assertEquals(
ImmutableList.of(
Granularity.HOUR,
Granularity.FIFTEEN_MINUTE,
Granularity.TEN_MINUTE,
Granularity.FIVE_MINUTE,
Granularity.MINUTE,
Granularity.SECOND
),
SegmentAllocateAction.granularitiesFinerThan(Granularity.HOUR)
);
}
@Test
public void testManySegmentsSameInterval() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
null
);
final SegmentIdentifier id2 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id3 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id2.getIdentifierAsString()
);
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(2, 0)
)
);
}
@Test
public void testResumeSequence() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
null
);
final SegmentIdentifier id2 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id3 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id2.getIdentifierAsString()
);
final SegmentIdentifier id4 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id5 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id6 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.MINUTE,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id7 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.DAY,
"s1",
id1.getIdentifierAsString()
);
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
Assert.assertNull(id4);
assertSameIdentifier(id5, id2);
Assert.assertNull(id6);
assertSameIdentifier(id7, id2);
}
@Test
public void testMultipleSequences() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
final SegmentIdentifier id2 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s2", null);
final SegmentIdentifier id3 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
final SegmentIdentifier id4 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id3.getIdentifierAsString()
);
final SegmentIdentifier id5 = allocate(
task,
THE_DISTANT_FUTURE,
QueryGranularity.NONE,
Granularity.HOUR,
"s2",
id2.getIdentifierAsString()
);
final SegmentIdentifier id6 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
final TaskLock partyLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(PARTY_TIME);
}
}
)
);
final TaskLock futureLock = Iterables.getOnlyElement(
FluentIterable.from(taskActionTestKit.getTaskLockbox().findLocksForTask(task))
.filter(
new Predicate<TaskLock>()
{
@Override
public boolean apply(TaskLock input)
{
return input.getInterval().contains(THE_DISTANT_FUTURE);
}
}
)
);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id2,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
assertSameIdentifier(
id3,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
partyLock.getVersion(),
new NumberedShardSpec(2, 0)
)
);
assertSameIdentifier(
id4,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(0, 0)
)
);
assertSameIdentifier(
id5,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(THE_DISTANT_FUTURE),
futureLock.getVersion(),
new NumberedShardSpec(1, 0)
)
);
assertSameIdentifier(
id6,
id1
);
}
@Test
public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new LinearShardSpec(0))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new LinearShardSpec(1))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
null
);
final SegmentIdentifier id2 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new LinearShardSpec(2)
)
);
assertSameIdentifier(
id2,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new LinearShardSpec(3)
)
);
}
@Test
public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
null
);
final SegmentIdentifier id2 = allocate(
task,
PARTY_TIME,
QueryGranularity.NONE,
Granularity.HOUR,
"s1",
id1.getIdentifierAsString()
);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
)
);
assertSameIdentifier(
id2,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(3, 2)
)
);
}
@Test
public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.DAY, "s1", null);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
)
);
}
@Test
public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.MINUTE, "s1", null);
assertSameIdentifier(
id1,
new SegmentIdentifier(
DATA_SOURCE,
Granularity.HOUR.bucket(PARTY_TIME),
PARTY_TIME.toString(),
new NumberedShardSpec(2, 2)
)
);
}
@Test
public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(0, 2))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new NumberedShardSpec(1, 2))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.DAY, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testCannotDoAnythingWithSillyQueryGranularity() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.DAY, Granularity.HOUR, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testCannotAddToExistingSingleDimensionShardSpecs() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new SingleDimensionShardSpec("foo", null, "bar", 0))
.build(),
DataSegment.builder()
.dataSource(DATA_SOURCE)
.interval(Granularity.HOUR.bucket(PARTY_TIME))
.version(PARTY_TIME.toString())
.shardSpec(new SingleDimensionShardSpec("foo", "bar", null, 1))
.build()
)
);
taskActionTestKit.getTaskLockbox().add(task);
final SegmentIdentifier id1 = allocate(task, PARTY_TIME, QueryGranularity.NONE, Granularity.HOUR, "s1", null);
Assert.assertNull(id1);
}
@Test
public void testSerde() throws Exception
{
final SegmentAllocateAction action = new SegmentAllocateAction(
DATA_SOURCE,
PARTY_TIME,
QueryGranularity.MINUTE,
Granularity.HOUR,
"s1",
"prev"
);
final ObjectMapper objectMapper = new DefaultObjectMapper();
final SegmentAllocateAction action2 = (SegmentAllocateAction) objectMapper.readValue(
objectMapper.writeValueAsBytes(action),
TaskAction.class
);
Assert.assertEquals(DATA_SOURCE, action2.getDataSource());
Assert.assertEquals(PARTY_TIME, action2.getTimestamp());
Assert.assertEquals(new DurationGranularity(60000, 0), action2.getQueryGranularity());
Assert.assertSame(Granularity.HOUR, action2.getPreferredSegmentGranularity());
Assert.assertEquals("s1", action2.getSequenceName());
Assert.assertEquals("prev", action2.getPreviousSegmentId());
}
private SegmentIdentifier allocate(
final Task task,
final DateTime timestamp,
final QueryGranularity queryGranularity,
final Granularity preferredSegmentGranularity,
final String sequenceName,
final String sequencePreviousId
) throws Exception
{
final SegmentAllocateAction action = new SegmentAllocateAction(
DATA_SOURCE,
timestamp,
queryGranularity,
preferredSegmentGranularity,
sequenceName,
sequencePreviousId
);
return action.perform(task, taskActionTestKit.getTaskActionToolbox()).orNull();
}
private void assertSameIdentifier(final SegmentIdentifier one, final SegmentIdentifier other)
{
Assert.assertEquals(one, other);
Assert.assertEquals(one.getShardSpec().getPartitionNum(), other.getShardSpec().getPartitionNum());
if (one.getShardSpec().getClass() == NumberedShardSpec.class
&& other.getShardSpec().getClass() == NumberedShardSpec.class) {
Assert.assertEquals(
((NumberedShardSpec) one.getShardSpec()).getPartitions(),
((NumberedShardSpec) other.getShardSpec()).getPartitions()
);
} else if (one.getShardSpec().getClass() == LinearShardSpec.class
&& other.getShardSpec().getClass() == LinearShardSpec.class) {
// do nothing
} else {
throw new ISE(
"Unexpected shardSpecs [%s] and [%s]",
one.getShardSpec().getClass(),
other.getShardSpec().getClass()
);
}
}
}

View File

@ -0,0 +1,114 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.google.common.base.Suppliers;
import io.druid.indexing.common.TestUtils;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskStorage;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.metrics.NoopServiceEmitter;
import org.joda.time.Period;
import org.junit.rules.ExternalResource;
public class TaskActionTestKit extends ExternalResource
{
private final MetadataStorageTablesConfig metadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase("druid");
private TaskStorage taskStorage;
private TaskLockbox taskLockbox;
private TestDerbyConnector testDerbyConnector;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
private TaskActionToolbox taskActionToolbox;
public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{
return metadataStorageTablesConfig;
}
public TaskStorage getTaskStorage()
{
return taskStorage;
}
public TaskLockbox getTaskLockbox()
{
return taskLockbox;
}
public TestDerbyConnector getTestDerbyConnector()
{
return testDerbyConnector;
}
public IndexerMetadataStorageCoordinator getMetadataStorageCoordinator()
{
return metadataStorageCoordinator;
}
public TaskActionToolbox getTaskActionToolbox()
{
return taskActionToolbox;
}
@Override
public void before()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(new Period("PT24H")));
taskLockbox = new TaskLockbox(taskStorage);
testDerbyConnector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(metadataStorageTablesConfig)
);
metadataStorageCoordinator = new IndexerSQLMetadataStorageCoordinator(
new TestUtils().getTestObjectMapper(),
metadataStorageTablesConfig,
testDerbyConnector
);
taskActionToolbox = new TaskActionToolbox(
taskLockbox,
metadataStorageCoordinator,
new NoopServiceEmitter()
);
testDerbyConnector.createPendingSegmentsTable();
testDerbyConnector.createSegmentTable();
testDerbyConnector.createRulesTable();
testDerbyConnector.createConfigTable();
testDerbyConnector.createTaskTables();
testDerbyConnector.createAuditTable();
}
@Override
public void after()
{
testDerbyConnector.tearDown();
taskStorage = null;
taskLockbox = null;
testDerbyConnector = null;
metadataStorageCoordinator = null;
taskActionToolbox = null;
}
}

View File

@ -17,15 +17,19 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import junit.framework.Assert;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.util.List;
public class TaskLockboxTest public class TaskLockboxTest
{ {
private TaskStorage taskStorage; private TaskStorage taskStorage;
@ -33,7 +37,8 @@ public class TaskLockboxTest
private TaskLockbox lockbox; private TaskLockbox lockbox;
@Before @Before
public void setUp(){ public void setUp()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
lockbox = new TaskLockbox(taskStorage); lockbox = new TaskLockbox(taskStorage);
} }
@ -49,7 +54,7 @@ public class TaskLockboxTest
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testLockForInactiveTask() throws InterruptedException public void testLockForInactiveTask() throws InterruptedException
{ {
lockbox.lock(NoopTask.create(),new Interval("2015-01-01/2015-01-02")); lockbox.lock(NoopTask.create(), new Interval("2015-01-01/2015-01-02"));
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -78,7 +83,30 @@ public class TaskLockboxTest
// Now task2 should be able to get the lock // Now task2 should be able to get the lock
Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent()); Assert.assertTrue(lockbox.tryLock(task2, new Interval("2015-01-01/2015-01-02")).isPresent());
}
@Test
public void testTrySmallerLock() throws InterruptedException
{
Task task = NoopTask.create();
lockbox.add(task);
Optional<TaskLock> lock1 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-03"));
Assert.assertTrue(lock1.isPresent());
Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock1.get().getInterval());
// same task tries to take partially overlapping interval; should fail
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-02/2015-01-04")).isPresent());
// same task tries to take contained interval; should succeed and should match the original lock
Optional<TaskLock> lock2 = lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02"));
Assert.assertTrue(lock2.isPresent());
Assert.assertEquals(new Interval("2015-01-01/2015-01-03"), lock2.get().getInterval());
// only the first lock should actually exist
Assert.assertEquals(
ImmutableList.of(lock1.get()),
lockbox.findLocksForTask(task)
);
} }
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
@ -93,8 +121,8 @@ public class TaskLockboxTest
Task task = NoopTask.create(); Task task = NoopTask.create();
lockbox.add(task); lockbox.add(task);
lockbox.remove(task); lockbox.remove(task);
Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent()); } Assert.assertFalse(lockbox.tryLock(task, new Interval("2015-01-01/2015-01-02")).isPresent());
}
} }

View File

@ -198,7 +198,7 @@ public class SQLMetadataStorageDruidModule implements Module
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class)) PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding(type) .addBinding(type)
.to(IndexerSQLMetadataStorageCoordinator.class) .to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class); .in(ManageLifecycle.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class)) PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding(type) .addBinding(type)

View File

@ -17,6 +17,7 @@
package io.druid.indexing.overlord; package io.druid.indexing.overlord;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -51,6 +52,31 @@ public interface IndexerMetadataStorageCoordinator
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException; public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException;
/**
* Allocate a new pending segment in the pending segments table. This segment identifier will never be given out
* again, <em>unless</em> another call is made with the same dataSource, sequenceName, and previousSegmentId.
* <p/>
* The sequenceName and previousSegmentId parameters are meant to make it easy for two independent ingestion tasks
* to produce the same series of segments.
* <p/>
* Note that a segment sequence may include segments with a variety of different intervals and versions.
*
* @param dataSource dataSource for which to allocate a segment
* @param sequenceName name of the group of ingestion tasks producing a segment series
* @param previousSegmentId previous segment in the series; may be null or empty, meaning this is the first segment
* @param interval interval for which to allocate a segment
* @param maxVersion use this version if we have no better version to use. The returned segment identifier may
* have a version lower than this one, but will not have one higher.
*
* @return the pending segment identifier, or null if it was impossible to allocate a new segment
*/
SegmentIdentifier allocatePendingSegment(
String dataSource,
String sequenceName,
String previousSegmentId,
Interval interval,
String maxVersion
) throws IOException;
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException; public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException;

View File

@ -19,6 +19,7 @@ package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -26,12 +27,17 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.skife.jdbi.v2.FoldController; import org.skife.jdbi.v2.FoldController;
@ -72,16 +78,93 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
this.connector = connector; this.connector = connector;
} }
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) @LifecycleStart
throws IOException public void start()
{ {
final VersionedIntervalTimeline<String, DataSegment> timeline = connector.getDBI().withHandle( connector.createPendingSegmentsTable();
new HandleCallback<VersionedIntervalTimeline<String, DataSegment>>() connector.createSegmentTable();
}
public List<DataSegment> getUsedSegmentsForInterval(
final String dataSource,
final Interval interval
) throws IOException
{
return connector.retryWithHandle(
new HandleCallback<List<DataSegment>>()
{ {
@Override @Override
public VersionedIntervalTimeline<String, DataSegment> withHandle(Handle handle) throws IOException public List<DataSegment> withHandle(Handle handle) throws Exception
{ {
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<String, DataSegment>( final VersionedIntervalTimeline<String, DataSegment> timeline = getTimelineForIntervalWithHandle(
handle,
dataSource,
interval
);
return Lists.newArrayList(
Iterables.concat(
Iterables.transform(
timeline.lookup(interval),
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>()
{
@Override
public Iterable<DataSegment> apply(TimelineObjectHolder<String, DataSegment> input)
{
return input.getObject().payloads();
}
}
)
)
);
}
}
);
}
private List<SegmentIdentifier> getPendingSegmentsForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{
final List<SegmentIdentifier> identifiers = Lists.newArrayList();
final ResultIterator<byte[]> dbSegments =
handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource AND start <= :end and \"end\" >= :start",
dbTables.getPendingSegmentsTable()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.map(ByteArrayMapper.FIRST)
.iterator();
while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();
final SegmentIdentifier identifier = jsonMapper.readValue(payload, SegmentIdentifier.class);
if (interval.overlaps(identifier.getInterval())) {
identifiers.add(identifier);
}
}
dbSegments.close();
return identifiers;
}
private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(
Ordering.natural() Ordering.natural()
); );
@ -113,26 +196,6 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
dbSegments.close(); dbSegments.close();
return timeline; return timeline;
}
}
);
return Lists.newArrayList(
Iterables.concat(
Iterables.transform(
timeline.lookup(interval),
new Function<TimelineObjectHolder<String, DataSegment>, Iterable<DataSegment>>()
{
@Override
public Iterable<DataSegment> apply(TimelineObjectHolder<String, DataSegment> input)
{
return input.getObject().payloads();
}
}
)
)
);
} }
/** /**
@ -140,6 +203,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
* with identifiers already in the database will not be added). * with identifiers already in the database will not be added).
* *
* @param segments set of segments to add * @param segments set of segments to add
*
* @return set of segments actually added * @return set of segments actually added
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
@ -164,6 +228,204 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
); );
} }
@Override
public SegmentIdentifier allocatePendingSegment(
final String dataSource,
final String sequenceName,
final String previousSegmentId,
final Interval interval,
final String maxVersion
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(sequenceName, "sequenceName");
Preconditions.checkNotNull(interval, "interval");
Preconditions.checkNotNull(maxVersion, "maxVersion");
final String previousSegmentIdNotNull = previousSegmentId == null ? "" : previousSegmentId;
return connector.retryTransaction(
new TransactionCallback<SegmentIdentifier>()
{
@Override
public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final List<byte[]> existingBytes = handle
.createQuery(
String.format(
"SELECT payload FROM %s WHERE "
+ "dataSource = :dataSource AND "
+ "sequence_name = :sequence_name AND "
+ "sequence_prev_id = :sequence_prev_id",
dbTables.getPendingSegmentsTable()
)
).bind("dataSource", dataSource)
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.map(ByteArrayMapper.FIRST)
.list();
if (!existingBytes.isEmpty()) {
final SegmentIdentifier existingIdentifier = jsonMapper.readValue(
Iterables.getOnlyElement(existingBytes),
SegmentIdentifier.class
);
if (existingIdentifier.getInterval().getStartMillis() == interval.getStartMillis()
&& existingIdentifier.getInterval().getEndMillis() == interval.getEndMillis()) {
log.info(
"Found existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);
return existingIdentifier;
} else {
log.warn(
"Cannot use existing pending segment [%s] for sequence[%s] (previous = [%s]) in DB, "
+ "does not match requested interval[%s]",
existingIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull,
interval
);
return null;
}
}
// Make up a pending segment based on existing segments and pending segments in the DB. This works
// assuming that all tasks inserting segments at a particular point in time are going through the
// allocatePendingSegment flow. This should be assured through some other mechanism (like task locks).
final SegmentIdentifier newIdentifier;
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalWithHandle(
handle,
dataSource,
interval
).lookup(interval);
if (existingChunks.size() > 1) {
// Not possible to expand more than one chunk with a single segment.
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: already have [%,d] chunks.",
dataSource,
interval,
maxVersion,
existingChunks.size()
);
return null;
} else {
SegmentIdentifier max = null;
if (!existingChunks.isEmpty()) {
TimelineObjectHolder<String, DataSegment> existingHolder = Iterables.getOnlyElement(existingChunks);
for (PartitionChunk<DataSegment> existing : existingHolder.getObject()) {
if (max == null || max.getShardSpec().getPartitionNum() < existing.getObject()
.getShardSpec()
.getPartitionNum()) {
max = SegmentIdentifier.fromDataSegment(existing.getObject());
}
}
}
final List<SegmentIdentifier> pendings = getPendingSegmentsForIntervalWithHandle(
handle,
dataSource,
interval
);
for (SegmentIdentifier pending : pendings) {
if (max == null ||
pending.getVersion().compareTo(max.getVersion()) > 0 ||
(pending.getVersion().equals(max.getVersion())
&& pending.getShardSpec().getPartitionNum() > max.getShardSpec().getPartitionNum())) {
max = pending;
}
}
if (max == null) {
newIdentifier = new SegmentIdentifier(
dataSource,
interval,
maxVersion,
new NumberedShardSpec(0, 0)
);
} else if (!max.getInterval().equals(interval) || max.getVersion().compareTo(maxVersion) > 0) {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: conflicting segment[%s].",
dataSource,
interval,
maxVersion,
max.getIdentifierAsString()
);
return null;
} else if (max.getShardSpec() instanceof LinearShardSpec) {
newIdentifier = new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new LinearShardSpec(max.getShardSpec().getPartitionNum() + 1)
);
} else if (max.getShardSpec() instanceof NumberedShardSpec) {
newIdentifier = new SegmentIdentifier(
dataSource,
max.getInterval(),
max.getVersion(),
new NumberedShardSpec(
max.getShardSpec().getPartitionNum() + 1,
((NumberedShardSpec) max.getShardSpec()).getPartitions()
)
);
} else {
log.warn(
"Cannot allocate new segment for dataSource[%s], interval[%s], maxVersion[%s]: ShardSpec class[%s] used by [%s].",
dataSource,
interval,
maxVersion,
max.getShardSpec().getClass(),
max.getIdentifierAsString()
);
return null;
}
}
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
// Avoiding ON DUPLICATE KEY since it's not portable.
// Avoiding try/catch since it may cause inadvertent transaction-splitting.
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", sequence_name, sequence_prev_id, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :sequence_name, :sequence_prev_id, :payload)",
dbTables.getPendingSegmentsTable()
)
)
.bind("id", newIdentifier.getIdentifierAsString())
.bind("dataSource", dataSource)
.bind("created_date", new DateTime().toString())
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.bind("sequence_name", sequenceName)
.bind("sequence_prev_id", previousSegmentIdNotNull)
.bind("payload", jsonMapper.writeValueAsBytes(newIdentifier))
.execute();
log.info(
"Allocated pending segment [%s] for sequence[%s] (previous = [%s]) in DB",
newIdentifier.getIdentifierAsString(),
sequenceName,
previousSegmentIdNotNull
);
return newIdentifier;
}
}
);
}
/** /**
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing. Meant * Attempts to insert a single segment to the database. If the segment already exists, will do nothing. Meant
* to be called from within a transaction. * to be called from within a transaction.
@ -199,7 +461,8 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.execute(); .execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
} catch(Exception e) { }
catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) { if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else { } else {
@ -237,7 +500,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override @Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{ {
for(final DataSegment segment : segments) { for (final DataSegment segment : segments) {
updatePayload(handle, segment); updatePayload(handle, segment);
} }
@ -255,7 +518,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@Override @Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException
{ {
for(final DataSegment segment : segments) { for (final DataSegment segment : segments) {
deleteSegment(handle, segment); deleteSegment(handle, segment);
} }

View File

@ -28,10 +28,10 @@ import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.Batch; import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException; import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException; import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
@ -141,6 +141,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
return e != null && (e instanceof SQLTransientException return e != null && (e instanceof SQLTransientException
|| e instanceof SQLRecoverableException || e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException || e instanceof UnableToObtainConnectionException
|| e instanceof UnableToExecuteStatementException
|| connectorIsTransientException(e) || connectorIsTransientException(e)
|| (e instanceof SQLException && isTransientException(e.getCause())) || (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause()))); || (e instanceof DBIException && isTransientException(e.getCause())));
@ -180,6 +181,30 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
} }
} }
public void createPendingSegmentsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
String.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " start VARCHAR(255) NOT NULL,\n"
+ " \"end\" VARCHAR(255) NOT NULL,\n"
+ " sequence_name VARCHAR(255) NOT NULL,\n"
+ " sequence_prev_id VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id),\n"
+ " UNIQUE (sequence_name, sequence_prev_id)\n"
+ ")",
tableName, getPayloadType()
)
)
);
}
public void createSegmentTable(final String tableName) public void createSegmentTable(final String tableName)
{ {
createTable( createTable(
@ -355,7 +380,16 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public abstract DBI getDBI(); public abstract DBI getDBI();
@Override @Override
public void createSegmentTable() { public void createPendingSegmentsTable()
{
if (config.get().isCreateTables()) {
createPendingSegmentsTable(tablesConfigSupplier.get().getPendingSegmentsTable());
}
}
@Override
public void createSegmentTable()
{
if (config.get().isCreateTables()) { if (config.get().isCreateTables()) {
createSegmentTable(tablesConfigSupplier.get().getSegmentsTable()); createSegmentTable(tablesConfigSupplier.get().getSegmentsTable());
} }

View File

@ -26,12 +26,15 @@ import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec; import io.druid.timeline.partition.ShardSpec;
import org.joda.time.Interval; import org.joda.time.Interval;
import java.util.Objects;
public class SegmentIdentifier public class SegmentIdentifier
{ {
private final String dataSource; private final String dataSource;
private final Interval interval; private final Interval interval;
private final String version; private final String version;
private final ShardSpec shardSpec; private final ShardSpec shardSpec;
private final String asString;
@JsonCreator @JsonCreator
public SegmentIdentifier( public SegmentIdentifier(
@ -45,6 +48,13 @@ public class SegmentIdentifier
this.interval = Preconditions.checkNotNull(interval, "interval"); this.interval = Preconditions.checkNotNull(interval, "interval");
this.version = Preconditions.checkNotNull(version, "version"); this.version = Preconditions.checkNotNull(version, "version");
this.shardSpec = Preconditions.checkNotNull(shardSpec, "shardSpec"); this.shardSpec = Preconditions.checkNotNull(shardSpec, "shardSpec");
this.asString = DataSegment.makeDataSegmentIdentifier(
dataSource,
interval.getStart(),
interval.getEnd(),
version,
shardSpec
);
} }
@JsonProperty @JsonProperty
@ -73,13 +83,7 @@ public class SegmentIdentifier
public String getIdentifierAsString() public String getIdentifierAsString()
{ {
return DataSegment.makeDataSegmentIdentifier( return asString;
dataSource,
interval.getStart(),
interval.getEnd(),
version,
shardSpec
);
} }
@Override @Override
@ -92,19 +96,19 @@ public class SegmentIdentifier
return false; return false;
} }
SegmentIdentifier that = (SegmentIdentifier) o; SegmentIdentifier that = (SegmentIdentifier) o;
return getIdentifierAsString().equals(that.getIdentifierAsString()); return Objects.equals(asString, that.asString);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return getIdentifierAsString().hashCode(); return asString.hashCode();
} }
@Override @Override
public String toString() public String toString()
{ {
return getIdentifierAsString(); return asString;
} }
public static SegmentIdentifier fromDataSegment(final DataSegment segment) public static SegmentIdentifier fromDataSegment(final DataSegment segment)

View File

@ -0,0 +1,64 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
public class SegmentIdentifierTest
{
private static final String DATA_SOURCE = "foo";
private static final Interval INTERVAL = new Interval("2000/PT1H");
private static final String VERSION = "v1";
private static final NumberedShardSpec SHARD_SPEC_0 = new NumberedShardSpec(0, 2);
private static final NumberedShardSpec SHARD_SPEC_1 = new NumberedShardSpec(1, 2);
private static final SegmentIdentifier ID_0 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_0);
private static final SegmentIdentifier ID_1 = new SegmentIdentifier(DATA_SOURCE, INTERVAL, VERSION, SHARD_SPEC_1);
@Test
public void testSerde() throws Exception
{
final ObjectMapper objectMapper = new DefaultObjectMapper();
objectMapper.registerSubtypes(NumberedShardSpec.class);
final SegmentIdentifier id2 = objectMapper.readValue(
objectMapper.writeValueAsBytes(ID_1),
SegmentIdentifier.class
);
Assert.assertEquals(ID_1, id2);
Assert.assertEquals(DATA_SOURCE, id2.getDataSource());
Assert.assertEquals(INTERVAL, id2.getInterval());
Assert.assertEquals(VERSION, id2.getVersion());
Assert.assertEquals(SHARD_SPEC_1.getPartitionNum(), id2.getShardSpec().getPartitionNum());
Assert.assertEquals(SHARD_SPEC_1.getPartitions(), ((NumberedShardSpec) id2.getShardSpec()).getPartitions());
}
@Test
public void testAsString()
{
Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1", ID_0.getIdentifierAsString());
Assert.assertEquals("foo_2000-01-01T00:00:00.000Z_2000-01-01T01:00:00.000Z_v1_1", ID_1.getIdentifierAsString());
}
}

View File

@ -25,11 +25,11 @@ import com.google.inject.Module;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option; import io.airlift.airline.Option;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.metadata.MetadataStorageConnector; import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import java.util.List; import java.util.List;
@ -106,6 +106,7 @@ public class CreateTables extends GuiceRunnable
{ {
final Injector injector = makeInjector(); final Injector injector = makeInjector();
MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class); MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class);
dbConnector.createPendingSegmentsTable();
dbConnector.createSegmentTable(); dbConnector.createSegmentTable();
dbConnector.createRulesTable(); dbConnector.createRulesTable();
dbConnector.createConfigTable(); dbConnector.createConfigTable();