Introduce a new configuration that skip storing audit payload if payload size exceed limit and skip storing null fields for audit payload (#11078)

* Add config to skip storing audit payload if exceed limit

* fix checkstyle

* change config name

* skip null fields for audit payload

* fix checkstyle

* address comments

* fix guice

* fix test

* add tests

* address comments

* address comments

* address comments

* fix checkstyle

* address comments

* fix test

* fix test

* address comments

* Address comments

Co-authored-by: Jihoon Son <jihoonson@apache.org>

Co-authored-by: Jihoon Son <jihoonson@apache.org>
This commit is contained in:
Maytas Monsereenusorn 2021-04-13 20:18:28 -07:00 committed by GitHub
parent 08d3786738
commit f968400170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 587 additions and 108 deletions

View File

@ -20,6 +20,7 @@
package org.apache.druid.audit; package org.apache.druid.audit;
import org.apache.druid.common.config.ConfigSerde;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
@ -28,15 +29,25 @@ import java.util.List;
public interface AuditManager public interface AuditManager
{ {
/**
* This String is the default message stored instead of the actual audit payload if the audit payload size
* exceeded the maximum size limit configuration
*/
String PAYLOAD_SKIP_MSG_FORMAT = "Payload was not stored as its size exceeds the limit [%d] configured by druid.audit.manager.maxPayloadSizeBytes";
String X_DRUID_AUTHOR = "X-Druid-Author"; String X_DRUID_AUTHOR = "X-Druid-Author";
String X_DRUID_COMMENT = "X-Druid-Comment"; String X_DRUID_COMMENT = "X-Druid-Comment";
/** /**
* inserts an audit Entry in the Audit Table * inserts an audit entry in the Audit Table
* @param auditEntry * @param key of the audit entry
* @param type of the audit entry
* @param auditInfo of the audit entry
* @param payload of the audit entry
* @param configSerde of the payload of the audit entry
*/ */
void doAudit(AuditEntry auditEntry); <T> void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde<T> configSerde);
/** /**
* inserts an audit Entry in audit table using the handler provided * inserts an audit Entry in audit table using the handler provided

View File

@ -24,6 +24,14 @@ package org.apache.druid.common.config;
public interface ConfigSerde<T> public interface ConfigSerde<T>
{ {
byte[] serialize(T obj); byte[] serialize(T obj);
String serializeToString(T obj); /**
* Serialize object to String
*
* @param obj to be serialize
* @param skipNull if true, then skip serialization of any field with null value.
* This can be used to reduce the size of the resulting String.
* @return String serialization of the input
*/
String serializeToString(T obj, boolean skipNull);
T deserialize(byte[] bytes); T deserialize(byte[] bytes);
} }

View File

@ -22,11 +22,13 @@ package org.apache.druid.common.config;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager; import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult; import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils;
import java.io.IOException; import java.io.IOException;
@ -38,18 +40,21 @@ public class JacksonConfigManager
{ {
private final ConfigManager configManager; private final ConfigManager configManager;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ObjectMapper jsonMapperSkipNull;
private final AuditManager auditManager; private final AuditManager auditManager;
@Inject @Inject
public JacksonConfigManager( public JacksonConfigManager(
ConfigManager configManager, ConfigManager configManager,
ObjectMapper jsonMapper, @Json ObjectMapper jsonMapper,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue,
AuditManager auditManager AuditManager auditManager
) )
{ {
this.configManager = configManager; this.configManager = configManager;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.auditManager = auditManager; this.auditManager = auditManager;
this.jsonMapperSkipNull = jsonMapperOnlyNonNullValue;
} }
public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz) public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz)
@ -72,18 +77,12 @@ public class JacksonConfigManager
ConfigSerde configSerde = create(val.getClass(), null); ConfigSerde configSerde = create(val.getClass(), null);
// Audit and actual config change are done in separate transactions // Audit and actual config change are done in separate transactions
// there can be phantom audits and reOrdering in audit changes as well. // there can be phantom audits and reOrdering in audit changes as well.
auditManager.doAudit( auditManager.doAudit(key, key, auditInfo, val, configSerde);
AuditEntry.builder()
.key(key)
.type(key)
.auditInfo(auditInfo)
.payload(configSerde.serializeToString(val))
.build()
);
return configManager.set(key, configSerde, val); return configManager.set(key, configSerde, val);
} }
private <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal) @VisibleForTesting
<T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
{ {
return new ConfigSerde<T>() return new ConfigSerde<T>()
{ {
@ -99,10 +98,10 @@ public class JacksonConfigManager
} }
@Override @Override
public String serializeToString(T obj) public String serializeToString(T obj, boolean skipNull)
{ {
try { try {
return jsonMapper.writeValueAsString(obj); return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -121,7 +120,8 @@ public class JacksonConfigManager
}; };
} }
private <T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal) @VisibleForTesting
<T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
{ {
return new ConfigSerde<T>() return new ConfigSerde<T>()
{ {
@ -137,10 +137,10 @@ public class JacksonConfigManager
} }
@Override @Override
public String serializeToString(T obj) public String serializeToString(T obj, boolean skipNull)
{ {
try { try {
return jsonMapper.writeValueAsString(obj); return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
} }
catch (JsonProcessingException e) { catch (JsonProcessingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -30,6 +30,7 @@ import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.annotations.Smile;
import org.skife.config.ConfigurationObjectFactory; import org.skife.config.ConfigurationObjectFactory;
@ -41,6 +42,7 @@ public class DruidSecondaryModule implements Module
private final Properties properties; private final Properties properties;
private final ConfigurationObjectFactory factory; private final ConfigurationObjectFactory factory;
private final ObjectMapper jsonMapper; private final ObjectMapper jsonMapper;
private final ObjectMapper jsonMapperOnlyNonNullValueSerialization;
private final ObjectMapper smileMapper; private final ObjectMapper smileMapper;
private final Validator validator; private final Validator validator;
@ -49,6 +51,7 @@ public class DruidSecondaryModule implements Module
Properties properties, Properties properties,
ConfigurationObjectFactory factory, ConfigurationObjectFactory factory,
@Json ObjectMapper jsonMapper, @Json ObjectMapper jsonMapper,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValueSerialization,
@Smile ObjectMapper smileMapper, @Smile ObjectMapper smileMapper,
Validator validator Validator validator
) )
@ -56,6 +59,7 @@ public class DruidSecondaryModule implements Module
this.properties = properties; this.properties = properties;
this.factory = factory; this.factory = factory;
this.jsonMapper = jsonMapper; this.jsonMapper = jsonMapper;
this.jsonMapperOnlyNonNullValueSerialization = jsonMapperOnlyNonNullValueSerialization;
this.smileMapper = smileMapper; this.smileMapper = smileMapper;
this.validator = validator; this.validator = validator;
} }
@ -78,6 +82,13 @@ public class DruidSecondaryModule implements Module
return jsonMapper; return jsonMapper;
} }
@Provides @LazySingleton @JsonNonNull
public ObjectMapper getJsonMapperOnlyNonNullValueSerialization(final Injector injector)
{
setupJackson(injector, jsonMapperOnlyNonNullValueSerialization);
return jsonMapperOnlyNonNullValueSerialization;
}
@Provides @LazySingleton @Smile @Provides @LazySingleton @Smile
public ObjectMapper getSmileMapper(Injector injector) public ObjectMapper getSmileMapper(Injector injector)
{ {

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice.annotations;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* The ObjectMapper of this annotation will skip serialization of any field with null value.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
@PublicApi
public @interface JsonNonNull
{
}

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class JacksonConfigManagerTest
{
@Mock
private ConfigManager mockConfigManager;
@Mock
private AuditManager mockAuditManager;
private JacksonConfigManager jacksonConfigManager;
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void setUp()
{
jacksonConfigManager = new JacksonConfigManager(
mockConfigManager,
new ObjectMapper(),
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL),
mockAuditManager
);
}
@Test
public void testSerializeToStringWithSkipNullTrue()
{
ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
{
}, null);
ConfigSerde<TestConfig> configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null);
TestConfig config = new TestConfig("version", null, 3);
String actual = configConfigSerdeFromTypeReference.serializeToString(config, true);
Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual);
actual = configConfigSerdeFromClass.serializeToString(config, true);
Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual);
}
@Test
public void testSerializeToStringWithSkipNullFalse()
{
ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
{
}, null);
ConfigSerde<TestConfig> configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null);
TestConfig config = new TestConfig("version", null, 3);
String actual = configConfigSerdeFromTypeReference.serializeToString(config, false);
Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual);
actual = configConfigSerdeFromClass.serializeToString(config, false);
Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual);
}
@Test
public void testSerializeToStringWithInvalidConfigForConfigSerdeFromTypeReference()
{
ConfigSerde<ClassThatJacksonCannotSerialize> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<ClassThatJacksonCannotSerialize>()
{
}, null);
exception.expect(RuntimeException.class);
exception.expectMessage("InvalidDefinitionException");
configConfigSerdeFromTypeReference.serializeToString(new ClassThatJacksonCannotSerialize(), false);
}
@Test
public void testSerializeToStringWithInvalidConfigForConfigSerdeFromClass()
{
ConfigSerde<ClassThatJacksonCannotSerialize> configConfigSerdeFromClass = jacksonConfigManager.create(ClassThatJacksonCannotSerialize.class, null);
exception.expect(RuntimeException.class);
exception.expectMessage("InvalidDefinitionException");
configConfigSerdeFromClass.serializeToString(new ClassThatJacksonCannotSerialize(), false);
}
@Test
public void testSet()
{
String key = "key";
TestConfig val = new TestConfig("version", "string", 3);
AuditInfo auditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
jacksonConfigManager.set(key, val, auditInfo);
ArgumentCaptor<ConfigSerde> configSerdeCapture = ArgumentCaptor.forClass(
ConfigSerde.class);
Mockito.verify(mockAuditManager).doAudit(
ArgumentMatchers.eq(key),
ArgumentMatchers.eq(key),
ArgumentMatchers.eq(auditInfo),
ArgumentMatchers.eq(val),
configSerdeCapture.capture()
);
Assert.assertNotNull(configSerdeCapture.getValue());
}
static class TestConfig
{
private final String version;
private final String settingString;
private final int settingInt;
@JsonCreator
public TestConfig(
@JsonProperty("version") String version,
@JsonProperty("settingString") String settingString,
@JsonProperty("settingInt") int settingInt
)
{
this.version = version;
this.settingString = settingString;
this.settingInt = settingInt;
}
public String getVersion()
{
return version;
}
public String getSettingString()
{
return settingString;
}
public int getSettingInt()
{
return settingInt;
}
}
static class ClassThatJacksonCannotSerialize
{
}
}

View File

@ -338,6 +338,8 @@ Coordinator and Overlord log changes to lookups, segment load/drop rules, dynami
|--------|-----------|-------| |--------|-----------|-------|
|`druid.audit.manager.auditHistoryMillis`|Default duration for querying audit history.|1 week| |`druid.audit.manager.auditHistoryMillis`|Default duration for querying audit history.|1 week|
|`druid.audit.manager.includePayloadAsDimensionInMetric`|Boolean flag on whether to add `payload` column in service metric.|false| |`druid.audit.manager.includePayloadAsDimensionInMetric`|Boolean flag on whether to add `payload` column in service metric.|false|
|`druid.audit.manager.maxPayloadSizeBytes`|The maximum size of audit payload to store in Druid's metadata store audit table. If the size of audit payload exceeds this value, the audit log would be stored with a message indicating that the payload was omitted instead. Setting `maxPayloadSizeBytes` to -1 (default value) disables this check, meaning Druid will always store audit payload regardless of it's size. Setting to any negative number other than `-1` is invalid. Human-readable format is supported, see [here](human-readable-byte.md). |-1|
|`druid.audit.manager.skipNullField`|If true, the audit payload stored in metadata store will exclude any field with null value. |false|
### Enabling Metrics ### Enabling Metrics

View File

@ -19,6 +19,7 @@
package org.apache.druid.jackson; package org.apache.druid.jackson;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.dataformat.smile.SmileGenerator; import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
@ -28,6 +29,7 @@ import com.google.inject.Module;
import com.google.inject.Provides; import com.google.inject.Provides;
import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.guice.annotations.Smile; import org.apache.druid.guice.annotations.Smile;
/** /**
@ -46,6 +48,15 @@ public class JacksonModule implements Module
return new DefaultObjectMapper(); return new DefaultObjectMapper();
} }
/**
* Provides ObjectMapper that suppress serializing properties with null values
*/
@Provides @LazySingleton @JsonNonNull
public ObjectMapper jsonMapperOnlyNonNullValue()
{
return new DefaultObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
@Provides @LazySingleton @Smile @Provides @LazySingleton @Smile
public ObjectMapper smileMapper() public ObjectMapper smileMapper()
{ {

View File

@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager; import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigSerde;
import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
@ -76,8 +78,15 @@ public class SQLAuditManager implements AuditManager
} }
@Override @Override
public void doAudit(final AuditEntry auditEntry) public <T> void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde<T> configSerde)
{ {
AuditEntry auditEntry = AuditEntry.builder()
.key(key)
.type(type)
.auditInfo(auditInfo)
.payload(configSerde.serializeToString(payload, config.isSkipNullField()))
.build();
dbi.withHandle( dbi.withHandle(
new HandleCallback<Void>() new HandleCallback<Void>()
{ {
@ -114,6 +123,20 @@ public class SQLAuditManager implements AuditManager
{ {
emitter.emit(getAuditMetricEventBuilder(auditEntry).build("config/audit", 1)); emitter.emit(getAuditMetricEventBuilder(auditEntry).build("config/audit", 1));
AuditEntry auditEntryToStore = auditEntry;
if (config.getMaxPayloadSizeBytes() >= 0) {
int payloadSize = jsonMapper.writeValueAsBytes(auditEntry.getPayload()).length;
if (payloadSize > config.getMaxPayloadSizeBytes()) {
auditEntryToStore = AuditEntry.builder()
.key(auditEntry.getKey())
.type(auditEntry.getType())
.auditInfo(auditEntry.getAuditInfo())
.payload(StringUtils.format(PAYLOAD_SKIP_MSG_FORMAT, config.getMaxPayloadSizeBytes()))
.auditTime(auditEntry.getAuditTime())
.build();
}
}
handle.createStatement( handle.createStatement(
StringUtils.format( StringUtils.format(
"INSERT INTO %s ( audit_key, type, author, comment, created_date, payload) VALUES (:audit_key, :type, :author, :comment, :created_date, :payload)", "INSERT INTO %s ( audit_key, type, author, comment, created_date, payload) VALUES (:audit_key, :type, :author, :comment, :created_date, :payload)",
@ -125,7 +148,7 @@ public class SQLAuditManager implements AuditManager
.bind("author", auditEntry.getAuditInfo().getAuthor()) .bind("author", auditEntry.getAuditInfo().getAuthor())
.bind("comment", auditEntry.getAuditInfo().getComment()) .bind("comment", auditEntry.getAuditInfo().getComment())
.bind("created_date", auditEntry.getAuditTime().toString()) .bind("created_date", auditEntry.getAuditTime().toString())
.bind("payload", jsonMapper.writeValueAsBytes(auditEntry)) .bind("payload", jsonMapper.writeValueAsBytes(auditEntryToStore))
.execute(); .execute();
} }

View File

@ -20,6 +20,8 @@
package org.apache.druid.server.audit; package org.apache.druid.server.audit;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.HumanReadableBytesRange;
/** /**
*/ */
@ -31,6 +33,16 @@ public class SQLAuditManagerConfig
@JsonProperty @JsonProperty
private boolean includePayloadAsDimensionInMetric = false; private boolean includePayloadAsDimensionInMetric = false;
@JsonProperty
@HumanReadableBytesRange(
min = -1,
message = "maxPayloadSizeBytes must either be -1 (for disabling the check) or a non negative number"
)
private HumanReadableBytes maxPayloadSizeBytes = HumanReadableBytes.valueOf(-1);
@JsonProperty
private boolean skipNullField = false;
public long getAuditHistoryMillis() public long getAuditHistoryMillis()
{ {
return auditHistoryMillis; return auditHistoryMillis;
@ -40,4 +52,15 @@ public class SQLAuditManagerConfig
{ {
return includePayloadAsDimensionInMetric; return includePayloadAsDimensionInMetric;
} }
public long getMaxPayloadSizeBytes()
{
return maxPayloadSizeBytes.getBytes();
}
public boolean isSkipNullField()
{
return skipNullField;
}
} }

View File

@ -19,14 +19,17 @@
package org.apache.druid.server.audit; package org.apache.druid.server.audit;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager; import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigSerde;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.metadata.TestDerbyConnector;
import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.server.metrics.NoopServiceEmitter;
@ -35,12 +38,19 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback; import org.skife.jdbi.v2.tweak.HandleCallback;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
@RunWith(MockitoJUnitRunner.class)
public class SQLAuditManagerTest public class SQLAuditManagerTest
{ {
@Rule @Rule
@ -49,6 +59,8 @@ public class SQLAuditManagerTest
private TestDerbyConnector connector; private TestDerbyConnector connector;
private AuditManager auditManager; private AuditManager auditManager;
private final String PAYLOAD_DIMENSION_KEY = "payload"; private final String PAYLOAD_DIMENSION_KEY = "payload";
private ConfigSerde<String> stringConfigSerde;
private final ObjectMapper mapper = new DefaultObjectMapper(); private final ObjectMapper mapper = new DefaultObjectMapper();
@ -64,6 +76,33 @@ public class SQLAuditManagerTest
mapper, mapper,
new SQLAuditManagerConfig() new SQLAuditManagerConfig()
); );
stringConfigSerde = new ConfigSerde<String>()
{
@Override
public byte[] serialize(String obj)
{
try {
return mapper.writeValueAsBytes(obj);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
@Override
public String serializeToString(String obj, boolean skipNull)
{
// In our test, payload Object is already a String
// So to serialize to String, we just return a String
return obj;
}
@Override
public String deserialize(byte[] bytes)
{
return JacksonUtils.readValue(mapper, bytes, String.class);
}
};
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
@ -125,18 +164,17 @@ public class SQLAuditManagerTest
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testCreateAuditEntry() throws IOException public void testCreateAuditEntry() throws IOException
{ {
AuditEntry entry = new AuditEntry( String entry1Key = "testKey";
"testKey", String entry1Type = "testType";
"testType", AuditInfo entry1AuditInfo = new AuditInfo(
new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
); );
auditManager.doAudit(entry); String entry1Payload = "testPayload";
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
byte[] payload = connector.lookup( byte[] payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key", "audit_key",
@ -144,118 +182,128 @@ public class SQLAuditManagerTest
"testKey" "testKey"
); );
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry, dbEntry); Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testFetchAuditHistory() public void testFetchAuditHistory()
{ {
AuditEntry entry = new AuditEntry( String entry1Key = "testKey";
"testKey", String entry1Type = "testType";
"testType", AuditInfo entry1AuditInfo = new AuditInfo(
new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
); );
auditManager.doAudit(entry); String entry1Payload = "testPayload";
auditManager.doAudit(entry);
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory( List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testKey", "testKey",
"testType", "testType",
Intervals.of("2012-01-01T00:00:00Z/2013-01-03T00:00:00Z") Intervals.of("2000-01-01T00:00:00Z/2100-01-03T00:00:00Z")
); );
Assert.assertEquals(2, auditEntries.size()); Assert.assertEquals(2, auditEntries.size());
Assert.assertEquals(entry, auditEntries.get(0));
Assert.assertEquals(entry, auditEntries.get(1)); Assert.assertEquals(entry1Key, auditEntries.get(0).getKey());
Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload());
Assert.assertEquals(entry1Type, auditEntries.get(0).getType());
Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo());
Assert.assertEquals(entry1Key, auditEntries.get(1).getKey());
Assert.assertEquals(entry1Payload, auditEntries.get(1).getPayload());
Assert.assertEquals(entry1Type, auditEntries.get(1).getType());
Assert.assertEquals(entry1AuditInfo, auditEntries.get(1).getAuditInfo());
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testFetchAuditHistoryByKeyAndTypeWithLimit() public void testFetchAuditHistoryByKeyAndTypeWithLimit()
{ {
AuditEntry entry1 = new AuditEntry( String entry1Key = "testKey1";
"testKey1", String entry1Type = "testType";
"testType", AuditInfo entry1AuditInfo = new AuditInfo(
new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
); );
AuditEntry entry2 = new AuditEntry( String entry1Payload = "testPayload";
"testKey2",
"testType", String entry2Key = "testKey2";
new AuditInfo( String entry2Type = "testType";
AuditInfo entry2AuditInfo = new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-02T00:00:00Z")
); );
auditManager.doAudit(entry1); String entry2Payload = "testPayload";
auditManager.doAudit(entry2);
auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory( List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testKey1", "testKey1",
"testType", "testType",
1 1
); );
Assert.assertEquals(1, auditEntries.size()); Assert.assertEquals(1, auditEntries.size());
Assert.assertEquals(entry1, auditEntries.get(0)); Assert.assertEquals(entry1Key, auditEntries.get(0).getKey());
Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload());
Assert.assertEquals(entry1Type, auditEntries.get(0).getType());
Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo());
} }
@Test(timeout = 60_000L) @Test(timeout = 60_000L)
public void testFetchAuditHistoryByTypeWithLimit() public void testFetchAuditHistoryByTypeWithLimit()
{ {
AuditEntry entry1 = new AuditEntry( String entry1Key = "testKey";
"testKey", String entry1Type = "testType";
"testType", AuditInfo entry1AuditInfo = new AuditInfo(
new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-01T00:00:00Z")
); );
AuditEntry entry2 = new AuditEntry( String entry1Payload = "testPayload1";
"testKey",
"testType", String entry2Key = "testKey";
new AuditInfo( String entry2Type = "testType";
AuditInfo entry2AuditInfo = new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-02T00:00:00Z")
); );
AuditEntry entry3 = new AuditEntry( String entry2Payload = "testPayload2";
"testKey",
"testType", String entry3Key = "testKey";
new AuditInfo( String entry3Type = "testType";
AuditInfo entry3AuditInfo = new AuditInfo(
"testAuthor", "testAuthor",
"testComment", "testComment",
"127.0.0.1" "127.0.0.1"
),
"testPayload",
DateTimes.of("2013-01-03T00:00:00Z")
); );
auditManager.doAudit(entry1); String entry3Payload = "testPayload3";
auditManager.doAudit(entry2);
auditManager.doAudit(entry3); auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
auditManager.doAudit(entry3Key, entry3Type, entry3AuditInfo, entry3Payload, stringConfigSerde);
List<AuditEntry> auditEntries = auditManager.fetchAuditHistory( List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
"testType", "testType",
2 2
); );
Assert.assertEquals(2, auditEntries.size()); Assert.assertEquals(2, auditEntries.size());
Assert.assertEquals(entry3, auditEntries.get(0)); Assert.assertEquals(entry3Key, auditEntries.get(0).getKey());
Assert.assertEquals(entry2, auditEntries.get(1)); Assert.assertEquals(entry3Payload, auditEntries.get(0).getPayload());
Assert.assertEquals(entry3Type, auditEntries.get(0).getType());
Assert.assertEquals(entry3AuditInfo, auditEntries.get(0).getAuditInfo());
Assert.assertEquals(entry2Key, auditEntries.get(1).getKey());
Assert.assertEquals(entry2Payload, auditEntries.get(1).getPayload());
Assert.assertEquals(entry2Type, auditEntries.get(1).getType());
Assert.assertEquals(entry2AuditInfo, auditEntries.get(1).getAuditInfo());
} }
@Test(expected = IllegalArgumentException.class, timeout = 10_000L) @Test(expected = IllegalArgumentException.class, timeout = 10_000L)
@ -270,6 +318,132 @@ public class SQLAuditManagerTest
auditManager.fetchAuditHistory("testType", 0); auditManager.fetchAuditHistory("testType", 0);
} }
@Test(timeout = 60_000L)
public void testCreateAuditEntryWithPayloadOverSkipPayloadLimit() throws IOException
{
int maxPayloadSize = 10;
SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager(
connector,
derbyConnectorRule.metadataTablesConfigSupplier(),
new NoopServiceEmitter(),
mapper,
new SQLAuditManagerConfig()
{
@Override
public long getMaxPayloadSizeBytes()
{
return maxPayloadSize;
}
}
);
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
String entry1Payload = "payload audit to store";
auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
stringConfigSerde
);
byte[] payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertNotEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(StringUtils.format(AuditManager.PAYLOAD_SKIP_MSG_FORMAT, maxPayloadSize), dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
}
@Test(timeout = 60_000L)
public void testCreateAuditEntryWithPayloadUnderSkipPayloadLimit() throws IOException
{
SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager(
connector,
derbyConnectorRule.metadataTablesConfigSupplier(),
new NoopServiceEmitter(),
mapper,
new SQLAuditManagerConfig()
{
@Override
public long getMaxPayloadSizeBytes()
{
return 500;
}
}
);
String entry1Key = "testKey";
String entry1Type = "testType";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
String entry1Payload = "payload audit to store";
auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
stringConfigSerde
);
byte[] payload = connector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
"audit_key",
"payload",
"testKey"
);
AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
Assert.assertEquals(entry1Key, dbEntry.getKey());
Assert.assertEquals(entry1Payload, dbEntry.getPayload());
Assert.assertEquals(entry1Type, dbEntry.getType());
Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
}
@Test(timeout = 60_000L)
public void testCreateAuditEntryWithSkipNullConfigTrue()
{
ConfigSerde<Map<String, String>> mockConfigSerde = Mockito.mock(ConfigSerde.class);
SQLAuditManager auditManagerWithSkipNull = new SQLAuditManager(
connector,
derbyConnectorRule.metadataTablesConfigSupplier(),
new NoopServiceEmitter(),
mapper,
new SQLAuditManagerConfig()
{
@Override
public boolean isSkipNullField()
{
return true;
}
}
);
String entry1Key = "test1Key";
String entry1Type = "test1Type";
AuditInfo entry1AuditInfo = new AuditInfo(
"testAuthor",
"testComment",
"127.0.0.1"
);
// Entry 1 payload has a null field for one of the property
Map<String, String> entryPayload1WithNull = new HashMap<>();
entryPayload1WithNull.put("version", "x");
entryPayload1WithNull.put("something", null);
auditManagerWithSkipNull.doAudit(entry1Key, entry1Type, entry1AuditInfo, entryPayload1WithNull, mockConfigSerde);
Mockito.verify(mockConfigSerde).serializeToString(ArgumentMatchers.eq(entryPayload1WithNull), ArgumentMatchers.eq(true));
}
@After @After
public void cleanup() public void cleanup()
{ {