password provider for basic authentication of HttpEmitterConfig (#8618)

This commit is contained in:
Parag Jain 2019-10-03 04:29:17 +05:30 committed by Himanshu
parent 0a20caf177
commit f0d74b240d
7 changed files with 17 additions and 14 deletions

View File

@ -21,6 +21,7 @@ package org.apache.druid.java.util.emitter.core;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.JvmUtils; import org.apache.druid.utils.JvmUtils;
import javax.validation.constraints.Min; import javax.validation.constraints.Min;
@ -45,7 +46,6 @@ public class BaseHttpEmittingConfig
* Do not time out in case flushTimeOut is not set * Do not time out in case flushTimeOut is not set
*/ */
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE; public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY; public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null; public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f; public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f;
@ -86,7 +86,7 @@ public class BaseHttpEmittingConfig
long flushTimeOut = DEFAULT_FLUSH_TIME_OUT; long flushTimeOut = DEFAULT_FLUSH_TIME_OUT;
@JsonProperty @JsonProperty
String basicAuthentication = DEFAULT_BASIC_AUTHENTICATION; PasswordProvider basicAuthentication = null;
@JsonProperty @JsonProperty
BatchingStrategy batchingStrategy = DEFAULT_BATCHING_STRATEGY; BatchingStrategy batchingStrategy = DEFAULT_BATCHING_STRATEGY;
@ -125,7 +125,7 @@ public class BaseHttpEmittingConfig
return flushTimeOut; return flushTimeOut;
} }
public String getBasicAuthentication() public PasswordProvider getBasicAuthentication()
{ {
return basicAuthentication; return basicAuthentication;
} }

View File

@ -20,6 +20,7 @@
package org.apache.druid.java.util.emitter.core; package org.apache.druid.java.util.emitter.core;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.metadata.PasswordProvider;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
@ -86,7 +87,7 @@ public class HttpEmitterConfig extends BaseHttpEmittingConfig
return this; return this;
} }
public Builder setBasicAuthentication(String basicAuthentication) public Builder setBasicAuthentication(PasswordProvider basicAuthentication)
{ {
this.basicAuthentication = basicAuthentication; this.basicAuthentication = basicAuthentication;
return this; return this;

View File

@ -742,7 +742,7 @@ public class HttpPostEmitter implements Flushable, Closeable, Emitter
request.setBody(ByteBuffer.wrap(payload, 0, payloadLength)); request.setBody(ByteBuffer.wrap(payload, 0, payloadLength));
if (config.getBasicAuthentication() != null) { if (config.getBasicAuthentication() != null) {
final String[] parts = config.getBasicAuthentication().split(":", 2); final String[] parts = config.getBasicAuthentication().getPassword().split(":", 2);
final String user = parts[0]; final String user = parts[0];
final String password = parts.length > 1 ? parts[1] : ""; final String password = parts.length > 1 ? parts[1] : "";
String encoded = StringUtils.encodeBase64String((user + ':' + password).getBytes(StandardCharsets.UTF_8)); String encoded = StringUtils.encodeBase64String((user + ':' + password).getBytes(StandardCharsets.UTF_8));

View File

@ -29,6 +29,8 @@ import io.netty.handler.codec.http.HttpVersion;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle; import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.UnitEvent; import org.apache.druid.java.util.emitter.service.UnitEvent;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
import org.asynchttpclient.ListenableFuture; import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request; import org.asynchttpclient.Request;
@ -175,7 +177,7 @@ public class EmitterTest
return emitter; return emitter;
} }
private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(String authentication) private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(PasswordProvider authentication)
{ {
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL) HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
.setFlushMillis(Long.MAX_VALUE) .setFlushMillis(Long.MAX_VALUE)
@ -439,7 +441,7 @@ public class EmitterTest
new UnitEvent("test", 1), new UnitEvent("test", 1),
new UnitEvent("test", 2) new UnitEvent("test", 2)
); );
emitter = manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating("foo:bar"); emitter = manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(new DefaultPasswordProvider("foo:bar"));
httpClient.setGoHandler( httpClient.setGoHandler(
new GoHandler() new GoHandler()

View File

@ -43,7 +43,7 @@ public class HttpEmitterConfigTest
Assert.assertEquals(60000, config.getFlushMillis()); Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(500, config.getFlushCount()); Assert.assertEquals(500, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory() Runtime.getRuntime().maxMemory()
@ -67,7 +67,7 @@ public class HttpEmitterConfigTest
Assert.assertEquals(60000, config.getFlushMillis()); Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(300, config.getFlushCount()); Assert.assertEquals(300, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory() Runtime.getRuntime().maxMemory()
@ -103,7 +103,7 @@ public class HttpEmitterConfigTest
Assert.assertEquals(1, config.getFlushMillis()); Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount()); Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication()); Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize()); Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut()); Assert.assertEquals(1000, config.getFlushTimeOut());
@ -133,7 +133,7 @@ public class HttpEmitterConfigTest
Assert.assertEquals(1, config.getFlushMillis()); Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount()); Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication()); Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize()); Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut()); Assert.assertEquals(1000, config.getFlushTimeOut());

View File

@ -40,7 +40,7 @@ public class ParametrizedUriEmitterConfigTest
Assert.assertEquals(60000, config.getFlushMillis()); Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(500, config.getFlushCount()); Assert.assertEquals(500, config.getFlushCount());
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication()); Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit( Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory() Runtime.getRuntime().maxMemory()
@ -68,7 +68,7 @@ public class ParametrizedUriEmitterConfigTest
Assert.assertEquals(1, config.getFlushMillis()); Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount()); Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl()); Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication()); Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy()); Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize()); Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut()); Assert.assertEquals(1000, config.getFlushTimeOut());

View File

@ -380,7 +380,7 @@ The Druid servers [emit various metrics](../operations/metrics.md) and alerts vi
|--------|-----------|-------| |--------|-----------|-------|
|`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000| |`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000|
|`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500| |`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500|
|`druid.emitter.http.basicAuthentication`|Login and password for authentication in "login:password" form, e.g., `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentication| |`druid.emitter.http.basicAuthentication`|[Password Provider](../operations/password-provider.md) for providing Login and password for authentication in "login:password" form, e.g., `druid.emitter.http.basicAuthentication=admin:adminpassword` uses Default Password Provider which allows plain text passwords.|not specified = no authentication|
|`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout| |`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY| |`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))| |`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))|