This commit is contained in:
jisookim0513 2014-10-31 12:52:53 -07:00
commit 01f2b3482c
78 changed files with 1190 additions and 1291 deletions

View File

@ -27,8 +27,8 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration;
import java.util.Arrays;

View File

@ -1,87 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class MetadataStorageConnectorConfig
{
@JsonProperty
private boolean createTables = true;
@JsonProperty
private String connectURI = null;
@JsonProperty
private String user = null;
@JsonProperty
private String password = null;
@JsonProperty
private boolean useValidationQuery = false;
@JsonProperty
private String validationQuery = "SELECT 1";
public boolean isCreateTables()
{
return createTables;
}
public String getConnectURI()
{
return connectURI;
}
public String getUser()
{
return user;
}
public String getPassword()
{
return password;
}
public boolean isUseValidationQuery()
{
return useValidationQuery;
}
public String getValidationQuery() {
return validationQuery;
}
@Override
public String toString()
{
return "DbConnectorConfig{" +
"createTables=" + createTables +
", connectURI='" + connectURI + '\'' +
", user='" + user + '\'' +
", password=****" +
", useValidationQuery=" + useValidationQuery +
", validationQuery='" + validationQuery + '\'' +
'}';
}
}

View File

@ -1,123 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class MetadataStorageTablesConfig
{
public static MetadataStorageTablesConfig fromBase(String base)
{
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null);
}
private static String defaultBase = "druid";
@JsonProperty("base")
private final String base;
@JsonProperty("segments")
private final String segmentsTable;
@JsonProperty("rules")
private final String rulesTable;
@JsonProperty("config")
private final String configTable;
@JsonProperty("tasks")
private final String tasksTable;
@JsonProperty("taskLog")
private final String taskLogTable;
@JsonProperty("taskLock")
private final String taskLockTable;
@JsonCreator
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
@JsonProperty("config") String configTable,
@JsonProperty("tasks") String tasksTable,
@JsonProperty("taskLog") String taskLogTable,
@JsonProperty("taskLock") String taskLockTable
)
{
this.base = (base == null) ? defaultBase : base;
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
this.configTable = makeTableName(configTable, "config");
this.tasksTable = makeTableName(tasksTable, "tasks");
this.taskLogTable = makeTableName(taskLogTable, "tasklogs");
this.taskLockTable = makeTableName(taskLockTable, "tasklocks");
}
private String makeTableName(String explicitTableName, String defaultSuffix)
{
if (explicitTableName == null) {
if (base == null) {
return null;
}
return String.format("%s_%s", base, defaultSuffix);
}
return explicitTableName;
}
public String getBase()
{
return base;
}
public String getSegmentsTable()
{
return segmentsTable;
}
public String getRulesTable()
{
return rulesTable;
}
public String getConfigTable()
{
return configTable;
}
public String getTasksTable()
{
return tasksTable;
}
public String getTaskLogTable()
{
return taskLogTable;
}
public String getTaskLockTable()
{
return taskLockTable;
}
}

View File

@ -27,8 +27,8 @@ import com.metamx.common.lifecycle.Lifecycle;
import io.druid.common.config.ConfigManager;
import io.druid.common.config.ConfigManagerConfig;
import io.druid.common.config.JacksonConfigManager;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageTablesConfig;
/**
*/

View File

@ -19,48 +19,51 @@
package io.druid.indexing.overlord;
import com.google.common.base.Optional;
import com.metamx.common.Pair;
import org.joda.time.DateTime;
import java.util.List;
import java.util.Map;
public interface MetadataStorageActionHandler
public interface MetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
{
/* Insert stuff on the table */
public void insert(
String tableName,
String id,
String createdDate,
DateTime createdDate,
String dataSource,
byte[] payload,
int active,
byte[] statusPayload
) throws Exception;
TaskType task,
boolean active,
TaskStatusType status
) throws TaskExistsException;
/* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */
public int setStatus(String tableName, String Id, int active, byte[] statusPayload);
public boolean setStatus(String taskId, boolean active, TaskStatusType statusPayload);
/* Retrieve a task with the given ID */
public List<Map<String, Object>> getTask(String tableName, String Id);
public Optional<TaskType> getTask(String taskId);
/* Retrieve a task status with the given ID */
public List<Map<String, Object>> getTaskStatus(String tableName, String Id);
public Optional<TaskStatusType> getTaskStatus(String taskId);
/* Retrieve active tasks */
public List<Map<String, Object>> getActiveTasks(String tableName);
public List<Pair<TaskType, TaskStatusType>> getActiveTasksWithStatus();
/* Retrieve task statuses that have been created sooner than the given time */
public List<Map<String, Object>> getRecentlyFinishedTaskStatuses(String tableName, String recent);
public List<TaskStatusType> getRecentlyFinishedTaskStatuses(DateTime start);
/* Add lock to the task with given ID */
public int addLock(String tableName, String Id, byte[] lock);
public int addLock(String taskId, TaskLockType lock);
/* Remove taskLock with given ID */
public int removeLock(String tableName, long lockId);
public int removeLock(long lockId);
public int addAuditLog(String tableName, String Id, byte[] taskAction);
public int addAuditLog(String taskId, TaskActionType taskAction);
/* Get logs for task with given ID */
public List<Map<String, Object>> getTaskLogs(String tableName, String Id);
public List<TaskActionType> getTaskLogs(String taskId);
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(String tableName, String Id);
public Map<Long, TaskLockType> getTaskLocks(String taskId);
}

View File

@ -0,0 +1,25 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.overlord;
public interface MetadataStorageActionHandlerFactory
{
public <A,B,C,D> MetadataStorageActionHandler<A,B,C,D> create(MetadataStorageActionHandlerTypes<A,B,C,D> types);
}

View File

@ -1,6 +1,6 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
@ -17,34 +17,14 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.indexing.overlord;
/**
*/
public interface MetadataStorageConnector
import com.fasterxml.jackson.core.type.TypeReference;
public interface MetadataStorageActionHandlerTypes<A,B,C,D>
{
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
) throws Exception;
public byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
);
public void createSegmentTable();
public void createRulesTable();
public void createConfigTable();
public void createTaskTables();
public TypeReference<A> getTaskType();
public TypeReference<B> getTaskStatusType();
public TypeReference<C> getTaskActionType();
public TypeReference<D> getTaskLockType();
}

View File

@ -5,9 +5,10 @@ druid.extensions.coordinates=["io.druid.extensions:druid-examples:0.7.0","io.dru
druid.zk.service.host=localhost
# Metadata Storage
druid.db.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.db.connector.user=druid
druid.db.connector.password=diurd
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc\:mysql\://localhost\:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd
# Deep storage
druid.storage.type=local

View File

@ -21,7 +21,7 @@ package io.druid.indexer.updater;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Supplier;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
/**
*/

View File

@ -22,7 +22,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;

View File

@ -56,8 +56,6 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.util.Collection;
import java.util.List;
import java.util.Map;

View File

@ -19,57 +19,83 @@
package io.druid.indexing.overlord;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.Task;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.StatementException;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
public class MetadataTaskStorage implements TaskStorage
{
private final ObjectMapper jsonMapper;
private static final MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock> TASK_TYPES = new MetadataStorageActionHandlerTypes<Task, TaskStatus, TaskAction, TaskLock>()
{
@Override
public TypeReference<Task> getTaskType()
{
return new TypeReference<Task>()
{
};
}
@Override
public TypeReference<TaskStatus> getTaskStatusType()
{
return new TypeReference<TaskStatus>()
{
};
}
@Override
public TypeReference<TaskAction> getTaskActionType()
{
return new TypeReference<TaskAction>()
{
};
}
@Override
public TypeReference<TaskLock> getTaskLockType()
{
return new TypeReference<TaskLock>()
{
};
}
};
private final MetadataStorageConnector metadataStorageConnector;
private final MetadataStorageTablesConfig dbTables;
private final TaskStorageConfig config;
private final MetadataStorageActionHandler handler;
private final MetadataStorageActionHandler<Task, TaskStatus, TaskAction, TaskLock> handler;
private static final EmittingLogger log = new EmittingLogger(MetadataTaskStorage.class);
@Inject
public MetadataTaskStorage(
final ObjectMapper jsonMapper,
final MetadataStorageConnector metadataStorageConnector,
final MetadataStorageTablesConfig dbTables,
final TaskStorageConfig config,
final MetadataStorageActionHandler handler
final MetadataStorageActionHandlerFactory factory
)
{
this.jsonMapper = jsonMapper;
this.metadataStorageConnector = metadataStorageConnector;
this.dbTables = dbTables;
this.config = config;
this.handler = handler;
this.handler = factory.create(TASK_TYPES);
}
@LifecycleStart
@ -100,23 +126,19 @@ public class MetadataTaskStorage implements TaskStorage
try {
handler.insert(
dbTables.getTasksTable(),
task.getId(),
new DateTime().toString(),
new DateTime(),
task.getDataSource(),
jsonMapper.writeValueAsBytes(task),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
task,
status.isRunnable(),
status
);
}
catch (Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(task.getId()).isPresent()) {
throw new TaskExistsException(task.getId(), e);
if(e instanceof TaskExistsException) {
throw (TaskExistsException) e;
} else {
throw Throwables.propagate(e);
Throwables.propagate(e);
}
}
}
@ -128,103 +150,77 @@ public class MetadataTaskStorage implements TaskStorage
log.info("Updating task %s to status: %s", status.getId(), status);
try {
int updated = handler.setStatus(
dbTables.getTasksTable(),
status.getId(),
status.isRunnable() ? 1 : 0,
jsonMapper.writeValueAsBytes(status)
);
if (updated != 1) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
final boolean set = handler.setStatus(
status.getId(),
status.isRunnable(),
status
);
if (!set) {
throw new IllegalStateException(String.format("Active task not found: %s", status.getId()));
}
}
@Override
public Optional<Task> getTask(final String taskid)
public Optional<Task> getTask(final String taskId)
{
try {
final List<Map<String, Object>> dbTasks = handler.getTask(dbTables.getTasksTable(), taskid);
if (dbTasks.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbTasks);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("payload"), Task.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return handler.getTask(taskId);
}
@Override
public Optional<TaskStatus> getStatus(final String taskid)
public Optional<TaskStatus> getStatus(final String taskId)
{
try {
final List<Map<String, Object>> dbStatuses = handler.getTaskStatus(dbTables.getTasksTable(), taskid);
if (dbStatuses.size() == 0) {
return Optional.absent();
} else {
final Map<String, Object> dbStatus = Iterables.getOnlyElement(dbStatuses);
return Optional.of(jsonMapper.readValue((byte[]) dbStatus.get("status_payload"), TaskStatus.class));
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return handler.getTaskStatus(taskId);
}
@Override
public List<Task> getActiveTasks()
{
final List<Map<String, Object>> dbTasks = handler.getActiveTasks(dbTables.getTasksTable());
final ImmutableList.Builder<Task> tasks = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final Task task = jsonMapper.readValue((byte[]) row.get("payload"), Task.class);
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isRunnable()) {
tasks.add(task);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", id).emit();
}
}
return tasks.build();
return ImmutableList.copyOf(
Iterables.transform(
Iterables.filter(
handler.getActiveTasksWithStatus(),
new Predicate<Pair<Task, TaskStatus>>()
{
@Override
public boolean apply(
@Nullable Pair<Task, TaskStatus> input
)
{
return input.rhs.isRunnable();
}
}
),
new Function<Pair<Task, TaskStatus>, Task>()
{
@Nullable
@Override
public Task apply(@Nullable Pair<Task, TaskStatus> input)
{
return input.lhs;
}
}
)
);
}
@Override
public List<TaskStatus> getRecentlyFinishedTaskStatuses()
{
final DateTime recent = new DateTime().minus(config.getRecentlyFinishedThreshold());
final DateTime start = new DateTime().minus(config.getRecentlyFinishedThreshold());
final List<Map<String, Object>> dbTasks = handler.getRecentlyFinishedTaskStatuses(dbTables.getTasksTable(), recent.toString());
final ImmutableList.Builder<TaskStatus> statuses = ImmutableList.builder();
for (final Map<String, Object> row : dbTasks) {
final String id = row.get("id").toString();
try {
final TaskStatus status = jsonMapper.readValue((byte[]) row.get("status_payload"), TaskStatus.class);
if (status.isComplete()) {
statuses.add(status);
}
}
catch (Exception e) {
log.makeAlert(e, "Failed to parse status payload").addData("task", id).emit();
}
}
return statuses.build();
return ImmutableList.copyOf(
Iterables.filter(
handler.getRecentlyFinishedTaskStatuses(start),
new Predicate<TaskStatus>()
{
@Override
public boolean apply(TaskStatus status)
{
return status.isComplete();
}
}
)
);
}
@Override
@ -240,14 +236,7 @@ public class MetadataTaskStorage implements TaskStorage
taskid
);
try {
handler.addLock(dbTables.getTaskLockTable(), taskid, jsonMapper.writeValueAsBytes(taskLock));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
handler.addLock(taskid, taskLock);
}
@Override
@ -264,7 +253,7 @@ public class MetadataTaskStorage implements TaskStorage
if (taskLock.equals(taskLockToRemove)) {
log.info("Deleting TaskLock with id[%d]: %s", id, taskLock);
handler.removeLock(dbTables.getTaskLockTable(), id);
handler.removeLock(id);
}
}
}
@ -293,52 +282,17 @@ public class MetadataTaskStorage implements TaskStorage
log.info("Logging action for task[%s]: %s", task.getId(), taskAction);
try {
handler.addAuditLog(dbTables.getTaskLogTable(), task.getId(), jsonMapper.writeValueAsBytes(taskAction));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
handler.addAuditLog(task.getId(), taskAction);
}
@Override
public List<TaskAction> getAuditLogs(final String taskid)
public List<TaskAction> getAuditLogs(final String taskId)
{
final List<Map<String, Object>> dbTaskLogs = handler.getTaskLogs(dbTables.getTaskLogTable(), taskid);
final List<TaskAction> retList = Lists.newArrayList();
for (final Map<String, Object> dbTaskLog : dbTaskLogs) {
try {
retList.add(jsonMapper.readValue((byte[]) dbTaskLog.get("log_payload"), TaskAction.class));
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskid)
.addData("logPayload", dbTaskLog)
.emit();
}
}
return retList;
return handler.getTaskLogs(taskId);
}
private Map<Long, TaskLock> getLocksWithIds(final String taskid)
{
final List<Map<String, Object>> dbTaskLocks = handler.getTaskLocks(dbTables.getTaskLockTable(), taskid);
final Map<Long, TaskLock> retMap = Maps.newHashMap();
for (final Map<String, Object> row : dbTaskLocks) {
try {
retMap.put(
(Long) row.get("id"),
jsonMapper.readValue((byte[]) row.get("lock_payload"), TaskLock.class)
);
}
catch (Exception e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", taskid)
.addData("lockPayload", row)
.emit();
}
}
return retMap;
return handler.getTaskLocks(taskid);
}
}
}

View File

@ -25,14 +25,14 @@
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<meta name="Description" content="Druid Indexer Coordinator Console"/>
<style type="text/css">@import "css/style.css";</style>
<style type="text/css">@import "css/demo_table.css";</style>
<style type="text/css">@import "old-console/css/style.css";</style>
<style type="text/css">@import "old-console/css/demo_table.css";</style>
<script type="text/javascript" src="js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="js/jquery-1.11.0.min.js"></script>
<script type="text/javascript" src="js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="old-console/js/underscore-1.2.2.js"></script>
<script type="text/javascript" src="old-console/js/jquery-1.11.0.min.js"></script>
<script type="text/javascript" src="old-console/js/jquery.dataTables-1.8.2.js"></script>
<script type="text/javascript" src="old-console/js/druidTable-0.0.1.js"></script>
<script type="text/javascript" src="old-console/js/tablehelper-0.0.2.js"></script>
<script type="text/javascript" src="js/console-0.0.1.js"></script>
</head>
@ -64,4 +64,4 @@
<table id="eventTable"></table>
</div>
</body>
</html>
</html>

View File

@ -29,7 +29,6 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.common.guava.DSuppliers;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.cache.SimplePathChildrenCacheFactory;
import io.druid.indexing.common.IndexingServiceCondition;
@ -41,7 +40,6 @@ import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import io.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy;
import io.druid.indexing.overlord.setup.WorkerSetupData;
import io.druid.indexing.worker.TaskAnnouncement;
import io.druid.indexing.worker.Worker;
import io.druid.jackson.DefaultObjectMapper;
@ -59,7 +57,6 @@ import org.junit.Test;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
public class RemoteTaskRunnerTest
{

View File

@ -42,7 +42,7 @@ import io.druid.data.input.FirehoseFactory;
import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.impl.InputRowParser;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.granularity.QueryGranularity;
import io.druid.indexing.common.SegmentLoaderFactory;
import io.druid.indexing.common.TaskLock;

View File

@ -24,9 +24,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-mysql-storage</artifactId>
<name>druid-mysql-storage</name>
<description>druid-mysql-storage</description>
<artifactId>mysql-metadata-storage</artifactId>
<name>mysql-metadata-storage</name>
<description>mysql-metadata-storage</description>
<parent>
<groupId>io.druid</groupId>

View File

@ -17,15 +17,15 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.mysql;
package io.druid.metadata.storage.mysql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.mysql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import java.util.List;
public class MySQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
public static final String TYPE = "mysql";
public MySQLMetadataStorageModule()
{
super(TYPE);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding(TYPE)
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding(TYPE)
.to(MySQLConnector.class)
.in(LazySingleton.class);
}
}

View File

@ -0,0 +1 @@
io.druid.metadata.storage.mysql.MySQLMetadataStorageModule

View File

@ -1,135 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.mysql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import io.druid.db.SQLMetadataStorageActionHandler;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.SegmentPublisher;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
public class MySQLMetadataStorageModule implements DruidModule
{
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindMySQL(binder);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
}
private void bindMySQL(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("mysql")
.to(MySQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("mysql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("mysql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("mysql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("mysql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("mysql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final MySQLConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -1 +0,0 @@
io.druid.storage.mysql.MySQLMetadataStorageModule

View File

@ -61,8 +61,8 @@
<module>kafka-eight</module>
<module>rabbitmq</module>
<module>histogram</module>
<module>mysql-storage</module>
<module>postgres-storage</module>
<module>mysql-metadata-storage</module>
<module>postgresql-metadata-storage</module>
</modules>
<dependencyManagement>

View File

@ -1,135 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.postgres;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Provides;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import io.druid.db.SQLMetadataStorageActionHandler;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.initialization.DruidModule;
import io.druid.segment.realtime.SegmentPublisher;
import org.skife.jdbi.v2.IDBI;
import java.util.List;
public class PostgresMetadataStorageModule implements DruidModule
{
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
bindPostgres(binder);
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
}
private void bindPostgres(Binder binder) {
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("postgresql")
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("postgresql")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("postgresql")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("postgresql")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("postgresql")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
}
@Provides
@LazySingleton
public IDBI getDbi(final PostgreSQLConnector dbConnector)
{
return dbConnector.getDBI();
}
}

View File

@ -1 +0,0 @@
io.druid.storage.postgres.PostgresMetadataStorageModule

View File

@ -24,9 +24,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-postgres-storage</artifactId>
<name>druid-postgres-storage</name>
<description>druid-postgres-storage</description>
<artifactId>postgresql-metadata-storage</artifactId>
<name>postgresql-metadata-storage</name>
<description>postgresql-metadata-storage</description>
<parent>
<groupId>io.druid</groupId>

View File

@ -17,18 +17,17 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.storage.postgres;
package io.druid.metadata.storage.postgresql;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.db.SQLMetadataConnector;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.apache.commons.dbcp2.BasicDataSource;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.StringMapper;

View File

@ -0,0 +1,65 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata.storage.postgresql;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Key;
import io.druid.guice.LazySingleton;
import io.druid.guice.PolyBind;
import io.druid.guice.SQLMetadataStorageDruidModule;
import io.druid.initialization.DruidModule;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import java.util.List;
public class PostgreSQLMetadataStorageModule extends SQLMetadataStorageDruidModule implements DruidModule
{
public static final String TYPE = "postgresql";
public PostgreSQLMetadataStorageModule()
{
super(TYPE);
}
@Override
public List<? extends com.fasterxml.jackson.databind.Module> getJacksonModules()
{
return ImmutableList.of();
}
@Override
public void configure(Binder binder)
{
super.configure(binder);
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding(TYPE)
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding(TYPE)
.to(PostgreSQLConnector.class)
.in(LazySingleton.class);
}
}

View File

@ -0,0 +1 @@
io.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule

View File

@ -1,349 +0,0 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.RetryUtils;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class SQLMetadataStorageActionHandler implements MetadataStorageActionHandler
{
private final IDBI dbi;
private final SQLMetadataConnector connector;
@Inject
public SQLMetadataStorageActionHandler(
final IDBI dbi,
final SQLMetadataConnector connector
)
{
this.dbi = dbi;
this.connector = connector;
}
/* Insert stuff. @returns number of entries inserted on success */
public void insert(
final String tableName,
final String id,
final String createdDate,
final String dataSource,
final byte[] payload,
final int active,
final byte[] statusPayload
)
{
retryingHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
tableName
)
)
.bind("id", id)
.bind("created_date", createdDate)
.bind("datasource", dataSource)
.bind("payload", payload)
.bind("active", active)
.bind("status_payload", statusPayload)
.execute();
return null;
}
}
);
}
/* Insert stuff. @returns 1 if status of the task with the given id has been updated successfully */
public int setStatus(final String tableName, final String Id, final int active, final byte[] statusPayload)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
tableName
)
)
.bind("id", Id)
.bind("active", active)
.bind("status_payload", statusPayload)
.execute();
}
}
);
}
/* Retrieve a task with the given ID */
public List<Map<String, Object>> getTask(final String tableName, final String Id)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE id = :id",
tableName
)
)
.bind("id", Id)
.list();
}
}
);
}
/* Retrieve a task status with the given ID */
public List<Map<String, Object>> getTaskStatus(final String tableName, final String Id)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT status_payload FROM %s WHERE id = :id",
tableName
)
)
.bind("id", Id)
.list();
}
}
);
}
/* Retrieve active tasks */
public List<Map<String, Object>> getActiveTasks(final String tableName)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = 1 ORDER BY created_date",
tableName
)
).list();
}
}
);
}
/* Retrieve task statuses that have been created sooner than the given time */
public List<Map<String, Object>> getRecentlyFinishedTaskStatuses(final String tableName, final String recent)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = 0 AND created_date >= :recent ORDER BY created_date DESC",
tableName
)
).bind("recent", recent.toString()).list();
}
}
);
}
/* Add lock to the task with given ID */
public int addLock(final String tableName, final String Id, final byte[] lock)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
tableName
)
)
.bind("task_id", Id)
.bind("lock_payload", lock)
.execute();
}
}
);
}
/* Remove taskLock with given ID */
public int removeLock(final String tableName, final long lockId)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
tableName
)
)
.bind("id", lockId)
.execute();
}
}
);
}
public int addAuditLog(final String tableName, final String Id, final byte[] taskAction)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
tableName
)
)
.bind("task_id", Id)
.bind("log_payload", taskAction)
.execute();
}
}
);
}
/* Get logs for task with given ID */
public List<Map<String, Object>> getTaskLogs(final String tableName, final String Id)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
tableName
)
)
.bind("task_id", Id)
.list();
}
}
);
}
/* Get locks for task with given ID */
public List<Map<String, Object>> getTaskLocks(final String tableName, final String Id)
{
return retryingHandle(
new HandleCallback<List<Map<String, Object>>>()
{
@Override
public List<Map<String, Object>> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
tableName
)
)
.bind("task_id", Id)
.list();
}
}
);
}
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return dbi.withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
protected boolean shouldRetryException(final Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| connector.isTransientException(e)
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
}
}

View File

@ -21,131 +21,36 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import io.druid.db.SQLMetadataConnector;
import io.druid.db.SQLMetadataRuleManager;
import io.druid.db.SQLMetadataRuleManagerProvider;
import io.druid.db.SQLMetadataSegmentManager;
import io.druid.db.SQLMetadataSegmentManagerProvider;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataSegmentPublisherProvider;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.DerbyConnector;
import io.druid.db.SQLMetadataStorageActionHandler;
import io.druid.db.SQLMetadataSegmentPublisher;
import io.druid.db.SQLMetadataSegmentPublisherProvider;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.metadata.DerbyConnector;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import org.skife.jdbi.v2.IDBI;
public class DerbyMetadataStorageDruidModule implements Module
public class DerbyMetadataStorageDruidModule extends SQLMetadataStorageDruidModule
{
public DerbyMetadataStorageDruidModule()
{
super(TYPE);
}
public static final String TYPE = "derby";
@Override
public void configure(Binder binder)
{
bindDataBaseDerby(binder);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataStorageConnector.class), Key.get(DerbyConnector.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(SQLMetadataConnector.class), Key.get(DerbyConnector.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataSegmentManager.class), Key.get(SQLMetadataSegmentManager.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataSegmentManagerProvider.class), Key.get(SQLMetadataSegmentManagerProvider.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataRuleManager.class), Key.get(SQLMetadataRuleManager.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataRuleManagerProvider.class), Key.get(SQLMetadataRuleManagerProvider.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(SegmentPublisher.class), Key.get(SQLMetadataSegmentPublisher.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataSegmentPublisherProvider.class), Key.get(SQLMetadataSegmentPublisherProvider.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(IndexerMetadataStorageCoordinator.class), Key.get(IndexerSQLMetadataStorageCoordinator.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataStorageActionHandler.class), Key.get(SQLMetadataStorageActionHandler.class)
);
PolyBind.createChoice(
binder, "druid.db.type", Key.get(MetadataStorageUpdaterJobHandler.class), Key.get(SQLMetadataStorageUpdaterJobHandler.class)
);
createBindingChoices(binder);
super.configure(binder);
}
private static void bindDataBaseDerby(Binder binder)
{
PolyBind.optionBinder(binder, Key.get(MetadataStorageConnector.class))
.addBinding("derby")
.addBinding(TYPE)
.to(DerbyConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SQLMetadataConnector.class))
.addBinding("derby")
.addBinding(TYPE)
.to(DerbyConnector.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding("derby")
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding("derby")
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding("derby")
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding("derby")
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
.addBinding("derby")
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding("derby")
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding("derby")
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandler.class))
.addBinding("derby")
.to(SQLMetadataStorageActionHandler.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding("derby")
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
}
@Provides

View File

@ -21,18 +21,18 @@ package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Module;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataRuleManagerConfig;
import io.druid.db.MetadataSegmentManagerConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataRuleManagerConfig;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
public class MetadataDbConfigModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.db.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.db.connector", MetadataStorageConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.metadata.storage.tables", MetadataStorageTablesConfig.class);
JsonConfigProvider.bind(binder, "druid.metadata.storage.connector", MetadataStorageConnectorConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.segments", MetadataSegmentManagerConfig.class);
JsonConfigProvider.bind(binder, "druid.manager.rules", MetadataRuleManagerConfig.class);

View File

@ -19,7 +19,7 @@
package io.druid.guice;
import io.druid.db.SegmentPublisherProvider;
import io.druid.metadata.SegmentPublisherProvider;
import io.druid.segment.realtime.NoopSegmentPublisher;
import io.druid.segment.realtime.SegmentPublisher;

View File

@ -0,0 +1,149 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.guice;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManagerProvider;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataSegmentPublisherProvider;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.SQLMetadataConnector;
import io.druid.metadata.SQLMetadataRuleManager;
import io.druid.metadata.SQLMetadataRuleManagerProvider;
import io.druid.metadata.SQLMetadataSegmentManager;
import io.druid.metadata.SQLMetadataSegmentManagerProvider;
import io.druid.metadata.SQLMetadataSegmentPublisher;
import io.druid.metadata.SQLMetadataSegmentPublisherProvider;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.segment.realtime.SegmentPublisher;
public class SQLMetadataStorageDruidModule implements Module
{
public static final String PROPERTY = "druid.metadata.storage.type";
final String type;
public SQLMetadataStorageDruidModule(String type)
{
this.type = type;
}
/**
* This function only needs to be called by the default SQL metadata storage module
* Other modules should default to calling super.configure(...) alone
*/
public void createBindingChoices(Binder binder) {
PolyBind.createChoice(
binder, PROPERTY, Key.get(MetadataStorageConnector.class), null
);
PolyBind.createChoice(
binder, PROPERTY, Key.get(SQLMetadataConnector.class), null
);
PolyBind.createChoice(
binder, PROPERTY, Key.get(MetadataSegmentManager.class), Key.get(SQLMetadataSegmentManager.class)
);
PolyBind.createChoice(
binder,
PROPERTY, Key.get(MetadataSegmentManagerProvider.class), Key.get(SQLMetadataSegmentManagerProvider.class)
);
PolyBind.createChoice(
binder, PROPERTY, Key.get(MetadataRuleManager.class), Key.get(SQLMetadataRuleManager.class)
);
PolyBind.createChoice(
binder, PROPERTY, Key.get(MetadataRuleManagerProvider.class), Key.get(SQLMetadataRuleManagerProvider.class)
);
PolyBind.createChoice(
binder, PROPERTY, Key.get(SegmentPublisher.class), Key.get(SQLMetadataSegmentPublisher.class)
);
PolyBind.createChoice(
binder,
PROPERTY, Key.get(MetadataSegmentPublisherProvider.class), Key.get(SQLMetadataSegmentPublisherProvider.class)
);
PolyBind.createChoice(
binder,
PROPERTY, Key.get(IndexerMetadataStorageCoordinator.class), Key.get(IndexerSQLMetadataStorageCoordinator.class)
);
PolyBind.createChoice(
binder,
PROPERTY, Key.get(MetadataStorageActionHandlerFactory.class), Key.get(SQLMetadataStorageActionHandlerFactory.class)
);
PolyBind.createChoice(
binder,
PROPERTY, Key.get(MetadataStorageUpdaterJobHandler.class), Key.get(SQLMetadataStorageUpdaterJobHandler.class)
);
}
@Override
public void configure(Binder binder)
{
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManager.class))
.addBinding(type)
.to(SQLMetadataSegmentManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentManagerProvider.class))
.addBinding(type)
.to(SQLMetadataSegmentManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManager.class))
.addBinding(type)
.to(SQLMetadataRuleManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataRuleManagerProvider.class))
.addBinding(type)
.to(SQLMetadataRuleManagerProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(SegmentPublisher.class))
.addBinding(type)
.to(SQLMetadataSegmentPublisher.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataSegmentPublisherProvider.class))
.addBinding(type)
.to(SQLMetadataSegmentPublisherProvider.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageActionHandlerFactory.class))
.addBinding(type)
.to(SQLMetadataStorageActionHandlerFactory.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(IndexerMetadataStorageCoordinator.class))
.addBinding(type)
.to(IndexerSQLMetadataStorageCoordinator.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(MetadataStorageUpdaterJobHandler.class))
.addBinding(type)
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import org.skife.jdbi.v2.tweak.ConnectionFactory;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.api.client.repackaged.com.google.common.base.Throwables;
import com.google.common.base.Supplier;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import io.druid.server.coordinator.rules.Rule;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.inject.Provider;
@ -26,4 +26,4 @@ import com.google.inject.Provider;
public interface MetadataRuleManagerProvider extends Provider<MetadataRuleManager>
{
public MetadataRuleManager get();
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import io.druid.client.DruidDataSource;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.inject.Provider;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import io.druid.segment.realtime.SegmentPublisher;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;

View File

@ -18,7 +18,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;

View File

@ -18,7 +18,7 @@
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;

View File

@ -19,7 +19,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.databind.ObjectMapper;

View File

@ -0,0 +1,536 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.client.util.Maps;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import com.metamx.common.RetryUtils;
import com.metamx.emitter.EmittingLogger;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.indexing.overlord.MetadataStorageActionHandlerTypes;
import io.druid.indexing.overlord.TaskExistsException;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.StatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
public class SQLMetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
implements MetadataStorageActionHandler<TaskType, TaskStatusType, TaskActionType, TaskLockType>
{
private static final EmittingLogger log = new EmittingLogger(SQLMetadataStorageActionHandler.class);
private final IDBI dbi;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig config;
private final ObjectMapper jsonMapper;
private final TypeReference taskType;
private final TypeReference taskStatusType;
private final TypeReference taskActionType;
private final TypeReference taskLockType;
public SQLMetadataStorageActionHandler(
final IDBI dbi,
final SQLMetadataConnector connector,
final MetadataStorageTablesConfig config,
final ObjectMapper jsonMapper,
final MetadataStorageActionHandlerTypes<TaskType, TaskStatusType, TaskActionType, TaskLockType> types
)
{
this.dbi = dbi;
this.connector = connector;
this.config = config;
this.jsonMapper = jsonMapper;
this.taskType = types.getTaskType();
this.taskStatusType = types.getTaskStatusType();
this.taskActionType = types.getTaskActionType();
this.taskLockType = types.getTaskLockType();
}
/**
* Insert stuff
*
*/
public void insert(
final String id,
final DateTime createdDate,
final String dataSource,
final TaskType task,
final boolean active,
final TaskStatusType status
) throws TaskExistsException
{
try {
retryingHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(
String.format(
"INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)",
config.getTasksTable()
)
)
.bind("id", id)
.bind("created_date", createdDate)
.bind("datasource", dataSource)
.bind("payload", jsonMapper.writeValueAsBytes(task))
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
}
);
} catch(Exception e) {
final boolean isStatementException = e instanceof StatementException ||
(e instanceof CallbackFailedException
&& e.getCause() instanceof StatementException);
if (isStatementException && getTask(id).isPresent()) {
throw new TaskExistsException(id, e);
} else {
throw Throwables.propagate(e);
}
}
}
/**
* Update task status.
*
* @return true if status of the task with the given id has been updated successfully
*/
public boolean setStatus(final String taskId, final boolean active, final TaskStatusType status)
{
return retryingHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"UPDATE %s SET active = :active, status_payload = :status_payload WHERE id = :id AND active = 1",
config.getTasksTable()
)
)
.bind("id", taskId)
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute() == 1;
}
}
);
}
/* */
/**
* Retrieve a task with the given ID
*
* @param taskId task ID
* @return task if it exists
*/
public Optional<TaskType> getTask(final String taskId)
{
return retryingHandle(
new HandleCallback<Optional<TaskType>>()
{
@Override
public Optional<TaskType> withHandle(Handle handle) throws Exception
{
return Optional.fromNullable(
jsonMapper.<TaskType>readValue(
handle.createQuery(
String.format("SELECT payload FROM %s WHERE id = :id", config.getTasksTable())
)
.bind("id", taskId)
.map(ByteArrayMapper.FIRST)
.first(),
taskType
)
);
}
}
);
}
/* Retrieve a task status with the given ID */
public Optional<TaskStatusType> getTaskStatus(final String taskId)
{
return retryingHandle(
new HandleCallback<Optional<TaskStatusType>>()
{
@Override
public Optional<TaskStatusType> withHandle(Handle handle) throws Exception
{
return Optional.fromNullable(
jsonMapper.<TaskStatusType>readValue(
handle.createQuery(
String.format("SELECT status_payload FROM %s WHERE id = :id", config.getTasksTable())
)
.bind("id", taskId)
.map(ByteArrayMapper.FIRST)
.first(),
taskStatusType
)
);
}
}
);
}
/* Retrieve active tasks */
public List<Pair<TaskType, TaskStatusType>> getActiveTasksWithStatus()
{
return retryingHandle(
new HandleCallback<List<Pair<TaskType, TaskStatusType>>>()
{
@Override
public List<Pair<TaskType, TaskStatusType>> withHandle(Handle handle) throws Exception
{
return handle
.createQuery(
String.format(
"SELECT id, payload, status_payload FROM %s WHERE active = TRUE ORDER BY created_date",
config.getTasksTable()
)
)
.map(
new ResultSetMapper<Pair<TaskType, TaskStatusType>>()
{
@Override
public Pair<TaskType, TaskStatusType> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
jsonMapper.<TaskType>readValue(
r.getBytes("payload"),
taskType
),
jsonMapper.<TaskStatusType>readValue(
r.getBytes("status_payload"),
taskStatusType
)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse task payload").addData("task", r.getString("id")).emit();
throw new SQLException(e);
}
}
}
).list();
}
}
);
}
/* Retrieve task statuses that have been created sooner than the given time */
public List<TaskStatusType> getRecentlyFinishedTaskStatuses(final DateTime start)
{
return retryingHandle(
new HandleCallback<List<TaskStatusType>>()
{
@Override
public List<TaskStatusType> withHandle(Handle handle) throws Exception
{
return handle
.createQuery(
String.format(
"SELECT id, status_payload FROM %s WHERE active = FALSE AND created_date >= :start ORDER BY created_date DESC",
config.getTasksTable()
)
).bind("start", start.toString())
.map(
new ResultSetMapper<TaskStatusType>()
{
@Override
public TaskStatusType map(int index, ResultSet r, StatementContext ctx) throws SQLException
{
try {
return jsonMapper.readValue(
r.getBytes("status_payload"),
taskStatusType
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to parse status payload")
.addData("task", r.getString("id"))
.emit();
throw new SQLException(e);
}
}
}
).list();
}
}
);
}
/* Add lock to the task with given ID */
public int addLock(final String taskId, final TaskLockType lock)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, lock_payload) VALUES (:task_id, :lock_payload)",
config.getTaskLockTable()
)
)
.bind("task_id", taskId)
.bind("lock_payload", jsonMapper.writeValueAsBytes(lock))
.execute();
}
}
);
}
/* Remove taskLock with given ID */
public int removeLock(final long lockId)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"DELETE FROM %s WHERE id = :id",
config.getTaskLockTable()
)
)
.bind("id", lockId)
.execute();
}
}
);
}
public int addAuditLog(final String taskId, final TaskActionType taskAction)
{
return retryingHandle(
new HandleCallback<Integer>()
{
@Override
public Integer withHandle(Handle handle) throws Exception
{
return handle.createStatement(
String.format(
"INSERT INTO %s (task_id, log_payload) VALUES (:task_id, :log_payload)",
config.getTaskLogTable()
)
)
.bind("task_id", taskId)
.bind("log_payload", jsonMapper.writeValueAsBytes(taskAction))
.execute();
}
}
);
}
/* Get logs for task with given ID */
public List<TaskActionType> getTaskLogs(final String taskId)
{
return retryingHandle(
new HandleCallback<List<TaskActionType>>()
{
@Override
public List<TaskActionType> withHandle(Handle handle) throws Exception
{
return handle
.createQuery(
String.format(
"SELECT log_payload FROM %s WHERE task_id = :task_id",
config.getTaskLogTable()
)
)
.bind("task_id", taskId)
.map(ByteArrayMapper.FIRST)
.fold(
Lists.<TaskActionType>newLinkedList(),
new Folder3<List<TaskActionType>, byte[]>()
{
@Override
public List<TaskActionType> fold(
List<TaskActionType> list, byte[] bytes, FoldController control, StatementContext ctx
) throws SQLException
{
try {
list.add(
jsonMapper.<TaskActionType>readValue(
bytes, taskActionType
)
);
return list;
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize TaskLog")
.addData("task", taskId)
.addData("logPayload", new String(bytes, Charsets.UTF_8))
.emit();
throw new SQLException(e);
}
}
}
);
}
}
);
}
/* Get locks for task with given ID */
public Map<Long, TaskLockType> getTaskLocks(final String taskId)
{
return retryingHandle(
new HandleCallback<Map<Long, TaskLockType>>()
{
@Override
public Map<Long, TaskLockType> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT id, lock_payload FROM %s WHERE task_id = :task_id",
config.getTaskLockTable()
)
)
.bind("task_id", taskId)
.map(
new ResultSetMapper<Pair<Long, TaskLockType>>()
{
@Override
public Pair<Long, TaskLockType> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
r.getLong("id"),
jsonMapper.<TaskLockType>readValue(
r.getBytes("lock_payload"),
taskLockType
)
);
}
catch (IOException e) {
log.makeAlert(e, "Failed to deserialize TaskLock")
.addData("task", r.getLong("id"))
.addData(
"lockPayload", new String(r.getBytes("lock_payload"), Charsets.UTF_8)
)
.emit();
throw new SQLException(e);
}
}
}
)
.fold(
Maps.<Long, TaskLockType>newLinkedHashMap(),
new Folder3<Map<Long, TaskLockType>, Pair<Long, TaskLockType>>()
{
@Override
public Map<Long, TaskLockType> fold(
Map<Long, TaskLockType> accumulator,
Pair<Long, TaskLockType> lock,
FoldController control,
StatementContext ctx
) throws SQLException
{
accumulator.put(lock.lhs, lock.rhs);
return accumulator;
}
}
);
}
}
);
}
private <T> T retryingHandle(final HandleCallback<T> callback)
{
final Callable<T> call = new Callable<T>()
{
@Override
public T call() throws Exception
{
return dbi.withHandle(callback);
}
};
final Predicate<Throwable> shouldRetry = new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable e)
{
return shouldRetryException(e);
}
};
final int maxTries = 10;
try {
return RetryUtils.retry(call, shouldRetry, maxTries);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
protected boolean shouldRetryException(final Throwable e)
{
return e != null && (e instanceof SQLTransientException
|| connector.isTransientException(e)
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof SQLException && shouldRetryException(e.getCause()))
|| (e instanceof DBIException && shouldRetryException(e.getCause())));
}
}

View File

@ -0,0 +1,54 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.indexing.overlord.MetadataStorageActionHandler;
import io.druid.indexing.overlord.MetadataStorageActionHandlerFactory;
import io.druid.indexing.overlord.MetadataStorageActionHandlerTypes;
import org.skife.jdbi.v2.IDBI;
public class SQLMetadataStorageActionHandlerFactory implements MetadataStorageActionHandlerFactory
{
private final IDBI dbi;
private final SQLMetadataConnector connector;
private final MetadataStorageTablesConfig config;
private final ObjectMapper jsonMapper;
@Inject
public SQLMetadataStorageActionHandlerFactory(
IDBI dbi,
SQLMetadataConnector connector,
MetadataStorageTablesConfig config,
ObjectMapper jsonMapper
)
{
this.dbi = dbi;
this.connector = connector;
this.config = config;
this.jsonMapper = jsonMapper;
}
public <A,B,C,D> MetadataStorageActionHandler<A,B,C,D> create(MetadataStorageActionHandlerTypes<A,B,C,D> types)
{
return new SQLMetadataStorageActionHandler<>(dbi, connector, config, jsonMapper, types);
}
}

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

View File

@ -23,6 +23,6 @@ import org.skife.config.Config;
public abstract class DbSegmentPublisherConfig
{
@Config("druid.db.tables.segmentTable")
@Config("druid.metadata.storage.tables.segmentTable")
public abstract String getSegmentTable();
}

View File

@ -25,7 +25,7 @@ import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.client.ServerView;
import io.druid.concurrent.Execs;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.SegmentPublisher;
import io.druid.server.coordination.BaseZkCoordinator;

View File

@ -50,8 +50,8 @@ import io.druid.collections.CountingMap;
import io.druid.common.config.JacksonConfigManager;
import io.druid.concurrent.Execs;
import io.druid.curator.discovery.ServiceAnnouncer;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Self;
import io.druid.segment.IndexIO;

View File

@ -24,7 +24,7 @@ import com.google.common.collect.Sets;
import com.metamx.common.guava.Comparators;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidDataSource;
import io.druid.db.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;

View File

@ -20,7 +20,7 @@
package io.druid.server.coordinator.helper;
import com.metamx.emitter.EmittingLogger;
import io.druid.db.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.CoordinatorStats;
import io.druid.server.coordinator.DruidCluster;
import io.druid.server.coordinator.DruidCoordinator;

View File

@ -23,8 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.coordinator.DruidCoordinator;
import javax.annotation.Nullable;

View File

@ -33,7 +33,7 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

View File

@ -33,8 +33,8 @@ import io.druid.client.DruidDataSource;
import io.druid.client.DruidServer;
import io.druid.client.InventoryView;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.rules.LoadRule;
import io.druid.server.coordinator.rules.Rule;
@ -102,8 +102,8 @@ public class InfoResource
private final DruidCoordinator coordinator;
private final InventoryView serverInventoryView;
private final MetadataSegmentManager databaseSegmentManager;
private final MetadataRuleManager databaseRuleManager;
private final MetadataSegmentManager metadataSegmentManager;
private final MetadataRuleManager metadataRuleManager;
private final IndexingServiceClient indexingServiceClient;
private final ObjectMapper jsonMapper;
@ -112,8 +112,8 @@ public class InfoResource
public InfoResource(
DruidCoordinator coordinator,
InventoryView serverInventoryView,
MetadataSegmentManager databaseSegmentManager,
MetadataRuleManager databaseRuleManager,
MetadataSegmentManager metadataSegmentManager,
MetadataRuleManager metadataRuleManager,
@Nullable
IndexingServiceClient indexingServiceClient,
ObjectMapper jsonMapper
@ -121,8 +121,8 @@ public class InfoResource
{
this.coordinator = coordinator;
this.serverInventoryView = serverInventoryView;
this.databaseSegmentManager = databaseSegmentManager;
this.databaseRuleManager = databaseRuleManager;
this.metadataSegmentManager = metadataSegmentManager;
this.metadataRuleManager = metadataRuleManager;
this.indexingServiceClient = indexingServiceClient;
this.jsonMapper = jsonMapper;
}
@ -358,7 +358,7 @@ public class InfoResource
// This will def. be removed as part of the next release
return Response.ok().entity(
Maps.transformValues(
databaseRuleManager.getAllRules(),
metadataRuleManager.getAllRules(),
new Function<List<Rule>, Object>()
{
@Override
@ -374,8 +374,8 @@ public class InfoResource
if (rule instanceof LoadRule) {
Map<String, Object> newRule = jsonMapper.convertValue(
rule, new TypeReference<Map<String, Object>>()
{
}
{
}
);
Set<String> tiers = Sets.newHashSet(((LoadRule) rule).getTieredReplicants().keySet());
tiers.remove(DruidServer.DEFAULT_TIER);
@ -409,10 +409,10 @@ public class InfoResource
)
{
if (full != null) {
return Response.ok(databaseRuleManager.getRulesWithDefault(dataSourceName))
return Response.ok(metadataRuleManager.getRulesWithDefault(dataSourceName))
.build();
}
return Response.ok(databaseRuleManager.getRules(dataSourceName))
return Response.ok(metadataRuleManager.getRules(dataSourceName))
.build();
}
@ -424,7 +424,7 @@ public class InfoResource
final List<Rule> rules
)
{
if (databaseRuleManager.overrideRule(dataSourceName, rules)) {
if (metadataRuleManager.overrideRule(dataSourceName, rules)) {
return Response.status(Response.Status.OK).build();
}
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
@ -491,7 +491,7 @@ public class InfoResource
if (kill != null && Boolean.valueOf(kill)) {
indexingServiceClient.killSegments(dataSourceName, new Interval(interval));
} else {
if (!databaseSegmentManager.removeDatasource(dataSourceName)) {
if (!metadataSegmentManager.removeDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
}
@ -506,7 +506,7 @@ public class InfoResource
@PathParam("dataSourceName") final String dataSourceName
)
{
if (!databaseSegmentManager.enableDatasource(dataSourceName)) {
if (!metadataSegmentManager.enableDatasource(dataSourceName)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -574,7 +574,7 @@ public class InfoResource
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
if (!metadataSegmentManager.removeSegment(dataSourceName, segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -589,7 +589,7 @@ public class InfoResource
@PathParam("segmentId") String segmentId
)
{
if (!databaseSegmentManager.enableSegment(segmentId)) {
if (!metadataSegmentManager.enableSegment(segmentId)) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -672,7 +672,7 @@ public class InfoResource
}
@GET
@Path("/db/datasources")
@Path("/metadata/datasources")
@Produces("application/json")
public Response getDatabaseDataSources(
@QueryParam("full") String full,
@ -681,15 +681,15 @@ public class InfoResource
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
return builder.entity(metadataSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build();
return builder.entity(metadataSegmentManager.getInventory()).build();
}
List<String> dataSourceNames = Lists.newArrayList(
Iterables.transform(
databaseSegmentManager.getInventory(),
metadataSegmentManager.getInventory(),
new Function<DruidDataSource, String>()
{
@Override
@ -707,13 +707,13 @@ public class InfoResource
}
@GET
@Path("/db/datasources/{dataSourceName}")
@Path("/metadata/datasources/{dataSourceName}")
@Produces("application/json")
public Response getDatabaseSegmentDataSource(
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -722,14 +722,14 @@ public class InfoResource
}
@GET
@Path("/db/datasources/{dataSourceName}/segments")
@Path("/metadata/datasources/{dataSourceName}/segments")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegments(
@PathParam("dataSourceName") String dataSourceName,
@QueryParam("full") String full
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -755,14 +755,14 @@ public class InfoResource
}
@GET
@Path("/db/datasources/{dataSourceName}/segments/{segmentId}")
@Path("/metadata/datasources/{dataSourceName}/segments/{segmentId}")
@Produces("application/json")
public Response getDatabaseSegmentDataSourceSegment(
@PathParam("dataSourceName") String dataSourceName,
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}

View File

@ -24,7 +24,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import io.druid.client.DruidDataSource;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.timeline.DataSegment;
import javax.ws.rs.GET;
@ -38,17 +38,17 @@ import java.util.List;
/**
*/
@Path("/druid/coordinator/v1/db")
public class DBResource
@Path("/druid/coordinator/v1/metadata")
public class MetadataResource
{
private final MetadataSegmentManager databaseSegmentManager;
private final MetadataSegmentManager metadataSegmentManager;
@Inject
public DBResource(
MetadataSegmentManager databaseSegmentManager
public MetadataResource(
MetadataSegmentManager metadataSegmentManager
)
{
this.databaseSegmentManager = databaseSegmentManager;
this.metadataSegmentManager = metadataSegmentManager;
}
@GET
@ -61,15 +61,15 @@ public class DBResource
{
Response.ResponseBuilder builder = Response.status(Response.Status.OK);
if (includeDisabled != null) {
return builder.entity(databaseSegmentManager.getAllDatasourceNames()).build();
return builder.entity(metadataSegmentManager.getAllDatasourceNames()).build();
}
if (full != null) {
return builder.entity(databaseSegmentManager.getInventory()).build();
return builder.entity(metadataSegmentManager.getInventory()).build();
}
List<String> dataSourceNames = Lists.newArrayList(
Iterables.transform(
databaseSegmentManager.getInventory(),
metadataSegmentManager.getInventory(),
new Function<DruidDataSource, String>()
{
@Override
@ -93,7 +93,7 @@ public class DBResource
@PathParam("dataSourceName") final String dataSourceName
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -109,7 +109,7 @@ public class DBResource
@QueryParam("full") String full
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}
@ -142,7 +142,7 @@ public class DBResource
@PathParam("segmentId") String segmentId
)
{
DruidDataSource dataSource = databaseSegmentManager.getInventoryValue(dataSourceName);
DruidDataSource dataSource = metadataSegmentManager.getInventoryValue(dataSourceName);
if (dataSource == null) {
return Response.status(Response.Status.NOT_FOUND).build();
}

View File

@ -20,7 +20,7 @@
package io.druid.server.http;
import com.google.inject.Inject;
import io.druid.db.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.rules.Rule;
import javax.ws.rs.Consumes;

View File

@ -11375,10 +11375,10 @@ app.factory('$druid', function($q, $http, $utils, $window) {
return this.getAndProcess("/servers?simple", $utils.processServerTiers);
},
getDataSources: function() {
return this.getAndProcess("/db/datasources", $utils.processDataSources);
return this.getAndProcess("/metadata/datasources", $utils.processDataSources);
},
getAllDataSources: function() {
return this.getAndProcess("/db/datasources?includeDisabled", function(dataSources) {
return this.getAndProcess("/metadata/datasources?includeDisabled", function(dataSources) {
return dataSources;
});
},
@ -12758,4 +12758,4 @@ ClusterConfigInstanceCtrl = function($scope, $modalInstance, $druid) {
};
},{"../../bower_components/angular-bootstrap/ui-bootstrap-tpls.min.js":1,"../../bower_components/angular-sanitize/angular-sanitize.min.js":2,"../../bower_components/angular-ui-router/release/angular-ui-router.min.js":3,"../../bower_components/angular/angular.min.js":4,"../../bower_components/d3/d3.js":5,"../../bower_components/moment/min/moment.min.js":7,"../../bower_components/ng-clip/dest/ng-clip.min.js":8,"../../bower_components/ng-csv/build/ng-csv.min.js":9,"../../bower_components/underscore/underscore.js":10,"../../bower_components/zeroclipboard/ZeroClipboard.min.js":11,"../../lib/moment-interval.js":12,"jquery":6}]},{},[13])
},{"../../bower_components/angular-bootstrap/ui-bootstrap-tpls.min.js":1,"../../bower_components/angular-sanitize/angular-sanitize.min.js":2,"../../bower_components/angular-ui-router/release/angular-ui-router.min.js":3,"../../bower_components/angular/angular.min.js":4,"../../bower_components/d3/d3.js":5,"../../bower_components/moment/min/moment.min.js":7,"../../bower_components/ng-clip/dest/ng-clip.min.js":8,"../../bower_components/ng-csv/build/ng-csv.min.js":9,"../../bower_components/underscore/underscore.js":10,"../../bower_components/zeroclipboard/ZeroClipboard.min.js":11,"../../lib/moment-interval.js":12,"jquery":6}]},{},[13])

View File

@ -70,12 +70,12 @@ $(document).ready(function() {
}
});
$.getJSON("/druid/coordinator/v1/db/datasources", function(enabled_datasources) {
$.getJSON("/druid/coordinator/v1/metadata/datasources", function(enabled_datasources) {
$.each(enabled_datasources, function(index, datasource) {
$('#enabled_datasources').append($('<li>' + datasource + '</li>'));
});
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(db_datasources) {
$.getJSON("/druid/coordinator/v1/metadata/datasources?includeDisabled", function(db_datasources) {
var disabled_datasources = _.difference(db_datasources, enabled_datasources);
$.each(disabled_datasources, function(index, datasource) {
$('#disabled_datasources').append($('<li>' + datasource + '</li>'));
@ -93,4 +93,4 @@ $(document).ready(function() {
$('#disable').click(function (){
$("#disable_dialog").dialog("open")
});
});
});

View File

@ -41,7 +41,7 @@ $(document).ready(function() {
}
});
$.getJSON("/druid/coordinator/v1/db/datasources?includeDisabled", function(data) {
$.getJSON("/druid/coordinator/v1/metadata/datasources?includeDisabled", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').val(datasource).text(datasource));
});
@ -50,4 +50,4 @@ $(document).ready(function() {
$("#confirm").click(function() {
$("#confirm_dialog").dialog("open");
});
});
});

View File

@ -268,7 +268,7 @@ $(document).ready(function() {
tiers = theTiers;
});
$.getJSON("/druid/coordinator/v1/db/datasources", function(data) {
$.getJSON("/druid/coordinator/v1/metadata/datasources", function(data) {
$.each(data, function(index, datasource) {
$('#datasources').append($('<option></option>').val(datasource).text(datasource));
});
@ -307,4 +307,4 @@ $(document).ready(function() {
$('#update').click(function (){
$("#update_dialog").dialog("open")
});
});
});

View File

@ -17,7 +17,7 @@
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.db;
package io.druid.metadata;
import com.google.common.base.Suppliers;
import com.google.common.collect.Maps;

View File

@ -29,7 +29,7 @@ import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoaderConfig;
import io.druid.segment.realtime.SegmentPublisher;

View File

@ -29,7 +29,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.DruidServer;
import io.druid.client.ImmutableDruidServer;
import io.druid.db.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;

View File

@ -29,7 +29,7 @@ import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import io.druid.client.DruidServer;
import io.druid.db.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.segment.IndexIO;
import io.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
import io.druid.server.coordinator.rules.IntervalDropRule;

View File

@ -27,7 +27,7 @@ import io.druid.client.ImmutableDruidServer;
import io.druid.client.SingleServerInventoryView;
import io.druid.curator.discovery.NoopServiceAnnouncer;
import io.druid.curator.inventory.InventoryManagerConfig;
import io.druid.db.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.server.DruidNode;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;

View File

@ -14,9 +14,9 @@ import io.druid.curator.PotentiallyGzippedCompressionProvider;
import io.druid.curator.announcement.Announcer;
import io.druid.curator.discovery.ServerDiscoveryFactory;
import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerConfig;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;

View File

@ -29,12 +29,12 @@ import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.client.indexing.IndexingServiceClient;
import io.druid.db.MetadataRuleManager;
import io.druid.db.MetadataRuleManagerConfig;
import io.druid.db.MetadataRuleManagerProvider;
import io.druid.db.MetadataSegmentManager;
import io.druid.db.MetadataSegmentManagerConfig;
import io.druid.db.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataRuleManager;
import io.druid.metadata.MetadataRuleManagerConfig;
import io.druid.metadata.MetadataRuleManagerProvider;
import io.druid.metadata.MetadataSegmentManager;
import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.guice.ConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
@ -49,7 +49,7 @@ import io.druid.server.http.BackwardsCompatibleInfoResource;
import io.druid.server.http.CoordinatorDynamicConfigsResource;
import io.druid.server.http.CoordinatorRedirectInfo;
import io.druid.server.http.CoordinatorResource;
import io.druid.server.http.DBResource;
import io.druid.server.http.MetadataResource;
import io.druid.server.http.DatasourcesResource;
import io.druid.server.http.InfoResource;
import io.druid.server.http.RedirectFilter;
@ -125,7 +125,7 @@ public class CliCoordinator extends ServerRunnable
Jerseys.addResource(binder, RulesResource.class);
Jerseys.addResource(binder, ServersResource.class);
Jerseys.addResource(binder, DatasourcesResource.class);
Jerseys.addResource(binder, DBResource.class);
Jerseys.addResource(binder, MetadataResource.class);
LifecycleModule.register(binder, Server.class);
}

View File

@ -31,7 +31,6 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
@ -50,10 +49,9 @@ import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.tasklogs.SwitchingTaskLogStreamer;
import io.druid.indexing.common.tasklogs.TaskRunnerTaskLogStreamer;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.ForkingTaskRunnerFactory;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.indexing.overlord.MetadataTaskStorage;
import io.druid.indexing.overlord.RemoteTaskRunnerFactory;
import io.druid.indexing.overlord.TaskLockbox;
import io.druid.indexing.overlord.TaskMaster;
@ -141,7 +139,6 @@ public class CliOverlord extends ServerRunnable
binder.bind(TaskActionClientFactory.class).to(LocalTaskActionClientFactory.class).in(LazySingleton.class);
binder.bind(TaskActionToolbox.class).in(LazySingleton.class);
binder.bind(IndexerMetadataStorageCoordinator.class).to(IndexerSQLMetadataStorageCoordinator.class).in(LazySingleton.class);
binder.bind(TaskLockbox.class).in(LazySingleton.class);
binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class);
binder.bind(ResourceManagementSchedulerFactory.class)

View File

@ -31,7 +31,7 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Arguments;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.db.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.guice.Binders;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.Jerseys;

View File

@ -27,9 +27,9 @@ import com.google.inject.Module;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.airlift.command.Option;
import io.druid.db.MetadataStorageConnector;
import io.druid.db.MetadataStorageConnectorConfig;
import io.druid.db.MetadataStorageTablesConfig;
import io.druid.metadata.MetadataStorageConnector;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.annotations.Self;
import io.druid.server.DruidNode;