mirror of https://github.com/apache/druid.git
Allow request headers in HttpInputSource in native and MSQ Ingestion (#16974)
Support for adding the request headers in http input source. we can now pass the additional headers as json in both native and MSQ.
This commit is contained in:
parent
a18f582ef0
commit
a95397e712
|
@ -519,7 +519,7 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
final String password = "AbCd_1234";
|
||||
final ObjectMapper mapper = getObjectMapper();
|
||||
|
||||
final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http"));
|
||||
final HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(Collections.singleton("http"), null);
|
||||
mapper.setInjectableValues(new InjectableValues.Std()
|
||||
.addValue(HttpInputSourceConfig.class, httpInputSourceConfig)
|
||||
.addValue(ObjectMapper.class, new DefaultObjectMapper())
|
||||
|
@ -562,6 +562,7 @@ public class TaskQueueTest extends IngestionTestBase
|
|||
"user",
|
||||
new DefaultPasswordProvider(password),
|
||||
null,
|
||||
null,
|
||||
httpInputSourceConfig),
|
||||
new NoopInputFormat(),
|
||||
null,
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.io.InputStream;
|
|||
import java.net.URI;
|
||||
import java.net.URLConnection;
|
||||
import java.util.Base64;
|
||||
import java.util.Map;
|
||||
|
||||
public class HttpEntity extends RetryingInputEntity
|
||||
{
|
||||
|
@ -45,15 +46,19 @@ public class HttpEntity extends RetryingInputEntity
|
|||
@Nullable
|
||||
private final PasswordProvider httpAuthenticationPasswordProvider;
|
||||
|
||||
private final Map<String, String> requestHeaders;
|
||||
|
||||
HttpEntity(
|
||||
URI uri,
|
||||
@Nullable String httpAuthenticationUsername,
|
||||
@Nullable PasswordProvider httpAuthenticationPasswordProvider
|
||||
@Nullable PasswordProvider httpAuthenticationPasswordProvider,
|
||||
@Nullable Map<String, String> requestHeaders
|
||||
)
|
||||
{
|
||||
this.uri = uri;
|
||||
this.httpAuthenticationUsername = httpAuthenticationUsername;
|
||||
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
|
||||
this.requestHeaders = requestHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -65,7 +70,7 @@ public class HttpEntity extends RetryingInputEntity
|
|||
@Override
|
||||
protected InputStream readFrom(long offset) throws IOException
|
||||
{
|
||||
return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset);
|
||||
return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset, requestHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,10 +85,15 @@ public class HttpEntity extends RetryingInputEntity
|
|||
return t -> t instanceof IOException;
|
||||
}
|
||||
|
||||
public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset)
|
||||
public static InputStream openInputStream(URI object, String userName, PasswordProvider passwordProvider, long offset, final Map<String, String> requestHeaders)
|
||||
throws IOException
|
||||
{
|
||||
final URLConnection urlConnection = object.toURL().openConnection();
|
||||
if (requestHeaders != null && requestHeaders.size() > 0) {
|
||||
for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
|
||||
urlConnection.addRequestProperty(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(userName) && passwordProvider != null) {
|
||||
String userPass = userName + ":" + passwordProvider.getPassword();
|
||||
String basicAuthString = "Basic " + Base64.getEncoder().encodeToString(StringUtils.toUtf8(userPass));
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.druid.data.input.impl.systemfield.SystemField;
|
|||
import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
|
||||
import org.apache.druid.data.input.impl.systemfield.SystemFieldInputSource;
|
||||
import org.apache.druid.data.input.impl.systemfield.SystemFields;
|
||||
import org.apache.druid.error.InvalidInput;
|
||||
import org.apache.druid.java.util.common.CloseableIterators;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -47,6 +48,7 @@ import java.io.File;
|
|||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
|
@ -64,6 +66,7 @@ public class HttpInputSource
|
|||
private final PasswordProvider httpAuthenticationPasswordProvider;
|
||||
private final SystemFields systemFields;
|
||||
private final HttpInputSourceConfig config;
|
||||
private final Map<String, String> requestHeaders;
|
||||
|
||||
@JsonCreator
|
||||
public HttpInputSource(
|
||||
|
@ -71,6 +74,7 @@ public class HttpInputSource
|
|||
@JsonProperty("httpAuthenticationUsername") @Nullable String httpAuthenticationUsername,
|
||||
@JsonProperty("httpAuthenticationPassword") @Nullable PasswordProvider httpAuthenticationPasswordProvider,
|
||||
@JsonProperty(SYSTEM_FIELDS_PROPERTY) @Nullable SystemFields systemFields,
|
||||
@JsonProperty("requestHeaders") @Nullable Map<String, String> requestHeaders,
|
||||
@JacksonInject HttpInputSourceConfig config
|
||||
)
|
||||
{
|
||||
|
@ -80,17 +84,11 @@ public class HttpInputSource
|
|||
this.httpAuthenticationUsername = httpAuthenticationUsername;
|
||||
this.httpAuthenticationPasswordProvider = httpAuthenticationPasswordProvider;
|
||||
this.systemFields = systemFields == null ? SystemFields.none() : systemFields;
|
||||
this.requestHeaders = requestHeaders == null ? Collections.emptyMap() : requestHeaders;
|
||||
throwIfForbiddenHeaders(config, this.requestHeaders);
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Nonnull
|
||||
@Override
|
||||
public Set<String> getTypes()
|
||||
{
|
||||
return Collections.singleton(TYPE_KEY);
|
||||
}
|
||||
|
||||
public static void throwIfInvalidProtocols(HttpInputSourceConfig config, List<URI> uris)
|
||||
{
|
||||
for (URI uri : uris) {
|
||||
|
@ -100,6 +98,27 @@ public class HttpInputSource
|
|||
}
|
||||
}
|
||||
|
||||
public static void throwIfForbiddenHeaders(HttpInputSourceConfig config, Map<String, String> requestHeaders)
|
||||
{
|
||||
if (config.getAllowedHeaders().size() > 0) {
|
||||
for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
|
||||
if (!config.getAllowedHeaders().contains(StringUtils.toLowerCase(entry.getKey()))) {
|
||||
throw InvalidInput.exception("Got forbidden header %s, allowed headers are only %s ",
|
||||
entry.getKey(), config.getAllowedHeaders()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@JsonIgnore
|
||||
@Nonnull
|
||||
@Override
|
||||
public Set<String> getTypes()
|
||||
{
|
||||
return Collections.singleton(TYPE_KEY);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<URI> getUris()
|
||||
{
|
||||
|
@ -128,6 +147,14 @@ public class HttpInputSource
|
|||
return httpAuthenticationPasswordProvider;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty("requestHeaders")
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public Map<String, String> getRequestHeaders()
|
||||
{
|
||||
return requestHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
|
||||
{
|
||||
|
@ -148,6 +175,7 @@ public class HttpInputSource
|
|||
httpAuthenticationUsername,
|
||||
httpAuthenticationPasswordProvider,
|
||||
systemFields,
|
||||
requestHeaders,
|
||||
config
|
||||
);
|
||||
}
|
||||
|
@ -181,7 +209,8 @@ public class HttpInputSource
|
|||
createSplits(inputFormat, null).map(split -> new HttpEntity(
|
||||
split.get(),
|
||||
httpAuthenticationUsername,
|
||||
httpAuthenticationPasswordProvider
|
||||
httpAuthenticationPasswordProvider,
|
||||
requestHeaders
|
||||
)).iterator()
|
||||
),
|
||||
SystemFieldDecoratorFactory.fromInputSource(this),
|
||||
|
@ -203,13 +232,21 @@ public class HttpInputSource
|
|||
&& Objects.equals(httpAuthenticationUsername, that.httpAuthenticationUsername)
|
||||
&& Objects.equals(httpAuthenticationPasswordProvider, that.httpAuthenticationPasswordProvider)
|
||||
&& Objects.equals(systemFields, that.systemFields)
|
||||
&& Objects.equals(requestHeaders, that.requestHeaders)
|
||||
&& Objects.equals(config, that.config);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(uris, httpAuthenticationUsername, httpAuthenticationPasswordProvider, systemFields, config);
|
||||
return Objects.hash(
|
||||
uris,
|
||||
httpAuthenticationUsername,
|
||||
httpAuthenticationPasswordProvider,
|
||||
systemFields,
|
||||
requestHeaders,
|
||||
config
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -226,6 +263,7 @@ public class HttpInputSource
|
|||
", httpAuthenticationUsername=" + httpAuthenticationUsername +
|
||||
", httpAuthenticationPasswordProvider=" + httpAuthenticationPasswordProvider +
|
||||
(systemFields.getFields().isEmpty() ? "" : ", systemFields=" + systemFields) +
|
||||
", requestHeaders = " + requestHeaders +
|
||||
"}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableSet;
|
|||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -38,14 +39,21 @@ public class HttpInputSourceConfig
|
|||
@JsonProperty
|
||||
private final Set<String> allowedProtocols;
|
||||
|
||||
@JsonProperty
|
||||
private final Set<String> allowedHeaders;
|
||||
|
||||
@JsonCreator
|
||||
public HttpInputSourceConfig(
|
||||
@JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols
|
||||
@JsonProperty("allowedProtocols") @Nullable Set<String> allowedProtocols,
|
||||
@JsonProperty("allowedHeaders") @Nullable Set<String> allowedHeaders
|
||||
)
|
||||
{
|
||||
this.allowedProtocols = allowedProtocols == null || allowedProtocols.isEmpty()
|
||||
? DEFAULT_ALLOWED_PROTOCOLS
|
||||
: allowedProtocols.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
|
||||
this.allowedHeaders = allowedHeaders == null
|
||||
? Collections.emptySet()
|
||||
: allowedHeaders.stream().map(StringUtils::toLowerCase).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
public Set<String> getAllowedProtocols()
|
||||
|
@ -53,6 +61,11 @@ public class HttpInputSourceConfig
|
|||
return allowedProtocols;
|
||||
}
|
||||
|
||||
public Set<String> getAllowedHeaders()
|
||||
{
|
||||
return allowedHeaders;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
|
@ -63,13 +76,16 @@ public class HttpInputSourceConfig
|
|||
return false;
|
||||
}
|
||||
HttpInputSourceConfig that = (HttpInputSourceConfig) o;
|
||||
return Objects.equals(allowedProtocols, that.allowedProtocols);
|
||||
return Objects.equals(allowedProtocols, that.allowedProtocols) && Objects.equals(
|
||||
allowedHeaders,
|
||||
that.allowedHeaders
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(allowedProtocols);
|
||||
return Objects.hash(allowedProtocols, allowedHeaders);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -77,6 +93,7 @@ public class HttpInputSourceConfig
|
|||
{
|
||||
return "HttpInputSourceConfig{" +
|
||||
"allowedProtocols=" + allowedProtocols +
|
||||
", allowedHeaders=" + allowedHeaders +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,7 +19,9 @@
|
|||
|
||||
package org.apache.druid.data.input.impl;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.net.HttpHeaders;
|
||||
import com.sun.net.httpserver.Headers;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -42,6 +44,8 @@ import java.net.URISyntaxException;
|
|||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
|
||||
public class HttpEntityTest
|
||||
{
|
||||
|
@ -96,8 +100,61 @@ public class HttpEntityTest
|
|||
server.start();
|
||||
|
||||
URI url = new URI("http://" + server.getAddress().getHostName() + ":" + server.getAddress().getPort() + "/test");
|
||||
inputStream = HttpEntity.openInputStream(url, "", null, 0);
|
||||
inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5);
|
||||
inputStream = HttpEntity.openInputStream(url, "", null, 0, Collections.emptyMap());
|
||||
inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, Collections.emptyMap());
|
||||
inputStream.skip(5);
|
||||
Assert.assertTrue(IOUtils.contentEquals(inputStream, inputStreamPartial));
|
||||
}
|
||||
finally {
|
||||
IOUtils.closeQuietly(inputStream);
|
||||
IOUtils.closeQuietly(inputStreamPartial);
|
||||
if (server != null) {
|
||||
server.stop(0);
|
||||
}
|
||||
if (serverSocket != null) {
|
||||
serverSocket.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRequestHeaders() throws IOException, URISyntaxException
|
||||
{
|
||||
HttpServer server = null;
|
||||
InputStream inputStream = null;
|
||||
InputStream inputStreamPartial = null;
|
||||
ServerSocket serverSocket = null;
|
||||
Map<String, String> requestHeaders = ImmutableMap.of("r-Cookie", "test", "Content-Type", "application/json");
|
||||
try {
|
||||
serverSocket = new ServerSocket(0);
|
||||
int port = serverSocket.getLocalPort();
|
||||
// closing port so that the httpserver can use. Can cause race conditions.
|
||||
serverSocket.close();
|
||||
server = HttpServer.create(new InetSocketAddress("localhost", port), 0);
|
||||
server.createContext(
|
||||
"/test",
|
||||
(httpExchange) -> {
|
||||
Headers headers = httpExchange.getRequestHeaders();
|
||||
for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
|
||||
Assert.assertTrue(headers.containsKey(entry.getKey()));
|
||||
Assert.assertEquals(headers.get(entry.getKey()).get(0), entry.getValue());
|
||||
}
|
||||
String payload = "12345678910";
|
||||
byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8);
|
||||
httpExchange.sendResponseHeaders(200, outputBytes.length);
|
||||
OutputStream os = httpExchange.getResponseBody();
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE, "application/octet-stream");
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH, String.valueOf(outputBytes.length));
|
||||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes 0");
|
||||
os.write(outputBytes);
|
||||
os.close();
|
||||
}
|
||||
);
|
||||
server.start();
|
||||
|
||||
URI url = new URI("http://" + server.getAddress().getHostName() + ":" + server.getAddress().getPort() + "/test");
|
||||
inputStream = HttpEntity.openInputStream(url, "", null, 0, requestHeaders);
|
||||
inputStreamPartial = HttpEntity.openInputStream(url, "", null, 5, requestHeaders);
|
||||
inputStream.skip(5);
|
||||
Assert.assertTrue(IOUtils.contentEquals(inputStream, inputStreamPartial));
|
||||
}
|
||||
|
@ -119,7 +176,7 @@ public class HttpEntityTest
|
|||
long offset = 15;
|
||||
String contentRange = StringUtils.format("bytes %d-%d/%d", offset, 1000, 1000);
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(contentRange);
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
|
||||
Mockito.verify(inputStreamMock, Mockito.times(0)).skip(offset);
|
||||
}
|
||||
|
||||
|
@ -128,7 +185,7 @@ public class HttpEntityTest
|
|||
{
|
||||
long offset = 15;
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn(null);
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
|
||||
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
|
||||
}
|
||||
|
||||
|
@ -137,7 +194,7 @@ public class HttpEntityTest
|
|||
{
|
||||
long offset = 15;
|
||||
Mockito.when(urlConnection.getHeaderField(HttpHeaders.CONTENT_RANGE)).thenReturn("token 2-12/12");
|
||||
HttpEntity.openInputStream(uri, "", null, offset);
|
||||
HttpEntity.openInputStream(uri, "", null, offset, Collections.emptyMap());
|
||||
Mockito.verify(inputStreamMock, Mockito.times(1)).skip(offset);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import nl.jqno.equalsverifier.EqualsVerifier;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Collections;
|
||||
|
||||
public class HttpInputSourceConfigTest
|
||||
{
|
||||
@Test
|
||||
|
@ -35,21 +37,33 @@ public class HttpInputSourceConfigTest
|
|||
@Test
|
||||
public void testNullAllowedProtocolsUseDefault()
|
||||
{
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(null);
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(null, null);
|
||||
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
|
||||
Assert.assertEquals(Collections.emptySet(), config.getAllowedHeaders());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEmptyAllowedProtocolsUseDefault()
|
||||
{
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of());
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of(), null);
|
||||
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, config.getAllowedProtocols());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomAllowedProtocols()
|
||||
{
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"));
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(ImmutableSet.of("druid"), null);
|
||||
Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllowedHeaders()
|
||||
{
|
||||
HttpInputSourceConfig config = new HttpInputSourceConfig(
|
||||
ImmutableSet.of("druid"),
|
||||
ImmutableSet.of("Content-Type", "Referer")
|
||||
);
|
||||
Assert.assertEquals(ImmutableSet.of("druid"), config.getAllowedProtocols());
|
||||
Assert.assertEquals(ImmutableSet.of("content-type", "referer"), config.getAllowedHeaders());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,11 +22,15 @@ package org.apache.druid.data.input.impl;
|
|||
import com.fasterxml.jackson.databind.InjectableValues.Std;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Sets;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.druid.data.input.InputSource;
|
||||
import org.apache.druid.data.input.impl.systemfield.SystemField;
|
||||
import org.apache.druid.data.input.impl.systemfield.SystemFields;
|
||||
import org.apache.druid.error.DruidException;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
import org.apache.druid.metadata.DefaultPasswordProvider;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
@ -35,7 +39,10 @@ import org.junit.rules.ExpectedException;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HttpInputSourceTest
|
||||
{
|
||||
|
@ -45,7 +52,7 @@ public class HttpInputSourceTest
|
|||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null);
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null, null);
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
mapper.setInjectableValues(new Std().addValue(HttpInputSourceConfig.class, httpInputSourceConfig));
|
||||
final HttpInputSource source = new HttpInputSource(
|
||||
|
@ -53,6 +60,7 @@ public class HttpInputSourceTest
|
|||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
new SystemFields(EnumSet.of(SystemField.URI)),
|
||||
null,
|
||||
httpInputSourceConfig
|
||||
);
|
||||
final byte[] json = mapper.writeValueAsBytes(source);
|
||||
|
@ -68,7 +76,8 @@ public class HttpInputSourceTest
|
|||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
|
||||
new HttpInputSource(
|
||||
|
@ -76,7 +85,8 @@ public class HttpInputSourceTest
|
|||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
|
||||
expectedException.expect(IllegalArgumentException.class);
|
||||
|
@ -86,19 +96,21 @@ public class HttpInputSourceTest
|
|||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConstructorAllowsOnlyCustomProtocols()
|
||||
{
|
||||
final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"));
|
||||
final HttpInputSourceConfig customConfig = new HttpInputSourceConfig(ImmutableSet.of("druid"), null);
|
||||
new HttpInputSource(
|
||||
ImmutableList.of(URI.create("druid:///")),
|
||||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
null,
|
||||
null,
|
||||
customConfig
|
||||
);
|
||||
|
||||
|
@ -109,6 +121,7 @@ public class HttpInputSourceTest
|
|||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
null,
|
||||
null,
|
||||
customConfig
|
||||
);
|
||||
}
|
||||
|
@ -116,12 +129,13 @@ public class HttpInputSourceTest
|
|||
@Test
|
||||
public void testSystemFields()
|
||||
{
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null);
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(null, null);
|
||||
final HttpInputSource inputSource = new HttpInputSource(
|
||||
ImmutableList.of(URI.create("http://test.com/http-test")),
|
||||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
|
||||
null,
|
||||
httpInputSourceConfig
|
||||
);
|
||||
|
||||
|
@ -130,10 +144,54 @@ public class HttpInputSourceTest
|
|||
inputSource.getConfiguredSystemFields()
|
||||
);
|
||||
|
||||
final HttpEntity entity = new HttpEntity(URI.create("https://example.com/foo"), null, null);
|
||||
final HttpEntity entity = new HttpEntity(URI.create("https://example.com/foo"), null, null, null);
|
||||
|
||||
Assert.assertEquals("https://example.com/foo", inputSource.getSystemFieldValue(entity, SystemField.URI));
|
||||
Assert.assertEquals("/foo", inputSource.getSystemFieldValue(entity, SystemField.PATH));
|
||||
Assert.assertEquals(inputSource.getRequestHeaders(), Collections.emptyMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllowedHeaders()
|
||||
{
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
|
||||
null,
|
||||
Sets.newHashSet("R-cookie", "Content-type")
|
||||
);
|
||||
final HttpInputSource inputSource = new HttpInputSource(
|
||||
ImmutableList.of(URI.create("http://test.com/http-test")),
|
||||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
|
||||
ImmutableMap.of("r-Cookie", "test", "Content-Type", "application/json"),
|
||||
httpInputSourceConfig
|
||||
);
|
||||
Set<String> expectedSet = inputSource.getRequestHeaders()
|
||||
.keySet()
|
||||
.stream()
|
||||
.map(StringUtils::toLowerCase)
|
||||
.collect(Collectors.toSet());
|
||||
Assert.assertEquals(expectedSet, httpInputSourceConfig.getAllowedHeaders());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldFailOnForbiddenHeaders()
|
||||
{
|
||||
HttpInputSourceConfig httpInputSourceConfig = new HttpInputSourceConfig(
|
||||
null,
|
||||
Sets.newHashSet("R-cookie", "Content-type")
|
||||
);
|
||||
expectedException.expect(DruidException.class);
|
||||
expectedException.expectMessage(
|
||||
"Got forbidden header G-Cookie, allowed headers are only [r-cookie, content-type]");
|
||||
new HttpInputSource(
|
||||
ImmutableList.of(URI.create("http://test.com/http-test")),
|
||||
"myName",
|
||||
new DefaultPasswordProvider("myPassword"),
|
||||
new SystemFields(EnumSet.of(SystemField.URI, SystemField.PATH)),
|
||||
ImmutableMap.of("G-Cookie", "test", "Content-Type", "application/json"),
|
||||
httpInputSourceConfig
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -204,7 +204,7 @@ public class InputEntityIteratingReaderTest extends InitializedNullHandlingTest
|
|||
),
|
||||
CloseableIterators.withEmptyBaggage(
|
||||
ImmutableList.of(
|
||||
new HttpEntity(new URI("testscheme://some/path"), null, null)
|
||||
new HttpEntity(new URI("testscheme://some/path"), null, null, null)
|
||||
{
|
||||
@Override
|
||||
protected int getMaxRetries()
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.apache.druid.catalog.model.table;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.druid.catalog.model.CatalogUtils;
|
||||
import org.apache.druid.catalog.model.ColumnSpec;
|
||||
|
@ -27,6 +29,7 @@ import org.apache.druid.catalog.model.table.TableFunction.ParameterDefn;
|
|||
import org.apache.druid.catalog.model.table.TableFunction.ParameterType;
|
||||
import org.apache.druid.data.input.InputSource;
|
||||
import org.apache.druid.data.input.impl.HttpInputSource;
|
||||
import org.apache.druid.jackson.DefaultObjectMapper;
|
||||
import org.apache.druid.java.util.common.IAE;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.metadata.DefaultPasswordProvider;
|
||||
|
@ -93,6 +96,7 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
public static final String PASSWORD_PARAMETER = "password";
|
||||
public static final String PASSWORD_ENV_VAR_PARAMETER = "passwordEnvVar";
|
||||
|
||||
public static final String HEADERS = "headers";
|
||||
private static final List<ParameterDefn> URI_PARAMS = Collections.singletonList(
|
||||
new Parameter(URIS_PARAMETER, ParameterType.VARCHAR_ARRAY, true)
|
||||
);
|
||||
|
@ -103,10 +107,15 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
new Parameter(PASSWORD_ENV_VAR_PARAMETER, ParameterType.VARCHAR, true)
|
||||
);
|
||||
|
||||
private static final List<ParameterDefn> HEADERS_PARAMS = Collections.singletonList(
|
||||
new Parameter(HEADERS, ParameterType.VARCHAR, true)
|
||||
);
|
||||
|
||||
// Field names in the HttpInputSource
|
||||
protected static final String URIS_FIELD = "uris";
|
||||
protected static final String PASSWORD_FIELD = "httpAuthenticationPassword";
|
||||
protected static final String USERNAME_FIELD = "httpAuthenticationUsername";
|
||||
protected static final String HEADERS_FIELD = "requestHeaders";
|
||||
|
||||
@Override
|
||||
public String typeValue()
|
||||
|
@ -201,7 +210,7 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
@Override
|
||||
protected List<ParameterDefn> adHocTableFnParameters()
|
||||
{
|
||||
return CatalogUtils.concatLists(URI_PARAMS, USER_PWD_PARAMS);
|
||||
return CatalogUtils.concatLists(URI_PARAMS, CatalogUtils.concatLists(USER_PWD_PARAMS, HEADERS_PARAMS));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -210,6 +219,7 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
jsonMap.put(InputSource.TYPE_PROPERTY, HttpInputSource.TYPE_KEY);
|
||||
convertUriArg(jsonMap, args);
|
||||
convertUserPasswordArgs(jsonMap, args);
|
||||
convertHeaderArg(jsonMap, args);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -228,6 +238,10 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
params = CatalogUtils.concatLists(params, USER_PWD_PARAMS);
|
||||
}
|
||||
|
||||
if (!sourceMap.containsKey(HEADERS_FIELD)) {
|
||||
params = CatalogUtils.concatLists(params, HEADERS_PARAMS);
|
||||
}
|
||||
|
||||
// Does the table define a format?
|
||||
if (table.inputFormatMap == null) {
|
||||
params = addFormatParameters(params);
|
||||
|
@ -255,6 +269,9 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
if (!sourceMap.containsKey(USERNAME_FIELD) && !sourceMap.containsKey(PASSWORD_FIELD)) {
|
||||
convertUserPasswordArgs(sourceMap, args);
|
||||
}
|
||||
if (!sourceMap.containsKey(HEADERS_FIELD)) {
|
||||
convertHeaderArg(sourceMap, args);
|
||||
}
|
||||
return convertPartialFormattedTable(table, args, columns, sourceMap);
|
||||
}
|
||||
|
||||
|
@ -283,6 +300,26 @@ public class HttpInputSourceDefn extends FormattedInputSourceDefn
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* URIs in SQL is in the form of a string that contains a comma-delimited
|
||||
* set of URIs. Done since SQL doesn't support array scalars.
|
||||
*/
|
||||
private void convertHeaderArg(Map<String, Object> jsonMap, Map<String, Object> args)
|
||||
{
|
||||
String requestHeaders = CatalogUtils.getString(args, HEADERS);
|
||||
Map<String, String> headersMap;
|
||||
if (requestHeaders != null) {
|
||||
try {
|
||||
headersMap = DefaultObjectMapper.INSTANCE.readValue(requestHeaders, new TypeReference<Map<String, String>>(){});
|
||||
}
|
||||
catch (JsonProcessingException e) {
|
||||
throw new ISE("Failed read map from headers json");
|
||||
}
|
||||
jsonMap.put(HEADERS_FIELD, headersMap);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert the user name and password. All are SQL strings. Passwords must be in
|
||||
* the form of a password provider, so do the needed conversion. HTTP provides
|
||||
|
|
|
@ -170,7 +170,8 @@ public class ExternalTableTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
Map<String, Object> sourceMap = toMap(inputSource);
|
||||
sourceMap.remove("uris");
|
||||
|
@ -195,7 +196,8 @@ public class ExternalTableTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("koala")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
|
|
@ -57,7 +57,7 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
{
|
||||
mapper.setInjectableValues(new InjectableValues.Std().addValue(
|
||||
HttpInputSourceConfig.class,
|
||||
new HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS)
|
||||
new HttpInputSourceConfig(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, null)
|
||||
));
|
||||
}
|
||||
|
||||
|
@ -99,7 +99,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -119,7 +120,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -150,7 +152,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -216,7 +219,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -272,11 +276,12 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
|
||||
// Get the partial table function
|
||||
TableFunction fn = externDefn.tableFn(resolved);
|
||||
assertEquals(4, fn.parameters().size());
|
||||
assertEquals(5, fn.parameters().size());
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.USER_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.PASSWORD_ENV_VAR_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
|
||||
|
||||
// Convert to an external table.
|
||||
ExternalTableSpec externSpec = fn.apply(
|
||||
|
@ -320,8 +325,9 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
|
||||
// Get the partial table function
|
||||
TableFunction fn = externDefn.tableFn(resolved);
|
||||
assertEquals(1, fn.parameters().size());
|
||||
assertEquals(2, fn.parameters().size());
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.URIS_PARAMETER));
|
||||
assertTrue(hasParam(fn, HttpInputSourceDefn.HEADERS));
|
||||
|
||||
// Convert to an external table.
|
||||
ExternalTableSpec externSpec = fn.apply(
|
||||
|
@ -344,7 +350,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(httpToMap(inputSource))
|
||||
|
@ -382,7 +389,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new EnvironmentVariablePasswordProvider("SECRET"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -415,7 +423,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(httpToMap(inputSource))
|
||||
|
@ -484,7 +493,8 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
"bob",
|
||||
new EnvironmentVariablePasswordProvider("SECRET"),
|
||||
null,
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
);
|
||||
TableMetadata table = TableBuilder.external("foo")
|
||||
.inputSource(toMap(inputSource))
|
||||
|
@ -518,7 +528,6 @@ public class HttpInputSourceDefnTest extends BaseExternTableTest
|
|||
assertEquals("secret", ((DefaultPasswordProvider) sourceSpec.getHttpAuthenticationPasswordProvider()).getPassword());
|
||||
}
|
||||
assertEquals("http://foo.com/my.csv", sourceSpec.getUris().get(0).toString());
|
||||
|
||||
// Just a sanity check: details of CSV conversion are tested elsewhere.
|
||||
CsvInputFormat csvFormat = (CsvInputFormat) externSpec.inputFormat;
|
||||
assertEquals(Arrays.asList("x", "y"), csvFormat.getColumns());
|
||||
|
|
|
@ -79,7 +79,7 @@ public class InputSourceModuleTest
|
|||
Properties props = new Properties();
|
||||
Injector injector = makeInjectorWithProperties(props);
|
||||
HttpInputSourceConfig instance = injector.getInstance(HttpInputSourceConfig.class);
|
||||
Assert.assertEquals(new HttpInputSourceConfig(null), instance);
|
||||
Assert.assertEquals(new HttpInputSourceConfig(null, null), instance);
|
||||
Assert.assertEquals(HttpInputSourceConfig.DEFAULT_ALLOWED_PROTOCOLS, instance.getAllowedProtocols());
|
||||
}
|
||||
|
||||
|
|
|
@ -860,7 +860,7 @@ public class CalciteInsertDmlTest extends CalciteIngestionDmlTest
|
|||
|
||||
|
||||
// Test correctness of the query when only the CLUSTERED BY clause is present
|
||||
final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"]},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"]},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]";
|
||||
final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"join\",\"left\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://boo.gz\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"json\"},\"signature\":[{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"timestamp\",\"type\":\"STRING\"},{\"name\":\"cityName\",\"type\":\"STRING\"},{\"name\":\"countryIsoCode\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}]},\"right\":{\"type\":\"query\",\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"https://foo.tsv\"],\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"tsv\",\"delimiter\":\"\\t\",\"findColumnsFromHeader\":true},\"signature\":[{\"name\":\"Country\",\"type\":\"STRING\"},{\"name\":\"Capital\",\"type\":\"STRING\"},{\"name\":\"ISO3\",\"type\":\"STRING\"},{\"name\":\"ISO2\",\"type\":\"STRING\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"Capital\",\"ISO2\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false}},\"rightPrefix\":\"j0.\",\"condition\":\"(\\\"countryIsoCode\\\" == \\\"j0.ISO2\\\")\",\"joinType\":\"LEFT\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"virtualColumns\":[{\"type\":\"expression\",\"name\":\"v0\",\"expression\":\"timestamp_parse(\\\"timestamp\\\",null,'UTC')\",\"outputType\":\"LONG\"}],\"resultFormat\":\"compactedList\",\"orderBy\":[{\"columnName\":\"v0\",\"order\":\"ascending\"},{\"columnName\":\"isRobot\",\"order\":\"ascending\"},{\"columnName\":\"j0.Capital\",\"order\":\"ascending\"},{\"columnName\":\"regionName\",\"order\":\"ascending\"}],\"columns\":[\"isRobot\",\"j0.Capital\",\"regionName\",\"v0\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"\\\"HOUR\\\"\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"v0\",\"type\":\"LONG\"},{\"name\":\"isRobot\",\"type\":\"STRING\"},{\"name\":\"j0.Capital\",\"type\":\"STRING\"},{\"name\":\"regionName\",\"type\":\"STRING\"}],\"columnMappings\":[{\"queryColumn\":\"v0\",\"outputColumn\":\"__time\"},{\"queryColumn\":\"isRobot\",\"outputColumn\":\"isRobotAlias\"},{\"queryColumn\":\"j0.Capital\",\"outputColumn\":\"countryCapital\"},{\"queryColumn\":\"regionName\",\"outputColumn\":\"regionName\"}]}]";
|
||||
|
||||
testQuery(
|
||||
PLANNER_CONFIG_NATIVE_QUERY_EXPLAIN,
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.sql.calcite;
|
|||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.calcite.avatica.SqlType;
|
||||
import org.apache.druid.catalog.model.Columns;
|
||||
import org.apache.druid.data.input.impl.CsvInputFormat;
|
||||
|
@ -86,7 +87,8 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
SystemFields.none(),
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
),
|
||||
new CsvInputFormat(ImmutableList.of("x", "y", "z"), null, false, false, 0),
|
||||
RowSignature.builder()
|
||||
|
@ -259,7 +261,8 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
SystemFields.none(),
|
||||
new HttpInputSourceConfig(null)
|
||||
ImmutableMap.of("Accept", "application/ndjson", "a", "b"),
|
||||
new HttpInputSourceConfig(null, null)
|
||||
),
|
||||
new CsvInputFormat(ImmutableList.of("timestamp", "isRobot"), null, false, false, 0),
|
||||
RowSignature.builder()
|
||||
|
@ -280,7 +283,8 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
" userName => 'bob',\n" +
|
||||
" password => 'secret',\n" +
|
||||
" uris => ARRAY['http://example.com/foo.csv', 'http://example.com/bar.csv'],\n" +
|
||||
" format => 'csv'\n" +
|
||||
" format => 'csv',\n" +
|
||||
" headers=> '{\"Accept\":\"application/ndjson\", \"a\": \"b\" }'\n" +
|
||||
" )\n" +
|
||||
") EXTEND (\"timestamp\" VARCHAR, isRobot VARCHAR)\n" +
|
||||
"PARTITIONED BY HOUR")
|
||||
|
@ -313,7 +317,7 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
" format => 'csv'))\n" +
|
||||
" EXTEND (x VARCHAR, y VARCHAR, z BIGINT)\n" +
|
||||
"PARTITIONED BY ALL TIME";
|
||||
final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
|
||||
final String explanation = "[{\"query\":{\"queryType\":\"scan\",\"dataSource\":{\"type\":\"external\",\"inputSource\":{\"type\":\"http\",\"uris\":[\"http://foo.com/bar.csv\"],\"httpAuthenticationUsername\":\"bob\",\"httpAuthenticationPassword\":{\"type\":\"default\",\"password\":\"secret\"},\"requestHeaders\":{}},\"inputFormat\":{\"type\":\"csv\",\"columns\":[\"x\",\"y\",\"z\"]},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}]},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z\"]},\"resultFormat\":\"compactedList\",\"columns\":[\"x\",\"y\",\"z\"],\"context\":{\"defaultTimeout\":300000,\"maxScatterGatherBytes\":9223372036854775807,\"sqlCurrentTimestamp\":\"2000-01-01T00:00:00Z\",\"sqlInsertSegmentGranularity\":\"{\\\"type\\\":\\\"all\\\"}\",\"sqlQueryId\":\"dummy\",\"vectorize\":\"false\",\"vectorizeVirtualColumns\":\"false\"},\"columnTypes\":[\"STRING\",\"STRING\",\"LONG\"],\"granularity\":{\"type\":\"all\"},\"legacy\":false},\"signature\":[{\"name\":\"x\",\"type\":\"STRING\"},{\"name\":\"y\",\"type\":\"STRING\"},{\"name\":\"z\",\"type\":\"LONG\"}],\"columnMappings\":[{\"queryColumn\":\"x\",\"outputColumn\":\"x\"},{\"queryColumn\":\"y\",\"outputColumn\":\"y\"},{\"queryColumn\":\"z\",\"outputColumn\":\"z\"}]}]";
|
||||
final String resources = "[{\"name\":\"EXTERNAL\",\"type\":\"EXTERNAL\"},{\"name\":\"dst\",\"type\":\"DATASOURCE\"}]";
|
||||
final String attributes = "{\"statementType\":\"INSERT\",\"targetDataSource\":\"dst\",\"partitionedBy\":{\"type\":\"all\"}}";
|
||||
|
||||
|
@ -390,7 +394,8 @@ public class IngestTableFunctionTest extends CalciteIngestionDmlTest
|
|||
"bob",
|
||||
new DefaultPasswordProvider("secret"),
|
||||
SystemFields.none(),
|
||||
new HttpInputSourceConfig(null)
|
||||
null,
|
||||
new HttpInputSourceConfig(null, null)
|
||||
),
|
||||
new JsonInputFormat(null, null, null, null, null),
|
||||
RowSignature.builder()
|
||||
|
|
|
@ -1,3 +1,3 @@
|
|||
LogicalInsert(target=[dst], partitionedBy=[ALL TIME], clusteredBy=[<none>])
|
||||
LogicalProject(inputs=[0..2])
|
||||
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
|
||||
ExternalTableScan(dataSource=[{"type":"external","inputSource":{"type":"http","uris":["http://foo.com/bar.csv"],"httpAuthenticationUsername":"bob","httpAuthenticationPassword":{"type":"default","password":"secret"},"requestHeaders":{}},"inputFormat":{"type":"csv","columns":["x","y","z"]},"signature":[{"name":"x","type":"STRING"},{"name":"y","type":"STRING"},{"name":"z","type":"LONG"}]}])
|
||||
|
|
Loading…
Reference in New Issue