diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java index 6f72bff2da2..c485d38aa87 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java @@ -28,6 +28,7 @@ import com.metamx.druid.TimelineObjectHolder; import com.metamx.druid.VersionedIntervalTimeline; import com.metamx.druid.client.DataSegment; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.merger.common.TaskStatus; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; @@ -37,6 +38,8 @@ import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.ResultIterator; import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.TransactionCallback; +import org.skife.jdbi.v2.TransactionStatus; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.annotation.Nullable; @@ -51,8 +54,6 @@ public class MergerDBCoordinator { private static final Logger log = new Logger(MergerDBCoordinator.class); - private final Object lock = new Object(); - private final ObjectMapper jsonMapper; private final DbConnectorConfig dbConnectorConfig; private final DBI dbi; @@ -68,132 +69,153 @@ public class MergerDBCoordinator this.dbi = dbi; } - public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException + public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) + throws IOException { - synchronized (lock) { + // XXX Could be reading from a cache if we can assume we're the only one editing the DB - // XXX Could be reading from a cache if we can assume we're the only one editing the DB - - final VersionedIntervalTimeline timeline = dbi.withHandle( - new HandleCallback>() + final VersionedIntervalTimeline timeline = dbi.withHandle( + new HandleCallback>() + { + @Override + public VersionedIntervalTimeline withHandle(Handle handle) throws Exception { - @Override - public VersionedIntervalTimeline withHandle(Handle handle) throws Exception - { - final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( - Ordering.natural() + final VersionedIntervalTimeline timeline = new VersionedIntervalTimeline( + Ordering.natural() + ); + + final ResultIterator> dbSegments = + handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource", + dbConnectorConfig.getSegmentTable() + ) + ) + .bind("dataSource", dataSource) + .iterator(); + + while (dbSegments.hasNext()) { + + final Map dbSegment = dbSegments.next(); + + DataSegment segment = jsonMapper.readValue( + (String) dbSegment.get("payload"), + DataSegment.class ); - final ResultIterator> dbSegments = - handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE used = 1 AND dataSource = :dataSource", - dbConnectorConfig.getSegmentTable() - ) - ) - .bind("dataSource", dataSource) - .iterator(); - - while (dbSegments.hasNext()) { - - final Map dbSegment = dbSegments.next(); - - DataSegment segment = jsonMapper.readValue( - (String) dbSegment.get("payload"), - DataSegment.class - ); - - timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); - - } - - dbSegments.close(); - - return timeline; + timeline.add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); } - } - ); - final List segments = Lists.transform( - timeline.lookup(interval), - new Function, DataSegment>() + dbSegments.close(); + + return timeline; + + } + } + ); + + final List segments = Lists.transform( + timeline.lookup(interval), + new Function, DataSegment>() + { + @Override + public DataSegment apply(TimelineObjectHolder input) + { + return input.getObject().getChunk(0).getObject(); + } + } + ); + + return segments; + } + + public void commitTaskStatus(final TaskStatus taskStatus) + { + try { + dbi.inTransaction( + new TransactionCallback() { @Override - public DataSegment apply(@Nullable TimelineObjectHolder input) + public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception { - return input.getObject().getChunk(0).getObject(); + for(final DataSegment segment : taskStatus.getSegments()) + { + log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), taskStatus.getId()); + announceHistoricalSegment(handle, segment); + } + + for(final DataSegment segment : taskStatus.getSegmentsNuked()) + { + log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), taskStatus.getId()); + deleteSegment(handle, segment); + } + + return null; } } ); - - return segments; - + } + catch (Exception e) { + throw new RuntimeException(String.format("Exception commit task to DB: %s", taskStatus.getId()), e); } } public void announceHistoricalSegment(final DataSegment segment) throws Exception { - synchronized (lock) { - try { - List> exists = dbi.withHandle( - new HandleCallback>>() - { - @Override - public List> withHandle(Handle handle) throws Exception - { - return handle.createQuery( - String.format( - "SELECT id FROM %s WHERE id = ':identifier'", - dbConnectorConfig.getSegmentTable() - ) - ).bind( - "identifier", - segment.getIdentifier() - ).list(); - } - } - ); - - if (!exists.isEmpty()) { - log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); - return; + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + announceHistoricalSegment(handle, segment); + return null; + } } + ); + } - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - String.format( - "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", - dbConnectorConfig.getSegmentTable() - ) - ) - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", new DateTime().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", segment.getShardSpec().getPartitionNum()) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", jsonMapper.writeValueAsString(segment)) - .execute(); + private void announceHistoricalSegment(final Handle handle, final DataSegment segment) throws Exception + { + try { + final List> exists = handle.createQuery( + String.format( + "SELECT id FROM %s WHERE id = ':identifier'", + dbConnectorConfig.getSegmentTable() + ) + ).bind( + "identifier", + segment.getIdentifier() + ).list(); - return null; - } - } - ); - - log.info("Published segment [%s] to DB", segment.getIdentifier()); - } - catch (Exception e) { - log.error(e, "Exception inserting into DB"); - throw new RuntimeException(e); + if (!exists.isEmpty()) { + log.info("Found [%s] in DB, not updating DB", segment.getIdentifier()); + return; } + + handle.createStatement( + String.format( + "INSERT INTO %s (id, dataSource, created_date, start, end, partitioned, version, used, payload) VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)", + dbConnectorConfig.getSegmentTable() + ) + ) + .bind("id", segment.getIdentifier()) + .bind("dataSource", segment.getDataSource()) + .bind("created_date", new DateTime().toString()) + .bind("start", segment.getInterval().getStart().toString()) + .bind("end", segment.getInterval().getEnd().toString()) + .bind("partitioned", segment.getShardSpec().getPartitionNum()) + .bind("version", segment.getVersion()) + .bind("used", true) + .bind("payload", jsonMapper.writeValueAsString(segment)) + .execute(); + + log.info("Published segment [%s] to DB", segment.getIdentifier()); + } + catch (Exception e) { + log.error(e, "Exception inserting into DB"); + throw e; } } @@ -205,17 +227,21 @@ public class MergerDBCoordinator @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable()) - ).bind("id", segment.getIdentifier()) - .execute(); - + deleteSegment(handle, segment); return null; } } ); } + private void deleteSegment(final Handle handle, final DataSegment segment) + { + handle.createStatement( + String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable()) + ).bind("id", segment.getIdentifier()) + .execute(); + } + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) { List matchingSegments = dbi.withHandle( diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java index 74c1b4fb46e..2a21d67ef35 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java @@ -187,14 +187,38 @@ public class TaskConsumer implements Runnable public void run() { try { - if (statusFromRunner.getSegments().size() > 0) { - // TODO -- Publish in transaction - publishSegments(task, context, statusFromRunner.getSegments()); + // Validate status + for (final DataSegment segment : statusFromRunner.getSegments()) { + verifyDataSourceAndInterval(task, context, segment); + + // Verify version (must be equal to our context version) + if (!context.getVersion().equals(segment.getVersion())) { + throw new IllegalStateException( + String.format( + "Segment for task[%s] has invalid version: %s", + task.getId(), + segment.getIdentifier() + ) + ); + } } - if (statusFromRunner.getSegmentsNuked().size() > 0) { - deleteSegments(task, context, statusFromRunner.getSegmentsNuked()); + for (final DataSegment segment : statusFromRunner.getSegmentsNuked()) { + verifyDataSourceAndInterval(task, context, segment); + + // Verify version (must be less than our context version) + if (segment.getVersion().compareTo(context.getVersion()) >= 0) { + throw new IllegalStateException( + String.format( + "Segment-to-nuke for task[%s] has invalid version: %s", + task.getId(), + segment.getIdentifier() + ) + ); + } } + + mergerDBCoordinator.commitTaskStatus(statusFromRunner); } catch (Exception e) { log.error(e, "Exception while publishing segments for task: %s", task); @@ -211,11 +235,18 @@ public class TaskConsumer implements Runnable segmentBytes += segment.getSize(); } + int segmentNukedBytes = 0; + for (DataSegment segment : statusFromRunner.getSegmentsNuked()) { + segmentNukedBytes += segment.getSize(); + } + builder.setUser3(statusFromRunner.getStatusCode().toString()); emitter.emit(builder.build("indexer/time/run/millis", statusFromRunner.getDuration())); emitter.emit(builder.build("indexer/segment/count", statusFromRunner.getSegments().size())); emitter.emit(builder.build("indexer/segment/bytes", segmentBytes)); + emitter.emit(builder.build("indexer/segmentNuked/count", statusFromRunner.getSegmentsNuked().size())); + emitter.emit(builder.build("indexer/segmentNuked/bytes", segmentNukedBytes)); if (statusFromRunner.isFailure()) { log.makeAlert("Failed to index") diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java index c811d007b57..6fdd8725da9 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java @@ -10,11 +10,15 @@ import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.task.AbstractTask; import com.metamx.druid.merger.coordinator.exec.TaskConsumer; +import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Event; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEventBuilder; import junit.framework.Assert; +import org.easymock.EasyMock; import org.joda.time.Interval; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import java.io.IOException; @@ -24,68 +28,122 @@ import java.util.concurrent.Executors; public class TaskConsumerTest { - @Test - public void testSimple() + private TaskStorage ts = null; + private TaskQueue tq = null; + private TaskRunner tr = null; + private MockMergerDBCoordinator mdc = null; + private TaskConsumer tc = null; + + @Before + public void setUp() { - final TaskStorage ts = new LocalTaskStorage(); - final TaskQueue tq = new TaskQueue(ts); - final TaskRunner tr = new LocalTaskRunner( + EmittingLogger.registerEmitter(EasyMock.createMock(ServiceEmitter.class)); + + ts = new LocalTaskStorage(); + tq = new TaskQueue(ts); + tr = new LocalTaskRunner( new TaskToolbox(null, null, null, null, null, null), Executors.newSingleThreadExecutor() ); - final MockMergerDBCoordinator mdc = newMockMDC(); - final TaskConsumer tc = new TaskConsumer(tq, tr, mdc, newMockEmitter()); + mdc = newMockMDC(); + tc = new TaskConsumer(tq, tr, mdc, newMockEmitter()); tq.start(); tc.start(); + } - try { - tq.add( - new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + @After + public void tearDown() + { + tc.stop(); + tq.stop(); + } + + @Test + public void testSimple() throws Exception + { + tq.add( + new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + { + @Override + public Type getType() { - @Override - public Type getType() - { - return Type.TEST; - } - - @Override - public TaskStatus run( - TaskContext context, TaskToolbox toolbox, TaskCallback callback - ) throws Exception - { - return TaskStatus.success(getId()).withSegments( - ImmutableSet.of( - DataSegment.builder() - .dataSource("ds") - .interval(new Interval("2012-01-01/P1D")) - .version(context.getVersion()) - .build() - ) - ); - } + return Type.TEST; } - ); - while (ts.getStatus("id1").get().isRunnable()) { - Thread.sleep(100); - } + @Override + public TaskStatus run( + TaskContext context, TaskToolbox toolbox, TaskCallback callback + ) throws Exception + { + return TaskStatus.success(getId()).withSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource("ds") + .interval(new Interval("2012-01-01/P1D")) + .version(context.getVersion()) + .build() + ) + ); + } + } + ); - final TaskStatus status = ts.getStatus("id1").get(); - Assert.assertTrue("nextTasks", status.getNextTasks().isEmpty()); - Assert.assertEquals("segments.size", 1, status.getSegments().size()); - Assert.assertEquals("segmentsNuked.size", 0, status.getSegmentsNuked().size()); - Assert.assertEquals("segments published", status.getSegments(), mdc.getPublished()); - Assert.assertEquals("segments nuked", status.getSegmentsNuked(), mdc.getNuked()); + while (ts.getStatus("id1").get().isRunnable()) { + Thread.sleep(100); } - catch (Exception e) { - throw Throwables.propagate(e); - } - finally { - tc.stop(); - tq.stop(); + + final TaskStatus status = ts.getStatus("id1").get(); + Assert.assertEquals("statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); + Assert.assertEquals("nextTasks.size", 0, status.getNextTasks().size()); + Assert.assertEquals("segments.size", 1, status.getSegments().size()); + Assert.assertEquals("segmentsNuked.size", 0, status.getSegmentsNuked().size()); + Assert.assertEquals("segments published", status.getSegments(), mdc.getPublished()); + Assert.assertEquals("segments nuked", status.getSegmentsNuked(), mdc.getNuked()); + } + + @Test + public void testBadVersion() throws Exception + { + tq.add( + new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + { + @Override + public Type getType() + { + return Type.TEST; + } + + @Override + public TaskStatus run( + TaskContext context, TaskToolbox toolbox, TaskCallback callback + ) throws Exception + { + return TaskStatus.success(getId()).withSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource("ds") + .interval(new Interval("2012-01-01/P1D")) + .version(context.getVersion() + "1!!!1!!") + .build() + ) + ); + } + } + ); + + while (ts.getStatus("id1").get().isRunnable()) { + Thread.sleep(100); } + + final TaskStatus status = ts.getStatus("id1").get(); + Assert.assertEquals("statusCode", TaskStatus.Status.FAILED, status.getStatusCode()); + Assert.assertEquals("nextTasks.size", 0, status.getNextTasks().size()); + Assert.assertEquals("segments.size", 0, status.getSegments().size()); + Assert.assertEquals("segmentsNuked.size", 0, status.getSegmentsNuked().size()); + Assert.assertEquals("segments published", status.getSegments(), mdc.getPublished()); + Assert.assertEquals("segments nuked", status.getSegmentsNuked(), mdc.getNuked()); } private static class MockMergerDBCoordinator extends MergerDBCoordinator @@ -111,7 +169,21 @@ public class TaskConsumerTest } @Override - public void announceHistoricalSegment(DataSegment segment) throws Exception + public void commitTaskStatus(TaskStatus taskStatus) + { + for(final DataSegment segment : taskStatus.getSegments()) + { + announceHistoricalSegment(segment); + } + + for(final DataSegment segment : taskStatus.getSegmentsNuked()) + { + deleteSegment(segment); + } + } + + @Override + public void announceHistoricalSegment(DataSegment segment) { published.add(segment); }