initial commit to hard delete segments

This commit is contained in:
Fangjin Yang 2013-01-16 17:31:01 -08:00
parent 7e074e8158
commit 21613bc73b
20 changed files with 713 additions and 22 deletions

View File

@ -0,0 +1,41 @@
package com.metamx.druid.merge;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.Interval;
/**
*/
public class ClientKillQuery
{
private final String dataSource;
private final Interval interval;
@JsonCreator
public ClientKillQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
this.dataSource = dataSource;
this.interval = interval;
}
@JsonProperty
public String getType()
{
return "kill";
}
@JsonProperty
public String getDataSource()
{
return dataSource;
}
@JsonProperty
public Interval getInterval()
{
return interval;
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.druid.loading.S3SegmentPuller; import com.metamx.druid.loading.S3SegmentPuller;
import com.metamx.druid.loading.S3SegmentGetterConfig; import com.metamx.druid.loading.S3SegmentGetterConfig;
import com.metamx.druid.loading.S3ZippedSegmentPuller; import com.metamx.druid.loading.S3ZippedSegmentPuller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPuller; import com.metamx.druid.loading.SegmentPuller;
import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.common.task.Task;
import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig; import com.metamx.druid.merger.coordinator.config.IndexerCoordinatorConfig;
@ -43,6 +44,7 @@ public class TaskToolbox
private final ServiceEmitter emitter; private final ServiceEmitter emitter;
private final RestS3Service s3Client; private final RestS3Service s3Client;
private final SegmentPusher segmentPusher; private final SegmentPusher segmentPusher;
private final SegmentKiller segmentKiller;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public TaskToolbox( public TaskToolbox(
@ -50,6 +52,7 @@ public class TaskToolbox
ServiceEmitter emitter, ServiceEmitter emitter,
RestS3Service s3Client, RestS3Service s3Client,
SegmentPusher segmentPusher, SegmentPusher segmentPusher,
SegmentKiller segmentKiller,
ObjectMapper objectMapper ObjectMapper objectMapper
) )
{ {
@ -57,6 +60,7 @@ public class TaskToolbox
this.emitter = emitter; this.emitter = emitter;
this.s3Client = s3Client; this.s3Client = s3Client;
this.segmentPusher = segmentPusher; this.segmentPusher = segmentPusher;
this.segmentKiller = segmentKiller;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@ -80,6 +84,11 @@ public class TaskToolbox
return segmentPusher; return segmentPusher;
} }
public SegmentKiller getSegmentKiller()
{
return segmentKiller;
}
public ObjectMapper getObjectMapper() public ObjectMapper getObjectMapper()
{ {
return objectMapper; return objectMapper;

View File

@ -0,0 +1,52 @@
package com.metamx.druid.merger.common.task;
import com.google.common.collect.Lists;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.coordinator.TaskContext;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.joda.time.DateTime;
import org.joda.time.Interval;
/**
*/
public class KillTask extends AbstractTask
{
private static final Logger log = new Logger(KillTask.class);
@JsonCreator
public KillTask(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval
)
{
super(
String.format(
"kill_%s_%s_%s_%s",
dataSource,
interval.getStart(),
interval.getEnd(),
new DateTime().toString()
),
dataSource,
interval
);
}
@Override
public Type getType()
{
return Task.Type.KILL;
}
@Override
public TaskStatus run(TaskContext context, TaskToolbox toolbox) throws Exception
{
// Kill segments
toolbox.getSegmentKiller().kill(getDataSource(), getInterval());
return TaskStatus.success(getId(), Lists.<DataSegment>newArrayList());
}
}

View File

@ -37,6 +37,7 @@ import org.joda.time.Interval;
@JsonSubTypes(value = { @JsonSubTypes(value = {
@JsonSubTypes.Type(name = "append", value = AppendTask.class), @JsonSubTypes.Type(name = "append", value = AppendTask.class),
@JsonSubTypes.Type(name = "delete", value = DeleteTask.class), @JsonSubTypes.Type(name = "delete", value = DeleteTask.class),
@JsonSubTypes.Type(name = "kill", value = KillTask.class),
@JsonSubTypes.Type(name = "index", value = IndexTask.class), @JsonSubTypes.Type(name = "index", value = IndexTask.class),
@JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class), @JsonSubTypes.Type(name = "index_partitions", value = IndexDeterminePartitionsTask.class),
@JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class) @JsonSubTypes.Type(name = "index_generator", value = IndexGeneratorTask.class)
@ -49,7 +50,8 @@ public interface Task
MERGE, MERGE,
APPEND, APPEND,
DELETE, DELETE,
TEST TEST,
KILL
} }
public String getId(); public String getId();

View File

@ -47,6 +47,8 @@ import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.initialization.ServiceDiscoveryConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -98,6 +100,7 @@ import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.FilterHolder; import org.mortbay.jetty.servlet.FilterHolder;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.net.URL; import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
@ -125,6 +128,8 @@ public class IndexerCoordinatorNode extends RegisteringNode
private List<Monitor> monitors = null; private List<Monitor> monitors = null;
private ServiceEmitter emitter = null; private ServiceEmitter emitter = null;
private DbConnectorConfig dbConnectorConfig = null;
private DBI dbi = null;
private IndexerCoordinatorConfig config = null; private IndexerCoordinatorConfig config = null;
private TaskToolbox taskToolbox = null; private TaskToolbox taskToolbox = null;
private MergerDBCoordinator mergerDBCoordinator = null; private MergerDBCoordinator mergerDBCoordinator = null;
@ -193,6 +198,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
initializeEmitter(); initializeEmitter();
initializeMonitors(); initializeMonitors();
initializeDB();
initializeIndexerCoordinatorConfig(); initializeIndexerCoordinatorConfig();
initializeMergeDBCoordinator(); initializeMergeDBCoordinator();
initializeTaskToolbox(); initializeTaskToolbox();
@ -370,6 +376,16 @@ public class IndexerCoordinatorNode extends RegisteringNode
} }
} }
private void initializeDB()
{
if (dbConnectorConfig == null) {
dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
}
if (dbi == null) {
dbi = new DbConnector(dbConnectorConfig).getDBI();
}
}
private void initializeIndexerCoordinatorConfig() private void initializeIndexerCoordinatorConfig()
{ {
if (config == null) { if (config == null) {
@ -391,18 +407,23 @@ public class IndexerCoordinatorNode extends RegisteringNode
configFactory.build(S3SegmentPusherConfig.class), configFactory.build(S3SegmentPusherConfig.class),
jsonMapper jsonMapper
); );
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, jsonMapper); final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
} }
} }
public void initializeMergeDBCoordinator() public void initializeMergeDBCoordinator()
{ {
if (mergerDBCoordinator == null) { if (mergerDBCoordinator == null) {
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
mergerDBCoordinator = new MergerDBCoordinator( mergerDBCoordinator = new MergerDBCoordinator(
jsonMapper, jsonMapper,
dbConnectorConfig, dbConnectorConfig,
new DbConnector(dbConnectorConfig).getDBI() dbi
); );
} }
} }

View File

@ -29,11 +29,15 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode; import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.StatusServlet; import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.merger.common.TaskToolbox; import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory; import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -69,6 +73,7 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder; import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
@ -293,7 +298,15 @@ public class WorkerNode extends RegisteringNode
configFactory.build(S3SegmentPusherConfig.class), configFactory.build(S3SegmentPusherConfig.class),
jsonMapper jsonMapper
); );
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, jsonMapper); final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
} }
} }

View File

@ -322,7 +322,7 @@ public class RemoteTaskRunnerTest
{ {
return null; return null;
} }
}, null, null, null, jsonMapper }, null, null, null, null, jsonMapper
), ),
Executors.newSingleThreadExecutor() Executors.newSingleThreadExecutor()
); );

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Ordering; import com.google.common.collect.Ordering;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
@ -45,6 +46,7 @@ import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -343,6 +345,44 @@ public class DatabaseSegmentManager
return dataSources.get().values(); return dataSources.get().values();
} }
public Collection<String> getAllDatasourceNames()
{
synchronized (lock) {
return dbi.withHandle(
new HandleCallback<List<String>>()
{
@Override
public List<String> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format("SELECT DISTINCT(datasource) FROM %s", config.getSegmentTable())
)
.fold(
Lists.<String>newArrayList(),
new Folder3<ArrayList<String>, Map<String, Object>>()
{
@Override
public ArrayList<String> fold(
ArrayList<String> druidDataSources,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
druidDataSources.add(
MapUtils.getString(stringObjectMap, "datasource")
);
return druidDataSources;
}
}
);
}
}
);
}
}
public void poll() public void poll()
{ {
try { try {

View File

@ -548,10 +548,14 @@ public class InfoResource
@Path("/db/datasources") @Path("/db/datasources")
@Produces("application/json") @Produces("application/json")
public Response getDatabaseDataSources( public Response getDatabaseDataSources(
@QueryParam("full") String full @QueryParam("full") String full,
@QueryParam("includeDisabled") String includeDisabled
) )
{ {
Response.ResponseBuilder builder = Response.status(Response.Status.OK); Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) { if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build(); return builder.entity(databaseSegmentManager.getInventory()).build();
} }

View File

@ -190,7 +190,9 @@ public class MasterMain
emitter, emitter,
scheduledExecutorFactory, scheduledExecutorFactory,
new ConcurrentHashMap<String, LoadQueuePeon>(), new ConcurrentHashMap<String, LoadQueuePeon>(),
serviceProvider serviceProvider,
httpClient,
new ToStringResponseHandler(Charsets.UTF_8)
); );
lifecycle.addManagedInstance(master); lifecycle.addManagedInstance(master);

View File

@ -21,13 +21,13 @@ package com.metamx.druid.http;
import com.metamx.druid.master.DruidMaster; import com.metamx.druid.master.DruidMaster;
import com.metamx.druid.master.LoadPeonCallback; import com.metamx.druid.master.LoadPeonCallback;
import com.metamx.druid.merge.ClientKillQuery;
import javax.inject.Inject; import javax.inject.Inject;
import javax.ws.rs.Consumes; import javax.ws.rs.Consumes;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
import javax.ws.rs.Path; import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.List; import java.util.List;
@ -111,6 +111,15 @@ public class MasterResource
return resp; return resp;
} }
@POST
@Path("/kill")
@Consumes("application/json")
public Response killSegments(ClientKillQuery killQuery)
{
master.killSegments(killQuery);
return Response.ok().build();
}
@GET @GET
@Path("/loadstatus") @Path("/loadstatus")
@Produces("application/json") @Produces("application/json")

View File

@ -0,0 +1,144 @@
package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnectorConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.joda.time.Interval;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
/**
*/
public class S3SegmentKiller implements SegmentKiller
{
private static final Logger log = new Logger(S3SegmentKiller.class);
private final RestS3Service s3Client;
private final DBI dbi;
private final DbConnectorConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3SegmentKiller(
RestS3Service s3Client,
DBI dbi,
DbConnectorConfig config,
ObjectMapper jsonMapper
)
{
this.s3Client = s3Client;
this.dbi = dbi;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public List<DataSegment> kill(final String datasource, final Interval interval) throws ServiceException
{
List<DataSegment> matchingSegments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
config.getSegmentTable()
)
)
.bind("dataSource", datasource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.fold(
Lists.<DataSegment>newArrayList(),
new Folder3<List<DataSegment>, Map<String, Object>>()
{
@Override
public List<DataSegment> fold(
List<DataSegment> accumulator,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(
(String) stringObjectMap.get("payload"),
DataSegment.class
);
accumulator.add(segment);
return accumulator;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval);
for (final DataSegment segment : matchingSegments) {
// Remove database entry
log.info("Removing DB entry for %s", segment.getIdentifier());
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format("DELETE from %s WHERE id = :segmentID", config.getSegmentTable())
).bind("segmentID", segment.getIdentifier())
.execute();
return null;
}
}
);
// Remove from S3
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
if (!s3Client.isObjectInBucket(s3Bucket, s3Path)) {
throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3Path);
}
if (!s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
throw new ISE("IndexFile[s3://%s/%s] does not exist.", s3Bucket, s3DescriptorPath);
}
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
return matchingSegments;
}
}

View File

@ -0,0 +1,14 @@
package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.joda.time.Interval;
import java.util.List;
/**
*/
public interface SegmentKiller
{
public List<DataSegment> kill(String datasource, Interval interval) throws ServiceException;
}

View File

@ -21,7 +21,7 @@ package com.metamx.druid.master;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Collections2; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
@ -36,7 +36,6 @@ import com.metamx.common.guava.Comparators;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop; import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DataSegment;
import com.metamx.druid.client.DruidDataSource; import com.metamx.druid.client.DruidDataSource;
import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.DruidServer;
@ -44,9 +43,12 @@ import com.metamx.druid.client.ServerInventoryManager;
import com.metamx.druid.coordination.DruidClusterInfo; import com.metamx.druid.coordination.DruidClusterInfo;
import com.metamx.druid.db.DatabaseRuleManager; import com.metamx.druid.db.DatabaseRuleManager;
import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManager;
import com.metamx.druid.merge.ClientKillQuery;
import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent; import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.phonebook.PhoneBook; import com.metamx.phonebook.PhoneBook;
import com.metamx.phonebook.PhoneBookPeon; import com.metamx.phonebook.PhoneBookPeon;
import com.netflix.curator.x.discovery.ServiceProvider; import com.netflix.curator.x.discovery.ServiceProvider;
@ -55,8 +57,8 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.net.URL;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -89,6 +91,8 @@ public class DruidMaster
private final PhoneBookPeon masterPeon; private final PhoneBookPeon masterPeon;
private final Map<String, LoadQueuePeon> loadManagementPeons; private final Map<String, LoadQueuePeon> loadManagementPeons;
private final ServiceProvider serviceProvider; private final ServiceProvider serviceProvider;
private final HttpClient httpClient;
private final HttpResponseHandler<StringBuilder, String> responseHandler;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
@ -103,7 +107,9 @@ public class DruidMaster
ServiceEmitter emitter, ServiceEmitter emitter,
ScheduledExecutorFactory scheduledExecutorFactory, ScheduledExecutorFactory scheduledExecutorFactory,
Map<String, LoadQueuePeon> loadManagementPeons, Map<String, LoadQueuePeon> loadManagementPeons,
ServiceProvider serviceProvider ServiceProvider serviceProvider,
HttpClient httpClient,
HttpResponseHandler<StringBuilder, String> responseHandler
) )
{ {
this.config = config; this.config = config;
@ -124,6 +130,9 @@ public class DruidMaster
this.loadManagementPeons = loadManagementPeons; this.loadManagementPeons = loadManagementPeons;
this.serviceProvider = serviceProvider; this.serviceProvider = serviceProvider;
this.httpClient = httpClient;
this.responseHandler = responseHandler;
} }
public boolean isClusterMaster() public boolean isClusterMaster()
@ -199,6 +208,27 @@ public class DruidMaster
databaseSegmentManager.enableDatasource(ds); databaseSegmentManager.enableDatasource(ds);
} }
public void killSegments(ClientKillQuery killQuery)
{
try {
httpClient.post(
new URL(
String.format(
"http://%s:%s/mmx/merger/v1/index",
serviceProvider.getInstance().getAddress(),
serviceProvider.getInstance().getPort()
)
)
)
.setContent("application/json", jsonMapper.writeValueAsBytes(killQuery))
.go(responseHandler)
.get();
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{ {
final DruidServer fromServer = serverInventoryManager.getInventoryValue(from); final DruidServer fromServer = serverInventoryManager.getInventoryValue(from);
@ -688,7 +718,14 @@ public class DruidMaster
super( super(
ImmutableList.of( ImmutableList.of(
new DruidMasterSegmentInfoLoader(DruidMaster.this), new DruidMasterSegmentInfoLoader(DruidMaster.this),
new DruidMasterSegmentMerger(jsonMapper, serviceProvider), new DruidMasterSegmentMerger(
new HttpMergerClient(
httpClient,
responseHandler,
jsonMapper,
serviceProvider
)
),
new DruidMasterHelper() new DruidMasterHelper()
{ {
@Override @Override

View File

@ -0,0 +1,3 @@
#select_datasource {
margin: 20px 0 20px 0;
}

View File

@ -0,0 +1,75 @@
<!DOCTYPE HTML>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<html xmlns="http://www.w3.org/1999/html">
<head>
<title>Druid Master Console - Enable/Disable Datasources</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/jquery-ui-1.9.2.css";</style>
<style type="text/css">@import "css/enable.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery-ui-1.9.2.js"></script>
<script type="text/javascript" src="js/enable-0.0.1.js"></script>
</head>
<body>
<div class="container">
<div class="heading">Enable/Disable Datasources</div>
<div>
<h4>Enabled Datasources:</h4>
<ul id="enabled_datasources">
</ul>
</div>
<div>
<h4>Disabled Datasources:</h4>
<ul id="disabled_datasources">
</ul>
</div>
<div id="select_datasource">
Select Data Source:
<select id="datasources">
<option></option>
</select>
</div>
<div>
<button type="button" id="enable">Enable</button>
<div id="enable_dialog" title="Confirm Enable">
<p>Are you sure you want to enable the selected datasource?</p>
</div>
<button type="button" id="disable">Disable</button>
<div id="disable_dialog" title="Confirm Cancel">
<p>Are you sure you want to disable the selected datasource?</p>
</div>
<div id="error_dialog" title="Error!"></div>
</div>
</div>
</body>
</html>

View File

@ -20,20 +20,29 @@
<html> <html>
<head> <head>
<title>Druid Master Console</title> <title>Druid Master Console</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico"> <link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/> <meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style> <style type="text/css">@import "css/style.css";</style>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
<a href="view.html">View Information about the Cluster</a> <div>
<br/> <a href="view.html">View Information about the Cluster</a>
<a href="rules.html">Configure Compute Node Rules</a> </div>
<div>
<a href="rules.html">Configure Compute Node Rules</a>
</div>
<div>
<a href="enable.html">Enable/Disable Datasources</a>
</div>
<div>
<a href="kill.html">Permanent Segment Deletion</a>
</div>
</div> </div>
</body> </body>
</html> </html>

View File

@ -0,0 +1,97 @@
$(document).ready(function() {
$("button").button();
$("#error_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Ok : function() {
$(this).dialog("close");
}
}
});
$("#enable_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'POST',
url:'/info/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#enable_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#enable_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$("#disable_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
$.ajax({
type: 'DELETE',
url:'/info/datasources/' + selected,
data: JSON.stringify(selected),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#disable_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#disable_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$.getJSON("/info/db/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.getJSON("/info/db/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.each(db_datasources, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});
});
});
$("#enable").click(function() {
$("#enable_dialog").dialog("open");
});
$('#disable').click(function (){
$("#disable_dialog").dialog("open")
});
});

View File

@ -0,0 +1,58 @@
$(document).ready(function() {
$("button").button();
$("#error_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Ok : function() {
$(this).dialog("close");
}
}
});
$("#confirm_dialog").dialog({
autoOpen: false,
modal:true,
resizeable: false,
buttons: {
Yes : function() {
var selected = $('#datasources option:selected').text();
var interval = $('#interval').val();
var toSend = {
"dataSource" : selected,
"interval" : interval
}
$.ajax({
type: 'POST',
url:'/master/kill',
data: JSON.stringify(toSend),
contentType:"application/json; charset=utf-8",
dataType:"json",
error: function(xhr, status, error) {
$("#confirm_dialog").dialog("close");
$("#error_dialog").html(xhr.responseText);
$("#error_dialog").dialog("open");
},
success: function(data, status, xhr) {
$("#confirm_dialog").dialog("close");
}
});
},
Cancel: function() {
$(this).dialog("close");
}
}
});
$.getJSON("/info/db/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').attr("value", datasource).text(datasource));
});
});
$("#confirm").click(function() {
$("#confirm_dialog").dialog("open");
});
});

View File

@ -0,0 +1,61 @@
<!DOCTYPE HTML>
<!--
~ Druid - a distributed column store.
~ Copyright (C) 2012 Metamarkets Group Inc.
~
~ This program is free software; you can redistribute it and/or
~ modify it under the terms of the GNU General Public License
~ as published by the Free Software Foundation; either version 2
~ of the License, or (at your option) any later version.
~
~ This program is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
~ GNU General Public License for more details.
~
~ You should have received a copy of the GNU General Public License
~ along with this program; if not, write to the Free Software
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<html xmlns="http://www.w3.org/1999/html">
<head>
<title>Druid Master Console - Enable/Disable Datasources</title>
<link rel="shortcut icon" type="image/ico" href="images/favicon.ico">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Master Console Page"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/jquery-ui-1.9.2.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.8.3.js"></script>
<script type="text/javascript" src="js/jquery-ui-1.9.2.js"></script>
<script type="text/javascript" src="js/kill-0.0.1.js"></script>
</head>
<body>
<div class="container">
<div class="heading">Permanently Delete Segments</div>
<div id="select_datasource">
Select Data Source:
<select id="datasources">
<option></option>
</select>
</div>
<p>Interval:</p>
<span><input type="text" name="interval" id="interval"/></span>
<div>
<button type="button" id="confirm">Confirm</button>
<div id="confirm_dialog" title="Confirm">
<p>Are you sure you want delete segments for this datasource and range? There is no going back!</p>
</div>
<div id="error_dialog" title="Error!"></div>
</div>
</div>
</body>
</html>