From 21613bc73be1cc7bf1b3564e9f1a3bc15152cde9 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 16 Jan 2013 17:31:01 -0800 Subject: [PATCH] initial commit to hard delete segments --- .../metamx/druid/merge/ClientKillQuery.java | 41 +++++ .../druid/merger/common/TaskToolbox.java | 9 ++ .../druid/merger/common/task/KillTask.java | 52 +++++++ .../metamx/druid/merger/common/task/Task.java | 4 +- .../http/IndexerCoordinatorNode.java | 27 +++- .../druid/merger/worker/http/WorkerNode.java | 15 +- .../coordinator/RemoteTaskRunnerTest.java | 2 +- .../druid/db/DatabaseSegmentManager.java | 40 +++++ .../com/metamx/druid/http/InfoResource.java | 6 +- .../com/metamx/druid/http/MasterMain.java | 4 +- .../com/metamx/druid/http/MasterResource.java | 11 +- .../metamx/druid/loading/S3SegmentKiller.java | 144 ++++++++++++++++++ .../metamx/druid/loading/SegmentKiller.java | 14 ++ .../com/metamx/druid/master/DruidMaster.java | 47 +++++- .../src/main/resources/static/css/enable.css | 3 + server/src/main/resources/static/enable.html | 75 +++++++++ server/src/main/resources/static/index.html | 25 ++- .../main/resources/static/js/enable-0.0.1.js | 97 ++++++++++++ .../main/resources/static/js/kill-0.0.1.js | 58 +++++++ server/src/main/resources/static/kill.html | 61 ++++++++ 20 files changed, 713 insertions(+), 22 deletions(-) create mode 100644 client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java create mode 100644 merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java create mode 100644 server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java create mode 100644 server/src/main/java/com/metamx/druid/loading/SegmentKiller.java create mode 100644 server/src/main/resources/static/css/enable.css create mode 100644 server/src/main/resources/static/enable.html create mode 100644 server/src/main/resources/static/js/enable-0.0.1.js create mode 100644 server/src/main/resources/static/js/kill-0.0.1.js create mode 100644 server/src/main/resources/static/kill.html diff --git a/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java new file mode 100644 index 00000000000..ae39aeb512b --- /dev/null +++ b/client/src/main/java/com/metamx/druid/merge/ClientKillQuery.java @@ -0,0 +1,41 @@ +package com.metamx.druid.merge; + +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.joda.time.Interval; + +/** + */ +public class ClientKillQuery +{ + private final String dataSource; + private final Interval interval; + + @JsonCreator + public ClientKillQuery( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + this.dataSource = dataSource; + this.interval = interval; + } + + @JsonProperty + public String getType() + { + return "kill"; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java index 74a546cf696..26336b45c25 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/TaskToolbox.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap; import com.metamx.druid.loading.S3SegmentPuller; import com.metamx.druid.loading.S3SegmentGetterConfig; import com.metamx.druid.loading.S3ZippedSegmentPuller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.loading.SegmentPuller; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; @@ -43,6 +44,7 @@ public class TaskToolbox private final ServiceEmitter emitter; private final RestS3Service s3Client; private final SegmentPusher segmentPusher; + private final SegmentKiller segmentKiller; private final ObjectMapper objectMapper; public TaskToolbox( @@ -50,6 +52,7 @@ public class TaskToolbox ServiceEmitter emitter, RestS3Service s3Client, SegmentPusher segmentPusher, + SegmentKiller segmentKiller, ObjectMapper objectMapper ) { @@ -57,6 +60,7 @@ public class TaskToolbox this.emitter = emitter; this.s3Client = s3Client; this.segmentPusher = segmentPusher; + this.segmentKiller = segmentKiller; this.objectMapper = objectMapper; } @@ -80,6 +84,11 @@ public class TaskToolbox return segmentPusher; } + public SegmentKiller getSegmentKiller() + { + return segmentKiller; + } + public ObjectMapper getObjectMapper() { return objectMapper; diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java new file mode 100644 index 00000000000..5b2d415eb47 --- /dev/null +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -0,0 +1,52 @@ +package com.metamx.druid.merger.common.task; + +import com.google.common.collect.Lists; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.merger.common.TaskStatus; +import com.metamx.druid.merger.common.TaskToolbox; +import com.metamx.druid.merger.coordinator.TaskContext; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +/** + */ +public class KillTask extends AbstractTask +{ + private static final Logger log = new Logger(KillTask.class); + + @JsonCreator + public KillTask( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("interval") Interval interval + ) + { + super( + String.format( + "kill_%s_%s_%s_%s", + dataSource, + interval.getStart(), + interval.getEnd(), + new DateTime().toString() + ), + dataSource, + interval + ); + } + + @Override + public Type getType() + { + return Task.Type.KILL; + } + + @Override + public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception + { + // Kill segments + toolbox.getSegmentKiller().kill(getDataSource(), getInterval()); + return TaskStatus.success(getId(), Lists.newArrayList()); + } +} diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java index b2059210b58..961afac96ce 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/Task.java @@ -37,6 +37,7 @@ import org.joda.time.Interval; @JsonSubTypes(value = { @JsonSubTypes.Type(name = "append", value = AppendTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class), + @JsonSubTypes.Type(name = "kill", value = KillTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) @@ -49,7 +50,8 @@ public interface Task MERGE, APPEND, DELETE, - TEST + TEST, + KILL } public String getId(); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index bc48c7c71de..d77208c09e2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -47,6 +47,8 @@ import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentKiller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.net.URL; import java.util.Arrays; @@ -125,6 +128,8 @@ public class IndexerCoordinatorNode extends RegisteringNode private List monitors = null; private ServiceEmitter emitter = null; + private DbConnectorConfig dbConnectorConfig = null; + private DBI dbi = null; private IndexerCoordinatorConfig config = null; private TaskToolbox taskToolbox = null; private MergerDBCoordinator mergerDBCoordinator = null; @@ -193,6 +198,7 @@ public class IndexerCoordinatorNode extends RegisteringNode initializeEmitter(); initializeMonitors(); + initializeDB(); initializeIndexerCoordinatorConfig(); initializeMergeDBCoordinator(); initializeTaskToolbox(); @@ -370,6 +376,16 @@ public class IndexerCoordinatorNode extends RegisteringNode } } + private void initializeDB() + { + if (dbConnectorConfig == null) { + dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + } + if (dbi == null) { + dbi = new DbConnector(dbConnectorConfig).getDBI(); + } + } + private void initializeIndexerCoordinatorConfig() { if (config == null) { @@ -391,18 +407,23 @@ public class IndexerCoordinatorNode extends RegisteringNode configFactory.build(S3SegmentPusherConfig.class), jsonMapper ); - taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, jsonMapper); + final SegmentKiller segmentKiller = new S3SegmentKiller( + s3Client, + dbi, + dbConnectorConfig, + jsonMapper + ); + taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } public void initializeMergeDBCoordinator() { if (mergerDBCoordinator == null) { - final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); mergerDBCoordinator = new MergerDBCoordinator( jsonMapper, dbConnectorConfig, - new DbConnector(dbConnectorConfig).getDBI() + dbi ); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java index 0799a8de37c..65400f209d9 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/http/WorkerNode.java @@ -29,11 +29,15 @@ import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.logger.Logger; import com.metamx.druid.RegisteringNode; +import com.metamx.druid.db.DbConnector; +import com.metamx.druid.db.DbConnectorConfig; import com.metamx.druid.http.StatusServlet; import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.loading.S3SegmentKiller; +import com.metamx.druid.loading.SegmentKiller; import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; @@ -69,6 +73,7 @@ import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; +import org.skife.jdbi.v2.DBI; import java.io.IOException; import java.util.Arrays; @@ -293,7 +298,15 @@ public class WorkerNode extends RegisteringNode configFactory.build(S3SegmentPusherConfig.class), jsonMapper ); - taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, jsonMapper); + final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class); + DBI dbi = new DbConnector(dbConnectorConfig).getDBI(); + final SegmentKiller segmentKiller = new S3SegmentKiller( + s3Client, + dbi, + dbConnectorConfig, + jsonMapper + ); + taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper); } } diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java index eb10731abd9..e4996965fbd 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/RemoteTaskRunnerTest.java @@ -322,7 +322,7 @@ public class RemoteTaskRunnerTest { return null; } - }, null, null, null, jsonMapper + }, null, null, null, null, jsonMapper ), Executors.newSingleThreadExecutor() ); diff --git a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java index ef620cd1e38..e49728baea4 100644 --- a/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java +++ b/server/src/main/java/com/metamx/druid/db/DatabaseSegmentManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Function; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; +import com.metamx.common.MapUtils; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; @@ -45,6 +46,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback; import javax.annotation.Nullable; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -343,6 +345,44 @@ public class DatabaseSegmentManager return dataSources.get().values(); } + public Collection getAllDatasourceNames() + { + synchronized (lock) { + return dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable()) + ) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public ArrayList fold( + ArrayList druidDataSources, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + druidDataSources.add( + MapUtils.getString(stringObjectMap, "datasource") + ); + return druidDataSources; + } + } + ); + + } + } + ); + } + } + public void poll() { try { diff --git a/server/src/main/java/com/metamx/druid/http/InfoResource.java b/server/src/main/java/com/metamx/druid/http/InfoResource.java index 800e3a93e46..090e311ad31 100644 --- a/server/src/main/java/com/metamx/druid/http/InfoResource.java +++ b/server/src/main/java/com/metamx/druid/http/InfoResource.java @@ -548,10 +548,14 @@ public class InfoResource @Path("/db/datasources") @Produces("application/json") public Response getDatabaseDataSources( - @QueryParam("full") String full + @QueryParam("full") String full, + @QueryParam("includeDisabled") String includeDisabled ) { Response.ResponseBuilder builder = Response.status(Response.Status.OK); + if (includeDisabled != null) { + return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build(); + } if (full != null) { return builder.entity(databaseSegmentManager.getInventory()).build(); } diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 51df502fa64..f65e6e2b57c 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -190,7 +190,9 @@ public class MasterMain emitter, scheduledExecutorFactory, new ConcurrentHashMap(), - serviceProvider + serviceProvider, + httpClient, + new ToStringResponseHandler(Charsets.UTF_8) ); lifecycle.addManagedInstance(master); diff --git a/server/src/main/java/com/metamx/druid/http/MasterResource.java b/server/src/main/java/com/metamx/druid/http/MasterResource.java index ca992815eb6..c73e74e528c 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterResource.java +++ b/server/src/main/java/com/metamx/druid/http/MasterResource.java @@ -21,13 +21,13 @@ package com.metamx.druid.http; import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.LoadPeonCallback; +import com.metamx.druid.merge.ClientKillQuery; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; -import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Response; import java.util.List; @@ -111,6 +111,15 @@ public class MasterResource return resp; } + @POST + @Path("/kill") + @Consumes("application/json") + public Response killSegments(ClientKillQuery killQuery) + { + master.killSegments(killQuery); + return Response.ok().build(); + } + @GET @Path("/loadstatus") @Produces("application/json") diff --git a/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java new file mode 100644 index 00000000000..346bb036ff3 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/S3SegmentKiller.java @@ -0,0 +1,144 @@ +package com.metamx.druid.loading; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.MapUtils; +import com.metamx.common.logger.Logger; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.db.DbConnectorConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.jets3t.service.ServiceException; +import org.jets3t.service.impl.rest.httpclient.RestS3Service; +import org.joda.time.Interval; +import org.skife.jdbi.v2.DBI; +import org.skife.jdbi.v2.FoldController; +import org.skife.jdbi.v2.Folder3; +import org.skife.jdbi.v2.Handle; +import org.skife.jdbi.v2.StatementContext; +import org.skife.jdbi.v2.tweak.HandleCallback; + +import java.sql.SQLException; +import java.util.List; +import java.util.Map; + +/** + */ +public class S3SegmentKiller implements SegmentKiller +{ + private static final Logger log = new Logger(S3SegmentKiller.class); + + private final RestS3Service s3Client; + private final DBI dbi; + private final DbConnectorConfig config; + private final ObjectMapper jsonMapper; + + @Inject + public S3SegmentKiller( + RestS3Service s3Client, + DBI dbi, + DbConnectorConfig config, + ObjectMapper jsonMapper + ) + { + this.s3Client = s3Client; + this.dbi = dbi; + this.config = config; + this.jsonMapper = jsonMapper; + } + + + @Override + public List kill(final String datasource, final Interval interval) throws ServiceException + { + List matchingSegments = dbi.withHandle( + new HandleCallback>() + { + @Override + public List withHandle(Handle handle) throws Exception + { + return handle.createQuery( + String.format( + "SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0", + config.getSegmentTable() + ) + ) + .bind("dataSource", datasource) + .bind("start", interval.getStart().toString()) + .bind("end", interval.getEnd().toString()) + .fold( + Lists.newArrayList(), + new Folder3, Map>() + { + @Override + public List fold( + List accumulator, + Map stringObjectMap, + FoldController foldController, + StatementContext statementContext + ) throws SQLException + { + try { + DataSegment segment = jsonMapper.readValue( + (String) stringObjectMap.get("payload"), + DataSegment.class + ); + + accumulator.add(segment); + + return accumulator; + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + } + } + ); + + log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval); + for (final DataSegment segment : matchingSegments) { + // Remove database entry + log.info("Removing DB entry for %s", segment.getIdentifier()); + dbi.withHandle( + new HandleCallback() + { + @Override + public Void withHandle(Handle handle) throws Exception + { + handle.createStatement( + String.format("DELETE from %s WHERE id = :segmentID", config.getSegmentTable()) + ).bind("segmentID", segment.getIdentifier()) + .execute(); + + return null; + } + } + ); + + // Remove from S3 + + Map loadSpec = segment.getLoadSpec(); + String s3Bucket = MapUtils.getString(loadSpec, "bucket"); + String s3Path = MapUtils.getString(loadSpec, "key"); + String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; + + if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) { + throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path); + } + if (!s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3DescriptorPath); + } + + log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); + log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); + s3Client.deleteObject(s3Bucket, s3Path); + s3Client.deleteObject(s3Bucket, s3DescriptorPath); + } + + return matchingSegments; + } +} diff --git a/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java new file mode 100644 index 00000000000..56a14d2e933 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/SegmentKiller.java @@ -0,0 +1,14 @@ +package com.metamx.druid.loading; + +import com.metamx.druid.client.DataSegment; +import org.jets3t.service.ServiceException; +import org.joda.time.Interval; + +import java.util.List; + +/** + */ +public interface SegmentKiller +{ + public List kill(String datasource, Interval interval) throws ServiceException; +} diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 80172c432fa..d5d4a8bd978 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -21,7 +21,7 @@ package com.metamx.druid.master; import com.google.common.base.Function; import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -36,7 +36,6 @@ import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.common.logger.Logger; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidServer; @@ -44,9 +43,12 @@ import com.metamx.druid.client.ServerInventoryManager; import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseSegmentManager; +import com.metamx.druid.merge.ClientKillQuery; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.response.HttpResponseHandler; import com.metamx.phonebook.PhoneBook; import com.metamx.phonebook.PhoneBookPeon; import com.netflix.curator.x.discovery.ServiceProvider; @@ -55,8 +57,8 @@ import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.Duration; import javax.annotation.Nullable; +import java.net.URL; import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -89,6 +91,8 @@ public class DruidMaster private final PhoneBookPeon masterPeon; private final Map loadManagementPeons; private final ServiceProvider serviceProvider; + private final HttpClient httpClient; + private final HttpResponseHandler responseHandler; private final ObjectMapper jsonMapper; @@ -103,7 +107,9 @@ public class DruidMaster ServiceEmitter emitter, ScheduledExecutorFactory scheduledExecutorFactory, Map loadManagementPeons, - ServiceProvider serviceProvider + ServiceProvider serviceProvider, + HttpClient httpClient, + HttpResponseHandler responseHandler ) { this.config = config; @@ -124,6 +130,9 @@ public class DruidMaster this.loadManagementPeons = loadManagementPeons; this.serviceProvider = serviceProvider; + + this.httpClient = httpClient; + this.responseHandler = responseHandler; } public boolean isClusterMaster() @@ -199,6 +208,27 @@ public class DruidMaster databaseSegmentManager.enableDatasource(ds); } + public void killSegments(ClientKillQuery killQuery) + { + try { + httpClient.post( + new URL( + String.format( + "http://%s:%s/mmx/merger/v1/index", + serviceProvider.getInstance().getAddress(), + serviceProvider.getInstance().getPort() + ) + ) + ) + .setContent("application/json", jsonMapper.writeValueAsBytes(killQuery)) + .go(responseHandler) + .get(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) { final DruidServer fromServer = serverInventoryManager.getInventoryValue(from); @@ -688,7 +718,14 @@ public class DruidMaster super( ImmutableList.of( new DruidMasterSegmentInfoLoader(DruidMaster.this), - new DruidMasterSegmentMerger(jsonMapper, serviceProvider), + new DruidMasterSegmentMerger( + new HttpMergerClient( + httpClient, + responseHandler, + jsonMapper, + serviceProvider + ) + ), new DruidMasterHelper() { @Override diff --git a/server/src/main/resources/static/css/enable.css b/server/src/main/resources/static/css/enable.css new file mode 100644 index 00000000000..4c1a4cdb9b1 --- /dev/null +++ b/server/src/main/resources/static/css/enable.css @@ -0,0 +1,3 @@ +#select_datasource { + margin: 20px 0 20px 0; +} \ No newline at end of file diff --git a/server/src/main/resources/static/enable.html b/server/src/main/resources/static/enable.html new file mode 100644 index 00000000000..265ae7780df --- /dev/null +++ b/server/src/main/resources/static/enable.html @@ -0,0 +1,75 @@ + + + + + + Druid Master Console - Enable/Disable Datasources + + + + + + + + + + + + + + +
+ +
Enable/Disable Datasources
+ +
+

Enabled Datasources:

+
    +
+
+ +
+

Disabled Datasources:

+
    +
+
+ +
+ Select Data Source: + +
+ +
+ +
+

Are you sure you want to enable the selected datasource?

+
+ +
+

Are you sure you want to disable the selected datasource?

+
+ +
+
+
+ + \ No newline at end of file diff --git a/server/src/main/resources/static/index.html b/server/src/main/resources/static/index.html index bd36d0c5c53..8a6a68a7faa 100644 --- a/server/src/main/resources/static/index.html +++ b/server/src/main/resources/static/index.html @@ -20,20 +20,29 @@ - Druid Master Console - - - + Druid Master Console + + + - + \ No newline at end of file diff --git a/server/src/main/resources/static/js/enable-0.0.1.js b/server/src/main/resources/static/js/enable-0.0.1.js new file mode 100644 index 00000000000..f25b1a53a54 --- /dev/null +++ b/server/src/main/resources/static/js/enable-0.0.1.js @@ -0,0 +1,97 @@ +$(document).ready(function() { + $("button").button(); + + $("#error_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Ok : function() { + $(this).dialog("close"); + } + } + }); + + $("#enable_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + $.ajax({ + type: 'POST', + url:'/info/datasources/' + selected, + data: JSON.stringify(selected), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#enable_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#enable_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $("#disable_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + $.ajax({ + type: 'DELETE', + url:'/info/datasources/' + selected, + data: JSON.stringify(selected), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#disable_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#disable_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $.getJSON("/info/db/datasources", function(enabled_datasources) { + $.each(enabled_datasources, function(index, datasource) { + $('#enabled_datasources').append($('
  • ' + datasource + '
  • ')); + }); + + $.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) { + var disabled_datasources = _.difference(db_datasources, enabled_datasources); + $.each(disabled_datasources, function(index, datasource) { + $('#disabled_datasources').append($('
  • ' + datasource + '
  • ')); + }); + $.each(db_datasources, function(index, datasource) { + $('#datasources').append($('').attr("value", datasource).text(datasource)); + }); + }); + }); + + + $("#enable").click(function() { + $("#enable_dialog").dialog("open"); + }); + + $('#disable').click(function (){ + $("#disable_dialog").dialog("open") + }); +}); \ No newline at end of file diff --git a/server/src/main/resources/static/js/kill-0.0.1.js b/server/src/main/resources/static/js/kill-0.0.1.js new file mode 100644 index 00000000000..495c6c47f2d --- /dev/null +++ b/server/src/main/resources/static/js/kill-0.0.1.js @@ -0,0 +1,58 @@ +$(document).ready(function() { + $("button").button(); + + $("#error_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Ok : function() { + $(this).dialog("close"); + } + } + }); + + $("#confirm_dialog").dialog({ + autoOpen: false, + modal:true, + resizeable: false, + buttons: { + Yes : function() { + var selected = $('#datasources option:selected').text(); + var interval = $('#interval').val(); + var toSend = { + "dataSource" : selected, + "interval" : interval + } + $.ajax({ + type: 'POST', + url:'/master/kill', + data: JSON.stringify(toSend), + contentType:"application/json; charset=utf-8", + dataType:"json", + error: function(xhr, status, error) { + $("#confirm_dialog").dialog("close"); + $("#error_dialog").html(xhr.responseText); + $("#error_dialog").dialog("open"); + }, + success: function(data, status, xhr) { + $("#confirm_dialog").dialog("close"); + } + }); + }, + Cancel: function() { + $(this).dialog("close"); + } + } + }); + + $.getJSON("/info/db/datasources?includeDisabled", function(data) { + $.each(data, function(index, datasource) { + $('#datasources').append($('').attr("value", datasource).text(datasource)); + }); + }); + + $("#confirm").click(function() { + $("#confirm_dialog").dialog("open"); + }); +}); \ No newline at end of file diff --git a/server/src/main/resources/static/kill.html b/server/src/main/resources/static/kill.html new file mode 100644 index 00000000000..8741fd25f41 --- /dev/null +++ b/server/src/main/resources/static/kill.html @@ -0,0 +1,61 @@ + + + + + + Druid Master Console - Enable/Disable Datasources + + + + + + + + + + + + + +
    + +
    Permanently Delete Segments
    + +
    + Select Data Source: + +
    + +

    Interval:

    + + +
    + +
    +

    Are you sure you want delete segments for this datasource and range? There is no going back!

    +
    + +
    +
    +
    + + \ No newline at end of file