mirror of https://github.com/apache/druid.git
Implement get methods for PrefetchableFirehose (#4948)
This commit is contained in:
parent
dfa9cdc982
commit
d95915f8d2
|
@ -19,6 +19,7 @@
|
|||
|
||||
package io.druid.data.input.impl;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
|
@ -164,6 +165,36 @@ public abstract class PrefetchableTextFilesFirehoseFactory<ObjectType>
|
|||
this.maxFetchRetry = maxFetchRetry == null ? DEFAULT_MAX_FETCH_RETRY : maxFetchRetry;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getMaxCacheCapacityBytes()
|
||||
{
|
||||
return maxCacheCapacityBytes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getMaxFetchCapacityBytes()
|
||||
{
|
||||
return maxFetchCapacityBytes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getPrefetchTriggerBytes()
|
||||
{
|
||||
return prefetchTriggerBytes;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public long getFetchTimeout()
|
||||
{
|
||||
return fetchTimeout;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public int getMaxFetchRetry()
|
||||
{
|
||||
return maxFetchRetry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Firehose connect(StringInputRowParser firehoseParser, File temporaryDirectory) throws IOException
|
||||
{
|
||||
|
|
|
@ -61,6 +61,11 @@
|
|||
</exclusion>
|
||||
</exclusions>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-guice</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.firehose.azure;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Objects;
|
||||
|
||||
|
||||
public class AzureBlob
|
||||
|
@ -62,4 +63,26 @@ public class AzureBlob
|
|||
+ ",path=" + path
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (o == this) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final AzureBlob that = (AzureBlob) o;
|
||||
return Objects.equals(container, that.container) &&
|
||||
Objects.equals(path, that.path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(container, path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* This class is heavily inspired by the StaticS3FirehoseFactory class in the io.druid.firehose.s3 package
|
||||
|
@ -89,4 +90,38 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi
|
|||
|
||||
return new AzureByteSource(azureStorage, container, path);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final StaticAzureBlobStoreFirehoseFactory that = (StaticAzureBlobStoreFirehoseFactory) o;
|
||||
|
||||
return Objects.equals(blobs, that.blobs) &&
|
||||
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
|
||||
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
|
||||
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
|
||||
getFetchTimeout() == that.getFetchTimeout() &&
|
||||
getMaxFetchRetry() == that.getMaxFetchRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(
|
||||
blobs,
|
||||
getMaxCacheCapacityBytes(),
|
||||
getMaxFetchCapacityBytes(),
|
||||
getPrefetchTriggerBytes(),
|
||||
getFetchTimeout(),
|
||||
getMaxFetchRetry()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.firehose.azure;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.storage.azure.AzureStorage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class StaticAzureBlobStoreFirehoseFactoryTest
|
||||
{
|
||||
private static final AzureStorage STORAGE = new AzureStorage(null);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
final ObjectMapper mapper = createObjectMapper(new TestModule());
|
||||
|
||||
final List<AzureBlob> blobs = ImmutableList.of(
|
||||
new AzureBlob("foo", "bar"),
|
||||
new AzureBlob("foo", "bar2")
|
||||
);
|
||||
|
||||
final StaticAzureBlobStoreFirehoseFactory factory = new StaticAzureBlobStoreFirehoseFactory(
|
||||
STORAGE,
|
||||
blobs,
|
||||
2048L,
|
||||
1024L,
|
||||
512L,
|
||||
100L,
|
||||
5
|
||||
);
|
||||
|
||||
final StaticAzureBlobStoreFirehoseFactory outputFact = mapper.readValue(
|
||||
mapper.writeValueAsString(factory),
|
||||
StaticAzureBlobStoreFirehoseFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, outputFact);
|
||||
}
|
||||
|
||||
private static ObjectMapper createObjectMapper(DruidModule baseModule)
|
||||
{
|
||||
final ObjectMapper baseMapper = new DefaultObjectMapper();
|
||||
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
|
||||
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ObjectMapperModule(),
|
||||
baseModule
|
||||
);
|
||||
return injector.getInstance(ObjectMapper.class);
|
||||
}
|
||||
|
||||
private static class TestModule implements DruidModule
|
||||
{
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.of(new SimpleModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Provides
|
||||
public AzureStorage getRestS3Service()
|
||||
{
|
||||
return STORAGE;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -112,6 +112,11 @@
|
|||
<artifactId>rackspace-cloudfiles-uk</artifactId>
|
||||
<version>${jclouds.version}</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-guice</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.firehose.cloudfiles;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Objects;
|
||||
|
||||
public class CloudFilesBlob
|
||||
{
|
||||
|
@ -72,4 +73,27 @@ public class CloudFilesBlob
|
|||
+ ",region=" + region
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final CloudFilesBlob that = (CloudFilesBlob) o;
|
||||
return Objects.equals(container, that.container) &&
|
||||
Objects.equals(path, that.path) &&
|
||||
Objects.equals(region, that.region);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(container, path, region);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<CloudFilesBlob>
|
||||
{
|
||||
|
@ -91,4 +92,37 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho
|
|||
{
|
||||
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (o == this) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final StaticCloudFilesFirehoseFactory that = (StaticCloudFilesFirehoseFactory) o;
|
||||
return Objects.equals(blobs, that.blobs) &&
|
||||
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
|
||||
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
|
||||
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
|
||||
getFetchTimeout() == that.getFetchTimeout() &&
|
||||
getMaxFetchRetry() == that.getMaxFetchRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(
|
||||
blobs,
|
||||
getMaxCacheCapacityBytes(),
|
||||
getMaxFetchCapacityBytes(),
|
||||
getPrefetchTriggerBytes(),
|
||||
getFetchTimeout(),
|
||||
getMaxFetchRetry()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.firehose.cloudfiles;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class StaticCloudFilesFirehoseFactoryTest
|
||||
{
|
||||
private static final CloudFilesApi API = EasyMock.niceMock(CloudFilesApi.class);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
final ObjectMapper mapper = createObjectMapper(new TestModule());
|
||||
|
||||
final List<CloudFilesBlob> blobs = ImmutableList.of(
|
||||
new CloudFilesBlob("container", "foo", "bar"),
|
||||
new CloudFilesBlob("container", "foo", "bar2")
|
||||
);
|
||||
|
||||
final StaticCloudFilesFirehoseFactory factory = new StaticCloudFilesFirehoseFactory(
|
||||
API,
|
||||
blobs,
|
||||
2048L,
|
||||
1024L,
|
||||
512L,
|
||||
100L,
|
||||
5
|
||||
);
|
||||
|
||||
final StaticCloudFilesFirehoseFactory outputFact = mapper.readValue(
|
||||
mapper.writeValueAsString(factory),
|
||||
StaticCloudFilesFirehoseFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, outputFact);
|
||||
}
|
||||
|
||||
private static ObjectMapper createObjectMapper(DruidModule baseModule)
|
||||
{
|
||||
final ObjectMapper baseMapper = new DefaultObjectMapper();
|
||||
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
|
||||
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ObjectMapperModule(),
|
||||
baseModule
|
||||
);
|
||||
return injector.getInstance(ObjectMapper.class);
|
||||
}
|
||||
|
||||
private static class TestModule implements DruidModule
|
||||
{
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.of(new SimpleModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Provides
|
||||
public CloudFilesApi getRestS3Service()
|
||||
{
|
||||
return API;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -51,6 +51,11 @@
|
|||
<artifactId>google-http-client-jackson2</artifactId>
|
||||
<version>1.22.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-guice</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -22,6 +22,8 @@ package io.druid.firehose.google;
|
|||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public class GoogleBlob
|
||||
{
|
||||
private final String bucket;
|
||||
|
@ -54,5 +56,21 @@ public class GoogleBlob
|
|||
+ ",path=" + path
|
||||
+ "}";
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final GoogleBlob that = (GoogleBlob) o;
|
||||
return Objects.equals(bucket, that.bucket) &&
|
||||
Objects.equals(path, that.path);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.io.IOException;
|
|||
import java.io.InputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<GoogleBlob>
|
||||
{
|
||||
|
@ -81,5 +82,38 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF
|
|||
{
|
||||
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final StaticGoogleBlobStoreFirehoseFactory that = (StaticGoogleBlobStoreFirehoseFactory) o;
|
||||
|
||||
return Objects.equals(blobs, that.blobs) &&
|
||||
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
|
||||
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
|
||||
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
|
||||
getFetchTimeout() == that.getFetchTimeout() &&
|
||||
getMaxFetchRetry() == that.getMaxFetchRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(
|
||||
blobs,
|
||||
getMaxCacheCapacityBytes(),
|
||||
getMaxFetchCapacityBytes(),
|
||||
getPrefetchTriggerBytes(),
|
||||
getFetchTimeout(),
|
||||
getMaxFetchRetry()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,104 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.firehose.google;
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import io.druid.storage.google.GoogleStorage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
public class StaticGoogleBlobStoreFirehoseFactoryTest
|
||||
{
|
||||
private static final GoogleStorage STORAGE = new GoogleStorage(null);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
final ObjectMapper mapper = createObjectMapper(new TestGoogleModule());
|
||||
|
||||
final List<GoogleBlob> blobs = ImmutableList.of(
|
||||
new GoogleBlob("foo", "bar"),
|
||||
new GoogleBlob("foo", "bar2")
|
||||
);
|
||||
|
||||
final StaticGoogleBlobStoreFirehoseFactory factory = new StaticGoogleBlobStoreFirehoseFactory(
|
||||
STORAGE,
|
||||
blobs,
|
||||
2048L,
|
||||
1024L,
|
||||
512L,
|
||||
100L,
|
||||
5
|
||||
);
|
||||
|
||||
final StaticGoogleBlobStoreFirehoseFactory outputFact = mapper.readValue(
|
||||
mapper.writeValueAsString(factory),
|
||||
StaticGoogleBlobStoreFirehoseFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, outputFact);
|
||||
}
|
||||
|
||||
private static ObjectMapper createObjectMapper(DruidModule baseModule)
|
||||
{
|
||||
final ObjectMapper baseMapper = new DefaultObjectMapper();
|
||||
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
|
||||
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ObjectMapperModule(),
|
||||
baseModule
|
||||
);
|
||||
return injector.getInstance(ObjectMapper.class);
|
||||
}
|
||||
|
||||
private static class TestGoogleModule implements DruidModule
|
||||
{
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
return ImmutableList.of(new SimpleModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Provides
|
||||
public GoogleStorage getRestS3Service()
|
||||
{
|
||||
return STORAGE;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -61,6 +61,12 @@
|
|||
<artifactId>commons-io</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.module</groupId>
|
||||
<artifactId>jackson-module-guice</artifactId>
|
||||
<version>${jackson.version}</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!-- Tests -->
|
||||
<dependency>
|
||||
|
|
|
@ -40,6 +40,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -205,15 +206,28 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
|
|||
return false;
|
||||
}
|
||||
|
||||
StaticS3FirehoseFactory factory = (StaticS3FirehoseFactory) o;
|
||||
|
||||
return getUris().equals(factory.getUris());
|
||||
StaticS3FirehoseFactory that = (StaticS3FirehoseFactory) o;
|
||||
|
||||
return Objects.equals(uris, that.uris) &&
|
||||
Objects.equals(prefixes, that.prefixes) &&
|
||||
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
|
||||
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
|
||||
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
|
||||
getFetchTimeout() == that.getFetchTimeout() &&
|
||||
getMaxFetchRetry() == that.getMaxFetchRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return getUris().hashCode();
|
||||
return Objects.hash(
|
||||
uris,
|
||||
prefixes,
|
||||
getMaxCacheCapacityBytes(),
|
||||
getMaxFetchCapacityBytes(),
|
||||
getPrefetchTriggerBytes(),
|
||||
getFetchTimeout(),
|
||||
getMaxFetchRetry()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,11 +19,19 @@
|
|||
|
||||
package io.druid.firehose.s3;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
|
||||
import com.fasterxml.jackson.databind.Module;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.inject.Binder;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.easymock.EasyMock;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -36,38 +44,67 @@ import java.util.List;
|
|||
*/
|
||||
public class StaticS3FirehoseFactoryTest
|
||||
{
|
||||
private static final RestS3Service SERVICE = new RestS3Service(null);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
{
|
||||
ObjectMapper mapper = new DefaultObjectMapper();
|
||||
final ObjectMapper mapper = createObjectMapper(new TestS3Module());
|
||||
|
||||
final List<URI> uris = Arrays.asList(
|
||||
new URI("s3://foo/bar/file.gz"),
|
||||
new URI("s3://bar/foo/file2.gz")
|
||||
);
|
||||
|
||||
TestStaticS3FirehoseFactory factory = new TestStaticS3FirehoseFactory(
|
||||
uris
|
||||
final StaticS3FirehoseFactory factory = new StaticS3FirehoseFactory(
|
||||
SERVICE,
|
||||
uris,
|
||||
null,
|
||||
2048L,
|
||||
1024L,
|
||||
512L,
|
||||
100L,
|
||||
5
|
||||
);
|
||||
|
||||
TestStaticS3FirehoseFactory outputFact = mapper.readValue(
|
||||
final StaticS3FirehoseFactory outputFact = mapper.readValue(
|
||||
mapper.writeValueAsString(factory),
|
||||
TestStaticS3FirehoseFactory.class
|
||||
StaticS3FirehoseFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, outputFact);
|
||||
Assert.assertEquals(uris, outputFact.getUris());
|
||||
}
|
||||
|
||||
// This class is a workaround for the injectable value that StaticS3FirehoseFactory requires
|
||||
private static class TestStaticS3FirehoseFactory extends StaticS3FirehoseFactory
|
||||
private static ObjectMapper createObjectMapper(DruidModule baseModule)
|
||||
{
|
||||
@JsonCreator
|
||||
public TestStaticS3FirehoseFactory(
|
||||
@JsonProperty("uris") List<URI> uris
|
||||
)
|
||||
final ObjectMapper baseMapper = new DefaultObjectMapper();
|
||||
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
|
||||
|
||||
final Injector injector = Guice.createInjector(
|
||||
new ObjectMapperModule(),
|
||||
baseModule
|
||||
);
|
||||
return injector.getInstance(ObjectMapper.class);
|
||||
}
|
||||
|
||||
private static class TestS3Module implements DruidModule
|
||||
{
|
||||
@Override
|
||||
public List<? extends Module> getJacksonModules()
|
||||
{
|
||||
super(EasyMock.niceMock(RestS3Service.class), uris, null, null, null, null, null, null);
|
||||
return ImmutableList.of(new SimpleModule());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure(Binder binder)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
@Provides
|
||||
public RestS3Service getRestS3Service()
|
||||
{
|
||||
return SERVICE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.io.InputStream;
|
|||
import java.net.URI;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<URI>
|
||||
{
|
||||
|
@ -71,4 +72,37 @@ public class HttpFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<UR
|
|||
{
|
||||
return object.getPath().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o)
|
||||
{
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
final HttpFirehoseFactory that = (HttpFirehoseFactory) o;
|
||||
return Objects.equals(uris, that.uris) &&
|
||||
getMaxCacheCapacityBytes() == that.getMaxCacheCapacityBytes() &&
|
||||
getMaxFetchCapacityBytes() == that.getMaxFetchCapacityBytes() &&
|
||||
getPrefetchTriggerBytes() == that.getPrefetchTriggerBytes() &&
|
||||
getFetchTimeout() == that.getFetchTimeout() &&
|
||||
getMaxFetchRetry() == that.getMaxFetchRetry();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(
|
||||
uris,
|
||||
getMaxCacheCapacityBytes(),
|
||||
getMaxFetchCapacityBytes(),
|
||||
getPrefetchTriggerBytes(),
|
||||
getFetchTimeout(),
|
||||
getMaxFetchRetry()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package io.druid.segment.realtime.firehose;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import io.druid.jackson.DefaultObjectMapper;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
public class HttpFirehoseFactoryTest
|
||||
{
|
||||
@Test
|
||||
public void testSerde() throws IOException
|
||||
{
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
final HttpFirehoseFactory factory = new HttpFirehoseFactory(
|
||||
ImmutableList.of(URI.create("http://foo/bar"), URI.create("http://foo/bar2")),
|
||||
2048L,
|
||||
1024L,
|
||||
512L,
|
||||
100L,
|
||||
5
|
||||
);
|
||||
|
||||
final HttpFirehoseFactory outputFact = mapper.readValue(
|
||||
mapper.writeValueAsString(factory),
|
||||
HttpFirehoseFactory.class
|
||||
);
|
||||
|
||||
Assert.assertEquals(factory, outputFact);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue