Do not allow retention rules to be null (#14223)

Changes:
- Do not allow retention rules for any datasource or cluster to be null
- Allow empty rules at the datasource level but not at the cluster level
- Add validation to ensure that `druid.manager.rules.defaultRule` is always set correctly
- Minor style refactors
This commit is contained in:
Kashif Faraz 2023-05-11 14:33:56 +05:30 committed by GitHub
parent 47e48ee657
commit 64e6283eca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 312 additions and 373 deletions

View File

@ -22,6 +22,8 @@ package org.apache.druid.audit;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
public class AuditInfo
{
private final String author;
@ -67,29 +69,16 @@ public class AuditInfo
if (o == null || getClass() != o.getClass()) {
return false;
}
AuditInfo that = (AuditInfo) o;
if (!author.equals(that.author)) {
return false;
}
if (!comment.equals(that.comment)) {
return false;
}
if (!ip.equals(that.ip)) {
return false;
}
return true;
AuditInfo auditInfo = (AuditInfo) o;
return Objects.equals(author, auditInfo.author)
&& Objects.equals(comment, auditInfo.comment)
&& Objects.equals(ip, auditInfo.ip);
}
@Override
public int hashCode()
{
int result = author.hashCode();
result = 31 * result + comment.hashCode();
result = 31 * result + ip.hashCode();
return result;
return Objects.hash(author, comment, ip);
}
@Override

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.audit;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
public class AuditInfoTest
{
@Test
public void testAuditInfoEquality()
{
final AuditInfo auditInfo1 = new AuditInfo("druid", "test equality", "127.0.0.1");
final AuditInfo auditInfo2 = new AuditInfo("druid", "test equality", "127.0.0.1");
Assert.assertEquals(auditInfo1, auditInfo2);
Assert.assertEquals(auditInfo1.hashCode(), auditInfo2.hashCode());
}
@Test(timeout = 60_000L)
public void testAuditEntrySerde() throws IOException
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
);
ObjectMapper mapper = new DefaultObjectMapper();
AuditEntry serde = mapper.readValue(mapper.writeValueAsString(entry), AuditEntry.class);
Assert.assertEquals(entry, serde);
}
}

View File

@ -35,6 +35,10 @@ public class MetadataRuleManagerConfig
@JsonProperty
private Period alertThreshold = new Period("PT10M");
/**
* Datasource name against which the cluster-level default rules are stored
* in the metadata store.
*/
public String getDefaultRule()
{
return defaultRule;

View File

@ -32,6 +32,7 @@ import org.apache.druid.client.DruidServer;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
@ -41,20 +42,12 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.coordinator.rules.ForeverLoadRule;
import org.apache.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.tweak.ResultSetMapper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -174,6 +167,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
Preconditions.checkNotNull(config.getAlertThreshold().toStandardDuration());
Preconditions.checkNotNull(config.getPollDuration().toStandardDuration());
String defaultRule = config.getDefaultRule();
Preconditions.checkState(
defaultRule != null && !defaultRule.isEmpty(),
"If specified, 'druid.manager.rules.defaultRule' must have a non-empty value."
);
this.rules = new AtomicReference<>(ImmutableMap.of());
}
@ -194,26 +193,18 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
createDefaultRule(dbi, getRulesTable(), config.getDefaultRule(), jsonMapper);
exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
try {
// poll() is synchronized together with start() and stop() to ensure that when stop() exits, poll()
// won't actually run anymore after that (it could only enter the synchronized section and exit
// immediately because the localStartedOrder doesn't match the new currentStartOrder). It's needed
// to avoid flakiness in SQLMetadataRuleManagerTest.
// See https://github.com/apache/druid/issues/6028
synchronized (lock) {
if (localStartedOrder == currentStartOrder) {
poll();
}
() -> {
try {
// Do not poll if already stopped
// See https://github.com/apache/druid/issues/6028
synchronized (lock) {
if (localStartedOrder == currentStartOrder) {
poll();
}
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
}
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
}
},
0,
@ -246,73 +237,45 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
ImmutableMap<String, List<Rule>> newRules = ImmutableMap.copyOf(
dbi.withHandle(
new HandleCallback<Map<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> withHandle(Handle handle)
{
return handle.createQuery(
// Return latest version rule by dataSource
StringUtils.format(
"SELECT r.dataSource, r.payload "
+ "FROM %1$s r "
+ "INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds "
+ "ON r.datasource = ds.datasource and r.version = ds.version",
getRulesTable()
)
).map(
new ResultSetMapper<Pair<String, List<Rule>>>()
{
@Override
public Pair<String, List<Rule>> map(int index, ResultSet r, StatementContext ctx)
throws SQLException
{
try {
return Pair.of(
r.getString("dataSource"),
jsonMapper.readValue(
r.getBytes("payload"), new TypeReference<List<Rule>>()
{
}
)
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
}
handle -> handle.createQuery(
// Return latest version rule by dataSource
StringUtils.format(
"SELECT r.dataSource, r.payload "
+ "FROM %1$s r "
+ "INNER JOIN(SELECT dataSource, max(version) as version FROM %1$s GROUP BY dataSource) ds "
+ "ON r.datasource = ds.datasource and r.version = ds.version",
getRulesTable()
)
.fold(
new HashMap<>(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap,
FoldController foldController,
StatementContext statementContext
)
{
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}
);
}
}
).map(
(index, r, ctx) -> {
try {
return Pair.of(
r.getString("dataSource"),
jsonMapper.readValue(r.getBytes("payload"), new TypeReference<List<Rule>>() {})
);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
).fold(
new HashMap<>(),
(retVal, stringObjectMap, foldController, statementContext) -> {
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
)
)
);
final int newRuleCount = newRules.values().stream().mapToInt(List::size).sum();
log.info("Polled and found %,d rule(s) for %,d datasource(s)", newRuleCount, newRules.size());
log.info("Polled and found [%d] rule(s) for [%d] datasource(s).", newRuleCount, newRules.size());
rules.set(newRules);
failStartTimeMs = 0;
@ -322,9 +285,9 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
failStartTimeMs = System.currentTimeMillis();
}
if (System.currentTimeMillis() - failStartTimeMs > config.getAlertThreshold().toStandardDuration().getMillis()) {
log.makeAlert(e, "Exception while polling for rules")
.emit();
final long alertPeriodMillis = config.getAlertThreshold().toStandardDuration().getMillis();
if (System.currentTimeMillis() - failStartTimeMs > alertPeriodMillis) {
log.makeAlert(e, "Exception while polling for rules").emit();
failStartTimeMs = 0;
} else {
log.error(e, "Exception while polling for rules");
@ -362,6 +325,12 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
@Override
public boolean overrideRule(final String dataSource, final List<Rule> newRules, final AuditInfo auditInfo)
{
if (newRules == null) {
throw new IAE("Rules cannot be null.");
} else if (newRules.isEmpty() && config.getDefaultRule().equals(dataSource)) {
throw new IAE("Cluster-level rules cannot be empty.");
}
final String ruleString;
try {
ruleString = jsonMapper.writeValueAsString(newRules);
@ -374,39 +343,35 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
synchronized (lock) {
try {
dbi.inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus) throws Exception
{
final DateTime auditTime = DateTimes.nowUtc();
auditManager.doAudit(
AuditEntry.builder()
.key(dataSource)
.type("rules")
.auditInfo(auditInfo)
.payload(ruleString)
.auditTime(auditTime)
.build(),
handle
);
// Note that the method removeRulesForEmptyDatasourcesOlderThan depends on the version field
// to be a timestamp
String version = auditTime.toString();
handle.createStatement(
StringUtils.format(
"INSERT INTO %s (id, dataSource, version, payload) VALUES (:id, :dataSource, :version, :payload)",
getRulesTable()
)
)
.bind("id", StringUtils.format("%s_%s", dataSource, version))
.bind("dataSource", dataSource)
.bind("version", version)
.bind("payload", jsonMapper.writeValueAsBytes(newRules))
.execute();
(handle, transactionStatus) -> {
final DateTime auditTime = DateTimes.nowUtc();
auditManager.doAudit(
AuditEntry.builder()
.key(dataSource)
.type("rules")
.auditInfo(auditInfo)
.payload(ruleString)
.auditTime(auditTime)
.build(),
handle
);
// Note that the method removeRulesForEmptyDatasourcesOlderThan
// depends on the version field to be a timestamp
String version = auditTime.toString();
handle.createStatement(
StringUtils.format(
"INSERT INTO %s (id, dataSource, version, payload)"
+ " VALUES (:id, :dataSource, :version, :payload)",
getRulesTable()
)
)
.bind("id", StringUtils.format("%s_%s", dataSource, version))
.bind("dataSource", dataSource)
.bind("version", version)
.bind("payload", jsonMapper.writeValueAsBytes(newRules))
.execute();
return null;
}
return null;
}
);
}
@ -438,7 +403,8 @@ public class SQLMetadataRuleManager implements MetadataRuleManager
// However, since currently this query is run very infrequent (by default once a day by the KillRules Coordinator duty)
// and the inner query on segment table is a READ (no locking), it is keep this way.
StringUtils.format(
"DELETE FROM %1$s WHERE datasource NOT IN (SELECT DISTINCT datasource from %2$s) and datasource!=:default_rule and version < :date_time",
"DELETE FROM %1$s WHERE datasource NOT IN (SELECT DISTINCT datasource from %2$s)"
+ " and datasource!=:default_rule and version < :date_time",
getRulesTable(),
getSegmentsTable()
)

View File

@ -54,6 +54,8 @@ public class RulesResource
{
public static final String RULES_ENDPOINT = "/druid/coordinator/v1/rules";
private static final String AUDIT_HISTORY_TYPE = "rules";
private final MetadataRuleManager databaseRuleManager;
private final AuditManager auditManager;
@ -105,14 +107,19 @@ public class RulesResource
@Context HttpServletRequest req
)
{
if (databaseRuleManager.overrideRule(
dataSourceName,
rules,
new AuditInfo(author, comment, req.getRemoteAddr())
)) {
return Response.ok().build();
try {
final AuditInfo auditInfo = new AuditInfo(author, comment, req.getRemoteAddr());
if (databaseRuleManager.overrideRule(dataSourceName, rules, auditInfo)) {
return Response.ok().build();
} else {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
}
catch (IllegalArgumentException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
return Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
}
@GET
@ -162,16 +169,16 @@ public class RulesResource
{
if (interval == null && count != null) {
if (dataSourceName != null) {
return auditManager.fetchAuditHistory(dataSourceName, "rules", count);
return auditManager.fetchAuditHistory(dataSourceName, AUDIT_HISTORY_TYPE, count);
}
return auditManager.fetchAuditHistory("rules", count);
return auditManager.fetchAuditHistory(AUDIT_HISTORY_TYPE, count);
}
Interval theInterval = interval == null ? null : Intervals.of(interval);
if (dataSourceName != null) {
return auditManager.fetchAuditHistory(dataSourceName, "rules", theInterval);
return auditManager.fetchAuditHistory(dataSourceName, AUDIT_HISTORY_TYPE, theInterval);
}
return auditManager.fetchAuditHistory("rules", theInterval);
return auditManager.fetchAuditHistory(AUDIT_HISTORY_TYPE, theInterval);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.audit.AuditManager;
import org.apache.druid.client.DruidServer;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.TestHelper;
@ -45,8 +46,6 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.util.Collections;
import java.util.List;
@ -54,10 +53,14 @@ import java.util.Map;
public class SQLMetadataRuleManagerTest
{
private static final String DATASOURCE = "wiki";
@org.junit.Rule
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();
private TestDerbyConnector connector;
private MetadataStorageTablesConfig tablesConfig;
private MetadataRuleManagerConfig managerConfig;
private SQLMetadataRuleManager ruleManager;
private AuditManager auditManager;
private SQLMetadataSegmentPublisher publisher;
@ -79,13 +82,8 @@ public class SQLMetadataRuleManagerTest
);
connector.createRulesTable();
ruleManager = new SQLMetadataRuleManager(
mapper,
new MetadataRuleManagerConfig(),
tablesConfig,
connector,
auditManager
);
managerConfig = new MetadataRuleManagerConfig();
ruleManager = new SQLMetadataRuleManager(mapper, managerConfig, tablesConfig, connector, auditManager);
connector.createSegmentTable();
publisher = new SQLMetadataSegmentPublisher(
jsonMapper,
@ -109,23 +107,62 @@ public class SQLMetadataRuleManagerTest
{
List<Rule> rules = Collections.singletonList(
new IntervalLoadRule(
Intervals.of("2015-01-01/2015-02-01"), ImmutableMap.of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
Intervals.of("2015-01-01/2015-02-01"),
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("override rule"));
// New rule should be be reflected in the in memory rules map immediately after being set by user
Map<String, List<Rule>> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(rules.get(0), allRules.get("test_dataSource").get(0));
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
Assert.assertEquals(rules.get(0), allRules.get(DATASOURCE).get(0));
}
@Test
public void testOverrideRuleWithNull()
{
// Datasource level rules cannot be null
IAE exception = Assert.assertThrows(
IAE.class,
() -> ruleManager.overrideRule(DATASOURCE, null, createAuditInfo("null rule"))
);
Assert.assertEquals("Rules cannot be null.", exception.getMessage());
// Cluster level rules cannot be null
exception = Assert.assertThrows(
IAE.class,
() -> ruleManager.overrideRule(
managerConfig.getDefaultRule(),
null,
createAuditInfo("null cluster rule")
)
);
Assert.assertEquals("Rules cannot be null.", exception.getMessage());
}
@Test
public void testOverrideRuleWithEmpty()
{
// Cluster level rules cannot be empty
IAE exception = Assert.assertThrows(
IAE.class,
() -> ruleManager.overrideRule(
managerConfig.getDefaultRule(),
Collections.emptyList(),
createAuditInfo("empty cluster rule")
)
);
Assert.assertEquals("Cluster-level rules cannot be empty.", exception.getMessage());
// Datasource level rules can be empty
Assert.assertTrue(
ruleManager.overrideRule(
DATASOURCE,
Collections.emptyList(),
createAuditInfo("empty rule")
)
);
}
@Test
@ -133,39 +170,28 @@ public class SQLMetadataRuleManagerTest
{
List<Rule> rules = Collections.singletonList(
new IntervalLoadRule(
Intervals.of("2015-01-01/2015-02-01"), ImmutableMap.of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
Intervals.of("2015-01-01/2015-02-01"),
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
final AuditInfo auditInfo = createAuditInfo("create audit entry");
ruleManager.overrideRule(DATASOURCE, rules, auditInfo);
// fetch rules from metadata storage
ruleManager.poll();
Assert.assertEquals(rules, ruleManager.getRules("test_dataSource"));
Assert.assertEquals(rules, ruleManager.getRules(DATASOURCE));
// verify audit entry is created
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory("test_dataSource", "rules", null);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(DATASOURCE, "rules", null);
Assert.assertEquals(1, auditEntries.size());
AuditEntry entry = auditEntries.get(0);
Assert.assertEquals(
rules,
mapper.readValue(
entry.getPayload(),
new TypeReference<List<Rule>>()
{
}
)
mapper.readValue(entry.getPayload(), new TypeReference<List<Rule>>() {})
);
Assert.assertEquals(auditInfo, entry.getAuditInfo());
Assert.assertEquals("test_dataSource", entry.getKey());
Assert.assertEquals(DATASOURCE, entry.getKey());
}
@Test
@ -179,21 +205,13 @@ public class SQLMetadataRuleManagerTest
)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
ruleManager.overrideRule(
"test_dataSource2",
rules,
auditInfo
);
final AuditInfo auditInfo = createAuditInfo("test_comment");
ruleManager.overrideRule(DATASOURCE, rules, auditInfo);
ruleManager.overrideRule("test_dataSource2", rules, auditInfo);
// fetch rules from metadata storage
ruleManager.poll();
Assert.assertEquals(rules, ruleManager.getRules("test_dataSource"));
Assert.assertEquals(rules, ruleManager.getRules(DATASOURCE));
Assert.assertEquals(rules, ruleManager.getRules("test_dataSource2"));
// test fetch audit entries
@ -202,12 +220,7 @@ public class SQLMetadataRuleManagerTest
for (AuditEntry entry : auditEntries) {
Assert.assertEquals(
rules,
mapper.readValue(
entry.getPayload(),
new TypeReference<List<Rule>>()
{
}
)
mapper.readValue(entry.getPayload(), new TypeReference<List<Rule>>() {})
);
Assert.assertEquals(auditInfo, entry.getAuditInfo());
}
@ -218,23 +231,17 @@ public class SQLMetadataRuleManagerTest
{
List<Rule> rules = ImmutableList.of(
new IntervalLoadRule(
Intervals.of("2015-01-01/2015-02-01"), ImmutableMap.of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
Intervals.of("2015-01-01/2015-02-01"),
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
// Verify that rule was added
ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("test"));
// Verify that the rule was added
ruleManager.poll();
Map<String, List<Rule>> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
// Now delete rules
ruleManager.removeRulesForEmptyDatasourcesOlderThan(System.currentTimeMillis());
@ -250,23 +257,17 @@ public class SQLMetadataRuleManagerTest
{
List<Rule> rules = ImmutableList.of(
new IntervalLoadRule(
Intervals.of("2015-01-01/2015-02-01"), ImmutableMap.of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
Intervals.of("2015-01-01/2015-02-01"),
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("update rules"));
// Verify that rule was added
ruleManager.poll();
Map<String, List<Rule>> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
// This will not delete the rule as the rule was created just now so it will have the created timestamp later than
// the timestamp 2012-01-01T00:00:00Z
@ -276,7 +277,7 @@ public class SQLMetadataRuleManagerTest
ruleManager.poll();
allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
}
@Test
@ -284,28 +285,21 @@ public class SQLMetadataRuleManagerTest
{
List<Rule> rules = ImmutableList.of(
new IntervalLoadRule(
Intervals.of("2015-01-01/2015-02-01"), ImmutableMap.of(
DruidServer.DEFAULT_TIER,
DruidServer.DEFAULT_NUM_REPLICANTS
)
Intervals.of("2015-01-01/2015-02-01"),
ImmutableMap.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS)
)
);
AuditInfo auditInfo = new AuditInfo("test_author", "test_comment", "127.0.0.1");
ruleManager.overrideRule(
"test_dataSource",
rules,
auditInfo
);
ruleManager.overrideRule(DATASOURCE, rules, createAuditInfo("update rules"));
// Verify that rule was added
ruleManager.poll();
Map<String, List<Rule>> allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
// Add segment metadata to segment table so that the datasource is considered active
DataSegment dataSegment = new DataSegment(
"test_dataSource",
DATASOURCE,
Intervals.of("2015-01-01/2015-02-01"),
"1",
ImmutableMap.of(
@ -328,7 +322,7 @@ public class SQLMetadataRuleManagerTest
ruleManager.poll();
allRules = ruleManager.getAllRules();
Assert.assertEquals(1, allRules.size());
Assert.assertEquals(1, allRules.get("test_dataSource").size());
Assert.assertEquals(1, allRules.get(DATASOURCE).size());
}
@Test
@ -360,17 +354,14 @@ public class SQLMetadataRuleManagerTest
private void dropTable(final String tableName)
{
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute()
);
}
private AuditInfo createAuditInfo(String comment)
{
return new AuditInfo("test", comment, "127.0.0.1");
}
}

View File

@ -42,8 +42,6 @@ import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException;
import java.util.HashMap;
@ -58,10 +56,8 @@ public class SQLAuditManagerTest
private TestDerbyConnector connector;
private AuditManager auditManager;
private final String PAYLOAD_DIMENSION_KEY = "payload";
private ConfigSerde<String> stringConfigSerde;
private final ObjectMapper mapper = new DefaultObjectMapper();
@Before
@ -105,36 +101,13 @@ public class SQLAuditManagerTest
};
}
@Test(timeout = 60_000L)
public void testAuditEntrySerde() throws IOException
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
);
ObjectMapper mapper = new DefaultObjectMapper();
AuditEntry serde = mapper.readValue(mapper.writeValueAsString(entry), AuditEntry.class);
Assert.assertEquals(entry, serde);
}
@Test
public void testAuditMetricEventBuilderConfig()
{
AuditEntry entry = new AuditEntry(
"testKey",
"testType",
new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
),
new AuditInfo("testAuthor", "testComment", "127.0.0.1"),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
);
@ -155,10 +128,11 @@ public class SQLAuditManagerTest
);
ServiceMetricEvent.Builder auditEntryBuilder = ((SQLAuditManager) auditManager).getAuditMetricEventBuilder(entry);
Assert.assertEquals(null, auditEntryBuilder.getDimension(PAYLOAD_DIMENSION_KEY));
final String payloadDimensionKey = "payload";
Assert.assertNull(auditEntryBuilder.getDimension(payloadDimensionKey));
ServiceMetricEvent.Builder auditEntryBuilderWithPayload = auditManagerWithPayloadAsDimension.getAuditMetricEventBuilder(entry);
Assert.assertEquals("testPayload", auditEntryBuilderWithPayload.getDimension(PAYLOAD_DIMENSION_KEY));
Assert.assertEquals("testPayload", auditEntryBuilderWithPayload.getDimension(payloadDimensionKey));
}
@Test(timeout = 60_000L)
@ -166,11 +140,7 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
@ -193,11 +163,7 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
@ -226,29 +192,17 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey1";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload";
String entry2Key = "testKey2";
String entry2Type = "testType";
AuditInfo entry2AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry2AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry2Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testKey1",
"testType",
1
);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory("testKey1", "testType", 1);
Assert.assertEquals(1, auditEntries.size());
Assert.assertEquals(entry1Key, auditEntries.get(0).getKey());
Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload());
@ -261,11 +215,7 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
@ -298,11 +248,7 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
@ -338,39 +284,24 @@ public class SQLAuditManagerTest
{
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "testPayload1";
String entry2Key = "testKey";
String entry2Type = "testType";
AuditInfo entry2AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry2AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry2Payload = "testPayload2";
String entry3Key = "testKey";
String entry3Type = "testType";
AuditInfo entry3AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry3AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry3Payload = "testPayload3";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
auditManager.doAudit(entry3Key, entry3Type, entry3AuditInfo, entry3Payload, stringConfigSerde);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testType",
2
);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory("testType", 2);
Assert.assertEquals(2, auditEntries.size());
Assert.assertEquals(entry3Key, auditEntries.get(0).getKey());
Assert.assertEquals(entry3Payload, auditEntries.get(0).getPayload());
@ -416,15 +347,15 @@ public class SQLAuditManagerTest
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "payload audit to store";
auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
stringConfigSerde
auditManagerWithMaxPayloadSizeBytes.doAudit(
entry1Key,
entry1Type,
entry1AuditInfo,
entry1Payload,
stringConfigSerde
);
byte[] payload = connector.lookup(
@ -461,15 +392,15 @@ public class SQLAuditManagerTest
);
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
String entry1Payload = "payload audit to store";
auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
stringConfigSerde
auditManagerWithMaxPayloadSizeBytes.doAudit(
entry1Key,
entry1Type,
entry1AuditInfo,
entry1Payload,
stringConfigSerde
);
byte[] payload = connector.lookup(
@ -507,11 +438,7 @@ public class SQLAuditManagerTest
String entry1Key = "test1Key";
String entry1Type = "test1Type";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
AuditInfo entry1AuditInfo = new AuditInfo("testAuthor", "testComment", "127.0.0.1");
// Entry 1 payload has a null field for one of the property
Map<String, String> entryPayload1WithNull = new HashMap<>();
entryPayload1WithNull.put("version", "x");
@ -529,17 +456,12 @@ public class SQLAuditManagerTest
private void dropTable(final String tableName)
{
Assert.assertNull(connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
));
Assert.assertEquals(
0,
connector.getDBI().withHandle(
handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute()
).intValue()
);
}
}