DataSource metadata.

Geared towards supporting transactional inserts of new segments. This involves an
interface "DataSourceMetadata" that allows combining of partially specified metadata
(useful for partitioned ingestion).

DataSource metadata is stored in a new "dataSource" table.
This commit is contained in:
Gian Merlino 2016-03-01 16:51:35 -08:00
parent 3d2214377d
commit 187569e702
25 changed files with 999 additions and 141 deletions

View File

@ -31,7 +31,6 @@ public interface MetadataStorageConnector
final byte[] value final byte[] value
) throws Exception; ) throws Exception;
byte[] lookup( byte[] lookup(
final String tableName, final String tableName,
final String keyColumn, final String keyColumn,
@ -39,6 +38,8 @@ public interface MetadataStorageConnector
final String key final String key
); );
void createDataSourceTable();
void createPendingSegmentsTable(); void createPendingSegmentsTable();
void createSegmentTable(); void createSegmentTable();

View File

@ -31,7 +31,7 @@ public class MetadataStorageTablesConfig
{ {
public static MetadataStorageTablesConfig fromBase(String base) public static MetadataStorageTablesConfig fromBase(String base)
{ {
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null); return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null);
} }
public static final String TASK_ENTRY_TYPE = "task"; public static final String TASK_ENTRY_TYPE = "task";
@ -45,6 +45,9 @@ public class MetadataStorageTablesConfig
@JsonProperty("base") @JsonProperty("base")
private final String base; private final String base;
@JsonProperty("dataSource")
private final String dataSourceTable;
@JsonProperty("pendingSegments") @JsonProperty("pendingSegments")
private final String pendingSegmentsTable; private final String pendingSegmentsTable;
@ -72,6 +75,7 @@ public class MetadataStorageTablesConfig
@JsonCreator @JsonCreator
public MetadataStorageTablesConfig( public MetadataStorageTablesConfig(
@JsonProperty("base") String base, @JsonProperty("base") String base,
@JsonProperty("dataSource") String dataSourceTable,
@JsonProperty("pendingSegments") String pendingSegmentsTable, @JsonProperty("pendingSegments") String pendingSegmentsTable,
@JsonProperty("segments") String segmentsTable, @JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable, @JsonProperty("rules") String rulesTable,
@ -83,6 +87,7 @@ public class MetadataStorageTablesConfig
) )
{ {
this.base = (base == null) ? DEFAULT_BASE : base; this.base = (base == null) ? DEFAULT_BASE : base;
this.dataSourceTable = makeTableName(dataSourceTable, "dataSource");
this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments"); this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments"); this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules"); this.rulesTable = makeTableName(rulesTable, "rules");
@ -115,6 +120,11 @@ public class MetadataStorageTablesConfig
return base; return base;
} }
public String getDataSourceTable()
{
return dataSourceTable;
}
public String getPendingSegmentsTable() public String getPendingSegmentsTable()
{ {
return pendingSegmentsTable; return pendingSegmentsTable;

View File

@ -35,7 +35,20 @@ public class PostgreSQLConnectorTest
{ {
PostgreSQLConnector connector = new PostgreSQLConnector( PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null)) Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
); );
Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE"))); Assert.assertTrue(connector.isTransientException(new SQLException("bummer, connection problem", "08DIE")));

View File

@ -87,6 +87,7 @@ public class MetadataStorageUpdaterJobSpec implements Supplier<MetadataStorageCo
public MetadataStorageTablesConfig getMetadataStorageTablesConfig() public MetadataStorageTablesConfig getMetadataStorageTablesConfig()
{ {
return new MetadataStorageTablesConfig( return new MetadataStorageTablesConfig(
null,
null, null,
segmentTable, segmentTable,
null, null,

View File

@ -209,6 +209,7 @@ public class HadoopConverterJobTest
); );
metadataStorageTablesConfigSupplier = derbyConnectorRule.metadataTablesConfigSupplier(); metadataStorageTablesConfigSupplier = derbyConnectorRule.metadataTablesConfigSupplier();
connector = derbyConnectorRule.getConnector(); connector = derbyConnectorRule.getConnector();
try { try {
connector.getDBI().withHandle( connector.getDBI().withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()

View File

@ -206,7 +206,7 @@ public class TaskToolbox
return retVal; return retVal;
} }
public void pushSegments(Iterable<DataSegment> segments) throws IOException public void publishSegments(Iterable<DataSegment> segments) throws IOException
{ {
// Request segment pushes for each set // Request segment pushes for each set
final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index( final Multimap<Interval, DataSegment> segmentMultimap = Multimaps.index(
@ -223,7 +223,6 @@ public class TaskToolbox
for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) { for (final Collection<DataSegment> segmentCollection : segmentMultimap.asMap().values()) {
getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection))); getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.copyOf(segmentCollection)));
} }
} }
public File getTaskWorkDir() public File getTaskWorkDir()

View File

@ -20,12 +20,13 @@
package io.druid.indexing.common.actions; package io.druid.indexing.common.actions;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.query.DruidMetrics; import io.druid.query.DruidMetrics;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
@ -40,17 +41,29 @@ import java.util.Set;
* that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the * that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the
* same action. * same action.
*/ */
public class SegmentInsertAction implements TaskAction<Set<DataSegment>> public class SegmentInsertAction implements TaskAction<SegmentPublishResult>
{ {
@JsonIgnore
private final Set<DataSegment> segments; private final Set<DataSegment> segments;
private final DataSourceMetadata startMetadata;
private final DataSourceMetadata endMetadata;
public SegmentInsertAction(
Set<DataSegment> segments
)
{
this(segments, null, null);
}
@JsonCreator @JsonCreator
public SegmentInsertAction( public SegmentInsertAction(
@JsonProperty("segments") Set<DataSegment> segments @JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("startMetadata") DataSourceMetadata startMetadata,
@JsonProperty("endMetadata") DataSourceMetadata endMetadata
) )
{ {
this.segments = ImmutableSet.copyOf(segments); this.segments = ImmutableSet.copyOf(segments);
this.startMetadata = startMetadata;
this.endMetadata = endMetadata;
} }
@JsonProperty @JsonProperty
@ -59,26 +72,53 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
return segments; return segments;
} }
public TypeReference<Set<DataSegment>> getReturnTypeReference() @JsonProperty
public DataSourceMetadata getStartMetadata()
{ {
return new TypeReference<Set<DataSegment>>() return startMetadata;
}
@JsonProperty
public DataSourceMetadata getEndMetadata()
{
return endMetadata;
}
public TypeReference<SegmentPublishResult> getReturnTypeReference()
{
return new TypeReference<SegmentPublishResult>()
{ {
}; };
} }
/**
* Behaves similarly to
* {@link io.druid.indexing.overlord.IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)},
* including the possibility of returning null in case of metadata transaction failure.
*/
@Override @Override
public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throws IOException
{ {
toolbox.verifyTaskLocks(task, segments); toolbox.verifyTaskLocks(task, segments);
final Set<DataSegment> retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(segments); final SegmentPublishResult retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
startMetadata,
endMetadata
);
// Emit metrics // Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder() final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, task.getDataSource()) .setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setDimension(DruidMetrics.TASK_TYPE, task.getType()); .setDimension(DruidMetrics.TASK_TYPE, task.getType());
for (DataSegment segment : segments) { if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
}
for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString()); metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize())); toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
} }
@ -97,6 +137,8 @@ public class SegmentInsertAction implements TaskAction<Set<DataSegment>>
{ {
return "SegmentInsertAction{" + return "SegmentInsertAction{" +
"segments=" + segments + "segments=" + segments +
", startMetadata=" + startMetadata +
", endMetadata=" + endMetadata +
'}'; '}';
} }
} }

View File

@ -249,7 +249,7 @@ public class HadoopConverterTask extends ConvertSegmentTask
} }
); );
log.debug("Found new segments %s", Arrays.toString(finishedSegments.toArray())); log.debug("Found new segments %s", Arrays.toString(finishedSegments.toArray()));
toolbox.pushSegments(finishedSegments); toolbox.publishSegments(finishedSegments);
return success(); return success();
} }
} }

View File

@ -222,7 +222,7 @@ public class HadoopIndexTask extends HadoopTask
} }
); );
toolbox.pushSegments(publishedSegments); toolbox.publishSegments(publishedSegments);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} else { } else {
return TaskStatus.failure(getId()); return TaskStatus.failure(getId());

View File

@ -228,7 +228,7 @@ public class IndexTask extends AbstractFixedIntervalTask
segments.add(segment); segments.add(segment);
} }
} }
toolbox.pushSegments(segments); toolbox.publishSegments(segments);
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -171,7 +171,7 @@ public abstract class MergeTaskBase extends AbstractFixedIntervalTask
emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart));
emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize()));
toolbox.pushSegments(ImmutableList.of(uploadedSegment)); toolbox.publishSegments(ImmutableList.of(uploadedSegment));
return TaskStatus.success(getId()); return TaskStatus.success(getId());
} }

View File

@ -514,7 +514,7 @@ public class RealtimeIndexTask extends AbstractTask
@Override @Override
public void publishSegment(DataSegment segment) throws IOException public void publishSegment(DataSegment segment) throws IOException
{ {
taskToolbox.pushSegments(ImmutableList.of(segment)); taskToolbox.publishSegments(ImmutableList.of(segment));
} }
} }
} }

View File

@ -1,29 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common;
import io.druid.indexing.common.task.Task;
/**
*/
public interface TestTask extends Task
{
public TaskStatus getStatus();
}

View File

@ -0,0 +1,179 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.overlord.ObjectMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
public class SegmentInsertActionTest
{
@Rule
public ExpectedException thrown = ExpectedException.none();
@Rule
public TaskActionTestKit actionTestKit = new TaskActionTestKit();
private static final String DATA_SOURCE = "none";
private static final Interval INTERVAL = new Interval("2020/2020T01");
private static final String PARTY_YEAR = "1999";
private static final String THE_DISTANT_FUTURE = "3000";
private static final DataSegment SEGMENT1 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.<String, Object>of(),
ImmutableList.<String>of(),
ImmutableList.<String>of(),
new LinearShardSpec(0),
9,
1024
);
private static final DataSegment SEGMENT2 = new DataSegment(
DATA_SOURCE,
INTERVAL,
PARTY_YEAR,
ImmutableMap.<String, Object>of(),
ImmutableList.<String>of(),
ImmutableList.<String>of(),
new LinearShardSpec(1),
9,
1024
);
private static final DataSegment SEGMENT3 = new DataSegment(
DATA_SOURCE,
INTERVAL,
THE_DISTANT_FUTURE,
ImmutableMap.<String, Object>of(),
ImmutableList.<String>of(),
ImmutableList.<String>of(),
new LinearShardSpec(1),
9,
1024
);
@Test
public void testSimple() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertEquals(
ImmutableSet.of(SEGMENT1, SEGMENT2),
ImmutableSet.copyOf(
actionTestKit.getMetadataStorageCoordinator()
.getUsedSegmentsForInterval(DATA_SOURCE, INTERVAL)
)
);
}
@Test
public void testTransactional() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
SegmentPublishResult result1 = new SegmentInsertAction(
ImmutableSet.of(SEGMENT1),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableList.of(1))
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT1), true), result1);
SegmentPublishResult result2 = new SegmentInsertAction(
ImmutableSet.of(SEGMENT2),
new ObjectMetadata(ImmutableList.of(1)),
new ObjectMetadata(ImmutableList.of(2))
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT2), true), result2);
Assert.assertEquals(
ImmutableSet.of(SEGMENT1, SEGMENT2),
ImmutableSet.copyOf(
actionTestKit.getMetadataStorageCoordinator()
.getUsedSegmentsForInterval(DATA_SOURCE, INTERVAL)
)
);
Assert.assertEquals(
new ObjectMetadata(ImmutableList.of(2)),
actionTestKit.getMetadataStorageCoordinator().getDataSourceMetadata(DATA_SOURCE)
);
}
@Test
public void testFailBadVersion() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3));
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
thrown.expect(IllegalStateException.class);
thrown.expectMessage(CoreMatchers.startsWith("Segments not covered by locks for task"));
SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox());
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT3), true), result);
}
@Test
public void testFailTransactional() throws Exception
{
final Task task = new NoopTask(null, 0, 0, null, null, null);
actionTestKit.getTaskLockbox().add(task);
actionTestKit.getTaskLockbox().lock(task, new Interval(INTERVAL));
SegmentPublishResult result = new SegmentInsertAction(
ImmutableSet.of(SEGMENT1),
new ObjectMetadata(ImmutableList.of(1)),
new ObjectMetadata(ImmutableList.of(2))
).perform(
task,
actionTestKit.getTaskActionToolbox()
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result);
}
}

View File

@ -93,6 +93,7 @@ public class TaskActionTestKit extends ExternalResource
metadataStorageCoordinator, metadataStorageCoordinator,
new NoopServiceEmitter() new NoopServiceEmitter()
); );
testDerbyConnector.createDataSourceTable();
testDerbyConnector.createPendingSegmentsTable(); testDerbyConnector.createPendingSegmentsTable();
testDerbyConnector.createSegmentTable(); testDerbyConnector.createSegmentTable();
testDerbyConnector.createRulesTable(); testDerbyConnector.createRulesTable();

View File

@ -39,9 +39,7 @@ import com.metamx.common.ISE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.guava.Comparators; import com.metamx.common.guava.Comparators;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import com.metamx.metrics.Monitor; import com.metamx.metrics.Monitor;
import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.MapCache; import io.druid.client.cache.MapCache;
@ -102,6 +100,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode; import io.druid.server.DruidNode;
import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -218,20 +217,7 @@ public class TaskLifecycleTest
private static ServiceEmitter newMockEmitter() private static ServiceEmitter newMockEmitter()
{ {
return new ServiceEmitter(null, null, null) return new NoopServiceEmitter();
{
@Override
public void emit(Event event)
{
}
@Override
public void emit(ServiceEventBuilder builder)
{
}
};
} }
private static InputRow IR(String dt, String dim1, String dim2, float met) private static InputRow IR(String dt, String dim1, String dim2, float met)

View File

@ -23,7 +23,9 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import org.joda.time.Interval; import org.joda.time.Interval;
@ -43,6 +45,12 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
unusedSegments = Lists.newArrayList(); unusedSegments = Lists.newArrayList();
} }
@Override
public DataSourceMetadata getDataSourceMetadata(String dataSource)
{
throw new UnsupportedOperationException();
}
@Override @Override
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{ {
@ -77,6 +85,17 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto
return ImmutableSet.copyOf(added); return ImmutableSet.copyOf(added);
} }
@Override
public SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
DataSourceMetadata oldCommitMetadata,
DataSourceMetadata newCommitMetadata
) throws IOException
{
// Don't actually compare metadata, just do it!
return new SegmentPublishResult(announceHistoricalSegments(segments), true);
}
@Override @Override
public SegmentIdentifier allocatePendingSegment( public SegmentIdentifier allocatePendingSegment(
String dataSource, String dataSource,

View File

@ -0,0 +1,74 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Set;
/**
* Commit metadata for a dataSource. Used by
* {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)}
* to provide metadata transactions for segment inserts.
*
* Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side.
* This means metadata can be partitioned.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "object", value = ObjectMetadata.class)
})
public interface DataSourceMetadata
{
/**
* Returns true if this instance should be considered a valid starting point for a new dataSource that has
* no existing metadata.
*/
boolean isValidStart();
/**
* Returns true if any information present in this instance matches analogous information from "other" and
* so they are conflict-free. In other words, "one.plus(two)" and "two.plus(one)" should return equal
* instances if "one" matches "two".
*
* One simple way to implement this is to make it the same as "equals", although that doesn't allow for
* partitioned metadata.
*
* Behavior is undefined if you pass in an instance of a different class from this one.
*
* @param other another instance
*
* @return true or false
*/
boolean matches(DataSourceMetadata other);
/**
* Returns a copy of this instance with "other" merged in. Any conflicts should be resolved in favor of
* information from "other".
*
* Behavior is undefined if you pass in an instance of a different class from this one.
*
* @param other another instance
*
* @return merged copy
*/
DataSourceMetadata plus(DataSourceMetadata other);
}

View File

@ -41,14 +41,14 @@ public interface IndexerMetadataStorageCoordinator
* *
* @throws IOException * @throws IOException
*/ */
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval)
throws IOException; throws IOException;
/** /**
* Get all segments which may include any data in the interval and are flagged as used. * Get all segments which may include any data in the interval and are flagged as used.
* *
* @param dataSource The datasource to query * @param dataSource The datasource to query
* @param intervals The intervals for which all applicable and used datasources are requested. * @param intervals The intervals for which all applicable and used datasources are requested.
* *
* @return The DataSegments which include data in the requested intervals. These segments may contain data outside the requested interval. * @return The DataSegments which include data in the requested intervals. These segments may contain data outside the requested interval.
* *
@ -65,7 +65,7 @@ public interface IndexerMetadataStorageCoordinator
* *
* @return set of segments actually added * @return set of segments actually added
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException; Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException;
/** /**
* Allocate a new pending segment in the pending segments table. This segment identifier will never be given out * Allocate a new pending segment in the pending segments table. This segment identifier will never be given out
@ -93,9 +93,36 @@ public interface IndexerMetadataStorageCoordinator
String maxVersion String maxVersion
) throws IOException; ) throws IOException;
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException; /**
* Attempts to insert a set of segments to the metadata storage. Returns the set of segments actually added (segments
* with identifiers already in the metadata storage will not be added).
* <p/>
* If startMetadata and endMetadata are set, this insertion will be atomic with a compare-and-swap on dataSource
* commit metadata.
*
* @param segments set of segments to add, must all be from the same dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}. If null, this insert will
* not involve a metadata transaction
* @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}. If null, this insert will not
* involve a metadata transaction
*
* @return segment publish result indicating transaction success or failure, and set of segments actually published
*
* @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null
*/
SegmentPublishResult announceHistoricalSegments(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException;
public void deleteSegments(final Set<DataSegment> segments) throws IOException; DataSourceMetadata getDataSourceMetadata(String dataSource);
void updateSegmentMetadata(Set<DataSegment> segments) throws IOException;
void deleteSegments(Set<DataSegment> segments) throws IOException;
/** /**
* Get all segments which include ONLY data within the given interval and are not flagged as used. * Get all segments which include ONLY data within the given interval and are not flagged as used.
@ -105,5 +132,5 @@ public interface IndexerMetadataStorageCoordinator
* *
* @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval * @return DataSegments which include ONLY data within the requested interval and are not flagged as used. Data segments NOT returned here may include data in the interval
*/ */
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval); List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval);
} }

View File

@ -0,0 +1,87 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public final class ObjectMetadata implements DataSourceMetadata
{
private final Object theObject;
@JsonCreator
public ObjectMetadata(
@JsonProperty("object") Object theObject
)
{
this.theObject = theObject;
}
@JsonProperty("object")
public Object getObject()
{
return theObject;
}
@Override
public boolean isValidStart()
{
return theObject == null;
}
@Override
public boolean matches(DataSourceMetadata other)
{
return equals(other);
}
@Override
public DataSourceMetadata plus(DataSourceMetadata other)
{
return other;
}
@Override
public boolean equals(Object o)
{
if (o instanceof ObjectMetadata) {
final Object other = ((ObjectMetadata) o).getObject();
return (theObject == null && other == null) || (theObject != null && theObject.equals(other));
} else {
return false;
}
}
@Override
public int hashCode()
{
return Objects.hash(theObject);
}
@Override
public String toString()
{
return "ObjectMetadata{" +
"theObject=" + theObject +
'}';
}
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.indexing.overlord;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import io.druid.timeline.DataSegment;
import java.util.Objects;
import java.util.Set;
/**
* Result of an operation that attempts to publish segments. Indicates the set of segments actually published
* and whether or not the transaction was a success.
*
* If "success" is false then the segments set will be empty.
*
* It's possible for the segments set to be empty even if "success" is true, since the segments set only
* includes segments actually published as part of the transaction. The requested segments could have been
* published by a different transaction (e.g. in the case of replica sets) and this one would still succeed.
*/
public class SegmentPublishResult
{
private final Set<DataSegment> segments;
private final boolean success;
@JsonCreator
public SegmentPublishResult(
@JsonProperty("segments") Set<DataSegment> segments,
@JsonProperty("success") boolean success
)
{
this.segments = Preconditions.checkNotNull(segments, "segments");
this.success = success;
if (!success) {
Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes");
}
}
@JsonProperty
public Set<DataSegment> getSegments()
{
return segments;
}
@JsonProperty
public boolean isSuccess()
{
return success;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SegmentPublishResult that = (SegmentPublishResult) o;
return success == that.success &&
Objects.equals(segments, that.segments);
}
@Override
public int hashCode()
{
return Objects.hash(segments, success);
}
@Override
public String toString()
{
return "SegmentPublishResult{" +
"segments=" + segments +
", success=" + success +
'}';
}
}

View File

@ -33,10 +33,13 @@ import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding; import com.google.common.io.BaseEncoding;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.StringUtils; import com.metamx.common.StringUtils;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.indexing.overlord.DataSourceMetadata;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineObjectHolder; import io.druid.timeline.TimelineObjectHolder;
@ -55,6 +58,7 @@ import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback; import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper; import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.StringMapper; import org.skife.jdbi.v2.util.StringMapper;
@ -65,14 +69,14 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
/** /**
*/ */
public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStorageCoordinator
{ {
private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class); private static final Logger log = new Logger(IndexerSQLMetadataStorageCoordinator.class);
private static final int ALLOCATE_SEGMENT_QUIET_TRIES = 3;
private static int ALLOCATE_SEGMENT_QUIET_TRIES = 3;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final MetadataStorageTablesConfig dbTables; private final MetadataStorageTablesConfig dbTables;
@ -93,6 +97,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
@LifecycleStart @LifecycleStart
public void start() public void start()
{ {
connector.createDataSourceTable();
connector.createPendingSegmentsTable(); connector.createPendingSegmentsTable();
connector.createSegmentTable(); connector.createSegmentTable();
} }
@ -264,24 +269,90 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
*/ */
public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException public Set<DataSegment> announceHistoricalSegments(final Set<DataSegment> segments) throws IOException
{ {
return connector.getDBI().inTransaction( final SegmentPublishResult result = announceHistoricalSegments(segments, null, null);
new TransactionCallback<Set<DataSegment>>()
{ // Metadata transaction cannot fail because we are not trying to do one.
@Override if (!result.isSuccess()) {
public Set<DataSegment> inTransaction(Handle handle, TransactionStatus transactionStatus) throws IOException throw new ISE("WTF?! announceHistoricalSegments failed with null metadata, should not happen.");
}
return result.getSegments();
}
/**
* {@inheritDoc}
*/
@Override
public SegmentPublishResult announceHistoricalSegments(
final Set<DataSegment> segments,
final DataSourceMetadata startMetadata,
final DataSourceMetadata endMetadata
) throws IOException
{
if (segments.isEmpty()) {
throw new IllegalArgumentException("segment set must not be empty");
}
final String dataSource = segments.iterator().next().getDataSource();
for (DataSegment segment : segments) {
if (!dataSource.equals(segment.getDataSource())) {
throw new IllegalArgumentException("segments must all be from the same dataSource");
}
}
if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) {
throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
}
final AtomicBoolean txnFailure = new AtomicBoolean(false);
try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{ {
final Set<DataSegment> inserted = Sets.newHashSet(); @Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
final Set<DataSegment> inserted = Sets.newHashSet();
for (final DataSegment segment : segments) { if (startMetadata != null) {
if (announceHistoricalSegment(handle, segment)) { final boolean success = updateDataSourceMetadataWithHandle(
inserted.add(segment); handle,
dataSource,
startMetadata,
endMetadata
);
if (!success) {
transactionStatus.setRollbackOnly();
txnFailure.set(true);
throw new RuntimeException("Aborting transaction!");
}
} }
}
return ImmutableSet.copyOf(inserted); for (final DataSegment segment : segments) {
} if (announceHistoricalSegment(handle, segment)) {
} inserted.add(segment);
); }
}
return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true);
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
catch (CallbackFailedException e) {
if (txnFailure.get()) {
return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
} else {
throw e;
}
}
} }
@Override @Override
@ -499,10 +570,10 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
} }
/** /**
* Attempts to insert a single segment to the database. If the segment already exists, will do nothing. Meant * Attempts to insert a single segment to the database. If the segment already exists, will do nothing; although,
* to be called from within a transaction. * this checking is imperfect and callers must be prepared to retry their entire transaction on exceptions.
* *
* @return true if the segment was added, false otherwise * @return true if the segment was added, false if it already existed
*/ */
private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException private boolean announceHistoricalSegment(final Handle handle, final DataSegment segment) throws IOException
{ {
@ -512,38 +583,31 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
return false; return false;
} }
// Try/catch to work around races due to SELECT -> INSERT. Avoid ON DUPLICATE KEY since it's not portable. // SELECT -> INSERT can fail due to races; callers must be prepared to retry.
try { // Avoiding ON DUPLICATE KEY since it's not portable.
handle.createStatement( // Avoiding try/catch since it may cause inadvertent transaction-splitting.
String.format( handle.createStatement(
"INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " String.format(
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", "INSERT INTO %s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) "
dbTables.getSegmentsTable() + "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
) dbTables.getSegmentsTable()
) )
.bind("id", segment.getIdentifier()) )
.bind("dataSource", segment.getDataSource()) .bind("id", segment.getIdentifier())
.bind("created_date", new DateTime().toString()) .bind("dataSource", segment.getDataSource())
.bind("start", segment.getInterval().getStart().toString()) .bind("created_date", new DateTime().toString())
.bind("end", segment.getInterval().getEnd().toString()) .bind("start", segment.getInterval().getStart().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true) .bind("end", segment.getInterval().getEnd().toString())
.bind("version", segment.getVersion()) .bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("used", true) .bind("version", segment.getVersion())
.bind("payload", jsonMapper.writeValueAsBytes(segment)) .bind("used", true)
.execute(); .bind("payload", jsonMapper.writeValueAsBytes(segment))
.execute();
log.info("Published segment [%s] to DB", segment.getIdentifier()); log.info("Published segment [%s] to DB", segment.getIdentifier());
}
catch (Exception e) {
if (e.getCause() instanceof SQLException && segmentExists(handle, segment)) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
} else {
throw e;
}
}
} }
catch (IOException e) { catch (Exception e) {
log.error(e, "Exception inserting into DB"); log.error(e, "Exception inserting segment [%s] into DB", segment.getIdentifier());
throw e; throw e;
} }
@ -564,6 +628,150 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor
.isEmpty(); .isEmpty();
} }
/**
* Read dataSource metadata. Returns null if there is no metadata.
*/
public DataSourceMetadata getDataSourceMetadata(final String dataSource)
{
final byte[] bytes = connector.lookup(
dbTables.getDataSourceTable(),
"dataSource",
"commit_metadata_payload",
dataSource
);
if (bytes == null) {
return null;
}
try {
return jsonMapper.readValue(bytes, DataSourceMetadata.class);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Read dataSource metadata as bytes, from a specific handle. Returns null if there is no metadata.
*/
private byte[] getDataSourceMetadataWithHandleAsBytes(
final Handle handle,
final String dataSource
)
{
return connector.lookupWithHandle(
handle,
dbTables.getDataSourceTable(),
"dataSource",
"commit_metadata_payload",
dataSource
);
}
/**
* Compare-and-swap dataSource metadata in a transaction. This will only modify dataSource metadata if it equals
* oldCommitMetadata when this function is called (based on T.equals). This method is idempotent in that if
* the metadata already equals newCommitMetadata, it will return true.
*
* @param handle database handle
* @param dataSource druid dataSource
* @param startMetadata dataSource metadata pre-insert must match this startMetadata according to
* {@link DataSourceMetadata#matches(DataSourceMetadata)}
* @param endMetadata dataSource metadata post-insert will have this endMetadata merged in with
* {@link DataSourceMetadata#plus(DataSourceMetadata)}
*
* @return true if dataSource metadata was updated from matching startMetadata to matching endMetadata
*/
private boolean updateDataSourceMetadataWithHandle(
final Handle handle,
final String dataSource,
final DataSourceMetadata startMetadata,
final DataSourceMetadata endMetadata
) throws IOException
{
Preconditions.checkNotNull(dataSource, "dataSource");
Preconditions.checkNotNull(startMetadata, "startMetadata");
Preconditions.checkNotNull(endMetadata, "endMetadata");
final byte[] oldCommitMetadataBytesFromDb = getDataSourceMetadataWithHandleAsBytes(handle, dataSource);
final String oldCommitMetadataSha1FromDb;
final DataSourceMetadata oldCommitMetadataFromDb;
if (oldCommitMetadataBytesFromDb == null) {
oldCommitMetadataSha1FromDb = null;
oldCommitMetadataFromDb = null;
} else {
oldCommitMetadataSha1FromDb = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(oldCommitMetadataBytesFromDb).asBytes()
);
oldCommitMetadataFromDb = jsonMapper.readValue(oldCommitMetadataBytesFromDb, DataSourceMetadata.class);
}
final boolean startMetadataMatchesExisting = oldCommitMetadataFromDb == null
? startMetadata.isValidStart()
: startMetadata.matches(oldCommitMetadataFromDb);
if (!startMetadataMatchesExisting) {
// Not in the desired start state.
log.info("Not updating metadata, existing state is not the expected start state.");
return false;
}
final DataSourceMetadata newCommitMetadata = oldCommitMetadataFromDb == null
? endMetadata
: oldCommitMetadataFromDb.plus(endMetadata);
final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(newCommitMetadata);
final String newCommitMetadataSha1 = BaseEncoding.base16().encode(
Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes()
);
final boolean retVal;
if (oldCommitMetadataBytesFromDb == null) {
// SELECT -> INSERT can fail due to races; callers must be prepared to retry.
final int numRows = handle.createStatement(
String.format(
"INSERT INTO %s (dataSource, created_date, commit_metadata_payload, commit_metadata_sha1) "
+ "VALUES (:dataSource, :created_date, :commit_metadata_payload, :commit_metadata_sha1)",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("created_date", new DateTime().toString())
.bind("commit_metadata_payload", newCommitMetadataBytes)
.bind("commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows > 0;
} else {
// Expecting a particular old metadata; use the SHA1 in a compare-and-swap UPDATE
final int numRows = handle.createStatement(
String.format(
"UPDATE %s SET "
+ "commit_metadata_payload = :new_commit_metadata_payload, "
+ "commit_metadata_sha1 = :new_commit_metadata_sha1 "
+ "WHERE dataSource = :dataSource AND commit_metadata_sha1 = :old_commit_metadata_sha1",
dbTables.getDataSourceTable()
)
)
.bind("dataSource", dataSource)
.bind("old_commit_metadata_sha1", oldCommitMetadataSha1FromDb)
.bind("new_commit_metadata_payload", newCommitMetadataBytes)
.bind("new_commit_metadata_sha1", newCommitMetadataSha1)
.execute();
retVal = numRows > 0;
}
if (retVal) {
log.info("Updated metadata from[%s] to[%s].", oldCommitMetadataFromDb, newCommitMetadata);
} else {
log.info("Not updating metadata, compare-and-swap failure.");
}
return retVal;
}
public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException public void updateSegmentMetadata(final Set<DataSegment> segments) throws IOException
{ {
connector.getDBI().inTransaction( connector.getDBI().inTransaction(

View File

@ -216,6 +216,25 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
); );
} }
public void createDataSourceTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
String.format(
"CREATE TABLE %1$s (\n"
+ " dataSource VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " commit_metadata_payload %2$s NOT NULL,\n"
+ " commit_metadata_sha1 VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (dataSource)\n"
+ ")",
tableName, getPayloadType()
)
)
);
}
public void createSegmentTable(final String tableName) public void createSegmentTable(final String tableName)
{ {
createTable( createTable(
@ -390,6 +409,13 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
public abstract DBI getDBI(); public abstract DBI getDBI();
public void createDataSourceTable()
{
if (config.get().isCreateTables()) {
createDataSourceTable(tablesConfigSupplier.get().getDataSourceTable());
}
}
@Override @Override
public void createPendingSegmentsTable() public void createPendingSegmentsTable()
{ {
@ -442,35 +468,47 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
final String key final String key
) )
{ {
final String selectStatement = String.format("SELECT %s FROM %s WHERE %s = :key", valueColumn,
tableName, keyColumn
);
return getDBI().withHandle( return getDBI().withHandle(
new HandleCallback<byte[]>() new HandleCallback<byte[]>()
{ {
@Override @Override
public byte[] withHandle(Handle handle) throws Exception public byte[] withHandle(Handle handle) throws Exception
{ {
List<byte[]> matched = handle.createQuery(selectStatement) return lookupWithHandle(handle, tableName, keyColumn, valueColumn, key);
.bind("key", key)
.map(ByteArrayMapper.FIRST)
.list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
} }
} }
); );
} }
public byte[] lookupWithHandle(
final Handle handle,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = String.format(
"SELECT %s FROM %s WHERE %s = :key", valueColumn,
tableName, keyColumn
);
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(ByteArrayMapper.FIRST)
.list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
public MetadataStorageConnectorConfig getConfig() { return config.get(); } public MetadataStorageConnectorConfig getConfig() { return config.get(); }
protected BasicDataSource getDatasource() protected BasicDataSource getDatasource()

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import io.druid.indexing.overlord.ObjectMetadata;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.jackson.DefaultObjectMapper; import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment; import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.LinearShardSpec; import io.druid.timeline.partition.LinearShardSpec;
@ -32,6 +34,7 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
@ -42,9 +45,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{ {
@Rule @Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private final ObjectMapper mapper = new DefaultObjectMapper(); private final ObjectMapper mapper = new DefaultObjectMapper();
private final DataSegment defaultSegment = new DataSegment( private final DataSegment defaultSegment = new DataSegment(
"dataSource", "fooDataSource",
Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), Interval.parse("2015-01-01T00Z/2015-01-02T00Z"),
"version", "version",
ImmutableMap.<String, Object>of(), ImmutableMap.<String, Object>of(),
@ -56,7 +60,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
); );
private final DataSegment defaultSegment2 = new DataSegment( private final DataSegment defaultSegment2 = new DataSegment(
"dataSource", "fooDataSource",
Interval.parse("2015-01-01T00Z/2015-01-02T00Z"), Interval.parse("2015-01-01T00Z/2015-01-02T00Z"),
"version", "version",
ImmutableMap.<String, Object>of(), ImmutableMap.<String, Object>of(),
@ -68,7 +72,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
); );
private final DataSegment defaultSegment3 = new DataSegment( private final DataSegment defaultSegment3 = new DataSegment(
"dataSource", "fooDataSource",
Interval.parse("2015-01-03T00Z/2015-01-04T00Z"), Interval.parse("2015-01-03T00Z/2015-01-04T00Z"),
"version", "version",
ImmutableMap.<String, Object>of(), ImmutableMap.<String, Object>of(),
@ -88,6 +92,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest
{ {
derbyConnector = derbyConnectorRule.getConnector(); derbyConnector = derbyConnectorRule.getConnector();
mapper.registerSubtypes(LinearShardSpec.class); mapper.registerSubtypes(LinearShardSpec.class);
derbyConnector.createDataSourceTable();
derbyConnector.createTaskTables(); derbyConnector.createTaskTables();
derbyConnector.createSegmentTable(); derbyConnector.createSegmentTable();
coordinator = new IndexerSQLMetadataStorageCoordinator( coordinator = new IndexerSQLMetadataStorageCoordinator(
@ -135,6 +140,99 @@ public class IndexerSQLMetadataStorageCoordinatorTest
); );
} }
@Test
public void testTransactionalAnnounceSuccess() throws IOException
{
// Insert first segment.
final SegmentPublishResult result1 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "bar"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment).getBytes("UTF-8"),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
defaultSegment.getIdentifier()
)
);
// Insert second segment.
final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2);
Assert.assertArrayEquals(
mapper.writeValueAsString(defaultSegment2).getBytes("UTF-8"),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
defaultSegment2.getIdentifier()
)
);
// Examine metadata.
Assert.assertEquals(
new ObjectMetadata(ImmutableMap.of("foo", "baz")),
coordinator.getDataSourceMetadata("fooDataSource")
);
}
@Test
public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException
{
final SegmentPublishResult result1 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
new ObjectMetadata(ImmutableMap.of("foo", "bar")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result1);
}
@Test
public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException
{
final SegmentPublishResult result1 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result2);
}
@Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException
{
final SegmentPublishResult result1 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment),
new ObjectMetadata(null),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1);
final SegmentPublishResult result2 = coordinator.announceHistoricalSegments(
ImmutableSet.of(defaultSegment2),
new ObjectMetadata(ImmutableMap.of("foo", "qux")),
new ObjectMetadata(ImmutableMap.of("foo", "baz"))
);
Assert.assertEquals(new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false), result2);
}
@Test @Test
public void testSimpleUsedList() throws IOException public void testSimpleUsedList() throws IOException
{ {
@ -191,7 +289,10 @@ public class IndexerSQLMetadataStorageCoordinatorTest
ImmutableList.of(defaultSegment3), ImmutableList.of(defaultSegment3),
coordinator.getUsedSegmentsForIntervals( coordinator.getUsedSegmentsForIntervals(
defaultSegment.getDataSource(), defaultSegment.getDataSource(),
ImmutableList.of(Interval.parse("2015-01-03T00Z/2015-01-03T05Z"), Interval.parse("2015-01-03T09Z/2015-01-04T00Z")) ImmutableList.of(
Interval.parse("2015-01-03T00Z/2015-01-03T05Z"),
Interval.parse("2015-01-03T09Z/2015-01-04T00Z")
)
) )
); );
} }

View File

@ -108,6 +108,7 @@ public class CreateTables extends GuiceRunnable
{ {
final Injector injector = makeInjector(); final Injector injector = makeInjector();
MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class); MetadataStorageConnector dbConnector = injector.getInstance(MetadataStorageConnector.class);
dbConnector.createDataSourceTable();
dbConnector.createPendingSegmentsTable(); dbConnector.createPendingSegmentsTable();
dbConnector.createSegmentTable(); dbConnector.createSegmentTable();
dbConnector.createRulesTable(); dbConnector.createRulesTable();