mirror of https://github.com/apache/druid.git
Using annotation to distinguish Hadoop Configuration in each module (#9013)
* Multibinding for NodeRole * Fix endpoints * fix doc * fix test * Using annotation to distinguish Hadoop Configuration in each module
This commit is contained in:
parent
e5e1e9c4ee
commit
66056b2826
|
@ -27,6 +27,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory;
|
||||||
import org.apache.druid.data.input.InputSplit;
|
import org.apache.druid.data.input.InputSplit;
|
||||||
import org.apache.druid.data.input.impl.StringInputRowParser;
|
import org.apache.druid.data.input.impl.StringInputRowParser;
|
||||||
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
|
import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
|
import org.apache.druid.inputsource.hdfs.HdfsInputSource;
|
||||||
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
|
import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller;
|
||||||
import org.apache.druid.utils.CompressionUtils;
|
import org.apache.druid.utils.CompressionUtils;
|
||||||
|
@ -46,7 +47,7 @@ public class HdfsFirehoseFactory extends PrefetchableTextFilesFirehoseFactory<Pa
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public HdfsFirehoseFactory(
|
public HdfsFirehoseFactory(
|
||||||
@JacksonInject Configuration conf,
|
@JacksonInject @Hdfs Configuration conf,
|
||||||
@JsonProperty("paths") Object inputPaths,
|
@JsonProperty("paths") Object inputPaths,
|
||||||
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
|
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
|
||||||
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
|
@JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes,
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.guice;
|
||||||
|
|
||||||
|
import com.google.inject.BindingAnnotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
|
||||||
|
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
|
||||||
|
* different instances. This is a binding annotation for druid-hdfs-storage extension. Any binding for the same type
|
||||||
|
* should be distinguished by using this annotation.
|
||||||
|
*/
|
||||||
|
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@BindingAnnotation
|
||||||
|
public @interface Hdfs
|
||||||
|
{
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ import org.apache.druid.data.input.InputSplit;
|
||||||
import org.apache.druid.data.input.SplitHintSpec;
|
import org.apache.druid.data.input.SplitHintSpec;
|
||||||
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
|
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
|
||||||
import org.apache.druid.data.input.impl.SplittableInputSource;
|
import org.apache.druid.data.input.impl.SplittableInputSource;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.common.IAE;
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -72,7 +73,7 @@ public class HdfsInputSource extends AbstractInputSource implements SplittableIn
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public HdfsInputSource(
|
public HdfsInputSource(
|
||||||
@JsonProperty(PROP_PATHS) Object inputPaths,
|
@JsonProperty(PROP_PATHS) Object inputPaths,
|
||||||
@JacksonInject Configuration configuration
|
@JacksonInject @Hdfs Configuration configuration
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
|
this.inputPaths = coerceInputPathsToList(inputPaths, PROP_PATHS);
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.emitter.EmittingLogger;
|
import org.apache.druid.java.util.emitter.EmittingLogger;
|
||||||
import org.apache.druid.segment.loading.DataSegmentKiller;
|
import org.apache.druid.segment.loading.DataSegmentKiller;
|
||||||
import org.apache.druid.segment.loading.SegmentLoadingException;
|
import org.apache.druid.segment.loading.SegmentLoadingException;
|
||||||
|
@ -43,7 +44,7 @@ public class HdfsDataSegmentKiller implements DataSegmentKiller
|
||||||
private final Path storageDirectory;
|
private final Path storageDirectory;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsDataSegmentKiller(final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
|
public HdfsDataSegmentKiller(@Hdfs final Configuration config, final HdfsDataSegmentPusherConfig pusherConfig)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
|
this.storageDirectory = new Path(pusherConfig.getStorageDirectory());
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.io.ByteSource;
|
import com.google.common.io.ByteSource;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.common.FileUtils;
|
import org.apache.druid.java.util.common.FileUtils;
|
||||||
import org.apache.druid.java.util.common.IAE;
|
import org.apache.druid.java.util.common.IAE;
|
||||||
import org.apache.druid.java.util.common.RetryUtils;
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
|
@ -178,7 +179,7 @@ public class HdfsDataSegmentPuller implements URIDataPuller
|
||||||
protected final Configuration config;
|
protected final Configuration config;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsDataSegmentPuller(final Configuration config)
|
public HdfsDataSegmentPuller(@Hdfs final Configuration config)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import com.google.common.base.Suppliers;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.druid.common.utils.UUIDUtils;
|
import org.apache.druid.common.utils.UUIDUtils;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.common.IOE;
|
import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
@ -60,7 +61,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
|
||||||
private final Supplier<String> fullyQualifiedStorageDirectory;
|
private final Supplier<String> fullyQualifiedStorageDirectory;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsDataSegmentPusher(HdfsDataSegmentPusherConfig config, Configuration hadoopConfig, ObjectMapper jsonMapper)
|
public HdfsDataSegmentPusher(
|
||||||
|
HdfsDataSegmentPusherConfig config,
|
||||||
|
@Hdfs Configuration hadoopConfig,
|
||||||
|
ObjectMapper jsonMapper
|
||||||
|
)
|
||||||
{
|
{
|
||||||
this.hadoopConfig = hadoopConfig;
|
this.hadoopConfig = hadoopConfig;
|
||||||
this.jsonMapper = jsonMapper;
|
this.jsonMapper = jsonMapper;
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.druid.storage.hdfs;
|
||||||
|
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.druid.data.SearchableVersionedDataFinder;
|
import org.apache.druid.data.SearchableVersionedDataFinder;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.common.RetryUtils;
|
import org.apache.druid.java.util.common.RetryUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -36,7 +37,7 @@ import java.util.regex.Pattern;
|
||||||
public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder<URI>
|
public class HdfsFileTimestampVersionFinder extends HdfsDataSegmentPuller implements SearchableVersionedDataFinder<URI>
|
||||||
{
|
{
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsFileTimestampVersionFinder(Configuration config)
|
public HdfsFileTimestampVersionFinder(@Hdfs Configuration config)
|
||||||
{
|
{
|
||||||
super(config);
|
super(config);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.storage.hdfs;
|
||||||
|
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.guice.ManageLifecycle;
|
import org.apache.druid.guice.ManageLifecycle;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
|
||||||
|
@ -40,7 +41,7 @@ public class HdfsStorageAuthentication
|
||||||
private final Configuration hadoopConf;
|
private final Configuration hadoopConf;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, Configuration hadoopConf)
|
public HdfsStorageAuthentication(HdfsKerberosConfig hdfsKerberosConfig, @Hdfs Configuration hadoopConf)
|
||||||
{
|
{
|
||||||
this.hdfsKerberosConfig = hdfsKerberosConfig;
|
this.hdfsKerberosConfig = hdfsKerberosConfig;
|
||||||
this.hadoopConf = hadoopConf;
|
this.hadoopConf = hadoopConf;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import com.google.inject.multibindings.MapBinder;
|
||||||
import org.apache.druid.data.SearchableVersionedDataFinder;
|
import org.apache.druid.data.SearchableVersionedDataFinder;
|
||||||
import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory;
|
import org.apache.druid.firehose.hdfs.HdfsFirehoseFactory;
|
||||||
import org.apache.druid.guice.Binders;
|
import org.apache.druid.guice.Binders;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.guice.JsonConfigProvider;
|
import org.apache.druid.guice.JsonConfigProvider;
|
||||||
import org.apache.druid.guice.LazySingleton;
|
import org.apache.druid.guice.LazySingleton;
|
||||||
import org.apache.druid.guice.LifecycleModule;
|
import org.apache.druid.guice.LifecycleModule;
|
||||||
|
@ -108,7 +109,7 @@ public class HdfsStorageDruidModule implements DruidModule
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
binder.bind(Configuration.class).toInstance(conf);
|
binder.bind(Configuration.class).annotatedWith(Hdfs.class).toInstance(conf);
|
||||||
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);
|
JsonConfigProvider.bind(binder, "druid.storage", HdfsDataSegmentPusherConfig.class);
|
||||||
|
|
||||||
Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
|
Binders.taskLogsBinder(binder).addBinding("hdfs").to(HdfsTaskLogs.class);
|
||||||
|
@ -117,6 +118,5 @@ public class HdfsStorageDruidModule implements DruidModule
|
||||||
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
|
JsonConfigProvider.bind(binder, "druid.hadoop.security.kerberos", HdfsKerberosConfig.class);
|
||||||
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
|
binder.bind(HdfsStorageAuthentication.class).in(ManageLifecycle.class);
|
||||||
LifecycleModule.register(binder, HdfsStorageAuthentication.class);
|
LifecycleModule.register(binder, HdfsStorageAuthentication.class);
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Optional;
|
||||||
import com.google.common.io.ByteSource;
|
import com.google.common.io.ByteSource;
|
||||||
import com.google.common.io.ByteStreams;
|
import com.google.common.io.ByteStreams;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.guice.Hdfs;
|
||||||
import org.apache.druid.java.util.common.IOE;
|
import org.apache.druid.java.util.common.IOE;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.tasklogs.TaskLogs;
|
import org.apache.druid.tasklogs.TaskLogs;
|
||||||
|
@ -51,7 +52,7 @@ public class HdfsTaskLogs implements TaskLogs
|
||||||
private final Configuration hadoopConfig;
|
private final Configuration hadoopConfig;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public HdfsTaskLogs(HdfsTaskLogsConfig config, Configuration hadoopConfig)
|
public HdfsTaskLogs(HdfsTaskLogsConfig config, @Hdfs Configuration hadoopConfig)
|
||||||
{
|
{
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.hadoopConfig = hadoopConfig;
|
this.hadoopConfig = hadoopConfig;
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.jsontype.NamedType;
|
||||||
import com.fasterxml.jackson.databind.module.SimpleModule;
|
import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import org.apache.druid.data.input.orc.guice.Orc;
|
||||||
import org.apache.druid.initialization.DruidModule;
|
import org.apache.druid.initialization.DruidModule;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -89,6 +90,6 @@ public class OrcExtensionsModule implements DruidModule
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
binder.bind(Configuration.class).toInstance(conf);
|
binder.bind(Configuration.class).annotatedWith(Orc.class).toInstance(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputEntity;
|
||||||
import org.apache.druid.data.input.InputEntityReader;
|
import org.apache.druid.data.input.InputEntityReader;
|
||||||
import org.apache.druid.data.input.InputRowSchema;
|
import org.apache.druid.data.input.InputRowSchema;
|
||||||
import org.apache.druid.data.input.impl.NestedInputFormat;
|
import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||||
|
import org.apache.druid.data.input.orc.guice.Orc;
|
||||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
@ -42,7 +43,7 @@ public class OrcInputFormat extends NestedInputFormat
|
||||||
public OrcInputFormat(
|
public OrcInputFormat(
|
||||||
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
|
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
|
||||||
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
|
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
|
||||||
@JacksonInject Configuration conf
|
@JacksonInject @Orc Configuration conf
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(flattenSpec);
|
super(flattenSpec);
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.orc.guice;
|
||||||
|
|
||||||
|
import com.google.inject.BindingAnnotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
|
||||||
|
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
|
||||||
|
* different instances. This is a binding annotation for druid-orc-extensions extension. Any binding for the same type
|
||||||
|
* should be distinguished by using this annotation.
|
||||||
|
*/
|
||||||
|
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@BindingAnnotation
|
||||||
|
public @interface Orc
|
||||||
|
{
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule;
|
||||||
import com.google.inject.Binder;
|
import com.google.inject.Binder;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
|
import org.apache.druid.data.input.parquet.avro.ParquetAvroHadoopInputRowParser;
|
||||||
|
import org.apache.druid.data.input.parquet.guice.Parquet;
|
||||||
import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
|
import org.apache.druid.data.input.parquet.simple.ParquetHadoopInputRowParser;
|
||||||
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
|
import org.apache.druid.data.input.parquet.simple.ParquetParseSpec;
|
||||||
import org.apache.druid.initialization.DruidModule;
|
import org.apache.druid.initialization.DruidModule;
|
||||||
|
@ -98,6 +99,6 @@ public class ParquetExtensionsModule implements DruidModule
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
binder.bind(Configuration.class).toInstance(conf);
|
binder.bind(Configuration.class).annotatedWith(Parquet.class).toInstance(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,16 @@
|
||||||
|
|
||||||
package org.apache.druid.data.input.parquet;
|
package org.apache.druid.data.input.parquet;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import org.apache.druid.data.input.InputEntity;
|
import org.apache.druid.data.input.InputEntity;
|
||||||
import org.apache.druid.data.input.InputEntityReader;
|
import org.apache.druid.data.input.InputEntityReader;
|
||||||
import org.apache.druid.data.input.InputRowSchema;
|
import org.apache.druid.data.input.InputRowSchema;
|
||||||
import org.apache.druid.data.input.impl.NestedInputFormat;
|
import org.apache.druid.data.input.impl.NestedInputFormat;
|
||||||
|
import org.apache.druid.data.input.parquet.guice.Parquet;
|
||||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -35,15 +38,18 @@ import java.util.Objects;
|
||||||
public class ParquetInputFormat extends NestedInputFormat
|
public class ParquetInputFormat extends NestedInputFormat
|
||||||
{
|
{
|
||||||
private final boolean binaryAsString;
|
private final boolean binaryAsString;
|
||||||
|
private final Configuration conf;
|
||||||
|
|
||||||
@JsonCreator
|
@JsonCreator
|
||||||
public ParquetInputFormat(
|
public ParquetInputFormat(
|
||||||
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
|
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
|
||||||
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString
|
@JsonProperty("binaryAsString") @Nullable Boolean binaryAsString,
|
||||||
|
@JacksonInject @Parquet Configuration conf
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(flattenSpec);
|
super(flattenSpec);
|
||||||
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
|
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
|
||||||
|
this.conf = conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
|
@ -65,7 +71,7 @@ public class ParquetInputFormat extends NestedInputFormat
|
||||||
File temporaryDirectory
|
File temporaryDirectory
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
return new ParquetReader(inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
|
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, getFlattenSpec(), binaryAsString);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
|
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
|
||||||
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
|
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
|
||||||
import org.apache.druid.java.util.common.parsers.ParseException;
|
import org.apache.druid.java.util.common.parsers.ParseException;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.parquet.example.data.Group;
|
import org.apache.parquet.example.data.Group;
|
||||||
import org.apache.parquet.hadoop.example.GroupReadSupport;
|
import org.apache.parquet.hadoop.example.GroupReadSupport;
|
||||||
|
@ -51,6 +52,7 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
|
||||||
private final Closer closer;
|
private final Closer closer;
|
||||||
|
|
||||||
ParquetReader(
|
ParquetReader(
|
||||||
|
Configuration conf,
|
||||||
InputRowSchema inputRowSchema,
|
InputRowSchema inputRowSchema,
|
||||||
InputEntity source,
|
InputEntity source,
|
||||||
File temporaryDirectory,
|
File temporaryDirectory,
|
||||||
|
@ -69,7 +71,9 @@ public class ParquetReader extends IntermediateRowParsingReader<Group>
|
||||||
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
|
final ClassLoader currentClassLoader = Thread.currentThread().getContextClassLoader();
|
||||||
try {
|
try {
|
||||||
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
|
||||||
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path).build());
|
reader = closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new GroupReadSupport(), path)
|
||||||
|
.withConf(conf)
|
||||||
|
.build());
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
Thread.currentThread().setContextClassLoader(currentClassLoader);
|
Thread.currentThread().setContextClassLoader(currentClassLoader);
|
||||||
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.druid.data.input.parquet.guice;
|
||||||
|
|
||||||
|
import com.google.inject.BindingAnnotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each extension module needs to properly bind whatever it will use, but sometimes different modules need to bind the
|
||||||
|
* same class which will lead to the duplicate injection error. To avoid this problem, each module is supposed to bind
|
||||||
|
* different instances. This is a binding annotation for druid-parquet-extensions extension. Any binding for the same
|
||||||
|
* type should be distinguished by using this annotation.
|
||||||
|
*/
|
||||||
|
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@BindingAnnotation
|
||||||
|
public @interface Parquet
|
||||||
|
{
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputRowSchema;
|
||||||
import org.apache.druid.data.input.impl.FileEntity;
|
import org.apache.druid.data.input.impl.FileEntity;
|
||||||
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
import org.apache.druid.java.util.common.parsers.CloseableIterator;
|
||||||
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -51,7 +52,7 @@ class BaseParquetReaderTest
|
||||||
) throws IOException
|
) throws IOException
|
||||||
{
|
{
|
||||||
FileEntity entity = new FileEntity(new File(parquetFile));
|
FileEntity entity = new FileEntity(new File(parquetFile));
|
||||||
ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString);
|
ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, binaryAsString, new Configuration());
|
||||||
return parquet.createReader(schema, entity, null);
|
return parquet.createReader(schema, entity, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue