From 21613bc73be1cc7bf1b3564e9f1a3bc15152cde9 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 16 Jan 2013 17:31:01 -0800 Subject: [PATCH 1/3] initial commit to hard delete segments --- .../metamx/druid/merge/ClientKillQuery.java | 41 +++++ .../druid/merger/common/TaskToolbox.java | 9 ++ .../druid/merger/common/task/KillTask.java | 52 +++++++ .../metamx/druid/merger/common/task/Task.java | 4 +- .../http/IndexerCoordinatorNode.java | 27 +++- .../druid/merger/worker/http/WorkerNode.java | 15 +- .../coordinator/RemoteTaskRunnerTest.java | 2 +- .../druid/db/DatabaseSegmentManager.java | 40 +++++ .../com/metamx/druid/http/InfoResource.java | 6 +- .../com/metamx/druid/http/MasterMain.java | 4 +- .../com/metamx/druid/http/MasterResource.java | 11 +- .../metamx/druid/loading/S3SegmentKiller.java | 144 ++++++++++++++++++ .../metamx/druid/loading/SegmentKiller.java | 14 ++ .../com/metamx/druid/master/DruidMaster.java | 47 +++++- .../src/main/resources/static/css/enable.css | 3 + server/src/main/resources/static/enable.html | 75 +++++++++ server/src/main/resources/static/index.html | 25 ++- .../main/resources/static/js/enable-0.0.1.js | 97 ++++++++++++ .../main/resources/static/js/kill-0.0.1.js | 58 +++++++ server/src/main/resources/static/kill.html | 61 ++++++++ 20 files changed, 713 insertions(+), 22 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java create mode 100644 server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java create mode 100644 server/src/main/java/com/metamx/druid/loading/SegmentKiller.java create mode 100644 server/src/main/resources/static/css/enable.css create mode 100644 server/src/main/resources/static/enable.html create mode 100644 server/src/main/resources/static/js/enable-0.0.1.js create mode 100644 server/src/main/resources/static/js/kill-0.0.1.js create mode 100644 server/src/main/resources/static/kill.html diff --git a/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java new file mode 100644 index 00000000000..ae39aeb512b --- /dev/null +++ b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java @@ -0,0 +1,41 @@ +package com.metamx.druid.merge; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.joda.time.Interval; + +/** + */ +public class ClientKillQuery +{ + private final String dataSource; + private final Interval interval; + + @JsonCreator + public ClientKillQuery( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + this.dataSource = dataSource; + this.interval = interval; + } + + @JsonProperty + public String getType() + { + return "kill"; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 74a546cf696..26336b45c25 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.metamx.druid.loading.S3SegmentPuller; import com.metamx.druid.loading.S3SegmentGetterConfig; import com.metamx.druid.loading.S3ZippedSegmentPuller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.loading.SegmentPuller; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; @@ -43,6 +44,7 @@ public class TaskToolbox private final ServiceEmitter emitter; private final RestS3Service s3Client; private final SegmentPusher segmentPusher; + private final SegmentKiller segmentKiller; private final ObjectMapper objectMapper; public TaskToolbox( @@ -50,6 +52,7 @@ public class TaskToolbox ServiceEmitter emitter, RestS3Service s3Client, SegmentPusher segmentPusher, + SegmentKiller segmentKiller, ObjectMapper objectMapper ) { @@ -57,6 +60,7 @@ public class TaskToolbox this.emitter = emitter; this.s3Client = s3Client; this.segmentPusher = segmentPusher; + this.segmentKiller = segmentKiller; this.objectMapper = objectMapper; } @@ -80,6 +84,11 @@ public class TaskToolbox return segmentPusher; } + public SegmentKiller getSegmentKiller() + { + return segmentKiller; + } + public ObjectMapper getObjectMapper() { return objectMapper; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java new file mode 100644 index 00000000000..5b2d415eb47 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.common.task; + +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.coordinator.TaskContext; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +/** + */ +public class KillTask extends AbstractTask +{ + private static final Logger log = new Logger(KillTask.class); + + @JsonCreator + public KillTask( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super( + String.format( + "kill_%s_%s_%s_%s", + dataSource, + interval.getStart(), + interval.getEnd(), + new DateTime().toString() + ), + dataSource, + interval + ); + } + + @Override + public Type getType() + { + return Task.Type.KILL; + } + + @Override + public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + { + // Kill segments + toolbox.getSegmentKiller().kill(getDataSource(), getInterval()); + return TaskStatus.success(getId(), Lists.newArrayList()); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index b2059210b58..961afac96ce 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -37,6 +37,7 @@ import org.joda.time.Interval; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "append", value = AppendTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), + @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) @@ -49,7 +50,8 @@ public interface Task MERGE, APPEND, DELETE, - TEST + TEST, + KILL } public String getId(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index bc48c7c71de..d77208c09e2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -47,6 +47,8 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentKiller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.net.URL; import java.util.Arrays; @@ -125,6 +128,8 @@ public class IndexerCoordinatorNode extends RegisteringNode private List monitors = null; private ServiceEmitter emitter = null; + private DbConnectorConfig dbConnectorConfig = null; + private DBI dbi = null; private IndexerCoordinatorConfig config = null; private TaskToolbox taskToolbox = null; private MergerDBCoordinator mergerDBCoordinator = null; @@ -193,6 +198,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeEmitter(); initializeMonitors(); + initializeDB(); initializeIndexerCoordinatorConfig(); initializeMergeDBCoordinator(); initializeTaskToolbox(); @@ -370,6 +376,16 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + private void initializeDB() + { + if (dbConnectorConfig == null) { + dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + } + if (dbi == null) { + dbi = new DbConnector(dbConnectorConfig).getDBI(); + } + } + private void initializeIndexerCoordinatorConfig() { if (config == null) { @@ -391,18 +407,23 @@ public class IndexerCoordinatorNode extends RegisteringNode configFactory.build(S3SegmentPusherConfig.class), jsonMapper ); - taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, jsonMapper); + final SegmentKiller segmentKiller = new S3SegmentKiller( + s3Client, + dbi, + dbConnectorConfig, + jsonMapper + ); + taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } public void initializeMergeDBCoordinator() { if (mergerDBCoordinator == null) { - final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); mergerDBCoordinator = new MergerDBCoordinator( jsonMapper, dbConnectorConfig, - new DbConnector(dbConnectorConfig).getDBI() + dbi ); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 0799a8de37c..65400f209d9 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -29,11 +29,15 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.db.DbConnector; +import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentKiller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -69,6 +73,7 @@ import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.io.IOException; import java.util.Arrays; @@ -293,7 +298,15 @@ public class WorkerNode extends RegisteringNode configFactory.build(S3SegmentPusherConfig.class), jsonMapper ); - taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, jsonMapper); + final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); + final SegmentKiller segmentKiller = new S3SegmentKiller( + s3Client, + dbi, + dbConnectorConfig, + jsonMapper + ); + taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index eb10731abd9..e4996965fbd 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -322,7 +322,7 @@ public class RemoteTaskRunnerTest { return null; } - }, null, null, null, jsonMapper + }, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() ); diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index ef620cd1e38..e49728baea4 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.metamx.common.MapUtils; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -45,6 +46,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import javax.annotation.Nullable; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -343,6 +345,44 @@ public class DatabaseSegmentManager return dataSources.get().values(); } + public Collection getAllDatasourceNames() + { + synchronized (lock) { + return dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable()) + ) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList druidDataSources, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + druidDataSources.add( + MapUtils.getString(stringObjectMap, "datasource") + ); + return druidDataSources; + } + } + ); + + } + } + ); + } + } + public void poll() { try { diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index 800e3a93e46..090e311ad31 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -548,10 +548,14 @@ public class InfoResource @Path("/db/datasources") @Produces("application/json") public Response getDatabaseDataSources( - @QueryParam("full") String full + @QueryParam("full") String full, + @QueryParam("includeDisabled") String includeDisabled ) { Response.ResponseBuilder builder = Response.status(Response.Status.OK); + if (includeDisabled != null) { + return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build(); + } if (full != null) { return builder.entity(databaseSegmentManager.getInventory()).build(); } diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 51df502fa64..f65e6e2b57c 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -190,7 +190,9 @@ public class MasterMain emitter, scheduledExecutorFactory, new ConcurrentHashMap(), - serviceProvider + serviceProvider, + httpClient, + new ToStringResponseHandler(Charsets.UTF_8) ); lifecycle.addManagedInstance(master); diff --git a/server/src/main/java/com/metamx/druid/http/MasterResource.java b/server/src/main/java/com/metamx/druid/http/MasterResource.java index ca992815eb6..c73e74e528c 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterResource.java +++ b/server/src/main/java/com/metamx/druid/http/MasterResource.java @@ -21,13 +21,13 @@ package com.metamx.druid.http; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.LoadPeonCallback; +import com.metamx.druid.merge.ClientKillQuery; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; -import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Response; import java.util.List; @@ -111,6 +111,15 @@ public class MasterResource return resp; } + @POST + @Path("/kill") + @Consumes("application/json") + public Response killSegments(ClientKillQuery killQuery) + { + master.killSegments(killQuery); + return Response.ok().build(); + } + @GET @Path("/loadstatus") @Produces("application/json") diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java new file mode 100644 index 00000000000..346bb036ff3 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java @@ -0,0 +1,144 @@ +package com.metamx.druid.loading; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.db.DbConnectorConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.joda.time.Interval; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +/** + */ +public class S3SegmentKiller implements SegmentKiller +{ + private static final Logger log = new Logger(S3SegmentKiller.class); + + private final RestS3Service s3Client; + private final DBI dbi; + private final DbConnectorConfig config; + private final ObjectMapper jsonMapper; + + @Inject + public S3SegmentKiller( + RestS3Service s3Client, + DBI dbi, + DbConnectorConfig config, + ObjectMapper jsonMapper + ) + { + this.s3Client = s3Client; + this.dbi = dbi; + this.config = config; + this.jsonMapper = jsonMapper; + } + + + @Override + public List kill(final String datasource, final Interval interval) throws ServiceException + { + List matchingSegments = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + config.getSegmentTable() + ) + ) + .bind("dataSource", datasource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public List fold( + List accumulator, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + DataSegment segment = jsonMapper.readValue( + (String) stringObjectMap.get("payload"), + DataSegment.class + ); + + accumulator.add(segment); + + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval); + for (final DataSegment segment : matchingSegments) { + // Remove database entry + log.info("Removing DB entry for %s", segment.getIdentifier()); + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement( + String.format("DELETE from %s WHERE id = :segmentID", config.getSegmentTable()) + ).bind("segmentID", segment.getIdentifier()) + .execute(); + + return null; + } + } + ); + + // Remove from S3 + + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + + if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) { + throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path); + } + if (!s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3DescriptorPath); + } + + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); + } + + return matchingSegments; + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java new file mode 100644 index 00000000000..56a14d2e933 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java @@ -0,0 +1,14 @@ +package com.metamx.druid.loading; + +import com.metamx.druid.client.DataSegment; +import org.jets3t.service.ServiceException; +import org.joda.time.Interval; + +import java.util.List; + +/** + */ +public interface SegmentKiller +{ + public List kill(String datasource, Interval interval) throws ServiceException; +} diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 80172c432fa..d5d4a8bd978 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -21,7 +21,7 @@ package com.metamx.druid.master; import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -36,7 +36,6 @@ import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; @@ -44,9 +43,12 @@ import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; +import com.metamx.druid.merge.ClientKillQuery; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.phonebook.PhoneBook; import com.metamx.phonebook.PhoneBookPeon; import com.netflix.curator.x.discovery.ServiceProvider; @@ -55,8 +57,8 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.Duration; import javax.annotation.Nullable; +import java.net.URL; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -89,6 +91,8 @@ public class DruidMaster private final PhoneBookPeon masterPeon; private final Map loadManagementPeons; private final ServiceProvider serviceProvider; + private final HttpClient httpClient; + private final HttpResponseHandler responseHandler; private final ObjectMapper jsonMapper; @@ -103,7 +107,9 @@ public class DruidMaster ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, Map loadManagementPeons, - ServiceProvider serviceProvider + ServiceProvider serviceProvider, + HttpClient httpClient, + HttpResponseHandler responseHandler ) { this.config = config; @@ -124,6 +130,9 @@ public class DruidMaster this.loadManagementPeons = loadManagementPeons; this.serviceProvider = serviceProvider; + + this.httpClient = httpClient; + this.responseHandler = responseHandler; } public boolean isClusterMaster() @@ -199,6 +208,27 @@ public class DruidMaster databaseSegmentManager.enableDatasource(ds); } + public void killSegments(ClientKillQuery killQuery) + { + try { + httpClient.post( + new URL( + String.format( + "http://%s:%s/mmx/merger/v1/index", + serviceProvider.getInstance().getAddress(), + serviceProvider.getInstance().getPort() + ) + ) + ) + .setContent("application/json", jsonMapper.writeValueAsBytes(killQuery)) + .go(responseHandler) + .get(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) { final DruidServer fromServer = serverInventoryManager.getInventoryValue(from); @@ -688,7 +718,14 @@ public class DruidMaster super( ImmutableList.of( new DruidMasterSegmentInfoLoader(DruidMaster.this), - new DruidMasterSegmentMerger(jsonMapper, serviceProvider), + new DruidMasterSegmentMerger( + new HttpMergerClient( + httpClient, + responseHandler, + jsonMapper, + serviceProvider + ) + ), new DruidMasterHelper() { @Override diff --git a/server/src/main/resources/static/css/enable.css b/server/src/main/resources/static/css/enable.css new file mode 100644 index 00000000000..4c1a4cdb9b1 --- /dev/null +++ b/server/src/main/resources/static/css/enable.css @@ -0,0 +1,3 @@ +#select_datasource { + margin: 20px 0 20px 0; +} \ No newline at end of file diff --git a/server/src/main/resources/static/enable.html b/server/src/main/resources/static/enable.html new file mode 100644 index 00000000000..265ae7780df --- /dev/null +++ b/server/src/main/resources/static/enable.html @@ -0,0 +1,75 @@ + + + + + + Druid Master Console - Enable/Disable Datasources + + + + + + + + + + + + + + +
+ +
Enable/Disable Datasources
+ +
+

Enabled Datasources:

+
    +
+
+ +
+

Disabled Datasources:

+
    +
+
+ +
+ Select Data Source: + +
+ +
+ +
+

Are you sure you want to enable the selected datasource?

+
+ +
+

Are you sure you want to disable the selected datasource?

+
+ +
+
+
+ + \ No newline at end of file diff --git a/server/src/main/resources/static/index.html b/server/src/main/resources/static/index.html index bd36d0c5c53..8a6a68a7faa 100644 --- a/server/src/main/resources/static/index.html +++ b/server/src/main/resources/static/index.html @@ -20,20 +20,29 @@ - Druid Master Console - - - + Druid Master Console + + + - + \ No newline at end of file diff --git a/server/src/main/resources/static/js/enable-0.0.1.js b/server/src/main/resources/static/js/enable-0.0.1.js new file mode 100644 index 00000000000..f25b1a53a54 --- /dev/null +++ b/server/src/main/resources/static/js/enable-0.0.1.js @@ -0,0 +1,97 @@ +$(document).ready(function() { + $("button").button(); + + $("#error_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Ok : function() { + $(this).dialog("close"); + } + } + }); + + $("#enable_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + $.ajax({ + type: 'POST', + url:'/info/datasources/' + selected, + data: JSON.stringify(selected), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#enable_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#enable_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $("#disable_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + $.ajax({ + type: 'DELETE', + url:'/info/datasources/' + selected, + data: JSON.stringify(selected), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#disable_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#disable_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $.getJSON("/info/db/datasources", function(enabled_datasources) { + $.each(enabled_datasources, function(index, datasource) { + $('#enabled_datasources').append($('
  • ' + datasource + '
  • ')); + }); + + $.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) { + var disabled_datasources = _.difference(db_datasources, enabled_datasources); + $.each(disabled_datasources, function(index, datasource) { + $('#disabled_datasources').append($('
  • ' + datasource + '
  • ')); + }); + $.each(db_datasources, function(index, datasource) { + $('#datasources').append($('').attr("value", datasource).text(datasource)); + }); + }); + }); + + + $("#enable").click(function() { + $("#enable_dialog").dialog("open"); + }); + + $('#disable').click(function (){ + $("#disable_dialog").dialog("open") + }); +}); \ No newline at end of file diff --git a/server/src/main/resources/static/js/kill-0.0.1.js b/server/src/main/resources/static/js/kill-0.0.1.js new file mode 100644 index 00000000000..495c6c47f2d --- /dev/null +++ b/server/src/main/resources/static/js/kill-0.0.1.js @@ -0,0 +1,58 @@ +$(document).ready(function() { + $("button").button(); + + $("#error_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Ok : function() { + $(this).dialog("close"); + } + } + }); + + $("#confirm_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + var interval = $('#interval').val(); + var toSend = { + "dataSource" : selected, + "interval" : interval + } + $.ajax({ + type: 'POST', + url:'/master/kill', + data: JSON.stringify(toSend), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#confirm_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#confirm_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $.getJSON("/info/db/datasources?includeDisabled", function(data) { + $.each(data, function(index, datasource) { + $('#datasources').append($('').attr("value", datasource).text(datasource)); + }); + }); + + $("#confirm").click(function() { + $("#confirm_dialog").dialog("open"); + }); +}); \ No newline at end of file diff --git a/server/src/main/resources/static/kill.html b/server/src/main/resources/static/kill.html new file mode 100644 index 00000000000..8741fd25f41 --- /dev/null +++ b/server/src/main/resources/static/kill.html @@ -0,0 +1,61 @@ + + + + + + Druid Master Console - Enable/Disable Datasources + + + + + + + + + + + + + +
    + +
    Permanently Delete Segments
    + +
    + Select Data Source: + +
    + +

    Interval:

    + + +
    + +
    +

    Are you sure you want delete segments for this datasource and range? There is no going back!

    +
    + +
    +
    +
    + + \ No newline at end of file From 272d737517baecf281353894d8780e60685fe831 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Tue, 22 Jan 2013 16:21:38 -0800 Subject: [PATCH 2/3] cleaning up some interactions with RTR and workers --- .../druid/client/ServerInventoryManager.java | 4 +- .../{coordinator => common}/TaskCallback.java | 2 +- .../druid/merger/common/TaskStatus.java | 62 ++++++++-- .../druid/merger/common/task/DeleteTask.java | 9 +- .../task/IndexDeterminePartitionsTask.java | 72 +++++------ .../common/task/IndexGeneratorTask.java | 28 +++-- .../druid/merger/common/task/IndexTask.java | 5 +- .../druid/merger/common/task/KillTask.java | 9 +- .../druid/merger/common/task/MergeTask.java | 12 +- .../metamx/druid/merger/common/task/Task.java | 7 +- .../merger/coordinator/LocalTaskRunner.java | 3 +- .../coordinator/MergerDBCoordinator.java | 20 ++++ .../merger/coordinator/RemoteTaskRunner.java | 37 +++--- .../druid/merger/coordinator/TaskQueue.java | 62 ++++++---- .../druid/merger/coordinator/TaskRunner.java | 1 + .../druid/merger/coordinator/TaskWrapper.java | 1 + .../merger/coordinator/exec/TaskConsumer.java | 113 ++++++++++++------ .../druid/merger/worker/TaskMonitor.java | 10 +- .../coordinator/RemoteTaskRunnerTest.java | 8 +- .../merger/coordinator/TaskQueueTest.java | 71 ++++++----- .../metamx/druid/db/DatabaseRuleManager.java | 6 +- .../metamx/druid/loading/S3SegmentKiller.java | 34 +----- .../com/metamx/druid/master/DruidMaster.java | 2 +- .../metamx/druid/master/DruidMasterTest.java | 2 + 24 files changed, 356 insertions(+), 224 deletions(-) rename merger/src/main/java/com/metamx/druid/merger/{coordinator => common}/TaskCallback.java (95%) diff --git a/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java b/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java index bd32a62791d..891d41aaec9 100644 --- a/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java +++ b/client/src/main/java/com/metamx/druid/client/ServerInventoryManager.java @@ -19,7 +19,6 @@ package com.metamx.druid.client; -import com.google.common.collect.Maps; import com.metamx.common.Pair; import com.metamx.common.logger.Logger; import com.metamx.phonebook.PhoneBook; @@ -27,12 +26,13 @@ import com.metamx.phonebook.PhoneBookPeon; import java.util.Iterator; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** */ public class ServerInventoryManager extends InventoryManager { - private static final Map removedSegments = Maps.newHashMap(); + private static final Map removedSegments = new ConcurrentHashMap(); public ServerInventoryManager( ServerInventoryManagerConfig config, diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java similarity index 95% rename from merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java rename to merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java index 549d0e7c4a7..a134ad2455e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskCallback.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskCallback.java @@ -17,7 +17,7 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ -package com.metamx.druid.merger.coordinator; +package com.metamx.druid.merger.common; import com.metamx.druid.merger.common.TaskStatus; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java index 39b38a18a93..a492f3015f7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskStatus.java @@ -40,6 +40,13 @@ public class TaskStatus CONTINUED } + public static enum Action + { + NONE, + ANNOUNCE_SEGMENTS, + DELETE_SEGMENTS + } + public static TaskStatus running(String taskId) { return new TaskStatus( @@ -47,29 +54,44 @@ public class TaskStatus Status.RUNNING, Collections.emptyList(), Collections.emptyList(), - -1 + -1, + Action.NONE ); } public static TaskStatus failure(String taskId) { - return new TaskStatus(taskId, Status.FAILED, Collections.emptyList(), Collections.emptyList(), -1); + return new TaskStatus( + taskId, + Status.FAILED, + Collections.emptyList(), + Collections.emptyList(), + -1, + Action.NONE + ); } public static TaskStatus success(String taskId, List segments) { - return new TaskStatus(taskId, Status.SUCCESS, ImmutableList.copyOf(segments), Collections.emptyList(), -1); + return new TaskStatus( + taskId, + Status.SUCCESS, + ImmutableList.copyOf(segments), + Collections.emptyList(), + -1, + Action.NONE + ); } public static TaskStatus continued(String taskId, List nextTasks) { - Preconditions.checkArgument(nextTasks.size() > 0, "nextTasks.size() > 0"); return new TaskStatus( taskId, Status.CONTINUED, Collections.emptyList(), ImmutableList.copyOf(nextTasks), - -1 + -1, + Action.NONE ); } @@ -78,6 +100,7 @@ public class TaskStatus private final List nextTasks; private final Status status; private final long duration; + private final Action action; @JsonCreator private TaskStatus( @@ -85,7 +108,8 @@ public class TaskStatus @JsonProperty("status") Status status, @JsonProperty("segments") List segments, @JsonProperty("nextTasks") List nextTasks, - @JsonProperty("duration") long duration + @JsonProperty("duration") long duration, + @JsonProperty("action") Action action ) { this.id = id; @@ -93,6 +117,7 @@ public class TaskStatus this.nextTasks = nextTasks; this.status = status; this.duration = duration; + this.action = action; } @JsonProperty("id") @@ -125,6 +150,12 @@ public class TaskStatus return duration; } + @JsonProperty("action") + public Action getAction() + { + return action; + } + /** * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * isContinued, isSuccess, or isFailure will be true at any one time. @@ -171,7 +202,23 @@ public class TaskStatus public TaskStatus withDuration(long _duration) { - return new TaskStatus(id, status, segments, nextTasks, _duration); + return new TaskStatus(id, status, segments, nextTasks, _duration, action); + } + + public TaskStatus withSegments(List _segments) + { + return new TaskStatus(id, status, _segments, nextTasks, duration, action); + } + + public TaskStatus withNextTasks(List _nextTasks) + { + Preconditions.checkArgument(_nextTasks.size() > 0, "nextTasks.size() > 0"); + return new TaskStatus(id, status, segments, _nextTasks, duration, action); + } + + public TaskStatus withAction(Action _action) + { + return new TaskStatus(id, status, segments, nextTasks, duration, _action); } @Override @@ -183,6 +230,7 @@ public class TaskStatus .add("nextTasks", nextTasks) .add("status", status) .add("duration", duration) + .add("action", action) .toString(); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 5f08298cd2d..82c85a67f40 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -28,19 +28,17 @@ import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IncrementalIndexAdapter; import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.IndexableAdapter; -import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.coordinator.TaskContext; import com.metamx.druid.shard.NoneShardSpec; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; -import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; import org.joda.time.Interval; import java.io.File; -import java.util.ArrayList; public class DeleteTask extends AbstractTask { @@ -72,7 +70,7 @@ public class DeleteTask extends AbstractTask } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { // Strategy: Create an empty segment covering the interval to be deleted final IncrementalIndex empty = new IncrementalIndex(0, QueryGranularity.NONE, new AggregatorFactory[0]); @@ -100,6 +98,7 @@ public class DeleteTask extends AbstractTask segment.getVersion() ); - return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment)); + return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment)) + .withAction(TaskStatus.Action.ANNOUNCE_SEGMENTS); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 55edfdbc3bc..fce5ebffb3b 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -20,7 +20,6 @@ package com.metamx.druid.merger.common.task; import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Ordering; @@ -29,6 +28,7 @@ import com.google.common.collect.TreeMultiset; import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import com.metamx.druid.input.InputRow; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.coordinator.TaskContext; @@ -42,16 +42,18 @@ import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Set; public class IndexDeterminePartitionsTask extends AbstractTask { - @JsonProperty private final FirehoseFactory firehoseFactory; - @JsonProperty private final Schema schema; - @JsonProperty private final long targetPartitionSize; + @JsonProperty + private final FirehoseFactory firehoseFactory; + @JsonProperty + private final Schema schema; + @JsonProperty + private final long targetPartitionSize; private static final Logger log = new Logger(IndexTask.class); @@ -88,7 +90,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { log.info("Running with targetPartitionSize[%d]", targetPartitionSize); @@ -108,24 +110,24 @@ public class IndexDeterminePartitionsTask extends AbstractTask final Firehose firehose = firehoseFactory.connect(); try { - while(firehose.hasMore()) { + while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - if(getInterval().contains(inputRow.getTimestampFromEpoch())) { + if (getInterval().contains(inputRow.getTimestampFromEpoch())) { // Extract dimensions from event for (final String dim : inputRow.getDimensions()) { final List dimValues = inputRow.getDimension(dim); - if(!unusableDimensions.contains(dim)) { + if (!unusableDimensions.contains(dim)) { - if(dimValues.size() == 1) { + if (dimValues.size() == 1) { // Track this value TreeMultiset dimensionValueMultiset = dimensionValueMultisets.get(dim); - if(dimensionValueMultiset == null) { + if (dimensionValueMultiset == null) { dimensionValueMultiset = TreeMultiset.create(); dimensionValueMultisets.put(dim, dimensionValueMultiset); } @@ -146,7 +148,8 @@ public class IndexDeterminePartitionsTask extends AbstractTask } } - } finally { + } + finally { firehose.close(); } @@ -166,7 +169,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask } }; - if(dimensionValueMultisets.isEmpty()) { + if (dimensionValueMultisets.isEmpty()) { // No suitable partition dimension. We'll make one big segment and hope for the best. log.info("No suitable partition dimension found"); shardSpecs.add(new NoneShardSpec()); @@ -188,9 +191,9 @@ public class IndexDeterminePartitionsTask extends AbstractTask // Iterate over unique partition dimension values in sorted order String currentPartitionStart = null; int currentPartitionSize = 0; - for(final String partitionDimValue : partitionDimValues.elementSet()) { + for (final String partitionDimValue : partitionDimValues.elementSet()) { currentPartitionSize += partitionDimValues.count(partitionDimValue); - if(currentPartitionSize >= targetPartitionSize) { + if (currentPartitionSize >= targetPartitionSize) { final ShardSpec shardSpec = new SingleDimensionShardSpec( partitionDim, currentPartitionStart, @@ -229,25 +232,26 @@ public class IndexDeterminePartitionsTask extends AbstractTask return TaskStatus.continued( getId(), Lists.transform( - shardSpecs, new Function() - { - @Override - public Task apply(ShardSpec shardSpec) - { - return new IndexGeneratorTask( - getGroupId(), - getInterval(), - firehoseFactory, - new Schema( - schema.getDataSource(), - schema.getAggregators(), - schema.getIndexGranularity(), - shardSpec - ) - ); - } - } + shardSpecs, + new Function() + { + @Override + public Task apply(ShardSpec shardSpec) + { + return new IndexGeneratorTask( + getGroupId(), + getInterval(), + firehoseFactory, + new Schema( + schema.getDataSource(), + schema.getAggregators(), + schema.getIndexGranularity(), + shardSpec + ) + ); + } + } ) - ); + ).withAction(TaskStatus.Action.ANNOUNCE_SEGMENTS); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index b89142ef19a..aedcca1b665 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -25,6 +25,8 @@ import com.google.common.collect.Maps; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.input.InputRow; +import com.metamx.druid.loading.SegmentPusher; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.index.YeOldePlumberSchool; @@ -34,7 +36,6 @@ import com.metamx.druid.realtime.Firehose; import com.metamx.druid.realtime.FirehoseFactory; import com.metamx.druid.realtime.Plumber; import com.metamx.druid.realtime.Schema; -import com.metamx.druid.loading.SegmentPusher; import com.metamx.druid.realtime.Sink; import org.codehaus.jackson.annotate.JsonCreator; import org.codehaus.jackson.annotate.JsonProperty; @@ -89,7 +90,7 @@ public class IndexGeneratorTask extends AbstractTask } @Override - public TaskStatus run(final TaskContext context, final TaskToolbox toolbox) throws Exception + public TaskStatus run(final TaskContext context, final TaskToolbox toolbox, TaskCallback callback) throws Exception { // Set up temporary directory for indexing final File tmpDir = new File( @@ -131,10 +132,10 @@ public class IndexGeneratorTask extends AbstractTask ).findPlumber(schema, metrics); try { - while(firehose.hasMore()) { + while (firehose.hasMore()) { final InputRow inputRow = firehose.nextRow(); - if(shouldIndex(inputRow)) { + if (shouldIndex(inputRow)) { final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); if (sink == null) { throw new NullPointerException( @@ -148,14 +149,15 @@ public class IndexGeneratorTask extends AbstractTask int numRows = sink.add(inputRow); metrics.incrementProcessed(); - if(numRows >= toolbox.getConfig().getRowFlushBoundary()) { + if (numRows >= toolbox.getConfig().getRowFlushBoundary()) { plumber.persist(firehose.commit()); } } else { metrics.incrementThrownAway(); } } - } finally { + } + finally { firehose.close(); } @@ -174,23 +176,27 @@ public class IndexGeneratorTask extends AbstractTask ); // Done - return TaskStatus.success(getId(), ImmutableList.copyOf(pushedSegments)); + return TaskStatus.success(getId(), ImmutableList.copyOf(pushedSegments)) + .withAction(TaskStatus.Action.ANNOUNCE_SEGMENTS); } /** * Should we index this inputRow? Decision is based on our interval and shardSpec. + * * @param inputRow the row to check + * * @return true or false */ - private boolean shouldIndex(InputRow inputRow) { - if(!getInterval().contains(inputRow.getTimestampFromEpoch())) { + private boolean shouldIndex(InputRow inputRow) + { + if (!getInterval().contains(inputRow.getTimestampFromEpoch())) { return false; } final Map eventDimensions = Maps.newHashMapWithExpectedSize(inputRow.getDimensions().size()); - for(final String dim : inputRow.getDimensions()) { + for (final String dim : inputRow.getDimensions()) { final List dimValues = inputRow.getDimension(dim); - if(dimValues.size() == 1) { + if (dimValues.size() == 1) { eventDimensions.put(dim, Iterables.getOnlyElement(dimValues)); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index 6074765ddbf..c8391d7bfce 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -25,6 +25,7 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.indexer.granularity.GranularitySpec; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.coordinator.TaskContext; @@ -126,11 +127,11 @@ public class IndexTask extends AbstractTask @Override public TaskStatus preflight(TaskContext context) throws Exception { - return TaskStatus.continued(getId(), toSubtasks()); + return TaskStatus.continued(getId(), toSubtasks()).withAction(TaskStatus.Action.ANNOUNCE_SEGMENTS); } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { throw new IllegalStateException("IndexTasks should not be run!"); } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index 5b2d415eb47..06612d978df 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -3,6 +3,7 @@ package com.metamx.druid.merger.common.task; import com.google.common.collect.Lists; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.coordinator.TaskContext; @@ -11,6 +12,8 @@ import org.codehaus.jackson.annotate.JsonProperty; import org.joda.time.DateTime; import org.joda.time.Interval; +import java.util.List; + /** */ public class KillTask extends AbstractTask @@ -43,10 +46,10 @@ public class KillTask extends AbstractTask } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { // Kill segments - toolbox.getSegmentKiller().kill(getDataSource(), getInterval()); - return TaskStatus.success(getId(), Lists.newArrayList()); + List segmentsToKill = toolbox.getSegmentKiller().kill(getDataSource(), getInterval()); + return TaskStatus.success(getId(), segmentsToKill).withAction(TaskStatus.Action.DELETE_SEGMENTS); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 7388058bad0..d1be1f18ce7 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -35,6 +35,7 @@ import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.loading.SegmentPuller; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.coordinator.TaskContext; @@ -115,7 +116,7 @@ public abstract class MergeTask extends AbstractTask } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { final ServiceEmitter emitter = toolbox.getEmitter(); final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); @@ -171,7 +172,8 @@ public abstract class MergeTask extends AbstractTask emitter.emit(builder.build("merger/uploadTime", System.currentTimeMillis() - uploadStart)); emitter.emit(builder.build("merger/mergeSize", uploadedSegment.getSize())); - return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment)); + return TaskStatus.success(getId(), Lists.newArrayList(uploadedSegment)) + .withAction(TaskStatus.Action.ANNOUNCE_SEGMENTS); } catch (Exception e) { log.error( @@ -285,12 +287,12 @@ public abstract class MergeTask extends AbstractTask DateTime start = null; DateTime end = null; - for(final DataSegment segment : segments) { - if(start == null || segment.getInterval().getStart().isBefore(start)) { + for (final DataSegment segment : segments) { + if (start == null || segment.getInterval().getStart().isBefore(start)) { start = segment.getInterval().getStart(); } - if(end == null || segment.getInterval().getEnd().isAfter(end)) { + if (end == null || segment.getInterval().getEnd().isAfter(end)) { end = segment.getInterval().getEnd(); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index 961afac96ce..84c66512266 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -21,13 +21,10 @@ package com.metamx.druid.merger.common.task; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.coordinator.TaskContext; -import com.metamx.druid.merger.common.task.IndexDeterminePartitionsTask; -import com.metamx.druid.merger.common.task.IndexGeneratorTask; -import com.metamx.druid.merger.common.task.IndexTask; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonTypeInfo; -import org.joda.time.DateTime; import org.joda.time.Interval; /** @@ -86,5 +83,5 @@ public interface Task * @return Some kind of finished status (isRunnable must be false). * @throws Exception */ - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception; + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception; } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java index fc03504792f..5dd4f4d2204 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/LocalTaskRunner.java @@ -22,6 +22,7 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Throwables; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.task.Task; @@ -70,7 +71,7 @@ public class LocalTaskRunner implements TaskRunner TaskStatus status; try { - status = task.run(context, toolbox); + status = task.run(context, toolbox, null); } catch (InterruptedException e) { log.error(e, "Interrupted while running task[%s]", task); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java index bc53ef0d4f7..828404d3961 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/MergerDBCoordinator.java @@ -191,4 +191,24 @@ public class MergerDBCoordinator } } } + + public void deleteSegment(final DataSegment segment) + { + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement( + String.format("DELETE from %s WHERE id = :id", dbConnectorConfig.getSegmentTable()) + ).bind("id", segment.getIdentifier()) + .execute(); + + return null; + } + } + ); + + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 5537a6b6420..37bb3e9ca08 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -33,6 +33,7 @@ import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.PeriodGranularity; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskHolder; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; @@ -427,27 +428,27 @@ public class RemoteTaskRunner implements TaskRunner statusLock.notify(); + final TaskWrapper taskWrapper = tasks.get(taskId); + if (taskWrapper == null) { + log.warn( + "WTF?! Worker[%s] announcing a status for a task I didn't know about: %s", + worker.getHost(), + taskId + ); + } else { + final TaskCallback callback = taskWrapper.getCallback(); + + // Cleanup + if (callback != null) { + callback.notify(taskStatus); + } + } + if (taskStatus.isComplete()) { // Worker is done with this task workerWrapper.setLastCompletedTaskTime(new DateTime()); - final TaskWrapper taskWrapper = tasks.get(taskId); - - if (taskWrapper == null) { - log.warn( - "WTF?! Worker[%s] completed a task I didn't know about: %s", - worker.getHost(), - taskId - ); - } else { - final TaskCallback callback = taskWrapper.getCallback(); - - // Cleanup - if (callback != null) { - callback.notify(taskStatus); - } - tasks.remove(taskId); - cf.delete().guaranteed().inBackground().forPath(event.getData().getPath()); - } + tasks.remove(taskId); + cf.delete().guaranteed().inBackground().forPath(event.getData().getPath()); } } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index f674b8744cd..79d54b5ef20 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -52,18 +52,18 @@ import java.util.concurrent.locks.ReentrantLock; /** * Interface between task producers and task consumers. - * + *

    * The queue accepts tasks from producers using {@link #add} and delivers tasks to consumers using either * {@link #take} or {@link #poll}. Ordering is mostly-FIFO, with deviations when the natural next task would conflict * with a currently-running task. In that case, tasks are skipped until a runnable one is found. - * + *

    * To manage locking, the queue keeps track of currently-running tasks as {@link TaskGroup} objects. The idea is that * only one TaskGroup can be running on a particular dataSource + interval, and that TaskGroup has a single version * string that all tasks in the group must use to publish segments. Tasks in the same TaskGroup may run concurrently. - * + *

    * For persistence, the queue saves new tasks from {@link #add} and task status updates from {@link #done} using a * {@link TaskStorage} object. - * + *

    * To support leader election of our containing system, the queue can be stopped (in which case it will not accept * any new tasks, or hand out any more tasks, until started again). */ @@ -127,13 +127,13 @@ public class TaskQueue } }; - for(final Pair taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) { + for (final Pair taskAndVersion : byVersionOrdering.sortedCopy(runningTasks)) { final Task task = taskAndVersion.lhs; final String preferredVersion = taskAndVersion.rhs; queue.add(task); - if(preferredVersion != null) { + if (preferredVersion != null) { final Optional version = tryLock(task, Optional.of(preferredVersion)); log.info( @@ -173,7 +173,8 @@ public class TaskQueue running.clear(); active = false; - } finally { + } + finally { giant.unlock(); } } @@ -182,6 +183,7 @@ public class TaskQueue * Adds some work to the queue and the underlying task storage facility with a generic "running" status. * * @param task task to add + * * @return true */ public boolean add(Task task) @@ -205,6 +207,7 @@ public class TaskQueue /** * Locks and returns next doable work from the queue. Blocks if there is no doable work. + * * @return runnable task */ public VersionedTaskWrapper take() throws InterruptedException @@ -214,19 +217,21 @@ public class TaskQueue try { VersionedTaskWrapper taskWrapper; - while((taskWrapper = poll()) == null) { + while ((taskWrapper = poll()) == null) { log.info("Waiting for work..."); workMayBeAvailable.await(); } return taskWrapper; - } finally { + } + finally { giant.unlock(); } } /** * Locks and removes next doable work from the queue. Returns null if there is no doable work. + * * @return runnable task or null */ public VersionedTaskWrapper poll() @@ -235,9 +240,9 @@ public class TaskQueue try { log.info("Checking for doable work"); - for(final Task task : queue) { + for (final Task task : queue) { final Optional maybeVersion = tryLock(task); - if(maybeVersion.isPresent()) { + if (maybeVersion.isPresent()) { Preconditions.checkState(active, "wtf? Found task when inactive"); taskStorage.setVersion(task.getId(), maybeVersion.get()); queue.remove(task); @@ -259,6 +264,7 @@ public class TaskQueue * running. * * @param task task to unlock + * * @throws IllegalStateException if task is not currently locked */ private void unlock(final Task task) @@ -284,7 +290,7 @@ public class TaskQueue ); final TaskGroup taskGroup; - if(maybeTaskGroup.size() == 1) { + if (maybeTaskGroup.size() == 1) { taskGroup = maybeTaskGroup.get(0); } else { throw new IllegalStateException(String.format("Task must be running: %s", task.getId())); @@ -294,12 +300,12 @@ public class TaskQueue log.info("Removing task[%s] from TaskGroup[%s]", task.getId(), taskGroup.getGroupId()); taskGroup.getTaskSet().remove(task); - if(taskGroup.getTaskSet().size() == 0) { + if (taskGroup.getTaskSet().size() == 0) { log.info("TaskGroup complete: %s", taskGroup); running.get(dataSource).remove(taskGroup.getInterval()); } - if(running.get(dataSource).size() == 0) { + if (running.get(dataSource).size() == 0) { running.remove(dataSource); } @@ -314,8 +320,9 @@ public class TaskQueue * Unlock some task and update its status in the task storage facility. If "status" is a continuation status (i.e. * it has nextTasks) this will add the next tasks to the queue with a generic running status. * - * @param task task to unlock + * @param task task to unlock * @param status task completion status; must not be runnable + * * @throws IllegalStateException if task is not currently running, or if status is runnable */ public void done(final Task task, final TaskStatus status) @@ -338,11 +345,12 @@ public class TaskQueue // Add next tasks, if any try { - for(final Task nextTask : status.getNextTasks()) { + for (final Task nextTask : status.getNextTasks()) { add(nextTask); tryLock(nextTask); } - } catch(Exception e) { + } + catch (Exception e) { log.error(e, "Failed to continue task: %s", task.getId()); actualStatus = TaskStatus.failure(task.getId()); } @@ -369,7 +377,7 @@ public class TaskQueue try { final Optional statusOptional = taskStorage.getStatus(taskid); - if(statusOptional.isPresent()) { + if (statusOptional.isPresent()) { // See if we can collapse this down return Optional.of(collapseStatus(statusOptional.get())); } else { @@ -383,17 +391,16 @@ public class TaskQueue private TaskStatus collapseStatus(TaskStatus status) { - if (status.isContinued()) { int nSubtasks = 0; int nSuccesses = 0; List segments = Lists.newArrayList(); - for(final Task subtask : status.getNextTasks()) { + for (final Task subtask : status.getNextTasks()) { final TaskStatus subtaskStatus = collapseStatus(taskStorage.getStatus(subtask.getId()).get()); - nSubtasks ++; + nSubtasks++; if (subtaskStatus.isFailure()) { return TaskStatus.failure(status.getId()); @@ -405,7 +412,7 @@ public class TaskQueue } if (nSubtasks == nSuccesses) { - return TaskStatus.success(status.getId(), segments); + return TaskStatus.success(status.getId(), segments).withAction(status.getAction()); } } @@ -419,6 +426,7 @@ public class TaskQueue * Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task. * * @param task task to attempt to lock + * * @return lock version if lock was acquired, absent otherwise */ private Optional tryLock(final Task task) @@ -429,8 +437,9 @@ public class TaskQueue /** * Attempt to lock a task, without removing it from the queue. Can safely be called multiple times on the same task. * - * @param task task to attempt to lock + * @param task task to attempt to lock * @param preferredVersion use this version if possible (no guarantees, though!) + * * @return lock version if lock was acquired, absent otherwise */ private Optional tryLock(final Task task, final Optional preferredVersion) @@ -474,7 +483,7 @@ public class TaskQueue final String version; - if(preferredVersion.isPresent()) { + if (preferredVersion.isPresent()) { // We have a preferred version. Since this is a private method, we'll trust our caller to not break our // ordering assumptions and just use it. version = preferredVersion.get(); @@ -503,7 +512,8 @@ public class TaskQueue return Optional.of(taskGroupToUse.getVersion()); - } finally { + } + finally { giant.unlock(); } @@ -518,7 +528,7 @@ public class TaskQueue try { final NavigableMap dsRunning = running.get(dataSource); - if(dsRunning == null) { + if (dsRunning == null) { // No locks at all return Collections.emptyList(); } else { diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java index 5362741ac92..2cc9ba43600 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskRunner.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.coordinator; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.task.Task; /** diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java index c757bb2dc33..22e7d4152b4 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskWrapper.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.coordinator; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.task.Task; /** diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java index 78326d3a3cc..f24c562a045 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/exec/TaskConsumer.java @@ -21,13 +21,14 @@ package com.metamx.druid.merger.coordinator.exec; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableSet; +import com.metamx.common.ISE; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.druid.client.DataSegment; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.MergerDBCoordinator; -import com.metamx.druid.merger.coordinator.TaskCallback; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.coordinator.TaskContext; import com.metamx.druid.merger.coordinator.TaskQueue; import com.metamx.druid.merger.coordinator.TaskRunner; @@ -36,6 +37,8 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import java.util.List; + public class TaskConsumer implements Runnable { private final TaskQueue queue; @@ -107,7 +110,7 @@ public class TaskConsumer implements Runnable .emit(); // Retry would be nice, but only after we have a way to throttle and limit them. Just fail for now. - if(!shutdown) { + if (!shutdown) { queue.done(task, TaskStatus.failure(task.getId())); } } @@ -141,7 +144,8 @@ public class TaskConsumer implements Runnable try { preflightStatus = task.preflight(context); log.info("Preflight done for task: %s", task.getId()); - } catch(Exception e) { + } + catch (Exception e) { preflightStatus = TaskStatus.failure(task.getId()); log.error(e, "Exception thrown during preflight for task: %s", task.getId()); } @@ -168,47 +172,25 @@ public class TaskConsumer implements Runnable // If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after // we check and before we commit the database transaction, but better than nothing. - if(shutdown) { + if (shutdown) { log.info("Abandoning task due to shutdown: %s", task.getId()); return; } - // Publish returned segments - // FIXME: Publish in transaction try { - for (DataSegment segment : status.getSegments()) { - if (!task.getDataSource().equals(segment.getDataSource())) { - throw new IllegalStateException( - String.format( - "Segment for task[%s] has invalid dataSource: %s", - task.getId(), - segment.getIdentifier() - ) - ); - } - - if (!task.getInterval().contains(segment.getInterval())) { - throw new IllegalStateException( - String.format( - "Segment for task[%s] has invalid interval: %s", - task.getId(), - segment.getIdentifier() - ) - ); - } - - if (!context.getVersion().equals(segment.getVersion())) { - throw new IllegalStateException( - String.format( - "Segment for task[%s] has invalid version: %s", - task.getId(), - segment.getIdentifier() - ) - ); - } - - log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId()); - mergerDBCoordinator.announceHistoricalSegment(segment); + switch (status.getAction()) { + case ANNOUNCE_SEGMENTS: + // Publish returned segments + // FIXME: Publish in transaction + publishSegments(task, context, status.getSegments()); + break; + case DELETE_SEGMENTS: + deleteSegments(task, context, status.getSegments()); + break; + case NONE: + break; + default: + throw new ISE("Unknown Action[%s]", status.getAction().getClass()); } } catch (Exception e) { @@ -257,4 +239,57 @@ public class TaskConsumer implements Runnable } ); } + + private void deleteSegments(Task task, TaskContext context, List segments) throws Exception + { + for (DataSegment segment : segments) { + verifySegment(task, context, segment); + + log.info("Deleting segment[%s] for task[%s]", segment.getIdentifier(), task.getId()); + mergerDBCoordinator.deleteSegment(segment); + } + } + + private void publishSegments(Task task, TaskContext context, List segments) throws Exception + { + for (DataSegment segment : segments) { + verifySegment(task, context, segment); + + log.info("Publishing segment[%s] for task[%s]", segment.getIdentifier(), task.getId()); + mergerDBCoordinator.announceHistoricalSegment(segment); + } + } + + private void verifySegment(Task task, TaskContext context, DataSegment segment) + { + if (!task.getDataSource().equals(segment.getDataSource())) { + throw new IllegalStateException( + String.format( + "Segment for task[%s] has invalid dataSource: %s", + task.getId(), + segment.getIdentifier() + ) + ); + } + + if (!task.getInterval().contains(segment.getInterval())) { + throw new IllegalStateException( + String.format( + "Segment for task[%s] has invalid interval: %s", + task.getId(), + segment.getIdentifier() + ) + ); + } + + if (!context.getVersion().equals(segment.getVersion())) { + throw new IllegalStateException( + String.format( + "Segment for task[%s] has invalid version: %s", + task.getId(), + segment.getIdentifier() + ) + ); + } + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java index 5fc49788fcd..506b33e1416 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/TaskMonitor.java @@ -21,6 +21,7 @@ package com.metamx.druid.merger.worker; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskHolder; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; @@ -110,7 +111,14 @@ public class TaskMonitor try { workerCuratorCoordinator.unannounceTask(task.getId()); workerCuratorCoordinator.announceStatus(TaskStatus.running(task.getId())); - taskStatus = task.run(taskContext, toolbox); + taskStatus = task.run(taskContext, toolbox, new TaskCallback() + { + @Override + public void notify(TaskStatus status) + { + workerCuratorCoordinator.updateStatus(status); + } + }); } catch (Exception e) { log.makeAlert(e, "Failed to run task") diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index c64731c243c..d74150aa1fe 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -7,6 +7,7 @@ import com.metamx.common.ISE; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; @@ -190,8 +191,7 @@ public class RemoteTaskRunnerTest String.format("%s/worker1/task1", statusPath), jsonMapper.writeValueAsBytes( TaskStatus.success( - "task1", - Lists.newArrayList() + "task1" ) ) ); @@ -500,9 +500,9 @@ public class RemoteTaskRunnerTest } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { - return TaskStatus.success("task1", Lists.newArrayList()); + return TaskStatus.success("task1"); } } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java index 47913404eed..5eb460a3b4c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskQueueTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskCallback; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.task.AbstractTask; @@ -76,18 +77,23 @@ public class TaskQueueTest Throwable thrown; - for(Task task : tasks) { + for (Task task : tasks) { tq.add(task); } // get task status for in-progress task - Assert.assertEquals("T2 status (before finishing)", TaskStatus.Status.RUNNING, tq.getStatus(tasks[2].getId()).get().getStatusCode()); + Assert.assertEquals( + "T2 status (before finishing)", + TaskStatus.Status.RUNNING, + tq.getStatus(tasks[2].getId()).get().getStatusCode() + ); // Can't add tasks with the same id thrown = null; try { tq.add(newTask("T5", "G5", "baz", new Interval("2013-02-01/PT1H"))); - } catch(IllegalStateException e) { + } + catch (IllegalStateException e) { thrown = e; } @@ -97,7 +103,7 @@ public class TaskQueueTest final List taken = Lists.newArrayList(); while (true) { final VersionedTaskWrapper taskWrapper = tq.poll(); - if(taskWrapper != null) { + if (taskWrapper != null) { taken.add(taskWrapper.getTask()); } else { break; @@ -114,16 +120,21 @@ public class TaskQueueTest ); // mark one done - tq.done(tasks[2], tasks[2].run(null, null)); + tq.done(tasks[2], tasks[2].run(null, null, null)); // get its status back - Assert.assertEquals("T2 status (after finishing)", TaskStatus.Status.SUCCESS, tq.getStatus(tasks[2].getId()).get().getStatusCode()); + Assert.assertEquals( + "T2 status (after finishing)", + TaskStatus.Status.SUCCESS, + tq.getStatus(tasks[2].getId()).get().getStatusCode() + ); // Can't do a task twice thrown = null; try { - tq.done(tasks[2], tasks[2].run(null, null)); - } catch(IllegalStateException e) { + tq.done(tasks[2], tasks[2].run(null, null, null)); + } + catch (IllegalStateException e) { thrown = e; } @@ -133,7 +144,7 @@ public class TaskQueueTest taken.clear(); while (true) { final VersionedTaskWrapper taskWrapper = tq.poll(); - if(taskWrapper != null) { + if (taskWrapper != null) { taken.add(taskWrapper.getTask()); } else { break; @@ -162,9 +173,9 @@ public class TaskQueueTest final Task t1 = newContinuedTask("T1", "G1", "bar", new Interval("2013/P1Y"), Lists.newArrayList(t0)); tq.add(t1); - Assert.assertTrue("T0 isPresent (#1)", !tq.getStatus("T0").isPresent()); - Assert.assertTrue("T1 isPresent (#1)", tq.getStatus("T1").isPresent()); - Assert.assertTrue("T1 isRunnable (#1)", tq.getStatus("T1").get().isRunnable()); + Assert.assertTrue("T0 isPresent (#1)", !tq.getStatus("T0").isPresent()); + Assert.assertTrue("T1 isPresent (#1)", tq.getStatus("T1").isPresent()); + Assert.assertTrue("T1 isRunnable (#1)", tq.getStatus("T1").get().isRunnable()); Assert.assertTrue("T1 isComplete (#1)", !tq.getStatus("T1").get().isComplete()); // should be able to get t1 out @@ -172,28 +183,28 @@ public class TaskQueueTest Assert.assertNull("poll #2", tq.poll()); // report T1 done. Should cause T0 to be created - tq.done(t1, t1.run(null, null)); + tq.done(t1, t1.run(null, null, null)); - Assert.assertTrue("T0 isPresent (#2)", tq.getStatus("T0").isPresent()); - Assert.assertTrue("T0 isRunnable (#2)", tq.getStatus("T0").get().isRunnable()); + Assert.assertTrue("T0 isPresent (#2)", tq.getStatus("T0").isPresent()); + Assert.assertTrue("T0 isRunnable (#2)", tq.getStatus("T0").get().isRunnable()); Assert.assertTrue("T0 isComplete (#2)", !tq.getStatus("T0").get().isComplete()); - Assert.assertTrue("T1 isPresent (#2)", tq.getStatus("T1").isPresent()); + Assert.assertTrue("T1 isPresent (#2)", tq.getStatus("T1").isPresent()); Assert.assertTrue("T1 isRunnable (#2)", !tq.getStatus("T1").get().isRunnable()); - Assert.assertTrue("T1 isComplete (#2)", tq.getStatus("T1").get().isComplete()); + Assert.assertTrue("T1 isComplete (#2)", tq.getStatus("T1").get().isComplete()); // should be able to get t0 out Assert.assertEquals("poll #3", "T0", tq.poll().getTask().getId()); Assert.assertNull("poll #4", tq.poll()); // report T0 done. Should cause T0, T1 to be marked complete - tq.done(t0, t0.run(null, null)); + tq.done(t0, t0.run(null, null, null)); - Assert.assertTrue("T0 isPresent (#3)", tq.getStatus("T0").isPresent()); + Assert.assertTrue("T0 isPresent (#3)", tq.getStatus("T0").isPresent()); Assert.assertTrue("T0 isRunnable (#3)", !tq.getStatus("T0").get().isRunnable()); - Assert.assertTrue("T0 isComplete (#3)", tq.getStatus("T0").get().isComplete()); - Assert.assertTrue("T1 isPresent (#3)", tq.getStatus("T1").isPresent()); + Assert.assertTrue("T0 isComplete (#3)", tq.getStatus("T0").get().isComplete()); + Assert.assertTrue("T1 isPresent (#3)", tq.getStatus("T1").isPresent()); Assert.assertTrue("T1 isRunnable (#3)", !tq.getStatus("T1").get().isRunnable()); - Assert.assertTrue("T1 isComplete (#3)", tq.getStatus("T1").get().isComplete()); + Assert.assertTrue("T1 isComplete (#3)", tq.getStatus("T1").get().isComplete()); // should be no more events available for polling Assert.assertNull("poll #5", tq.poll()); @@ -227,7 +238,7 @@ public class TaskQueueTest Thread.sleep(5); // Finish t0 - tq.done(t0, t0.run(null, null)); + tq.done(t0, t0.run(null, null, null)); // take max number of tasks final Set taken = Sets.newHashSet(); @@ -238,7 +249,7 @@ public class TaskQueueTest final VersionedTaskWrapper taskWrapper = tq.poll(); - if(taskWrapper != null) { + if (taskWrapper != null) { Assert.assertEquals( String.format("%s version", taskWrapper.getTask().getId()), wt0.getVersion(), @@ -254,11 +265,11 @@ public class TaskQueueTest Assert.assertEquals("taken", Sets.newHashSet("T1", "T3"), taken); // Finish t1 - tq.done(t1, t1.run(null, null)); + tq.done(t1, t1.run(null, null, null)); Assert.assertNull("null poll #2", tq.poll()); // Finish t3 - tq.done(t3, t3.run(null, null)); + tq.done(t3, t3.run(null, null, null)); // We should be able to get t2 now final VersionedTaskWrapper wt2 = tq.poll(); @@ -268,7 +279,7 @@ public class TaskQueueTest Assert.assertNull("null poll #3", tq.poll()); // Finish t2 - tq.done(t2, t2.run(null, null)); + tq.done(t2, t2.run(null, null, null)); // We should be able to get t4 // And it should be in group G0, but that group should have a different version than last time @@ -281,7 +292,7 @@ public class TaskQueueTest Assert.assertNotSame("wt4 version", wt2.getVersion(), wt4.getVersion()); // Kind of done testing at this point, but let's finish t4 anyway - tq.done(t4, t4.run(null, null)); + tq.done(t4, t4.run(null, null, null)); Assert.assertNull("null poll #4", tq.poll()); } @@ -314,7 +325,7 @@ public class TaskQueueTest return new AbstractTask(id, groupId, dataSource, interval) { @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { return TaskStatus.success( id, @@ -358,7 +369,7 @@ public class TaskQueueTest } @Override - public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception { return TaskStatus.continued(id, nextTasks); } diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java index 2840eb037a8..6dc83bf3264 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseRuleManager.java @@ -189,7 +189,11 @@ public class DatabaseRuleManager public Map> withHandle(Handle handle) throws Exception { return handle.createQuery( - String.format("SELECT dataSource, payload FROM %s", config.getRuleTable()) + // Return latest version rule by dataSource + String.format( + "SELECT %1$s.dataSource, %1$s.payload FROM %1$s INNER JOIN(SELECT dataSource, max(version) as version, payload FROM %1$s GROUP BY dataSource) ds ON %1$s.datasource = ds.datasource and %1$s.version = ds.version", + config.getRuleTable() + ) ).fold( Maps.>newHashMap(), new Folder3>, Map>() diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java index 346bb036ff3..801e16a1602 100644 --- a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java @@ -101,42 +101,20 @@ public class S3SegmentKiller implements SegmentKiller log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval); for (final DataSegment segment : matchingSegments) { - // Remove database entry - log.info("Removing DB entry for %s", segment.getIdentifier()); - dbi.withHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - String.format("DELETE from %s WHERE id = :segmentID", config.getSegmentTable()) - ).bind("segmentID", segment.getIdentifier()) - .execute(); - - return null; - } - } - ); - // Remove from S3 - Map loadSpec = segment.getLoadSpec(); String s3Bucket = MapUtils.getString(loadSpec, "bucket"); String s3Path = MapUtils.getString(loadSpec, "key"); String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; - if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) { - throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path); + if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3Path); } - if (!s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { - throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3DescriptorPath); + if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); } - - log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); - log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); - s3Client.deleteObject(s3Bucket, s3Path); - s3Client.deleteObject(s3Bucket, s3DescriptorPath); } return matchingSegments; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index d5d4a8bd978..e792f313f51 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -594,7 +594,6 @@ public class DruidMaster DruidMasterRuntimeParams.newBuilder() .withStartTime(startTime) .withDatasources(databaseSegmentManager.getInventory()) - .withLoadManagementPeons(loadManagementPeons) .withMillisToWaitBeforeDeleting(config.getMillisToWaitBeforeDeleting()) .withEmitter(emitter) .withMergeBytesLimit(config.getMergeBytesLimit()) @@ -696,6 +695,7 @@ public class DruidMaster decrementRemovedSegmentsLifetime(); return params.buildFromExisting() + .withLoadManagementPeons(loadManagementPeons) .withDruidCluster(cluster) .withDatabaseRuleManager(databaseRuleManager) .withSegmentReplicantLookup(segmentReplicantLookup) diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index c54a9a66564..e9fc200a2d6 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -134,6 +134,8 @@ public class DruidMasterTest new NoopServiceEmitter(), scheduledExecutorFactory, loadManagementPeons, + null, + null, null ); } From 7f410f201d4bd0dfbf571b42202897e55117b51f Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Thu, 24 Jan 2013 17:47:10 -0800 Subject: [PATCH 3/3] updating amazon sdk version --- indexer/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexer/pom.xml b/indexer/pom.xml index 433aea2dc13..caa8c0c52bf 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -51,7 +51,7 @@ com.amazonaws aws-java-sdk - 1.2.15 + 1.3.27 javax.mail