Update emitter library and add support for ParametrizedUriEmitter (#4722)

* Move emitters from io.druid.server.initialization to the dedicated io.druid.server.emitter package; Update emitter library to 0.6.0; Add support for ParametrizedUriEmitter; Support hierarical properties in JsonConfigurator (was needed for ParametrizedUriEmitter)

* Log created RequestLoggers

* Fix forbidden API

* Test fix

* More Http and Parametrized Http Emitter docs

* Switch to debug level
This commit is contained in:
Roman Leventov 2017-09-13 17:17:19 -05:00 committed by GitHub
parent 4f6eb47e40
commit 267f415dc3
24 changed files with 347 additions and 22 deletions

View File

@ -43,6 +43,7 @@ import javax.validation.Path;
import javax.validation.Validator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -93,7 +94,7 @@ public class JsonConfigurator
value = propValue;
}
jsonMap.put(prop.substring(propertyBase.length()), value);
hieraricalPutValue(propertyPrefix, prop, prop.substring(propertyBase.length()), value, jsonMap);
}
}
@ -165,6 +166,43 @@ public class JsonConfigurator
return config;
}
private static void hieraricalPutValue(
String propertyPrefix,
String originalProperty,
String property,
Object value,
Map<String, Object> targetMap
)
{
int dotIndex = property.indexOf('.');
if (dotIndex < 0) {
targetMap.put(property, value);
return;
}
if (dotIndex == 0) {
throw new ProvisionException(StringUtils.format("Double dot in property: %s", originalProperty));
}
if (dotIndex == property.length() - 1) {
throw new ProvisionException(StringUtils.format("Dot at the end of property: %s", originalProperty));
}
String nestedKey = property.substring(0, dotIndex);
Object nested = targetMap.computeIfAbsent(nestedKey, k -> new HashMap<String, Object>());
if (!(nested instanceof Map)) {
// Clash is possible between properties, which are used to configure different objects: e. g.
// druid.emitter=parametrized is used to configure Emitter class, and druid.emitter.parametrized.xxx=yyy is used
// to configure ParametrizedUriEmitterConfig object. So skipping xxx=yyy key-value pair when configuring Emitter
// doesn't make any difference. That is why we just log this situation, instead of throwing an exception.
log.info(
"Skipping %s property: one of it's prefixes is also used as a property key. Prefix: %s",
originalProperty,
propertyPrefix
);
return;
}
Map<String, Object> nestedMap = (Map<String, Object>) nested;
hieraricalPutValue(propertyPrefix, originalProperty, property.substring(dotIndex + 1), value, nestedMap);
}
@VisibleForTesting
public static <T> void verifyClazzIsConfigurable(ObjectMapper mapper, Class<T> clazz)
{

View File

@ -203,7 +203,7 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts
|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter`|Setting this value to "noop", "logging", or "http" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|
|`druid.emitter`|Setting this value to "noop", "logging", "http" or "parametrized" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|
#### Logging Emitter Module
@ -216,10 +216,26 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts
|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.http.timeOut`|The timeout for data reads.|PT5M|
|`druid.emitter.http.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000|
|`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none|
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
|`druid.emitter.http.flushTimeOut|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|
#### Parametrized Http Emitter Module
`druid.emitter.parametrized.httpEmitting.*` configs correspond to the configs of Http Emitter Modules, see above.
Except `readTimeout` and `recipientBaseUrl`. E. g. `druid.emitter.parametrized.httpEmitting.flushMillis`,
`druid.emitter.parametrized.httpEmitting.flushCount`, etc.
The additional configs are:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.parametrized.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.parametrized.recipientBaseUrlPattern`|The URL pattern to send an event to, based on the event's feed. E. g. `http://foo.bar/{feed}`, that will send event to `http://foo.bar/metrics` if the event's feed is "metrics".|none, required config|
#### Composing Emitter Module

View File

@ -147,7 +147,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.4.5</version>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>

View File

@ -60,7 +60,7 @@ import io.druid.guice.security.DruidAuthModule;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.emitter.EmitterModule;
import io.druid.server.initialization.jetty.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import org.apache.commons.io.FileUtils;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.fasterxml.jackson.databind.Module;
import com.google.common.base.Function;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
@ -72,6 +72,7 @@ public class EmitterModule implements Module
binder.install(new NoopEmitterModule());
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
binder.install(new ParametrizedUriEmitterModule());
binder.install(new ComposingEmitterModule());
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
@ -87,6 +88,7 @@ public class EmitterModule implements Module
"version",
Strings.nullToEmpty(version) // Version is null during `mvn test`.
);
log.info("Underlying emitter for ServiceEmitter: %s", emitter);
final ServiceEmitter retVal = new ServiceEmitter(
config.getServiceName(),
config.getHostAndPortToUse(),

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
@ -27,10 +27,10 @@ import org.joda.time.Period;
public class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
{
@JsonProperty
private Period timeOut = new Period("PT5M");
private Period readTimeout = new Period("PT5M");
public Period getReadTimeout()
{
return timeOut;
return readTimeout;
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
@ -49,6 +49,11 @@ public class HttpEmitterModule implements Module
{
JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class);
configureSsl(binder);
}
static void configureSsl(Binder binder)
{
final SSLContext context;
try {
context = SSLContext.getDefault();

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;

View File

@ -17,7 +17,7 @@
* under the License.
*/
package io.druid.server.initialization;
package io.druid.server.emitter;
import com.google.inject.Binder;
import com.google.inject.Module;

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.emitter;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
public class ParametrizedUriEmitterConfig extends com.metamx.emitter.core.ParametrizedUriEmitterConfig
{
@JsonProperty
private Period readTimeout = new Period("PT5M");
public Period getReadTimeout()
{
return readTimeout;
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.emitter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.ParametrizedUriEmitter;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.http.LifecycleUtils;
import io.druid.java.util.common.lifecycle.Lifecycle;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
public class ParametrizedUriEmitterModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.parametrized", ParametrizedUriEmitterConfig.class);
HttpEmitterModule.configureSsl(binder);
}
@Provides
@ManageLifecycle
@Named("parametrized")
public Emitter getEmitter(
Supplier<ParametrizedUriEmitterConfig> config,
@Nullable SSLContext sslContext,
Lifecycle lifecycle,
ObjectMapper jsonMapper
)
{
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(1)
.withReadTimeout(config.get().getReadTimeout().toStandardDuration());
if (sslContext != null) {
builder.withSslContext(sslContext);
}
return new ParametrizedUriEmitter(
config.get(),
HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)),
jsonMapper
);
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.RequestLogLine;
import javax.validation.constraints.NotNull;
@ -35,6 +36,8 @@ import java.util.List;
@JsonTypeName("composing")
public class ComposingRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(ComposingRequestLoggerProvider.class);
@JsonProperty
@NotNull
private final List<RequestLoggerProvider> loggerProviders = Lists.newArrayList();
@ -46,7 +49,9 @@ public class ComposingRequestLoggerProvider implements RequestLoggerProvider
for (RequestLoggerProvider loggerProvider : loggerProviders) {
loggers.add(loggerProvider.get());
}
return new ComposingRequestLogger(loggers);
ComposingRequestLogger logger = new ComposingRequestLogger(loggers);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}
public static class ComposingRequestLogger implements RequestLogger
@ -79,6 +84,14 @@ public class ComposingRequestLoggerProvider implements RequestLoggerProvider
throw Throwables.propagate(exception);
}
}
@Override
public String toString()
{
return "ComposingRequestLogger{" +
"loggers=" + loggers +
'}';
}
}
}

View File

@ -50,6 +50,15 @@ public class EmittingRequestLogger implements RequestLogger
emitter.emit(new RequestLogEventBuilder(feed, requestLogLine));
}
@Override
public String toString()
{
return "EmittingRequestLogger{" +
"emitter=" + emitter +
", feed='" + feed + '\'' +
'}';
}
private static class RequestLogEvent implements Event
{
final ImmutableMap<String, String> serviceDimensions;

View File

@ -25,6 +25,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.java.util.common.logger.Logger;
import javax.validation.constraints.NotNull;
@ -33,6 +34,8 @@ import javax.validation.constraints.NotNull;
@JsonTypeName("emitter")
public class EmittingRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(EmittingRequestLoggerProvider.class);
@JsonProperty
@NotNull
private String feed = null;
@ -49,6 +52,8 @@ public class EmittingRequestLoggerProvider implements RequestLoggerProvider
@Override
public RequestLogger get()
{
return new EmittingRequestLogger(emitter, feed);
EmittingRequestLogger logger = new EmittingRequestLogger(emitter, feed);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}
}

View File

@ -134,4 +134,12 @@ public class FileRequestLogger implements RequestLogger
fileWriter.flush();
}
}
@Override
public String toString()
{
return "FileRequestLogger{" +
"baseDir=" + baseDir +
'}';
}
}

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import io.druid.java.util.common.logger.Logger;
import javax.validation.constraints.NotNull;
import java.io.File;
@ -35,6 +36,8 @@ import java.io.File;
@JsonTypeName("file")
public class FileRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(FileRequestLoggerProvider.class);
@JsonProperty
@NotNull
private File dir = null;
@ -52,6 +55,8 @@ public class FileRequestLoggerProvider implements RequestLoggerProvider
@Override
public RequestLogger get()
{
return new FileRequestLogger(jsonMapper, factory.create(1, "RequestLogger-%s"), dir);
FileRequestLogger logger = new FileRequestLogger(jsonMapper, factory.create(1, "RequestLogger-%s"), dir);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}
}

View File

@ -21,6 +21,7 @@ package io.druid.server.log;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.RequestLogLine;
import javax.validation.constraints.NotNull;
@ -31,6 +32,8 @@ import java.io.IOException;
@JsonTypeName("filtered")
public class FilteredRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(FilteredRequestLoggerProvider.class);
@JsonProperty
@NotNull
private RequestLoggerProvider delegate = null;
@ -41,7 +44,9 @@ public class FilteredRequestLoggerProvider implements RequestLoggerProvider
@Override
public RequestLogger get()
{
return new FilteredRequestLogger(delegate.get(), queryTimeThresholdMs);
FilteredRequestLogger logger = new FilteredRequestLogger(delegate.get(), queryTimeThresholdMs);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}
public static class FilteredRequestLogger implements RequestLogger
@ -64,6 +69,15 @@ public class FilteredRequestLoggerProvider implements RequestLoggerProvider
logger.log(requestLogLine);
}
}
@Override
public String toString()
{
return "FilteredRequestLogger{" +
"queryTimeThresholdMs=" + queryTimeThresholdMs +
", logger=" + logger +
'}';
}
}
}

View File

@ -103,4 +103,13 @@ public class LoggingRequestLogger implements RequestLogger
{
return setContextMDC;
}
@Override
public String toString()
{
return "LoggingRequestLogger{" +
"setMDC=" + setMDC +
", setContextMDC=" + setContextMDC +
'}';
}
}

View File

@ -24,10 +24,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.druid.guice.annotations.Json;
import io.druid.java.util.common.logger.Logger;
@JsonTypeName("slf4j")
public class LoggingRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(LoggingRequestLoggerProvider.class);
@JacksonInject
@Json
public ObjectMapper mapper;
@ -41,6 +44,8 @@ public class LoggingRequestLoggerProvider implements RequestLoggerProvider
@Override
public RequestLogger get()
{
return new LoggingRequestLogger(mapper, setMDC, setContextMDC);
LoggingRequestLogger logger = new LoggingRequestLogger(mapper, setMDC, setContextMDC);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}
}

View File

@ -19,13 +19,18 @@
package io.druid.server.log;
import io.druid.java.util.common.logger.Logger;
/**
*/
public class NoopRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(NoopRequestLoggerProvider.class);
@Override
public RequestLogger get()
{
log.debug(new Exception("Stack trace"), "Creating NoopRequestLogger at");
return new NoopRequestLogger();
}
}

View File

@ -29,8 +29,8 @@ import com.google.inject.name.Names;
import com.metamx.emitter.core.Emitter;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.LifecycleModule;
import io.druid.server.initialization.ComposingEmitterConfig;
import io.druid.server.initialization.ComposingEmitterModule;
import io.druid.server.emitter.ComposingEmitterConfig;
import io.druid.server.emitter.ComposingEmitterModule;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;

View File

@ -0,0 +1,85 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.emitter;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.ParametrizedUriEmitter;
import io.druid.guice.DruidGuiceExtensions;
import io.druid.guice.JsonConfigurator;
import io.druid.guice.LazySingleton;
import io.druid.guice.LifecycleModule;
import io.druid.guice.ServerModule;
import org.junit.Assert;
import org.junit.Test;
import javax.validation.Validation;
import javax.validation.Validator;
import java.util.Properties;
public class EmitterModuleTest
{
@Test
public void testParametrizedUriEmitterConfig()
{
final Properties props = new Properties();
props.setProperty("druid.emitter", "parametrized");
props.setProperty("druid.emitter.parametrized.recipientBaseUrlPattern", "http://example.com:8888/{feed}");
props.setProperty("druid.emitter.parametrized.httpEmitting.flushMillis", "1");
props.setProperty("druid.emitter.parametrized.httpEmitting.flushCount", "2");
props.setProperty("druid.emitter.parametrized.httpEmitting.basicAuthentication", "a:b");
props.setProperty("druid.emitter.parametrized.httpEmitting.batchingStrategy", "NEWLINES");
props.setProperty("druid.emitter.parametrized.httpEmitting.maxBatchSize", "4");
props.setProperty("druid.emitter.parametrized.httpEmitting.maxBufferSize", "8");
props.setProperty("druid.emitter.parametrized.httpEmitting.flushTimeOut", "1000");
final Emitter emitter =
makeInjectorWithProperties(props).getInstance(Emitter.class);
// Testing that ParametrizedUriEmitter is successfully deserialized from the above config
Assert.assertTrue(emitter instanceof ParametrizedUriEmitter);
}
private Injector makeInjectorWithProperties(final Properties props)
{
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new LifecycleModule(),
new ServerModule(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
}
},
new EmitterModule(props)
)
);
}
}