TaskConsumer: Commit to segment db in one transaction

This commit is contained in:
Gian Merlino 2013-01-31 08:21:36 -08:00
parent 1e35e6ad46
commit f946fc3ee6
3 changed files with 292 additions and 163 deletions

View File

@ -28,6 +28,7 @@ import com.metamx.druid.TimelineObjectHolder;
import com.metamx.druid.VersionedIntervalTimeline;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.merger.common.TaskStatus;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
@ -37,6 +38,8 @@ import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
@ -51,8 +54,6 @@ public class MergerDBCoordinator
{
private static final Logger log = new Logger(MergerDBCoordinator.class);
private final Object lock = new Object();
private final ObjectMapper jsonMapper;
private final DbConnectorConfig dbConnectorConfig;
private final DBI dbi;
@ -68,10 +69,9 @@ public class MergerDBCoordinator
this.dbi = dbi;
}
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval)
throws IOException
{
synchronized (lock) {
// XXX Could be reading from a cache if we can assume we're the only one editing the DB
final VersionedIntervalTimeline<String, DataSegment> timeline = dbi.withHandle(
@ -120,7 +120,7 @@ public class MergerDBCoordinator
new Function<TimelineObjectHolder<String, DataSegment>, DataSegment>()
{
@Override
public DataSegment apply(@Nullable TimelineObjectHolder<String, DataSegment> input)
public DataSegment apply(TimelineObjectHolder<String, DataSegment> input)
{
return input.getObject().getChunk(0).getObject();
}
@ -128,21 +128,58 @@ public class MergerDBCoordinator
);
return segments;
}
public void commitTaskStatus(final TaskStatus taskStatus)
{
try {
dbi.inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
for(final DataSegment segment : taskStatus.getSegments())
{
log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), taskStatus.getId());
announceHistoricalSegment(handle, segment);
}
for(final DataSegment segment : taskStatus.getSegmentsNuked())
{
log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), taskStatus.getId());
deleteSegment(handle, segment);
}
return null;
}
}
);
}
catch (Exception e) {
throw new RuntimeException(String.format("Exception commit task to DB: %s", taskStatus.getId()), e);
}
}
public void announceHistoricalSegment(final DataSegment segment) throws Exception
{
synchronized (lock) {
try {
List<Map<String, Object>> exists = dbi.withHandle(
new HandleCallback<List<Map<String, Object>>>()
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public Void withHandle(Handle handle) throws Exception
{
return handle.createQuery(
announceHistoricalSegment(handle, segment);
return null;
}
}
);
}
private void announceHistoricalSegment(final Handle handle, final DataSegment segment) throws Exception
{
try {
final List<Map<String, Object>> exists = handle.createQuery(
String.format(
"SELECT id FROM %s WHERE id = ':identifier'",
dbConnectorConfig.getSegmentTable()
@ -151,21 +188,12 @@ public class MergerDBCoordinator
"identifier",
segment.getIdentifier()
).list();
}
}
);
if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
return;
}
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
@ -183,17 +211,11 @@ public class MergerDBCoordinator
.bind("payload", jsonMapper.writeValueAsString(segment))
.execute();
return null;
}
}
);
log.info("Published segment [%s] to DB", segment.getIdentifier());
}
catch (Exception e) {
log.error(e, "Exception inserting into DB");
throw new RuntimeException(e);
}
throw e;
}
}
@ -205,17 +227,21 @@ public class MergerDBCoordinator
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable())
).bind("id", segment.getIdentifier())
.execute();
deleteSegment(handle, segment);
return null;
}
}
);
}
private void deleteSegment(final Handle handle, final DataSegment segment)
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable())
).bind("id", segment.getIdentifier())
.execute();
}
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = dbi.withHandle(

View File

@ -187,15 +187,39 @@ public class TaskConsumer implements Runnable
public void run()
{
try {
if (statusFromRunner.getSegments().size() > 0) {
// TODO -- Publish in transaction
publishSegments(task, context, statusFromRunner.getSegments());
// Validate status
for (final DataSegment segment : statusFromRunner.getSegments()) {
verifyDataSourceAndInterval(task, context, segment);
// Verify version (must be equal to our context version)
if (!context.getVersion().equals(segment.getVersion())) {
throw new IllegalStateException(
String.format(
"Segment for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
}
if (statusFromRunner.getSegmentsNuked().size() > 0) {
deleteSegments(task, context, statusFromRunner.getSegmentsNuked());
for (final DataSegment segment : statusFromRunner.getSegmentsNuked()) {
verifyDataSourceAndInterval(task, context, segment);
// Verify version (must be less than our context version)
if (segment.getVersion().compareTo(context.getVersion()) >= 0) {
throw new IllegalStateException(
String.format(
"Segment-to-nuke for task[%s] has invalid version: %s",
task.getId(),
segment.getIdentifier()
)
);
}
}
mergerDBCoordinator.commitTaskStatus(statusFromRunner);
}
catch (Exception e) {
log.error(e, "Exception while publishing segments for task: %s", task);
throw Throwables.propagate(e);
@ -211,11 +235,18 @@ public class TaskConsumer implements Runnable
segmentBytes += segment.getSize();
}
int segmentNukedBytes = 0;
for (DataSegment segment : statusFromRunner.getSegmentsNuked()) {
segmentNukedBytes += segment.getSize();
}
builder.setUser3(statusFromRunner.getStatusCode().toString());
emitter.emit(builder.build("indexer/time/run/millis", statusFromRunner.getDuration()));
emitter.emit(builder.build("indexer/segment/count", statusFromRunner.getSegments().size()));
emitter.emit(builder.build("indexer/segment/bytes", segmentBytes));
emitter.emit(builder.build("indexer/segmentNuked/count", statusFromRunner.getSegmentsNuked().size()));
emitter.emit(builder.build("indexer/segmentNuked/bytes", segmentNukedBytes));
if (statusFromRunner.isFailure()) {
log.makeAlert("Failed to index")

View File

@ -10,11 +10,15 @@ import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.task.AbstractTask;
import com.metamx.druid.merger.coordinator.exec.TaskConsumer;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import junit.framework.Assert;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
@ -24,23 +28,41 @@ import java.util.concurrent.Executors;
public class TaskConsumerTest
{
@Test
public void testSimple()
private TaskStorage ts = null;
private TaskQueue tq = null;
private TaskRunner tr = null;
private MockMergerDBCoordinator mdc = null;
private TaskConsumer tc = null;
@Before
public void setUp()
{
final TaskStorage ts = new LocalTaskStorage();
final TaskQueue tq = new TaskQueue(ts);
final TaskRunner tr = new LocalTaskRunner(
EmittingLogger.registerEmitter(EasyMock.createMock(ServiceEmitter.class));
ts = new LocalTaskStorage();
tq = new TaskQueue(ts);
tr = new LocalTaskRunner(
new TaskToolbox(null, null, null, null, null, null),
Executors.newSingleThreadExecutor()
);
final MockMergerDBCoordinator mdc = newMockMDC();
final TaskConsumer tc = new TaskConsumer(tq, tr, mdc, newMockEmitter());
mdc = newMockMDC();
tc = new TaskConsumer(tq, tr, mdc, newMockEmitter());
tq.start();
tc.start();
}
try {
@After
public void tearDown()
{
tc.stop();
tq.stop();
}
@Test
public void testSimple() throws Exception
{
tq.add(
new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@ -73,20 +95,56 @@ public class TaskConsumerTest
}
final TaskStatus status = ts.getStatus("id1").get();
Assert.assertTrue("nextTasks", status.getNextTasks().isEmpty());
Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode());
Assert.assertEquals("nextTasks.size", 0, status.getNextTasks().size());
Assert.assertEquals("segments.size", 1, status.getSegments().size());
Assert.assertEquals("segmentsNuked.size", 0, status.getSegmentsNuked().size());
Assert.assertEquals("segments published", status.getSegments(), mdc.getPublished());
Assert.assertEquals("segments nuked", status.getSegmentsNuked(), mdc.getNuked());
}
catch (Exception e) {
throw Throwables.propagate(e);
@Test
public void testBadVersion() throws Exception
{
tq.add(
new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public Type getType()
{
return Type.TEST;
}
finally {
tc.stop();
tq.stop();
@Override
public TaskStatus run(
TaskContext context, TaskToolbox toolbox, TaskCallback callback
) throws Exception
{
return TaskStatus.success(getId()).withSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource("ds")
.interval(new Interval("2012-01-01/P1D"))
.version(context.getVersion() + "1!!!1!!")
.build()
)
);
}
}
);
while (ts.getStatus("id1").get().isRunnable()) {
Thread.sleep(100);
}
final TaskStatus status = ts.getStatus("id1").get();
Assert.assertEquals("statusCode", TaskStatus.Status.FAILED, status.getStatusCode());
Assert.assertEquals("nextTasks.size", 0, status.getNextTasks().size());
Assert.assertEquals("segments.size", 0, status.getSegments().size());
Assert.assertEquals("segmentsNuked.size", 0, status.getSegmentsNuked().size());
Assert.assertEquals("segments published", status.getSegments(), mdc.getPublished());
Assert.assertEquals("segments nuked", status.getSegmentsNuked(), mdc.getNuked());
}
private static class MockMergerDBCoordinator extends MergerDBCoordinator
{
@ -111,7 +169,21 @@ public class TaskConsumerTest
}
@Override
public void announceHistoricalSegment(DataSegment segment) throws Exception
public void commitTaskStatus(TaskStatus taskStatus)
{
for(final DataSegment segment : taskStatus.getSegments())
{
announceHistoricalSegment(segment);
}
for(final DataSegment segment : taskStatus.getSegmentsNuked())
{
deleteSegment(segment);
}
}
@Override
public void announceHistoricalSegment(DataSegment segment)
{
published.add(segment);
}