From f96840017017806d2bbe35cf344e682995e8a9c1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 13 Apr 2021 20:18:28 -0700 Subject: [PATCH] Introduce a new configuration that skip storing audit payload if payload size exceed limit and skip storing null fields for audit payload (#11078) * Add config to skip storing audit payload if exceed limit * fix checkstyle * change config name * skip null fields for audit payload * fix checkstyle * address comments * fix guice * fix test * add tests * address comments * address comments * address comments * fix checkstyle * address comments * fix test * fix test * address comments * Address comments Co-authored-by: Jihoon Son Co-authored-by: Jihoon Son --- .../org/apache/druid/audit/AuditManager.java | 17 +- .../druid/common/config/ConfigSerde.java | 10 +- .../common/config/JacksonConfigManager.java | 32 +- .../druid/guice/DruidSecondaryModule.java | 11 + .../druid/guice/annotations/JsonNonNull.java | 38 ++ .../config/JacksonConfigManagerTest.java | 178 +++++++++ docs/configuration/index.md | 2 + .../apache/druid/jackson/JacksonModule.java | 11 + .../druid/server/audit/SQLAuditManager.java | 27 +- .../server/audit/SQLAuditManagerConfig.java | 23 ++ .../server/audit/SQLAuditManagerTest.java | 346 +++++++++++++----- 11 files changed, 587 insertions(+), 108 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java create mode 100644 core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java b/core/src/main/java/org/apache/druid/audit/AuditManager.java index 6389350fea0..73804d7da35 100644 --- a/core/src/main/java/org/apache/druid/audit/AuditManager.java +++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java @@ -20,6 +20,7 @@ package org.apache.druid.audit; +import org.apache.druid.common.config.ConfigSerde; import org.joda.time.Interval; import org.skife.jdbi.v2.Handle; @@ -28,15 +29,25 @@ import java.util.List; public interface AuditManager { + /** + * This String is the default message stored instead of the actual audit payload if the audit payload size + * exceeded the maximum size limit configuration + */ + String PAYLOAD_SKIP_MSG_FORMAT = "Payload was not stored as its size exceeds the limit [%d] configured by druid.audit.manager.maxPayloadSizeBytes"; + String X_DRUID_AUTHOR = "X-Druid-Author"; String X_DRUID_COMMENT = "X-Druid-Comment"; /** - * inserts an audit Entry in the Audit Table - * @param auditEntry + * inserts an audit entry in the Audit Table + * @param key of the audit entry + * @param type of the audit entry + * @param auditInfo of the audit entry + * @param payload of the audit entry + * @param configSerde of the payload of the audit entry */ - void doAudit(AuditEntry auditEntry); + void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde configSerde); /** * inserts an audit Entry in audit table using the handler provided diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java b/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java index 119c0e5ad32..708d16d8b19 100644 --- a/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java +++ b/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java @@ -24,6 +24,14 @@ package org.apache.druid.common.config; public interface ConfigSerde { byte[] serialize(T obj); - String serializeToString(T obj); + /** + * Serialize object to String + * + * @param obj to be serialize + * @param skipNull if true, then skip serialization of any field with null value. + * This can be used to reduce the size of the resulting String. + * @return String serialization of the input + */ + String serializeToString(T obj, boolean skipNull); T deserialize(byte[] bytes); } diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java index 1075e4c1aba..c62a8b72cab 100644 --- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java +++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java @@ -22,11 +22,13 @@ package org.apache.druid.common.config; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Inject; -import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; import org.apache.druid.common.config.ConfigManager.SetResult; +import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.JsonNonNull; import org.apache.druid.java.util.common.jackson.JacksonUtils; import java.io.IOException; @@ -38,18 +40,21 @@ public class JacksonConfigManager { private final ConfigManager configManager; private final ObjectMapper jsonMapper; + private final ObjectMapper jsonMapperSkipNull; private final AuditManager auditManager; @Inject public JacksonConfigManager( ConfigManager configManager, - ObjectMapper jsonMapper, + @Json ObjectMapper jsonMapper, + @JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue, AuditManager auditManager ) { this.configManager = configManager; this.jsonMapper = jsonMapper; this.auditManager = auditManager; + this.jsonMapperSkipNull = jsonMapperOnlyNonNullValue; } public AtomicReference watch(String key, Class clazz) @@ -72,18 +77,12 @@ public class JacksonConfigManager ConfigSerde configSerde = create(val.getClass(), null); // Audit and actual config change are done in separate transactions // there can be phantom audits and reOrdering in audit changes as well. - auditManager.doAudit( - AuditEntry.builder() - .key(key) - .type(key) - .auditInfo(auditInfo) - .payload(configSerde.serializeToString(val)) - .build() - ); + auditManager.doAudit(key, key, auditInfo, val, configSerde); return configManager.set(key, configSerde, val); } - private ConfigSerde create(final Class clazz, final T defaultVal) + @VisibleForTesting + ConfigSerde create(final Class clazz, final T defaultVal) { return new ConfigSerde() { @@ -99,10 +98,10 @@ public class JacksonConfigManager } @Override - public String serializeToString(T obj) + public String serializeToString(T obj, boolean skipNull) { try { - return jsonMapper.writeValueAsString(obj); + return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj); } catch (JsonProcessingException e) { throw new RuntimeException(e); @@ -121,7 +120,8 @@ public class JacksonConfigManager }; } - private ConfigSerde create(final TypeReference clazz, final T defaultVal) + @VisibleForTesting + ConfigSerde create(final TypeReference clazz, final T defaultVal) { return new ConfigSerde() { @@ -137,10 +137,10 @@ public class JacksonConfigManager } @Override - public String serializeToString(T obj) + public String serializeToString(T obj, boolean skipNull) { try { - return jsonMapper.writeValueAsString(obj); + return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj); } catch (JsonProcessingException e) { throw new RuntimeException(e); diff --git a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java index 5e4db78ac82..bb03146a05b 100644 --- a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java +++ b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java @@ -30,6 +30,7 @@ import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.Provides; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.JsonNonNull; import org.apache.druid.guice.annotations.Smile; import org.skife.config.ConfigurationObjectFactory; @@ -41,6 +42,7 @@ public class DruidSecondaryModule implements Module private final Properties properties; private final ConfigurationObjectFactory factory; private final ObjectMapper jsonMapper; + private final ObjectMapper jsonMapperOnlyNonNullValueSerialization; private final ObjectMapper smileMapper; private final Validator validator; @@ -49,6 +51,7 @@ public class DruidSecondaryModule implements Module Properties properties, ConfigurationObjectFactory factory, @Json ObjectMapper jsonMapper, + @JsonNonNull ObjectMapper jsonMapperOnlyNonNullValueSerialization, @Smile ObjectMapper smileMapper, Validator validator ) @@ -56,6 +59,7 @@ public class DruidSecondaryModule implements Module this.properties = properties; this.factory = factory; this.jsonMapper = jsonMapper; + this.jsonMapperOnlyNonNullValueSerialization = jsonMapperOnlyNonNullValueSerialization; this.smileMapper = smileMapper; this.validator = validator; } @@ -78,6 +82,13 @@ public class DruidSecondaryModule implements Module return jsonMapper; } + @Provides @LazySingleton @JsonNonNull + public ObjectMapper getJsonMapperOnlyNonNullValueSerialization(final Injector injector) + { + setupJackson(injector, jsonMapperOnlyNonNullValueSerialization); + return jsonMapperOnlyNonNullValueSerialization; + } + @Provides @LazySingleton @Smile public ObjectMapper getSmileMapper(Injector injector) { diff --git a/core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java b/core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java new file mode 100644 index 00000000000..ae4672f01f4 --- /dev/null +++ b/core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * The ObjectMapper of this annotation will skip serialization of any field with null value. + */ +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@BindingAnnotation +@PublicApi +public @interface JsonNonNull +{ +} diff --git a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java new file mode 100644 index 00000000000..25220ebb892 --- /dev/null +++ b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common.config; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.audit.AuditInfo; +import org.apache.druid.audit.AuditManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class JacksonConfigManagerTest +{ + @Mock + private ConfigManager mockConfigManager; + + @Mock + private AuditManager mockAuditManager; + + private JacksonConfigManager jacksonConfigManager; + + @Rule + public ExpectedException exception = ExpectedException.none(); + + @Before + public void setUp() + { + jacksonConfigManager = new JacksonConfigManager( + mockConfigManager, + new ObjectMapper(), + new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL), + mockAuditManager + ); + } + + @Test + public void testSerializeToStringWithSkipNullTrue() + { + ConfigSerde configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference() + { + }, null); + ConfigSerde configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null); + TestConfig config = new TestConfig("version", null, 3); + String actual = configConfigSerdeFromTypeReference.serializeToString(config, true); + Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual); + actual = configConfigSerdeFromClass.serializeToString(config, true); + Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual); + } + + @Test + public void testSerializeToStringWithSkipNullFalse() + { + ConfigSerde configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference() + { + }, null); + ConfigSerde configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null); + TestConfig config = new TestConfig("version", null, 3); + String actual = configConfigSerdeFromTypeReference.serializeToString(config, false); + Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual); + actual = configConfigSerdeFromClass.serializeToString(config, false); + Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual); + } + + @Test + public void testSerializeToStringWithInvalidConfigForConfigSerdeFromTypeReference() + { + ConfigSerde configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference() + { + }, null); + exception.expect(RuntimeException.class); + exception.expectMessage("InvalidDefinitionException"); + configConfigSerdeFromTypeReference.serializeToString(new ClassThatJacksonCannotSerialize(), false); + } + + @Test + public void testSerializeToStringWithInvalidConfigForConfigSerdeFromClass() + { + ConfigSerde configConfigSerdeFromClass = jacksonConfigManager.create(ClassThatJacksonCannotSerialize.class, null); + exception.expect(RuntimeException.class); + exception.expectMessage("InvalidDefinitionException"); + configConfigSerdeFromClass.serializeToString(new ClassThatJacksonCannotSerialize(), false); + } + + @Test + public void testSet() + { + String key = "key"; + TestConfig val = new TestConfig("version", "string", 3); + AuditInfo auditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + + jacksonConfigManager.set(key, val, auditInfo); + + ArgumentCaptor configSerdeCapture = ArgumentCaptor.forClass( + ConfigSerde.class); + Mockito.verify(mockAuditManager).doAudit( + ArgumentMatchers.eq(key), + ArgumentMatchers.eq(key), + ArgumentMatchers.eq(auditInfo), + ArgumentMatchers.eq(val), + configSerdeCapture.capture() + ); + Assert.assertNotNull(configSerdeCapture.getValue()); + } + + + static class TestConfig + { + private final String version; + private final String settingString; + private final int settingInt; + + @JsonCreator + public TestConfig( + @JsonProperty("version") String version, + @JsonProperty("settingString") String settingString, + @JsonProperty("settingInt") int settingInt + ) + { + this.version = version; + this.settingString = settingString; + this.settingInt = settingInt; + } + + public String getVersion() + { + return version; + } + + public String getSettingString() + { + return settingString; + } + + public int getSettingInt() + { + return settingInt; + } + } + + static class ClassThatJacksonCannotSerialize + { + + } +} diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 62e36bdde24..cd26a9e7afe 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -338,6 +338,8 @@ Coordinator and Overlord log changes to lookups, segment load/drop rules, dynami |--------|-----------|-------| |`druid.audit.manager.auditHistoryMillis`|Default duration for querying audit history.|1 week| |`druid.audit.manager.includePayloadAsDimensionInMetric`|Boolean flag on whether to add `payload` column in service metric.|false| +|`druid.audit.manager.maxPayloadSizeBytes`|The maximum size of audit payload to store in Druid's metadata store audit table. If the size of audit payload exceeds this value, the audit log would be stored with a message indicating that the payload was omitted instead. Setting `maxPayloadSizeBytes` to -1 (default value) disables this check, meaning Druid will always store audit payload regardless of it's size. Setting to any negative number other than `-1` is invalid. Human-readable format is supported, see [here](human-readable-byte.md). |-1| +|`druid.audit.manager.skipNullField`|If true, the audit payload stored in metadata store will exclude any field with null value. |false| ### Enabling Metrics diff --git a/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java b/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java index 853a088dfba..a7c947d365e 100644 --- a/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java @@ -19,6 +19,7 @@ package org.apache.druid.jackson; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.fasterxml.jackson.dataformat.smile.SmileGenerator; @@ -28,6 +29,7 @@ import com.google.inject.Module; import com.google.inject.Provides; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.annotations.Json; +import org.apache.druid.guice.annotations.JsonNonNull; import org.apache.druid.guice.annotations.Smile; /** @@ -46,6 +48,15 @@ public class JacksonModule implements Module return new DefaultObjectMapper(); } + /** + * Provides ObjectMapper that suppress serializing properties with null values + */ + @Provides @LazySingleton @JsonNonNull + public ObjectMapper jsonMapperOnlyNonNullValue() + { + return new DefaultObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL); + } + @Provides @LazySingleton @Smile public ObjectMapper smileMapper() { diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java index 9ea53c670e1..13020436174 100644 --- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java +++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java @@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Supplier; import com.google.inject.Inject; import org.apache.druid.audit.AuditEntry; +import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigSerde; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.DateTimes; @@ -76,8 +78,15 @@ public class SQLAuditManager implements AuditManager } @Override - public void doAudit(final AuditEntry auditEntry) + public void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde configSerde) { + AuditEntry auditEntry = AuditEntry.builder() + .key(key) + .type(type) + .auditInfo(auditInfo) + .payload(configSerde.serializeToString(payload, config.isSkipNullField())) + .build(); + dbi.withHandle( new HandleCallback() { @@ -114,6 +123,20 @@ public class SQLAuditManager implements AuditManager { emitter.emit(getAuditMetricEventBuilder(auditEntry).build("config/audit", 1)); + AuditEntry auditEntryToStore = auditEntry; + if (config.getMaxPayloadSizeBytes() >= 0) { + int payloadSize = jsonMapper.writeValueAsBytes(auditEntry.getPayload()).length; + if (payloadSize > config.getMaxPayloadSizeBytes()) { + auditEntryToStore = AuditEntry.builder() + .key(auditEntry.getKey()) + .type(auditEntry.getType()) + .auditInfo(auditEntry.getAuditInfo()) + .payload(StringUtils.format(PAYLOAD_SKIP_MSG_FORMAT, config.getMaxPayloadSizeBytes())) + .auditTime(auditEntry.getAuditTime()) + .build(); + } + } + handle.createStatement( StringUtils.format( "INSERT INTO %s ( audit_key, type, author, comment, created_date, payload) VALUES (:audit_key, :type, :author, :comment, :created_date, :payload)", @@ -125,7 +148,7 @@ public class SQLAuditManager implements AuditManager .bind("author", auditEntry.getAuditInfo().getAuthor()) .bind("comment", auditEntry.getAuditInfo().getComment()) .bind("created_date", auditEntry.getAuditTime().toString()) - .bind("payload", jsonMapper.writeValueAsBytes(auditEntry)) + .bind("payload", jsonMapper.writeValueAsBytes(auditEntryToStore)) .execute(); } diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java index 4ef45d1ee92..8509e06c102 100644 --- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java +++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java @@ -20,6 +20,8 @@ package org.apache.druid.server.audit; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.java.util.common.HumanReadableBytesRange; /** */ @@ -31,6 +33,16 @@ public class SQLAuditManagerConfig @JsonProperty private boolean includePayloadAsDimensionInMetric = false; + @JsonProperty + @HumanReadableBytesRange( + min = -1, + message = "maxPayloadSizeBytes must either be -1 (for disabling the check) or a non negative number" + ) + private HumanReadableBytes maxPayloadSizeBytes = HumanReadableBytes.valueOf(-1); + + @JsonProperty + private boolean skipNullField = false; + public long getAuditHistoryMillis() { return auditHistoryMillis; @@ -40,4 +52,15 @@ public class SQLAuditManagerConfig { return includePayloadAsDimensionInMetric; } + + public long getMaxPayloadSizeBytes() + { + return maxPayloadSizeBytes.getBytes(); + } + + public boolean isSkipNullField() + { + return skipNullField; + } + } diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java index 13dc10ab316..429402e9e9e 100644 --- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java +++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java @@ -19,14 +19,17 @@ package org.apache.druid.server.audit; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.audit.AuditEntry; import org.apache.druid.audit.AuditInfo; import org.apache.druid.audit.AuditManager; +import org.apache.druid.common.config.ConfigSerde; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.server.metrics.NoopServiceEmitter; @@ -35,12 +38,19 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Rule; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; import org.skife.jdbi.v2.Handle; import org.skife.jdbi.v2.tweak.HandleCallback; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import java.util.Map; +@RunWith(MockitoJUnitRunner.class) public class SQLAuditManagerTest { @Rule @@ -49,6 +59,8 @@ public class SQLAuditManagerTest private TestDerbyConnector connector; private AuditManager auditManager; private final String PAYLOAD_DIMENSION_KEY = "payload"; + private ConfigSerde stringConfigSerde; + private final ObjectMapper mapper = new DefaultObjectMapper(); @@ -64,6 +76,33 @@ public class SQLAuditManagerTest mapper, new SQLAuditManagerConfig() ); + stringConfigSerde = new ConfigSerde() + { + @Override + public byte[] serialize(String obj) + { + try { + return mapper.writeValueAsBytes(obj); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public String serializeToString(String obj, boolean skipNull) + { + // In our test, payload Object is already a String + // So to serialize to String, we just return a String + return obj; + } + + @Override + public String deserialize(byte[] bytes) + { + return JacksonUtils.readValue(mapper, bytes, String.class); + } + }; } @Test(timeout = 60_000L) @@ -125,18 +164,17 @@ public class SQLAuditManagerTest @Test(timeout = 60_000L) public void testCreateAuditEntry() throws IOException { - AuditEntry entry = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-01T00:00:00Z") + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - auditManager.doAudit(entry); + String entry1Payload = "testPayload"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + byte[] payload = connector.lookup( derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), "audit_key", @@ -144,118 +182,128 @@ public class SQLAuditManagerTest "testKey" ); AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); - Assert.assertEquals(entry, dbEntry); - + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); } @Test(timeout = 60_000L) public void testFetchAuditHistory() { - AuditEntry entry = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-01T00:00:00Z") + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - auditManager.doAudit(entry); - auditManager.doAudit(entry); + String entry1Payload = "testPayload"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + List auditEntries = auditManager.fetchAuditHistory( "testKey", "testType", - Intervals.of("2012-01-01T00:00:00Z/2013-01-03T00:00:00Z") + Intervals.of("2000-01-01T00:00:00Z/2100-01-03T00:00:00Z") ); Assert.assertEquals(2, auditEntries.size()); - Assert.assertEquals(entry, auditEntries.get(0)); - Assert.assertEquals(entry, auditEntries.get(1)); + + Assert.assertEquals(entry1Key, auditEntries.get(0).getKey()); + Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload()); + Assert.assertEquals(entry1Type, auditEntries.get(0).getType()); + Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo()); + + Assert.assertEquals(entry1Key, auditEntries.get(1).getKey()); + Assert.assertEquals(entry1Payload, auditEntries.get(1).getPayload()); + Assert.assertEquals(entry1Type, auditEntries.get(1).getType()); + Assert.assertEquals(entry1AuditInfo, auditEntries.get(1).getAuditInfo()); } @Test(timeout = 60_000L) public void testFetchAuditHistoryByKeyAndTypeWithLimit() { - AuditEntry entry1 = new AuditEntry( - "testKey1", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-01T00:00:00Z") + String entry1Key = "testKey1"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - AuditEntry entry2 = new AuditEntry( - "testKey2", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-02T00:00:00Z") + String entry1Payload = "testPayload"; + + String entry2Key = "testKey2"; + String entry2Type = "testType"; + AuditInfo entry2AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - auditManager.doAudit(entry1); - auditManager.doAudit(entry2); + String entry2Payload = "testPayload"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde); List auditEntries = auditManager.fetchAuditHistory( "testKey1", "testType", 1 ); Assert.assertEquals(1, auditEntries.size()); - Assert.assertEquals(entry1, auditEntries.get(0)); + Assert.assertEquals(entry1Key, auditEntries.get(0).getKey()); + Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload()); + Assert.assertEquals(entry1Type, auditEntries.get(0).getType()); + Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo()); } @Test(timeout = 60_000L) public void testFetchAuditHistoryByTypeWithLimit() { - AuditEntry entry1 = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-01T00:00:00Z") + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - AuditEntry entry2 = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-02T00:00:00Z") + String entry1Payload = "testPayload1"; + + String entry2Key = "testKey"; + String entry2Type = "testType"; + AuditInfo entry2AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - AuditEntry entry3 = new AuditEntry( - "testKey", - "testType", - new AuditInfo( - "testAuthor", - "testComment", - "127.0.0.1" - ), - "testPayload", - DateTimes.of("2013-01-03T00:00:00Z") + String entry2Payload = "testPayload2"; + + String entry3Key = "testKey"; + String entry3Type = "testType"; + AuditInfo entry3AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" ); - auditManager.doAudit(entry1); - auditManager.doAudit(entry2); - auditManager.doAudit(entry3); + String entry3Payload = "testPayload3"; + + auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde); + auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde); + auditManager.doAudit(entry3Key, entry3Type, entry3AuditInfo, entry3Payload, stringConfigSerde); + List auditEntries = auditManager.fetchAuditHistory( "testType", 2 ); Assert.assertEquals(2, auditEntries.size()); - Assert.assertEquals(entry3, auditEntries.get(0)); - Assert.assertEquals(entry2, auditEntries.get(1)); + Assert.assertEquals(entry3Key, auditEntries.get(0).getKey()); + Assert.assertEquals(entry3Payload, auditEntries.get(0).getPayload()); + Assert.assertEquals(entry3Type, auditEntries.get(0).getType()); + Assert.assertEquals(entry3AuditInfo, auditEntries.get(0).getAuditInfo()); + + Assert.assertEquals(entry2Key, auditEntries.get(1).getKey()); + Assert.assertEquals(entry2Payload, auditEntries.get(1).getPayload()); + Assert.assertEquals(entry2Type, auditEntries.get(1).getType()); + Assert.assertEquals(entry2AuditInfo, auditEntries.get(1).getAuditInfo()); } @Test(expected = IllegalArgumentException.class, timeout = 10_000L) @@ -270,6 +318,132 @@ public class SQLAuditManagerTest auditManager.fetchAuditHistory("testType", 0); } + @Test(timeout = 60_000L) + public void testCreateAuditEntryWithPayloadOverSkipPayloadLimit() throws IOException + { + int maxPayloadSize = 10; + SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager( + connector, + derbyConnectorRule.metadataTablesConfigSupplier(), + new NoopServiceEmitter(), + mapper, + new SQLAuditManagerConfig() + { + @Override + public long getMaxPayloadSizeBytes() + { + return maxPayloadSize; + } + } + ); + + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + String entry1Payload = "payload audit to store"; + + auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, + stringConfigSerde + ); + + byte[] payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + + AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertNotEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(StringUtils.format(AuditManager.PAYLOAD_SKIP_MSG_FORMAT, maxPayloadSize), dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); + } + + @Test(timeout = 60_000L) + public void testCreateAuditEntryWithPayloadUnderSkipPayloadLimit() throws IOException + { + SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager( + connector, + derbyConnectorRule.metadataTablesConfigSupplier(), + new NoopServiceEmitter(), + mapper, + new SQLAuditManagerConfig() + { + @Override + public long getMaxPayloadSizeBytes() + { + return 500; + } + } + ); + String entry1Key = "testKey"; + String entry1Type = "testType"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + String entry1Payload = "payload audit to store"; + + auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, + stringConfigSerde + ); + + byte[] payload = connector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(), + "audit_key", + "payload", + "testKey" + ); + + AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class); + Assert.assertEquals(entry1Key, dbEntry.getKey()); + Assert.assertEquals(entry1Payload, dbEntry.getPayload()); + Assert.assertEquals(entry1Type, dbEntry.getType()); + Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo()); + } + + @Test(timeout = 60_000L) + public void testCreateAuditEntryWithSkipNullConfigTrue() + { + ConfigSerde> mockConfigSerde = Mockito.mock(ConfigSerde.class); + SQLAuditManager auditManagerWithSkipNull = new SQLAuditManager( + connector, + derbyConnectorRule.metadataTablesConfigSupplier(), + new NoopServiceEmitter(), + mapper, + new SQLAuditManagerConfig() + { + @Override + public boolean isSkipNullField() + { + return true; + } + } + ); + + String entry1Key = "test1Key"; + String entry1Type = "test1Type"; + AuditInfo entry1AuditInfo = new AuditInfo( + "testAuthor", + "testComment", + "127.0.0.1" + ); + // Entry 1 payload has a null field for one of the property + Map entryPayload1WithNull = new HashMap<>(); + entryPayload1WithNull.put("version", "x"); + entryPayload1WithNull.put("something", null); + + auditManagerWithSkipNull.doAudit(entry1Key, entry1Type, entry1AuditInfo, entryPayload1WithNull, mockConfigSerde); + Mockito.verify(mockConfigSerde).serializeToString(ArgumentMatchers.eq(entryPayload1WithNull), ArgumentMatchers.eq(true)); + } + @After public void cleanup() {