Merge pull request #1100 from metamx/audit-config-changes

Auditing Rules and Dynamic Configuration Changes
This commit is contained in:
Fangjin Yang 2015-03-12 12:57:54 -07:00
commit 3892a94506
29 changed files with 1255 additions and 30 deletions

View File

@ -0,0 +1,185 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.audit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.DateTime;
/**
* An Entry in Audit Table.
*/
public class AuditEntry
{
private final String key;
private final String type;
private final AuditInfo auditInfo;
private final String payload;
private final DateTime auditTime;
@JsonCreator
public AuditEntry(
@JsonProperty("key") String key,
@JsonProperty("type") String type,
@JsonProperty("auditInfo") AuditInfo authorInfo,
@JsonProperty("payload") String payload,
@JsonProperty("auditTime") DateTime auditTime
)
{
Preconditions.checkNotNull(key, "key cannot be null");
Preconditions.checkNotNull(type, "type cannot be null");
Preconditions.checkNotNull(authorInfo, "author cannot be null");
this.key = key;
this.type = type;
this.auditInfo = authorInfo;
this.auditTime = auditTime == null ? DateTime.now() : auditTime;
this.payload = payload;
}
@JsonProperty
public String getKey()
{
return key;
}
@JsonProperty
public String getType()
{
return type;
}
@JsonProperty
public AuditInfo getAuditInfo()
{
return auditInfo;
}
@JsonProperty
public String getPayload()
{
return payload;
}
@JsonProperty
public DateTime getAuditTime()
{
return auditTime;
}
public static AuditEntry.Builder builder()
{
return new AuditEntry.Builder();
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AuditEntry entry = (AuditEntry) o;
if (!auditTime.equals(entry.auditTime)) {
return false;
}
if (!auditInfo.equals(entry.auditInfo)) {
return false;
}
if (!key.equals(entry.key)) {
return false;
}
if (!payload.equals(entry.payload)) {
return false;
}
if (!type.equals(entry.type)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = key.hashCode();
result = 31 * result + type.hashCode();
result = 31 * result + auditInfo.hashCode();
result = 31 * result + payload.hashCode();
result = 31 * result + auditTime.hashCode();
return result;
}
public static class Builder
{
private String key;
private String type;
private AuditInfo auditInfo;
private String payload;
private DateTime auditTime;
private Builder()
{
this.key = null;
this.auditInfo = null;
this.payload = null;
this.auditTime = DateTime.now();
}
public Builder key(String key)
{
this.key = key;
return this;
}
public Builder type(String type)
{
this.type = type;
return this;
}
public Builder auditInfo(AuditInfo auditInfo)
{
this.auditInfo = auditInfo;
return this;
}
public Builder payload(String payload)
{
this.payload = payload;
return this;
}
public Builder auditTime(DateTime auditTime)
{
this.auditTime = auditTime;
return this;
}
public AuditEntry build()
{
return new AuditEntry(key, type, auditInfo, payload, auditTime);
}
}
}

View File

@ -0,0 +1,79 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.audit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
public class AuditInfo
{
private final String author;
private final String comment;
@JsonCreator
public AuditInfo(
@JsonProperty("author") String author,
@JsonProperty("comment") String comment
)
{
this.author = author;
this.comment = comment;
}
@JsonProperty
public String getAuthor()
{
return author;
}
@JsonProperty
public String getComment()
{
return comment;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AuditInfo that = (AuditInfo) o;
if (!author.equals(that.author)) {
return false;
}
if (!comment.equals(that.comment)) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = author.hashCode();
result = 31 * result + comment.hashCode();
return result;
}
}

View File

@ -0,0 +1,57 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.audit;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import java.io.IOException;
import java.util.List;
public interface AuditManager
{
public static final String X_DRUID_AUTHOR = "X-Druid-Author";
public static final String X_DRUID_COMMENT = "X-Druid-Comment";
/**
* inserts an audit Entry in the Audit Table
* @param auditEntry
*/
public void doAudit(AuditEntry auditEntry);
/**
* inserts an audit Entry in audit table using the handler provided
* used to do the audit in same transaction as the config changes
* @param auditEntry
* @param handler
* @throws IOException
*/
public void doAudit(AuditEntry auditEntry, Handle handler) throws IOException;
/**
* provides audit history for given key, type and interval
* @param key
* @param type
* @param interval
* @return list of AuditEntries satisfying the passed parameters
*/
public List<AuditEntry> fetchAuditHistory(String key, String type, Interval interval);
}

View File

@ -22,5 +22,6 @@ package io.druid.common.config;
public interface ConfigSerde<T>
{
public byte[] serialize(T obj);
public String serializeToString(T obj);
public T deserialize(byte[] bytes);
}

View File

@ -22,6 +22,9 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
@ -32,15 +35,18 @@ public class JacksonConfigManager
{
private final ConfigManager configManager;
private final ObjectMapper jsonMapper;
private final AuditManager auditManager;
@Inject
public JacksonConfigManager(
ConfigManager configManager,
ObjectMapper jsonMapper
ObjectMapper jsonMapper,
AuditManager auditManager
)
{
this.configManager = configManager;
this.jsonMapper = jsonMapper;
this.auditManager = auditManager;
}
public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz)
@ -63,9 +69,20 @@ public class JacksonConfigManager
return configManager.watchConfig(key, create(clazz, defaultVal));
}
public <T> boolean set(String key, T val)
public <T> boolean set(String key, T val, AuditInfo auditInfo)
{
return configManager.set(key, create(val.getClass(), null), val);
ConfigSerde configSerde = create(val.getClass(), null);
// Audit and actual config change are done in separate transactions
// there can be phantom audits and reOrdering in audit changes as well.
auditManager.doAudit(
AuditEntry.builder()
.key(key)
.type(key)
.auditInfo(auditInfo)
.payload(configSerde.serializeToString(val))
.build()
);
return configManager.set(key, configSerde, val);
}
private <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
@ -83,6 +100,17 @@ public class JacksonConfigManager
}
}
@Override
public String serializeToString(T obj)
{
try {
return jsonMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public T deserialize(byte[] bytes)
{
@ -115,6 +143,17 @@ public class JacksonConfigManager
}
}
@Override
public String serializeToString(T obj)
{
try {
return jsonMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
}
@Override
public T deserialize(byte[] bytes)
{

View File

@ -44,4 +44,6 @@ public interface MetadataStorageConnector
public void createConfigTable();
public void createTaskTables();
public void createAuditTable();
}

View File

@ -29,7 +29,7 @@ public class MetadataStorageTablesConfig
{
public static MetadataStorageTablesConfig fromBase(String base)
{
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null);
return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null);
}
public static final String TASK_ENTRY_TYPE = "task";
@ -61,6 +61,9 @@ public class MetadataStorageTablesConfig
@JsonProperty("taskLock")
private final String taskLockTable;
@JsonProperty("audit")
private final String auditTable;
@JsonCreator
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
@ -69,7 +72,8 @@ public class MetadataStorageTablesConfig
@JsonProperty("config") String configTable,
@JsonProperty("tasks") String tasksTable,
@JsonProperty("taskLog") String taskLogTable,
@JsonProperty("taskLock") String taskLockTable
@JsonProperty("taskLock") String taskLockTable,
@JsonProperty("audit") String auditTable
)
{
this.base = (base == null) ? DEFAULT_BASE : base;
@ -83,6 +87,7 @@ public class MetadataStorageTablesConfig
entryTables.put(TASK_ENTRY_TYPE, this.tasksTable);
logTables.put(TASK_ENTRY_TYPE, this.taskLogTable);
lockTables.put(TASK_ENTRY_TYPE, this.taskLockTable);
this.auditTable = makeTableName(auditTable, "audit");
}
@ -137,4 +142,10 @@ public class MetadataStorageTablesConfig
{
return TASK_ENTRY_TYPE;
}
public String getAuditTable()
{
return auditTable;
}
}

View File

@ -46,9 +46,16 @@ The coordinator has dynamic configuration to change certain behaviour on the fly
It is recommended that you use the Coordinator Console to configure these parameters. However, if you need to do it via HTTP, the JSON object can be submitted to the overlord via a POST request at:
```
http://<COORDINATOR_IP>:<PORT>/coordinator/config
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config
```
Optional Header Parameters for auditing the config change can also be specified.
|Header Param Name| Description | Default |
|----------|-------------|---------|
|`X-Druid-Author`| author making the config change|""|
|`X-Druid-Comment`| comment describing the change being done|""|
A sample coordinator dynamic config JSON object is shown below:
```json
@ -74,3 +81,11 @@ Issuing a GET request at the same URL will return the spec that is currently in
|`replicantLifetime`|The maximum number of coordinator runs for a segment to be replicated before we start alerting.|15|
|`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10|
|`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false|
To view the audit history of coordinator dynamic config issue a GET request to the URL -
```
http://<COORDINATOR_IP>:<PORT>/druid/coordinator/v1/config/history?interval=<interval>
```
default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in coordinator runtime.properties

View File

@ -183,6 +183,12 @@ Returns all rules for a specified datasource.
Returns all rules for a specified datasource and includes default datasource.
* `/druid/coordinator/v1/rules/{dataSourceName}/history?interval=<interval>`
Returns audit history of rules. default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in coordinator runtime.properties
### POST
#### Datasources
@ -201,6 +207,13 @@ Enables a segment.
POST with a list of rules in JSON form to update rules.
Optional Header Parameters for auditing the config change can also be specified.
|Header Param Name| Description | Default |
|----------|-------------|---------|
|`X-Druid-Author`| author making the config change|""|
|`X-Druid-Comment`| comment describing the change being done|""|
### DELETE
#### Datasources

View File

@ -97,6 +97,13 @@ The JSON object can be submitted to the overlord via a POST request at:
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker
```
Optional Header Parameters for auditing the config change can also be specified.
|Header Param Name| Description | Default |
|----------|-------------|---------|
|`X-Druid-Author`| author making the config change|""|
|`X-Druid-Comment`| comment describing the change being done|""|
A sample worker config spec is shown below:
```json
@ -142,6 +149,14 @@ Issuing a GET request at the same URL will return the current worker config spec
|`selectStrategy`|How to assign tasks to middlemanagers. Choices are `fillCapacity`, `fillCapacityWithAffinity`, and `equalDistribution`.|fillCapacity|
|`autoScaler`|Only used if autoscaling is enabled. See below.|null|
To view the audit history of worker config issue a GET request to the URL -
```
http://<COORDINATOR_IP>:<port>/druid/indexer/v1/worker/history?interval=<interval>
```
default value of interval can be specified by setting `druid.audit.manager.auditHistoryMillis` (1 week if not configured) in overlord runtime.properties.
#### Worker Select Strategy
##### Fill Capacity

View File

@ -60,3 +60,8 @@ Task-related Tables
-------------------
There are also a number of tables created and used by the [Indexing Service](Indexing-Service.html) in the course of its work.
Audit Table
-----------
The Audit table is used to store the audit history for configuration changes e.g rule changes done by [Coordinator](Coordinator.html) and other config changes.

View File

@ -29,6 +29,8 @@ import com.google.common.io.ByteSource;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.config.JacksonConfigManager;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.actions.TaskActionClient;
@ -45,10 +47,12 @@ import io.druid.metadata.EntryExistsException;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@ -74,6 +78,7 @@ public class OverlordResource
private final TaskStorageQueryAdapter taskStorageQueryAdapter;
private final TaskLogStreamer taskLogStreamer;
private final JacksonConfigManager configManager;
private final AuditManager auditManager;
private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
@ -82,13 +87,15 @@ public class OverlordResource
TaskMaster taskMaster,
TaskStorageQueryAdapter taskStorageQueryAdapter,
TaskLogStreamer taskLogStreamer,
JacksonConfigManager configManager
JacksonConfigManager configManager,
AuditManager auditManager
) throws Exception
{
this.taskMaster = taskMaster;
this.taskStorageQueryAdapter = taskStorageQueryAdapter;
this.taskLogStreamer = taskLogStreamer;
this.configManager = configManager;
this.auditManager = auditManager;
}
@POST
@ -182,14 +189,17 @@ public class OverlordResource
return Response.ok(workerConfigRef.get()).build();
}
// default value is used for backwards compatibility
@POST
@Path("/worker")
@Consumes(MediaType.APPLICATION_JSON)
public Response setWorkerConfig(
final WorkerBehaviorConfig workerBehaviorConfig
final WorkerBehaviorConfig workerBehaviorConfig,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment
)
{
if (!configManager.set(WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig)) {
if (!configManager.set(WorkerBehaviorConfig.CONFIG_KEY, workerBehaviorConfig, new AuditInfo(author, comment))) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
@ -198,6 +208,24 @@ public class OverlordResource
return Response.ok().build();
}
@GET
@Path("/worker/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getWorkerConfigHistory(
@QueryParam("interval") final String interval
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
return Response.ok(
auditManager.fetchAuditHistory(
WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.CONFIG_KEY,
theInterval
)
)
.build();
}
@POST
@Path("/action")
@Produces(MediaType.APPLICATION_JSON)

View File

@ -23,6 +23,7 @@ import com.google.inject.Module;
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import io.druid.metadata.MetadataRuleManagerConfig;
import io.druid.metadata.MetadataStorageActionHandlerFactory;
import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import io.druid.metadata.MetadataRuleManager;
@ -42,6 +43,11 @@ import io.druid.metadata.SQLMetadataSegmentManagerProvider;
import io.druid.metadata.SQLMetadataSegmentPublisher;
import io.druid.metadata.SQLMetadataSegmentPublisherProvider;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.audit.AuditManager;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.audit.SQLAuditManager;
import io.druid.server.audit.SQLAuditManagerConfig;
import io.druid.server.audit.SQLAuditManagerProvider;
public class SQLMetadataStorageDruidModule implements Module
{
@ -135,6 +141,20 @@ public class SQLMetadataStorageDruidModule implements Module
Key.get(SQLMetadataStorageUpdaterJobHandler.class),
defaultPropertyValue
);
PolyBind.createChoiceWithDefault(
binder,
PROPERTY,
Key.get(AuditManager.class),
Key.get(SQLAuditManager.class),
defaultPropertyValue
);
PolyBind.createChoiceWithDefault(
binder,
PROPERTY,
Key.get(AuditManagerProvider.class),
Key.get(SQLAuditManagerProvider.class),
defaultPropertyValue
);
}
@Override
@ -184,5 +204,17 @@ public class SQLMetadataStorageDruidModule implements Module
.addBinding(type)
.to(SQLMetadataStorageUpdaterJobHandler.class)
.in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.audit.manager", SQLAuditManagerConfig.class);
PolyBind.optionBinder(binder, Key.get(AuditManager.class))
.addBinding(type)
.to(SQLAuditManager.class)
.in(LazySingleton.class);
PolyBind.optionBinder(binder, Key.get(AuditManagerProvider.class))
.addBinding(type)
.to(SQLAuditManagerProvider.class)
.in(LazySingleton.class);
}
}

View File

@ -17,6 +17,7 @@
package io.druid.metadata;
import io.druid.audit.AuditInfo;
import io.druid.server.coordinator.rules.Rule;
import java.util.List;
@ -38,5 +39,5 @@ public interface MetadataRuleManager
public List<Rule> getRulesWithDefault(final String dataSource);
public boolean overrideRule(final String dataSource, final List<Rule> newRules);
public boolean overrideRule(final String dataSource, final List<Rule> rulesConfig, final AuditInfo auditInfo);
}

View File

@ -263,14 +263,22 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
.map(IntegerMapper.FIRST)
.first();
if (count == 0) {
handle.createStatement(String.format("INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn))
handle.createStatement(
String.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
} else {
handle.createStatement(String.format("UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn))
handle.createStatement(
String.format(
"UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
@ -369,4 +377,36 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
return dataSource;
}
private void createAuditTable(final IDBI dbi, final String tableName)
{
createTable(
dbi,
tableName,
ImmutableList.of(
String.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " audit_key VARCHAR(255) NOT NULL,\n"
+ " type VARCHAR(255) NOT NULL,\n"
+ " author VARCHAR(255) NOT NULL,\n"
+ " comment VARCHAR(2048) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " payload %3$s NOT NULL,\n"
+ " PRIMARY KEY(id)\n"
+ ")",
tableName, getSerialType(), getPayloadType()
),
String.format("CREATE INDEX idx_%1$s_key_time ON %1$s(audit_key, created_date)", tableName),
String.format("CREATE INDEX idx_%1$s_type_time ON %1$s(audit_key, created_date)", tableName),
String.format("CREATE INDEX idx_%1$s_audit_time ON %1$s(created_date)", tableName)
)
);
}
@Override
public void createAuditTable() {
if (config.get().isCreateTables()) {
createAuditTable(getDBI(), tablesConfigSupplier.get().getAuditTable());
}
}
}

View File

@ -31,6 +31,9 @@ import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.client.DruidServer;
import io.druid.concurrent.Execs;
import io.druid.guice.ManageLifecycle;
@ -44,6 +47,8 @@ import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
@ -127,6 +132,7 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final IDBI dbi;
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
private final AuditManager auditManager;
private volatile ScheduledExecutorService exec;
@ -139,13 +145,15 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
@Json ObjectMapper jsonMapper,
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables,
SQLMetadataConnector connector
SQLMetadataConnector connector,
AuditManager auditManager
)
{
this.jsonMapper = jsonMapper;
this.config = config;
this.dbTables = dbTables;
this.dbi = connector.getDBI();
this.auditManager = auditManager;
this.rules = new AtomicReference<>(
ImmutableMap.<String, List<Rule>>of()
@ -298,17 +306,28 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
return retVal;
}
public boolean overrideRule(final String dataSource, final List<Rule> newRules)
public boolean overrideRule(final String dataSource, final List<Rule> newRules, final AuditInfo auditInfo)
{
synchronized (lock) {
try {
dbi.withHandle(
new HandleCallback<Void>()
dbi.inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final String version = new DateTime().toString();
final DateTime auditTime = DateTime.now();
auditManager.doAudit(
AuditEntry.builder()
.key(dataSource)
.type("rules")
.auditInfo(auditInfo)
.payload(jsonMapper.writeValueAsString(newRules))
.auditTime(auditTime)
.build(),
handle
);
String version = auditTime.toString();
handle.createStatement(
String.format(
"INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)",

View File

@ -22,6 +22,8 @@ import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import io.druid.audit.AuditManager;
import io.druid.server.audit.SQLAuditManager;
import org.skife.jdbi.v2.IDBI;
/**
@ -34,6 +36,7 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
private final SQLMetadataConnector connector;
private final Lifecycle lifecycle;
private final IDBI dbi;
private final AuditManager auditManager;
@Inject
public SQLMetadataRuleManagerProvider(
@ -41,7 +44,8 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
Supplier<MetadataRuleManagerConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables,
SQLMetadataConnector connector,
Lifecycle lifecycle
Lifecycle lifecycle,
SQLAuditManager auditManager
)
{
this.jsonMapper = jsonMapper;
@ -50,6 +54,7 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
this.connector = connector;
this.dbi = connector.getDBI();
this.lifecycle = lifecycle;
this.auditManager = auditManager;
}
@Override
@ -80,6 +85,6 @@ public class SQLMetadataRuleManagerProvider implements MetadataRuleManagerProvid
throw Throwables.propagate(e);
}
return new SQLMetadataRuleManager(jsonMapper, config, dbTables, connector);
return new SQLMetadataRuleManager(jsonMapper, config, dbTables, connector, auditManager);
}
}

View File

@ -0,0 +1,26 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.audit;
import com.google.inject.Provider;
import io.druid.audit.AuditManager;
public interface AuditManagerProvider extends Provider<AuditManager>
{
public AuditManager get();
}

View File

@ -0,0 +1,166 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.audit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditManager;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.annotations.Json;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
@ManageLifecycle
public class SQLAuditManager implements AuditManager
{
private final IDBI dbi;
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final ServiceEmitter emitter;
private final ObjectMapper jsonMapper;
private final SQLAuditManagerConfig config;
@Inject
public SQLAuditManager(
SQLMetadataConnector connector,
Supplier<MetadataStorageTablesConfig> dbTables,
ServiceEmitter emitter,
@Json ObjectMapper jsonMapper,
SQLAuditManagerConfig config
)
{
this.dbi = connector.getDBI();
this.dbTables = dbTables;
this.emitter = emitter;
this.jsonMapper = jsonMapper;
this.config = config;
}
public String getAuditTable()
{
return dbTables.get().getAuditTable();
}
@Override
public void doAudit(final AuditEntry auditEntry)
{
dbi.withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
doAudit(auditEntry, handle);
return null;
}
}
);
}
@Override
public void doAudit(AuditEntry auditEntry, Handle handle) throws IOException
{
emitter.emit(
new ServiceMetricEvent.Builder()
.setUser1(auditEntry.getKey())
.setUser2(auditEntry.getType())
.setUser3(auditEntry.getAuditInfo().getAuthor())
.setUser5(jsonMapper.writeValueAsString(auditEntry.getPayload()))
.setUser6(auditEntry.getAuditInfo().getComment())
.setUser7(auditEntry.getAuditTime().toString())
.build("config/audit", 1)
);
handle.createStatement(
String.format(
"INSERT INTO %s ( audit_key, type, author, comment, created_date, payload) VALUES (:audit_key, :type, :author, :comment, :created_date, :payload)",
getAuditTable()
)
)
.bind("audit_key", auditEntry.getKey())
.bind("type", auditEntry.getType())
.bind("author", auditEntry.getAuditInfo().getAuthor())
.bind("comment", auditEntry.getAuditInfo().getComment())
.bind("created_date", auditEntry.getAuditTime().toString())
.bind("payload", jsonMapper.writeValueAsBytes(auditEntry))
.execute();
}
@Override
public List<AuditEntry> fetchAuditHistory(final String key, final String type, Interval interval)
{
final Interval theInterval;
if (interval == null) {
DateTime now = new DateTime();
theInterval = new Interval(now.minus(config.getAuditHistoryMillis()), now);
} else {
theInterval = interval;
}
return dbi.withHandle(
new HandleCallback<List<AuditEntry>>()
{
@Override
public List<AuditEntry> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE audit_key = :audit_key and type = :type and created_date between :start_date and :end_date ORDER BY created_date",
getAuditTable()
)
).bind("audit_key", key)
.bind("type", type)
.bind("start_date", theInterval.getStart().toString())
.bind("end_date", theInterval.getEnd().toString())
.map(
new ResultSetMapper<AuditEntry>()
{
@Override
public AuditEntry map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return jsonMapper.readValue(r.getBytes("payload"), AuditEntry.class);
}
catch (IOException e) {
throw new SQLException(e);
}
}
}
)
.list();
}
}
);
}
}

View File

@ -0,0 +1,33 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.audit;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
*/
public class SQLAuditManagerConfig
{
@JsonProperty
private long auditHistoryMillis = 7 * 24 * 60 * 60 * 1000L; // 1 WEEK
public long getAuditHistoryMillis()
{
return auditHistoryMillis;
}
}

View File

@ -0,0 +1,84 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.audit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.audit.AuditManager;
import io.druid.guice.annotations.Json;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.SQLMetadataConnector;
public class SQLAuditManagerProvider implements AuditManagerProvider
{
private final Supplier<MetadataStorageTablesConfig> dbTables;
private final SQLMetadataConnector connector;
private final Lifecycle lifecycle;
private final ServiceEmitter emitter;
private final ObjectMapper mapper;
private final SQLAuditManagerConfig config;
@Inject
public SQLAuditManagerProvider(
Supplier<MetadataStorageTablesConfig> dbTables,
SQLMetadataConnector connector,
Lifecycle lifecycle,
ServiceEmitter emitter,
@Json ObjectMapper mapper,
SQLAuditManagerConfig config
)
{
this.dbTables = dbTables;
this.connector = connector;
this.lifecycle = lifecycle;
this.emitter = emitter;
this.mapper = mapper;
this.config = config;
}
@Override
public AuditManager get()
{
try {
lifecycle.addMaybeStartHandler(
new Lifecycle.Handler()
{
@Override
public void start() throws Exception
{
connector.createAuditTable();
}
@Override
public void stop()
{
}
}
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return new SQLAuditManager(connector, dbTables, emitter, mapper, config);
}
}

View File

@ -17,15 +17,21 @@
package io.druid.server.http;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.common.config.JacksonConfigManager;
import io.druid.server.coordinator.CoordinatorDynamicConfig;
import org.joda.time.Interval;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -35,13 +41,16 @@ import javax.ws.rs.core.Response;
public class CoordinatorDynamicConfigsResource
{
private final JacksonConfigManager manager;
private final AuditManager auditManager;
@Inject
public CoordinatorDynamicConfigsResource(
JacksonConfigManager manager
JacksonConfigManager manager,
AuditManager auditManager
)
{
this.manager = manager;
this.auditManager = auditManager;
}
@GET
@ -56,14 +65,36 @@ public class CoordinatorDynamicConfigsResource
).build();
}
// default value is used for backwards compatibility
@POST
@Consumes(MediaType.APPLICATION_JSON)
public Response setDynamicConfigs(final CoordinatorDynamicConfig dynamicConfig)
public Response setDynamicConfigs(final CoordinatorDynamicConfig dynamicConfig,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment
)
{
if (!manager.set(CoordinatorDynamicConfig.CONFIG_KEY, dynamicConfig)) {
if (!manager.set(CoordinatorDynamicConfig.CONFIG_KEY, dynamicConfig, new AuditInfo(author, comment))) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
return Response.ok().build();
}
@GET
@Path("/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getDatasourceRuleHistory(
@QueryParam("interval") final String interval
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
return Response.ok(
auditManager.fetchAuditHistory(
CoordinatorDynamicConfig.CONFIG_KEY,
CoordinatorDynamicConfig.CONFIG_KEY,
theInterval
)
)
.build();
}
}

View File

@ -18,11 +18,16 @@
package io.druid.server.http;
import com.google.inject.Inject;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.metadata.MetadataRuleManager;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.Interval;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
@ -38,13 +43,16 @@ import java.util.List;
public class RulesResource
{
private final MetadataRuleManager databaseRuleManager;
private final AuditManager auditManager;
@Inject
public RulesResource(
MetadataRuleManager databaseRuleManager
MetadataRuleManager databaseRuleManager,
AuditManager auditManager
)
{
this.databaseRuleManager = databaseRuleManager;
this.auditManager = auditManager;
}
@GET
@ -60,7 +68,6 @@ public class RulesResource
public Response getDatasourceRules(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("full") final String full
)
{
if (full != null) {
@ -70,18 +77,38 @@ public class RulesResource
return Response.ok(databaseRuleManager.getRules(dataSourceName))
.build();
}
// default value is used for backwards compatibility
@POST
@Path("/{dataSourceName}")
@Consumes(MediaType.APPLICATION_JSON)
public Response setDatasourceRules(
@PathParam("dataSourceName") final String dataSourceName,
final List<Rule> rules
final List<Rule> rules,
@HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author,
@HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment
)
{
if (databaseRuleManager.overrideRule(dataSourceName, rules)) {
if (databaseRuleManager.overrideRule(
dataSourceName,
rules,
new AuditInfo(author, comment)
)) {
return Response.ok().build();
}
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
@GET
@Path("/{dataSourceName}/history")
@Produces(MediaType.APPLICATION_JSON)
public Response getDatasourceRuleHistory(
@PathParam("dataSourceName") final String dataSourceName,
@QueryParam("interval") final String interval
)
{
Interval theInterval = interval == null ? null : new Interval(interval);
return Response.ok(auditManager.fetchAuditHistory(dataSourceName, "rules", theInterval))
.build();
}
}

View File

@ -50,11 +50,13 @@ public class SQLMetadataConnectorTest
tables.add(tablesConfig.getLockTable(entryType));
tables.add(tablesConfig.getLogTable(entryType));
tables.add(tablesConfig.getEntryTable(entryType));
tables.add(tablesConfig.getAuditTable());
connector.createSegmentTable();
connector.createConfigTable();
connector.createRulesTable();
connector.createTaskTables();
connector.createAuditTable();
connector.getDBI().withHandle(
new HandleCallback<Void>()

View File

@ -0,0 +1,140 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.metadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.client.DruidServer;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.audit.SQLAuditManager;
import io.druid.server.audit.SQLAuditManagerConfig;
import io.druid.server.coordinator.rules.ForeverDropRule;
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.IntervalLoadRule;
import io.druid.server.coordinator.rules.PeriodLoadRule;
import io.druid.server.coordinator.rules.Rule;
import io.druid.server.metrics.NoopServiceEmitter;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class SQLMetadataRuleManagerTest
{
private TestDerbyConnector connector;
private MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test");
private SQLMetadataRuleManager ruleManager;
private AuditManager auditManager;
private final ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp()
{
connector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(tablesConfig)
);
connector.createAuditTable();
auditManager = new SQLAuditManager(
connector,
Suppliers.ofInstance(tablesConfig),
new NoopServiceEmitter(),
mapper,
new SQLAuditManagerConfig()
);
connector.createRulesTable();
ruleManager = new SQLMetadataRuleManager(
mapper,
Suppliers.ofInstance(new MetadataRuleManagerConfig()),
Suppliers.ofInstance(tablesConfig),
connector,
auditManager
);
}
@Test
public void testAuditEntryCreated() throws Exception
{
List<Rule> rules = Arrays.<Rule>asList(
new IntervalLoadRule(
new Interval("2015-01-01/2015-02-01"), ImmutableMap.<String, Integer>of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
// fetch rules from metadata storage
ruleManager.poll();
Assert.assertEquals(rules, ruleManager.getRules("test_dataSource"));
// verify audit entry is created
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory("test_dataSource", "rules", null);
Assert.assertEquals(1, auditEntries.size());
AuditEntry entry = auditEntries.get(0);
Assert.assertEquals(mapper.writeValueAsString(rules),entry.getPayload());
Assert.assertEquals(auditInfo,entry.getAuditInfo());
Assert.assertEquals("test_dataSource", entry.getKey());
}
@After
public void cleanup()
{
dropTable(tablesConfig.getAuditTable());
dropTable(tablesConfig.getRulesTable());
}
private void dropTable(final String tableName)
{
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
);
}
}

View File

@ -0,0 +1,156 @@
/*
* Druid - a distributed column store.
* Copyright 2012 - 2015 Metamarkets Group Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.druid.server.audit;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import io.druid.audit.AuditEntry;
import io.druid.audit.AuditInfo;
import io.druid.audit.AuditManager;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.metadata.MetadataStorageConnectorConfig;
import io.druid.metadata.MetadataStorageTablesConfig;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.metrics.NoopServiceEmitter;
import junit.framework.Assert;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.List;
public class SQLAuditManagerTest
{
private TestDerbyConnector connector;
private MetadataStorageTablesConfig tablesConfig = MetadataStorageTablesConfig.fromBase("test");
private AuditManager auditManager;
private final ObjectMapper mapper = new DefaultObjectMapper();
@Before
public void setUp() throws Exception
{
connector = new TestDerbyConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(tablesConfig)
);
connector.createAuditTable();
auditManager = new SQLAuditManager(
connector,
Suppliers.ofInstance(tablesConfig),
new NoopServiceEmitter(),
mapper,
new SQLAuditManagerConfig()
);
}
@Test
public void testAuditEntrySerde() throws IOException
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
ObjectMapper mapper = new DefaultObjectMapper();
AuditEntry serde = mapper.readValue(mapper.writeValueAsString(entry), AuditEntry.class);
Assert.assertEquals(entry, serde);
}
@Test
public void testCreateAuditEntry() throws IOException
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
auditManager.doAudit(entry);
byte[] payload = connector.lookup(tablesConfig.getAuditTable(), "audit_key", "payload", "testKey");
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry, dbEntry);
}
@Test
public void testFetchAuditHistory() throws IOException
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment"
),
"testPayload",
new DateTime("2013-01-01T00:00:00Z")
);
auditManager.doAudit(entry);
auditManager.doAudit(entry);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testKey",
"testType",
new Interval(
"2012-01-01T00:00:00Z/2013-01-03T00:00:00Z"
)
);
Assert.assertEquals(2, auditEntries.size());
Assert.assertEquals(entry, auditEntries.get(0));
Assert.assertEquals(entry, auditEntries.get(1));
}
@After
public void cleanup()
{
dropTable(tablesConfig.getAuditTable());
}
private void dropTable(final String tableName)
{
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(String.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
);
}
}

View File

@ -41,6 +41,8 @@ import io.druid.metadata.MetadataSegmentManagerConfig;
import io.druid.metadata.MetadataSegmentManagerProvider;
import io.druid.metadata.MetadataStorage;
import io.druid.metadata.MetadataStorageProvider;
import io.druid.audit.AuditManager;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.coordinator.DruidCoordinator;
import io.druid.server.coordinator.DruidCoordinatorConfig;
import io.druid.server.coordinator.LoadQueueTaskMaster;
@ -109,6 +111,10 @@ public class CliCoordinator extends ServerRunnable
.toProvider(MetadataRuleManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(AuditManager.class)
.toProvider(AuditManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(IndexingServiceClient.class).in(LazySingleton.class);
binder.bind(DruidCoordinator.class);

View File

@ -29,6 +29,7 @@ import com.google.inject.servlet.GuiceFilter;
import com.google.inject.util.Providers;
import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.audit.AuditManager;
import io.druid.client.indexing.IndexingServiceSelectorConfig;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
@ -69,6 +70,7 @@ import io.druid.indexing.overlord.http.OverlordResource;
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.indexing.worker.config.WorkerConfig;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.audit.AuditManagerProvider;
import io.druid.server.http.RedirectFilter;
import io.druid.server.http.RedirectInfo;
import io.druid.server.initialization.jetty.JettyServerInitUtils;
@ -147,6 +149,10 @@ public class CliOverlord extends ServerRunnable
configureRunners(binder);
configureAutoscale(binder);
binder.bind(AuditManager.class)
.toProvider(AuditManagerProvider.class)
.in(ManageLifecycle.class);
binder.bind(RedirectFilter.class).in(LazySingleton.class);
binder.bind(RedirectInfo.class).to(OverlordRedirectInfo.class).in(LazySingleton.class);

View File

@ -110,5 +110,6 @@ public class CreateTables extends GuiceRunnable
dbConnector.createRulesTable();
dbConnector.createConfigTable();
dbConnector.createTaskTables();
dbConnector.createAuditTable();
}
}