From 74057600f9d0fd7db69264815d44f903848471c4 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Sun, 27 Jan 2013 09:57:37 -0800 Subject: [PATCH] fix worker node still using db connection --- .../druid/merger/common/task/KillTask.java | 8 +- .../coordinator/MergerDBCoordinator.java | 59 ++++++++++++- .../merger/coordinator/RemoteTaskRunner.java | 12 +-- .../druid/merger/coordinator/TaskContext.java | 11 ++- .../merger/coordinator/exec/TaskConsumer.java | 27 ++++-- .../http/IndexerCoordinatorNode.java | 7 +- .../druid/merger/worker/http/WorkerNode.java | 16 +--- .../coordinator/RemoteTaskRunnerTest.java | 25 ++++-- .../merger/coordinator/TaskConsumerTest.java | 56 +++++++------ .../merger/coordinator/TaskQueueTest.java | 2 +- .../metamx/druid/loading/S3SegmentKiller.java | 83 +------------------ .../metamx/druid/loading/SegmentKiller.java | 4 +- 12 files changed, 159 insertions(+), 151 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index 8c636c27a55..eb5992925f0 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -51,11 +51,9 @@ public class KillTask extends AbstractTask public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { // Kill segments - Set segmentsToKill = ImmutableSet.copyOf( - toolbox.getSegmentKiller() - .kill(getDataSource(), getInterval()) - ); + toolbox.getSegmentKiller() + .kill(context.getUnusedSegments()); - return TaskStatus.success(getId()).withSegmentsNuked(segmentsToKill); + return TaskStatus.success(getId()).withSegmentsNuked(context.getUnusedSegments()); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java index 828404d3961..6f72bff2da2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Function; +import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.metamx.common.logger.Logger; @@ -31,12 +32,16 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.StatementContext; import org.skife.jdbi.v2.tweak.HandleCallback; import javax.annotation.Nullable; import java.io.IOException; +import java.sql.SQLException; import java.util.List; import java.util.Map; @@ -63,7 +68,7 @@ public class MergerDBCoordinator this.dbi = dbi; } - public List getSegmentsForInterval(final String dataSource, final Interval interval) throws IOException + public List getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException { synchronized (lock) { @@ -209,6 +214,58 @@ public class MergerDBCoordinator } } ); + } + public List getUnusedSegmentsForInterval(final String dataSource, final Interval interval) + { + List matchingSegments = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + dbConnectorConfig.getSegmentTable() + ) + ) + .bind("dataSource", dataSource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public List fold( + List accumulator, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + DataSegment segment = jsonMapper.readValue( + (String) stringObjectMap.get("payload"), + DataSegment.class + ); + + accumulator.add(segment); + + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval); + return matchingSegments; } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index a36bdb5a852..2e53c699e58 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -463,13 +463,13 @@ public class RemoteTaskRunner implements TaskRunner if (callback != null) { callback.notify(taskStatus); } - } - if (taskStatus.isComplete()) { - // Worker is done with this task - workerWrapper.setLastCompletedTaskTime(new DateTime()); - tasks.remove(taskId); - cf.delete().guaranteed().inBackground().forPath(event.getData().getPath()); + if (taskStatus.isComplete()) { + // Worker is done with this task + workerWrapper.setLastCompletedTaskTime(new DateTime()); + tasks.remove(taskId); + cf.delete().guaranteed().inBackground().forPath(event.getData().getPath()); + } } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskContext.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskContext.java index fcbb2450cbc..1d279fe0511 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskContext.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskContext.java @@ -32,15 +32,18 @@ public class TaskContext { final String version; final Set currentSegments; + final Set unusedSegments; @JsonCreator public TaskContext( @JsonProperty("version") String version, - @JsonProperty("currentSegments") Set currentSegments + @JsonProperty("currentSegments") Set currentSegments, + @JsonProperty("unusedSegments") Set unusedSegments ) { this.version = version; this.currentSegments = currentSegments; + this.unusedSegments = unusedSegments; } @JsonProperty @@ -54,4 +57,10 @@ public class TaskContext { return currentSegments; } + + @JsonProperty + public Set getUnusedSegments() + { + return unusedSegments; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java index 8996ad8c0f7..74c1b4fb46e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java @@ -109,7 +109,7 @@ public class TaskConsumer implements Runnable .emit(); // Retry would be nice, but only after we have a way to throttle and limit them. Just fail for now. - if(!shutdown) { + if (!shutdown) { queue.notify(task, TaskStatus.failure(task.getId())); } } @@ -127,7 +127,13 @@ public class TaskConsumer implements Runnable final TaskContext context = new TaskContext( version, ImmutableSet.copyOf( - mergerDBCoordinator.getSegmentsForInterval( + mergerDBCoordinator.getUsedSegmentsForInterval( + task.getDataSource(), + task.getInterval() + ) + ), + ImmutableSet.copyOf( + mergerDBCoordinator.getUnusedSegmentsForInterval( task.getDataSource(), task.getInterval() ) @@ -169,23 +175,24 @@ public class TaskConsumer implements Runnable // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after // we check and before we commit the database transaction, but better than nothing. - if(shutdown) { + if (shutdown) { log.info("Abandoning task due to shutdown: %s", task.getId()); return; } - queue.notify(task, statusFromRunner, new Runnable() + queue.notify( + task, statusFromRunner, new Runnable() { @Override public void run() { try { - if(statusFromRunner.getSegments().size() > 0) { + if (statusFromRunner.getSegments().size() > 0) { // TODO -- Publish in transaction publishSegments(task, context, statusFromRunner.getSegments()); } - if(statusFromRunner.getSegmentsNuked().size() > 0) { + if (statusFromRunner.getSegmentsNuked().size() > 0) { deleteSegments(task, context, statusFromRunner.getSegmentsNuked()); } } @@ -194,10 +201,11 @@ public class TaskConsumer implements Runnable throw Throwables.propagate(e); } } - }); + } + ); // Emit event and log, if the task is done - if(statusFromRunner.isComplete()) { + if (statusFromRunner.isComplete()) { int segmentBytes = 0; for (DataSegment segment : statusFromRunner.getSegments()) { segmentBytes += segment.getSize(); @@ -226,7 +234,8 @@ public class TaskConsumer implements Runnable statusFromRunner.getDuration() ); } - } catch(Exception e) { + } + catch (Exception e) { log.makeAlert(e, "Failed to handle task callback") .addData("task", task.getId()) .addData("statusCode", statusFromRunner.getStatusCode()) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index fb8b161d46b..3d03d7e10b8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -424,10 +424,7 @@ public class IndexerCoordinatorNode extends RegisteringNode jsonMapper ); final SegmentKiller segmentKiller = new S3SegmentKiller( - s3Client, - dbi, - dbConnectorConfig, - jsonMapper + s3Client ); taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } @@ -487,8 +484,6 @@ public class IndexerCoordinatorNode extends RegisteringNode public void initializeWorkerSetupManager() { if (workerSetupManager == null) { - final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); - final DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class); DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable()); diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 050069003a7..2ee0a1ecaff 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -29,15 +29,16 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; -import com.metamx.druid.db.DbConnector; -import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.S3SegmentKiller; +import com.metamx.druid.loading.S3SegmentPusher; +import com.metamx.druid.loading.S3SegmentPusherConfig; import com.metamx.druid.loading.SegmentKiller; +import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -46,9 +47,6 @@ import com.metamx.druid.merger.worker.TaskMonitor; import com.metamx.druid.merger.worker.Worker; import com.metamx.druid.merger.worker.WorkerCuratorCoordinator; import com.metamx.druid.merger.worker.config.WorkerConfig; -import com.metamx.druid.loading.S3SegmentPusher; -import com.metamx.druid.loading.S3SegmentPusherConfig; -import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -73,7 +71,6 @@ import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; -import org.skife.jdbi.v2.DBI; import java.io.IOException; import java.util.Arrays; @@ -298,13 +295,8 @@ public class WorkerNode extends RegisteringNode configFactory.build(S3SegmentPusherConfig.class), jsonMapper ); - final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); - DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); final SegmentKiller segmentKiller = new S3SegmentKiller( - s3Client, - dbi, - dbConnectorConfig, - jsonMapper + s3Client ); taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index 4f4bc9fdfed..b8620eb42df 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -1,7 +1,6 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.common.ISE; @@ -135,7 +134,7 @@ public class RemoteTaskRunnerTest { remoteTaskRunner.run( task1, - new TaskContext(new DateTime().toString(), Sets.newHashSet()), + new TaskContext(new DateTime().toString(), Sets.newHashSet(), Sets.newHashSet()), null ); } @@ -143,9 +142,25 @@ public class RemoteTaskRunnerTest @Test public void testAlreadyExecutedTask() throws Exception { - remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + remoteTaskRunner.run( + task1, + new TaskContext( + new DateTime().toString(), + Sets.newHashSet(), + Sets.newHashSet() + ), + null + ); try { - remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.newHashSet()), null); + remoteTaskRunner.run( + task1, + new TaskContext( + new DateTime().toString(), + Sets.newHashSet(), + Sets.newHashSet() + ), + null + ); fail("ISE expected"); } catch (ISE expected) { @@ -175,7 +190,7 @@ public class RemoteTaskRunnerTest ) ), Lists.newArrayList() ), - new TaskContext(new DateTime().toString(), Sets.newHashSet()), + new TaskContext(new DateTime().toString(), Sets.newHashSet(), Sets.newHashSet()), null ); } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java index 4d2d5947d75..c811d007b57 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskConsumerTest.java @@ -41,30 +41,32 @@ public class TaskConsumerTest tc.start(); try { - tq.add(new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) - { - @Override - public Type getType() - { - return Type.TEST; - } + tq.add( + new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D")) + { + @Override + public Type getType() + { + return Type.TEST; + } - @Override - public TaskStatus run( - TaskContext context, TaskToolbox toolbox, TaskCallback callback - ) throws Exception - { - return TaskStatus.success(getId()).withSegments( - ImmutableSet.of( - DataSegment.builder() - .dataSource("ds") - .interval(new Interval("2012-01-01/P1D")) - .version(context.getVersion()) - .build() - ) - ); - } - }); + @Override + public TaskStatus run( + TaskContext context, TaskToolbox toolbox, TaskCallback callback + ) throws Exception + { + return TaskStatus.success(getId()).withSegments( + ImmutableSet.of( + DataSegment.builder() + .dataSource("ds") + .interval(new Interval("2012-01-01/P1D")) + .version(context.getVersion()) + .build() + ) + ); + } + } + ); while (ts.getStatus("id1").get().isRunnable()) { Thread.sleep(100); @@ -97,7 +99,13 @@ public class TaskConsumerTest } @Override - public List getSegmentsForInterval(String dataSource, Interval interval) throws IOException + public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException + { + return ImmutableList.of(); + } + + @Override + public List getUnusedSegmentsForInterval(String dataSource, Interval interval) { return ImmutableList.of(); } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java index 8c196001125..0755a15a3a8 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java @@ -408,7 +408,7 @@ public class TaskQueueTest } }; - callback.notify(vt.getTask().run(new TaskContext(vt.getVersion(), null), null, callback)); + callback.notify(vt.getTask().run(new TaskContext(vt.getVersion(), null, null), null, callback)); // OK, finally ready to test stuff. Assert.assertTrue("pass1", structThingy.pass1); diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java index add1d45824e..46f6acfc629 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java @@ -1,26 +1,13 @@ package com.metamx.druid.loading; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.inject.Inject; -import com.metamx.common.ISE; import com.metamx.common.MapUtils; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; -import com.metamx.druid.db.DbConnectorConfig; -import org.codehaus.jackson.map.ObjectMapper; import org.jets3t.service.ServiceException; import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.joda.time.Interval; -import org.skife.jdbi.v2.DBI; -import org.skife.jdbi.v2.FoldController; -import org.skife.jdbi.v2.Folder3; -import org.skife.jdbi.v2.Handle; -import org.skife.jdbi.v2.StatementContext; -import org.skife.jdbi.v2.tweak.HandleCallback; -import java.sql.SQLException; -import java.util.List; +import java.util.Collection; import java.util.Map; /** @@ -30,80 +17,20 @@ public class S3SegmentKiller implements SegmentKiller private static final Logger log = new Logger(S3SegmentKiller.class); private final RestS3Service s3Client; - private final DBI dbi; - private final DbConnectorConfig config; - private final ObjectMapper jsonMapper; @Inject public S3SegmentKiller( - RestS3Service s3Client, - DBI dbi, - DbConnectorConfig config, - ObjectMapper jsonMapper + RestS3Service s3Client ) { this.s3Client = s3Client; - this.dbi = dbi; - this.config = config; - this.jsonMapper = jsonMapper; } @Override - public List kill(final String datasource, final Interval interval) throws ServiceException + public void kill(Collection segments) throws ServiceException { - // TODO -- Awkward for workers to use the DB! - - List matchingSegments = dbi.withHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) throws Exception - { - return handle.createQuery( - String.format( - "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", - config.getSegmentTable() - ) - ) - .bind("dataSource", datasource) - .bind("start", interval.getStart().toString()) - .bind("end", interval.getEnd().toString()) - .fold( - Lists.newArrayList(), - new Folder3, Map>() - { - @Override - public List fold( - List accumulator, - Map stringObjectMap, - FoldController foldController, - StatementContext statementContext - ) throws SQLException - { - try { - DataSegment segment = jsonMapper.readValue( - (String) stringObjectMap.get("payload"), - DataSegment.class - ); - - accumulator.add(segment); - - return accumulator; - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - ); - } - } - ); - - log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval); - for (final DataSegment segment : matchingSegments) { - // Remove from S3 + for (final DataSegment segment : segments) { Map loadSpec = segment.getLoadSpec(); String s3Bucket = MapUtils.getString(loadSpec, "bucket"); String s3Path = MapUtils.getString(loadSpec, "key"); @@ -118,7 +45,5 @@ public class S3SegmentKiller implements SegmentKiller s3Client.deleteObject(s3Bucket, s3DescriptorPath); } } - - return matchingSegments; } } diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java index 56a14d2e933..8f8746d5324 100644 --- a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java +++ b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java @@ -2,13 +2,13 @@ package com.metamx.druid.loading; import com.metamx.druid.client.DataSegment; import org.jets3t.service.ServiceException; -import org.joda.time.Interval; +import java.util.Collection; import java.util.List; /** */ public interface SegmentKiller { - public List kill(String datasource, Interval interval) throws ServiceException; + public void kill(Collection segments) throws ServiceException; }