mirror of https://github.com/apache/druid.git
composing emitter module to use multiple emitters together
This commit is contained in:
parent
07266d682a
commit
ebdb612933
|
@ -125,7 +125,7 @@ The Druid servers emit various metrics and alerts via something we call an Emitt
|
||||||
|
|
||||||
|Property|Description|Default|
|
|Property|Description|Default|
|
||||||
|--------|-----------|-------|
|
|--------|-----------|-------|
|
||||||
|`druid.emitter`|Setting this value to "noop", "logging", or "http" will instantialize one of the emitter modules.|noop|
|
|`druid.emitter`|Setting this value to "noop", "logging", or "http" will instantialize one of the emitter modules. value "composing" can be used to instantialize multiple emitter modules. |noop|
|
||||||
|
|
||||||
#### Logging Emitter Module
|
#### Logging Emitter Module
|
||||||
|
|
||||||
|
@ -143,6 +143,12 @@ The Druid servers emit various metrics and alerts via something we call an Emitt
|
||||||
|`druid.emitter.http.flushCount`|How many messages can the internal message buffer hold before flushing (sending).|500|
|
|`druid.emitter.http.flushCount`|How many messages can the internal message buffer hold before flushing (sending).|500|
|
||||||
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none|
|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none|
|
||||||
|
|
||||||
|
#### Composing Emitter Module
|
||||||
|
|
||||||
|
|Property|Description|Default|
|
||||||
|
|--------|-----------|-------|
|
||||||
|
|`druid.emitter.composing.emitters`|List of emitter modules to load e.g. ["logging","http"].|[]|
|
||||||
|
|
||||||
### Metadata Storage
|
### Metadata Storage
|
||||||
|
|
||||||
These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html) and [Indexing service](../design/indexing-service.html).
|
These properties specify the jdbc connection and other configuration around the metadata storage. The only processes that connect to the metadata storage with these properties are the [Coordinator](../design/coordinator.html) and [Indexing service](../design/indexing-service.html).
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -122,7 +122,7 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
<artifactId>emitter</artifactId>
|
<artifactId>emitter</artifactId>
|
||||||
<version>0.3.1</version>
|
<version>0.3.2</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.metamx</groupId>
|
<groupId>com.metamx</groupId>
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
import javax.validation.constraints.NotNull;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class ComposingEmitterConfig
|
||||||
|
{
|
||||||
|
@JsonProperty
|
||||||
|
@NotNull
|
||||||
|
private List<String> emitters = ImmutableList.of();
|
||||||
|
|
||||||
|
public List<String> getEmitters()
|
||||||
|
{
|
||||||
|
return emitters;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.server.initialization;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.Module;
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Provides;
|
||||||
|
import com.google.inject.name.Named;
|
||||||
|
import com.google.inject.name.Names;
|
||||||
|
import com.metamx.common.logger.Logger;
|
||||||
|
import com.metamx.emitter.core.ComposingEmitter;
|
||||||
|
import com.metamx.emitter.core.Emitter;
|
||||||
|
|
||||||
|
import io.druid.guice.JsonConfigProvider;
|
||||||
|
import io.druid.guice.ManageLifecycle;
|
||||||
|
import io.druid.initialization.DruidModule;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class ComposingEmitterModule implements DruidModule
|
||||||
|
{
|
||||||
|
private static Logger log = new Logger(ComposingEmitterModule.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
JsonConfigProvider.bind(binder, "druid.emitter.composing", ComposingEmitterConfig.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<? extends Module> getJacksonModules()
|
||||||
|
{
|
||||||
|
return Collections.EMPTY_LIST;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Provides
|
||||||
|
@ManageLifecycle
|
||||||
|
@Named("composing")
|
||||||
|
public Emitter getEmitter(ComposingEmitterConfig config, final Injector injector)
|
||||||
|
{
|
||||||
|
log.info("Creating Composing Emitter with %s", config.getEmitters());
|
||||||
|
|
||||||
|
List<Emitter> emitters = Lists.transform(
|
||||||
|
config.getEmitters(),
|
||||||
|
new Function<String, Emitter>()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public Emitter apply(String s)
|
||||||
|
{
|
||||||
|
return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
return new ComposingEmitter(emitters);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -68,6 +68,7 @@ public class EmitterModule implements Module
|
||||||
binder.install(new NoopEmitterModule());
|
binder.install(new NoopEmitterModule());
|
||||||
binder.install(new LogEmitterModule());
|
binder.install(new LogEmitterModule());
|
||||||
binder.install(new HttpEmitterModule());
|
binder.install(new HttpEmitterModule());
|
||||||
|
binder.install(new ComposingEmitterModule());
|
||||||
|
|
||||||
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
|
binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Metamarkets licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.druid.initialization;
|
||||||
|
|
||||||
|
import io.druid.guice.DruidGuiceExtensions;
|
||||||
|
import io.druid.guice.LifecycleModule;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import javax.validation.Validation;
|
||||||
|
import javax.validation.Validator;
|
||||||
|
|
||||||
|
import io.druid.server.initialization.ComposingEmitterConfig;
|
||||||
|
import io.druid.server.initialization.ComposingEmitterModule;
|
||||||
|
import org.easymock.EasyMock;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.inject.Binder;
|
||||||
|
import com.google.inject.Guice;
|
||||||
|
import com.google.inject.Injector;
|
||||||
|
import com.google.inject.Key;
|
||||||
|
import com.google.inject.Module;
|
||||||
|
import com.google.inject.name.Names;
|
||||||
|
import com.metamx.emitter.core.Emitter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class ComposingEmitterModuleTest
|
||||||
|
{
|
||||||
|
private final String testEmitterType = "http";
|
||||||
|
private Emitter emitter;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup()
|
||||||
|
{
|
||||||
|
emitter = EasyMock.createMock(Emitter.class);
|
||||||
|
emitter.start();
|
||||||
|
EasyMock.replay(emitter);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEmitter()
|
||||||
|
{
|
||||||
|
ComposingEmitterConfig config = EasyMock.createMock(ComposingEmitterConfig.class);
|
||||||
|
EasyMock.expect(config.getEmitters()).andReturn(Lists.newArrayList(testEmitterType)).anyTimes();
|
||||||
|
|
||||||
|
Injector injector = EasyMock.createMock(Injector.class);
|
||||||
|
EasyMock.expect(injector.getInstance(Key.get(Emitter.class, Names.named(testEmitterType)))).andReturn(emitter);
|
||||||
|
EasyMock.replay(config, injector);
|
||||||
|
|
||||||
|
Emitter composingEmitter = new ComposingEmitterModule().getEmitter(config, injector);
|
||||||
|
composingEmitter.start();
|
||||||
|
|
||||||
|
EasyMock.verify(config, emitter, injector);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEmitterViaRealGuice()
|
||||||
|
{
|
||||||
|
Injector injector = Guice.createInjector(
|
||||||
|
new DruidGuiceExtensions(),
|
||||||
|
new LifecycleModule(),
|
||||||
|
new Module()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void configure(Binder binder)
|
||||||
|
{
|
||||||
|
Properties props = new Properties();
|
||||||
|
props.put("druid.emitter.composing.emitters", "[\"" + testEmitterType + "\"]");
|
||||||
|
binder.bind(Properties.class).toInstance(props);
|
||||||
|
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
|
||||||
|
binder.bind(Emitter.class).annotatedWith(Names.named(testEmitterType)).toInstance(emitter);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
new ComposingEmitterModule()
|
||||||
|
);
|
||||||
|
injector.getInstance(Key.get(Emitter.class, Names.named("composing"))).start();
|
||||||
|
EasyMock.verify(emitter);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue