mirror of https://github.com/apache/druid.git
DataSegmentPusher: Add allowed hadoop property prefixes. (#4562)
* DataSegmentPusher: Add allowed hadoop property prefixes. * Fix dots.
This commit is contained in:
parent
ae86323dbd
commit
441ee56ba9
|
@ -26,6 +26,8 @@ import io.druid.timeline.DataSegment;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface DataSegmentPusher
|
||||
|
@ -45,6 +47,15 @@ public interface DataSegmentPusher
|
|||
return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Property prefixes that should be added to the "allowedHadoopPrefix" config for passing down to Hadoop jobs. These
|
||||
* should be property prefixes like "druid.xxx", which means to include "druid.xxx" and "druid.xxx.*".
|
||||
*/
|
||||
default List<String> getAllowedPropertyPrefixesForHadoop()
|
||||
{
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/
|
||||
// If above format is ever changed, make sure to change it appropriately in other places
|
||||
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.storage.azure;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import com.microsoft.azure.storage.StorageException;
|
||||
|
@ -37,6 +38,7 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
|
@ -73,6 +75,12 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllowedPropertyPrefixesForHadoop()
|
||||
{
|
||||
return ImmutableList.of("druid.azure");
|
||||
}
|
||||
|
||||
public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment) throws
|
||||
IOException
|
||||
{
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.google.api.client.http.InputStreamContent;
|
|||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.CompressionUtils;
|
||||
|
@ -38,6 +39,7 @@ import java.io.FileInputStream;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class GoogleDataSegmentPusher implements DataSegmentPusher
|
||||
|
@ -75,6 +77,12 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
|
|||
return StringUtils.format("gs://%s/%s", config.getBucket(), config.getPrefix());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllowedPropertyPrefixesForHadoop()
|
||||
{
|
||||
return ImmutableList.of("druid.google");
|
||||
}
|
||||
|
||||
public File createDescriptorFile(final ObjectMapper jsonMapper, final DataSegment segment)
|
||||
throws IOException
|
||||
{
|
||||
|
|
|
@ -21,6 +21,7 @@ package io.druid.storage.s3;
|
|||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.inject.Inject;
|
||||
import com.metamx.emitter.EmittingLogger;
|
||||
|
@ -38,6 +39,7 @@ import java.io.File;
|
|||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.Files;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
|
@ -79,6 +81,12 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
|||
return getPathForHadoop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllowedPropertyPrefixesForHadoop()
|
||||
{
|
||||
return ImmutableList.of("druid.s3");
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
|
||||
{
|
||||
|
|
|
@ -74,6 +74,7 @@ import java.io.InputStreamReader;
|
|||
import java.io.Reader;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -259,7 +260,13 @@ public class HadoopDruidIndexerConfig
|
|||
|
||||
}
|
||||
this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity();
|
||||
this.allowedHadoopPrefix = spec.getTuningConfig().getAllowedHadoopPrefix();
|
||||
|
||||
// User-specified list plus our additional bonus list.
|
||||
this.allowedHadoopPrefix = new ArrayList<>();
|
||||
this.allowedHadoopPrefix.add("druid.storage");
|
||||
this.allowedHadoopPrefix.add("druid.javascript");
|
||||
this.allowedHadoopPrefix.addAll(DATA_SEGMENT_PUSHER.getAllowedPropertyPrefixesForHadoop());
|
||||
this.allowedHadoopPrefix.addAll(spec.getTuningConfig().getUserAllowedHadoopPrefix());
|
||||
}
|
||||
|
||||
@JsonProperty(value = "spec")
|
||||
|
|
|
@ -137,9 +137,7 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
|
||||
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
|
||||
this.useExplicitVersion = useExplicitVersion;
|
||||
this.allowedHadoopPrefix = allowedHadoopPrefix == null
|
||||
? ImmutableList.of("druid.storage.", "druid.javascript.")
|
||||
: allowedHadoopPrefix;
|
||||
this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
|
@ -323,9 +321,10 @@ public class HadoopTuningConfig implements TuningConfig
|
|||
);
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
public List<String> getAllowedHadoopPrefix()
|
||||
@JsonProperty("allowedHadoopPrefix")
|
||||
public List<String> getUserAllowedHadoopPrefix()
|
||||
{
|
||||
// Just the user-specified list. More are added in HadoopDruidIndexerConfig.
|
||||
return allowedHadoopPrefix;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -313,7 +313,7 @@ public class JobHelper
|
|||
|
||||
for (String propName : System.getProperties().stringPropertyNames()) {
|
||||
for (String prefix : listOfAllowedPrefix) {
|
||||
if (propName.startsWith(prefix)) {
|
||||
if (propName.equals(prefix) || propName.startsWith(prefix + ".")) {
|
||||
mapJavaOpts = StringUtils.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName));
|
||||
reduceJavaOpts = StringUtils.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName));
|
||||
break;
|
||||
|
|
Loading…
Reference in New Issue