From d95915f8d2365367024505bf2e7c2921cfae3806 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 12 Oct 2017 16:14:45 +0900 Subject: [PATCH] Implement get methods for PrefetchableFirehose (#4948) --- .../PrefetchableTextFilesFirehoseFactory.java | 31 ++++++ extensions-contrib/azure-extensions/pom.xml | 5 + .../io/druid/firehose/azure/AzureBlob.java | 23 ++++ .../StaticAzureBlobStoreFirehoseFactory.java | 35 ++++++ ...aticAzureBlobStoreFirehoseFactoryTest.java | 104 +++++++++++++++++ .../cloudfiles-extensions/pom.xml | 5 + .../firehose/cloudfiles/CloudFilesBlob.java | 24 ++++ .../StaticCloudFilesFirehoseFactory.java | 34 ++++++ .../StaticCloudFilesFirehoseFactoryTest.java | 105 ++++++++++++++++++ extensions-contrib/google-extensions/pom.xml | 5 + .../io/druid/firehose/google/GoogleBlob.java | 18 +++ .../StaticGoogleBlobStoreFirehoseFactory.java | 34 ++++++ ...ticGoogleBlobStoreFirehoseFactoryTest.java | 104 +++++++++++++++++ extensions-core/s3-extensions/pom.xml | 6 + .../firehose/s3/StaticS3FirehoseFactory.java | 22 +++- .../s3/StaticS3FirehoseFactoryTest.java | 69 +++++++++--- .../firehose/HttpFirehoseFactory.java | 34 ++++++ .../firehose/HttpFirehoseFactoryTest.java | 53 +++++++++ 18 files changed, 691 insertions(+), 20 deletions(-) create mode 100644 extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java create mode 100644 extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java create mode 100644 extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java create mode 100644 server/src/test/java/io/druid/segment/realtime/firehose/HttpFirehoseFactoryTest.java diff --git a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java index 96e2715b3d1..367f737e8b3 100644 --- a/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java +++ b/api/src/main/java/io/druid/data/input/impl/PrefetchableTextFilesFirehoseFactory.java @@ -19,6 +19,7 @@ package io.druid.data.input.impl; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; @@ -164,6 +165,36 @@ public abstract class PrefetchableTextFilesFirehoseFactory this.maxFetchRetry = maxFetchRetry == null ? DEFAULT_MAX_FETCH_RETRY : maxFetchRetry; } + @JsonProperty + public long getMaxCacheCapacityBytes() + { + return maxCacheCapacityBytes; + } + + @JsonProperty + public long getMaxFetchCapacityBytes() + { + return maxFetchCapacityBytes; + } + + @JsonProperty + public long getPrefetchTriggerBytes() + { + return prefetchTriggerBytes; + } + + @JsonProperty + public long getFetchTimeout() + { + return fetchTimeout; + } + + @JsonProperty + public int getMaxFetchRetry() + { + return maxFetchRetry; + } + @Override public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException { diff --git a/extensions-contrib/azure-extensions/pom.xml b/extensions-contrib/azure-extensions/pom.xml index 8acacff0036..9ac27d3ef47 100644 --- a/extensions-contrib/azure-extensions/pom.xml +++ b/extensions-contrib/azure-extensions/pom.xml @@ -61,6 +61,11 @@ + + com.fasterxml.jackson.module + jackson-module-guice + ${jackson.version} + diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java index 6a4eca1afee..8e7a1009974 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/AzureBlob.java @@ -22,6 +22,7 @@ package io.druid.firehose.azure; import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; +import java.util.Objects; public class AzureBlob @@ -62,4 +63,26 @@ public class AzureBlob + ",path=" + path + "}"; } + + @Override + public boolean equals(Object o) + { + if (o == this) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final AzureBlob that = (AzureBlob) o; + return Objects.equals(container, that.container) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(container, path); + } } diff --git a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index c891c8472b0..986c259b098 100644 --- a/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.List; +import java.util.Objects; /** * This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package @@ -89,4 +90,38 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi return new AzureByteSource(azureStorage, container, path); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final StaticAzureBlobStoreFirehoseFactory that = (StaticAzureBlobStoreFirehoseFactory) o; + + return Objects.equals(blobs, that.blobs) && + getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && + getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && + getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && + getFetchTimeout() == that.getFetchTimeout() && + getMaxFetchRetry() == that.getMaxFetchRetry(); + } + + @Override + public int hashCode() + { + return Objects.hash( + blobs, + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry() + ); + } } diff --git a/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java b/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java new file mode 100644 index 00000000000..d805230be16 --- /dev/null +++ b/extensions-contrib/azure-extensions/src/test/java/io/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactoryTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.azure; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import io.druid.initialization.DruidModule; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.storage.azure.AzureStorage; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class StaticAzureBlobStoreFirehoseFactoryTest +{ + private static final AzureStorage STORAGE = new AzureStorage(null); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = createObjectMapper(new TestModule()); + + final List blobs = ImmutableList.of( + new AzureBlob("foo", "bar"), + new AzureBlob("foo", "bar2") + ); + + final StaticAzureBlobStoreFirehoseFactory factory = new StaticAzureBlobStoreFirehoseFactory( + STORAGE, + blobs, + 2048L, + 1024L, + 512L, + 100L, + 5 + ); + + final StaticAzureBlobStoreFirehoseFactory outputFact = mapper.readValue( + mapper.writeValueAsString(factory), + StaticAzureBlobStoreFirehoseFactory.class + ); + + Assert.assertEquals(factory, outputFact); + } + + private static ObjectMapper createObjectMapper(DruidModule baseModule) + { + final ObjectMapper baseMapper = new DefaultObjectMapper(); + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + return injector.getInstance(ObjectMapper.class); + } + + private static class TestModule implements DruidModule + { + @Override + public List getJacksonModules() + { + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public AzureStorage getRestS3Service() + { + return STORAGE; + } + } +} diff --git a/extensions-contrib/cloudfiles-extensions/pom.xml b/extensions-contrib/cloudfiles-extensions/pom.xml index da9af925c62..c084777de6d 100644 --- a/extensions-contrib/cloudfiles-extensions/pom.xml +++ b/extensions-contrib/cloudfiles-extensions/pom.xml @@ -112,6 +112,11 @@ rackspace-cloudfiles-uk ${jclouds.version} + + com.fasterxml.jackson.module + jackson-module-guice + ${jackson.version} + diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java index fb013ed55a7..925a7c3bdce 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/CloudFilesBlob.java @@ -22,6 +22,7 @@ package io.druid.firehose.cloudfiles; import com.fasterxml.jackson.annotation.JsonProperty; import javax.validation.constraints.NotNull; +import java.util.Objects; public class CloudFilesBlob { @@ -72,4 +73,27 @@ public class CloudFilesBlob + ",region=" + region + "}"; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CloudFilesBlob that = (CloudFilesBlob) o; + return Objects.equals(container, that.container) && + Objects.equals(path, that.path) && + Objects.equals(region, that.region); + } + + @Override + public int hashCode() + { + return Objects.hash(container, path, region); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index 29ecfb3bbc8..c94c17365f5 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.List; +import java.util.Objects; public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { @@ -91,4 +92,37 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho { return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } + + @Override + public boolean equals(Object o) + { + if (o == this) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final StaticCloudFilesFirehoseFactory that = (StaticCloudFilesFirehoseFactory) o; + return Objects.equals(blobs, that.blobs) && + getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && + getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && + getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && + getFetchTimeout() == that.getFetchTimeout() && + getMaxFetchRetry() == that.getMaxFetchRetry(); + } + + @Override + public int hashCode() + { + return Objects.hash( + blobs, + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry() + ); + } } diff --git a/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java new file mode 100644 index 00000000000..a3eaf62185e --- /dev/null +++ b/extensions-contrib/cloudfiles-extensions/src/test/java/io/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactoryTest.java @@ -0,0 +1,105 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.cloudfiles; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import io.druid.initialization.DruidModule; +import io.druid.jackson.DefaultObjectMapper; +import org.easymock.EasyMock; +import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class StaticCloudFilesFirehoseFactoryTest +{ + private static final CloudFilesApi API = EasyMock.niceMock(CloudFilesApi.class); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = createObjectMapper(new TestModule()); + + final List blobs = ImmutableList.of( + new CloudFilesBlob("container", "foo", "bar"), + new CloudFilesBlob("container", "foo", "bar2") + ); + + final StaticCloudFilesFirehoseFactory factory = new StaticCloudFilesFirehoseFactory( + API, + blobs, + 2048L, + 1024L, + 512L, + 100L, + 5 + ); + + final StaticCloudFilesFirehoseFactory outputFact = mapper.readValue( + mapper.writeValueAsString(factory), + StaticCloudFilesFirehoseFactory.class + ); + + Assert.assertEquals(factory, outputFact); + } + + private static ObjectMapper createObjectMapper(DruidModule baseModule) + { + final ObjectMapper baseMapper = new DefaultObjectMapper(); + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + return injector.getInstance(ObjectMapper.class); + } + + private static class TestModule implements DruidModule + { + @Override + public List getJacksonModules() + { + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public CloudFilesApi getRestS3Service() + { + return API; + } + } +} diff --git a/extensions-contrib/google-extensions/pom.xml b/extensions-contrib/google-extensions/pom.xml index a787001ec39..f5a987f8849 100644 --- a/extensions-contrib/google-extensions/pom.xml +++ b/extensions-contrib/google-extensions/pom.xml @@ -51,6 +51,11 @@ google-http-client-jackson2 1.22.0 + + com.fasterxml.jackson.module + jackson-module-guice + ${jackson.version} + diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java index 053a4569aeb..ea1e395c22a 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/GoogleBlob.java @@ -22,6 +22,8 @@ package io.druid.firehose.google; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Objects; + public class GoogleBlob { private final String bucket; @@ -54,5 +56,21 @@ public class GoogleBlob + ",path=" + path + "}"; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final GoogleBlob that = (GoogleBlob) o; + return Objects.equals(bucket, that.bucket) && + Objects.equals(path, that.path); + } } diff --git a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 632850f399b..965fb0ffa9e 100644 --- a/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-contrib/google-extensions/src/main/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Collection; import java.util.List; +import java.util.Objects; public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { @@ -81,5 +82,38 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF { return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + final StaticGoogleBlobStoreFirehoseFactory that = (StaticGoogleBlobStoreFirehoseFactory) o; + + return Objects.equals(blobs, that.blobs) && + getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && + getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && + getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && + getFetchTimeout() == that.getFetchTimeout() && + getMaxFetchRetry() == that.getMaxFetchRetry(); + } + + @Override + public int hashCode() + { + return Objects.hash( + blobs, + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry() + ); + } } diff --git a/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java b/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java new file mode 100644 index 00000000000..511b50d9a17 --- /dev/null +++ b/extensions-contrib/google-extensions/src/test/java/io/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactoryTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.firehose.google; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import io.druid.initialization.DruidModule; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.storage.google.GoogleStorage; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +public class StaticGoogleBlobStoreFirehoseFactoryTest +{ + private static final GoogleStorage STORAGE = new GoogleStorage(null); + + @Test + public void testSerde() throws IOException + { + final ObjectMapper mapper = createObjectMapper(new TestGoogleModule()); + + final List blobs = ImmutableList.of( + new GoogleBlob("foo", "bar"), + new GoogleBlob("foo", "bar2") + ); + + final StaticGoogleBlobStoreFirehoseFactory factory = new StaticGoogleBlobStoreFirehoseFactory( + STORAGE, + blobs, + 2048L, + 1024L, + 512L, + 100L, + 5 + ); + + final StaticGoogleBlobStoreFirehoseFactory outputFact = mapper.readValue( + mapper.writeValueAsString(factory), + StaticGoogleBlobStoreFirehoseFactory.class + ); + + Assert.assertEquals(factory, outputFact); + } + + private static ObjectMapper createObjectMapper(DruidModule baseModule) + { + final ObjectMapper baseMapper = new DefaultObjectMapper(); + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + return injector.getInstance(ObjectMapper.class); + } + + private static class TestGoogleModule implements DruidModule + { + @Override + public List getJacksonModules() + { + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public GoogleStorage getRestS3Service() + { + return STORAGE; + } + } +} diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 96dd5948745..16e5974338b 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -61,6 +61,12 @@ commons-io provided + + com.fasterxml.jackson.module + jackson-module-guice + ${jackson.version} + provided + diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index 89e281e1931..7ffaae85451 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; /** @@ -205,15 +206,28 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor return false; } - StaticS3FirehoseFactory factory = (StaticS3FirehoseFactory) o; - - return getUris().equals(factory.getUris()); + StaticS3FirehoseFactory that = (StaticS3FirehoseFactory) o; + return Objects.equals(uris, that.uris) && + Objects.equals(prefixes, that.prefixes) && + getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() && + getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() && + getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() && + getFetchTimeout() == that.getFetchTimeout() && + getMaxFetchRetry() == that.getMaxFetchRetry(); } @Override public int hashCode() { - return getUris().hashCode(); + return Objects.hash( + uris, + prefixes, + getMaxCacheCapacityBytes(), + getMaxFetchCapacityBytes(), + getPrefetchTriggerBytes(), + getFetchTimeout(), + getMaxFetchRetry() + ); } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 79a03559e8b..3b189843e2d 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -19,11 +19,19 @@ package io.druid.firehose.s3; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; + + +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import io.druid.initialization.DruidModule; import io.druid.jackson.DefaultObjectMapper; -import org.easymock.EasyMock; import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.junit.Assert; import org.junit.Test; @@ -36,38 +44,67 @@ import java.util.List; */ public class StaticS3FirehoseFactoryTest { + private static final RestS3Service SERVICE = new RestS3Service(null); + @Test public void testSerde() throws Exception { - ObjectMapper mapper = new DefaultObjectMapper(); + final ObjectMapper mapper = createObjectMapper(new TestS3Module()); final List uris = Arrays.asList( new URI("s3://foo/bar/file.gz"), new URI("s3://bar/foo/file2.gz") ); - TestStaticS3FirehoseFactory factory = new TestStaticS3FirehoseFactory( - uris + final StaticS3FirehoseFactory factory = new StaticS3FirehoseFactory( + SERVICE, + uris, + null, + 2048L, + 1024L, + 512L, + 100L, + 5 ); - TestStaticS3FirehoseFactory outputFact = mapper.readValue( + final StaticS3FirehoseFactory outputFact = mapper.readValue( mapper.writeValueAsString(factory), - TestStaticS3FirehoseFactory.class + StaticS3FirehoseFactory.class ); Assert.assertEquals(factory, outputFact); - Assert.assertEquals(uris, outputFact.getUris()); } - // This class is a workaround for the injectable value that StaticS3FirehoseFactory requires - private static class TestStaticS3FirehoseFactory extends StaticS3FirehoseFactory + private static ObjectMapper createObjectMapper(DruidModule baseModule) { - @JsonCreator - public TestStaticS3FirehoseFactory( - @JsonProperty("uris") List uris - ) + final ObjectMapper baseMapper = new DefaultObjectMapper(); + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + return injector.getInstance(ObjectMapper.class); + } + + private static class TestS3Module implements DruidModule + { + @Override + public List getJacksonModules() { - super(EasyMock.niceMock(RestS3Service.class), uris, null, null, null, null, null, null); + return ImmutableList.of(new SimpleModule()); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public RestS3Service getRestS3Service() + { + return SERVICE; } } } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java index 4a604ae7f93..f112a0da283 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -29,6 +29,7 @@ import java.io.InputStream; import java.net.URI; import java.util.Collection; import java.util.List; +import java.util.Objects; public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory { @@ -71,4 +72,37 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory