Delagate creation of segmentPath/LoadSpec to DataSegmentPushers and add S3a support (#4116)

* Adding s3a schema and s3a implem to hdfs storage module.

* use 2.7.3

* use segment pusher to make loadspec

* move getStorageDir and makeLoad spec under DataSegmentPusher

* fix uts

* fix comment part1

* move to hadoop 2.8

* inject deep storage properties

* set version to 2.7.3

* fix build issue about static class

* fix comments

* fix default hadoop default coordinate

* fix create filesytem

* downgrade aws sdk

* bump the version
This commit is contained in:
Slim 2017-06-03 23:55:09 -07:00 committed by David Lim
parent ba816063cb
commit a2584d214a
48 changed files with 707 additions and 587 deletions

View File

@ -19,15 +19,45 @@
package io.druid.segment.loading;
import com.google.common.base.Joiner;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
public interface DataSegmentPusher
{
Joiner JOINER = Joiner.on("/").skipNulls();
@Deprecated
String getPathForHadoop(String dataSource);
String getPathForHadoop();
DataSegment push(File file, DataSegment segment) throws IOException;
//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);
default String getStorageDir(DataSegment dataSegment) {
return getDefaultStorageDir(dataSegment);
}
default String makeIndexPathName(DataSegment dataSegment, String indexName) {
return String.format("./%s/%s", getStorageDir(dataSegment),indexName);
}
// Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment) {
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
}
}

View File

@ -1,66 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.google.common.base.Joiner;
import io.druid.timeline.DataSegment;
import org.joda.time.format.ISODateTimeFormat;
/**
*/
public class DataSegmentPusherUtil
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
// Note: storage directory structure format = .../dataSource/interval/version/partitionNumber/
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
public static String getStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart(),
segment.getInterval().getEnd()
),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
);
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
public static String getHdfsStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_")
);
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.loading;
import com.google.common.collect.ImmutableMap;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
public class DataSegmentPusherUtilTest
{
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception
{
Interval interval = new Interval("2011-10-01/2011-10-02");
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");
DataSegment segment = new DataSegment(
"something",
interval,
"brand:new:version",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
null,
1
);
String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir);
}
}

View File

@ -17,4 +17,4 @@ druid.processing.numThreads=2
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"]
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"]

View File

@ -17,4 +17,4 @@ druid.processing.numThreads=2
# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.3.0"]
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.7.3"]

View File

@ -29,12 +29,12 @@ import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.Callable;
@ -85,7 +85,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
public Map<String, String> getAzurePaths(final DataSegment segment)
{
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment);
return ImmutableMap.of(
"index", String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
@ -109,16 +109,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final DataSegment outSegment = segment
.withSize(size)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
AzureStorageDruidModule.SCHEME,
"containerName",
config.getContainer(),
"blobPath",
azurePaths.get("index")
)
)
.withLoadSpec(this.makeLoadSpec(new URI(azurePaths.get("index"))))
.withBinaryVersion(version);
log.info("Deleting file [%s]", compressedSegmentData);
@ -174,4 +165,17 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
}
}
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
return ImmutableMap.<String, Object>of(
"type",
AzureStorageDruidModule.SCHEME,
"containerName",
config.getContainer(),
"blobPath",
uri.toString()
);
}
}

View File

@ -27,7 +27,6 @@ import com.google.common.io.Files;
import com.microsoft.azure.storage.StorageException;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.MapUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMockSupport;
@ -112,9 +111,9 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test
public void getAzurePathsTest()
{
final String storageDir = DataSegmentPusherUtil.getStorageDir(dataSegment);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, jsonMapper);
final String storageDir = pusher.getStorageDir(dataSegment);
Map<String, String> paths = pusher.getAzurePaths(dataSegment);
assertEquals(String.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME), paths.get("index"));

View File

@ -30,12 +30,13 @@ import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
/**
* Cassandra Segment Pusher
@ -77,7 +78,7 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
log.info("Writing [%s] to C*", indexFilesDir);
String key = JOINER.join(
config.getKeyspace().isEmpty() ? null : config.getKeyspace(),
DataSegmentPusherUtil.getStorageDir(segment)
this.getStorageDir(segment)
);
// Create index
@ -114,4 +115,10 @@ public class CassandraDataSegmentPusher extends CassandraStorage implements Data
compressedIndexFile.delete();
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException("not supported");
}
}

View File

@ -34,6 +34,8 @@ import org.jclouds.rackspace.cloudfiles.v1.CloudFilesApi;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
public class CloudFilesDataSegmentPusher implements DataSegmentPusher
@ -75,7 +77,7 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), inSegment);
final String segmentPath = CloudFilesUtils.buildCloudFilesPath(this.config.getBasePath(), getStorageDir(inSegment));
File descriptorFile = null;
File zipOutFile = null;
@ -112,18 +114,7 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
final DataSegment outSegment = inSegment
.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
CloudFilesStorageDruidModule.SCHEME,
"region",
segmentData.getRegion(),
"container",
segmentData.getContainer(),
"path",
segmentData.getPath()
)
)
.withLoadSpec(makeLoadSpec(new URI(segmentData.getPath())))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
return outSegment;
@ -146,4 +137,19 @@ public class CloudFilesDataSegmentPusher implements DataSegmentPusher
}
}
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
return ImmutableMap.<String, Object>of(
"type",
CloudFilesStorageDruidModule.SCHEME,
"region",
objectApi.getRegion(),
"container",
objectApi.getContainer(),
"path",
uri.toString()
);
}
}

View File

@ -22,8 +22,6 @@ package io.druid.storage.cloudfiles;
import com.google.common.base.Predicate;
import io.druid.java.util.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.util.concurrent.Callable;
@ -70,9 +68,4 @@ public class CloudFilesUtils
return path;
}
public static String buildCloudFilesPath(String basePath, final DataSegment segment)
{
return buildCloudFilesPath(basePath, DataSegmentPusherUtil.getStorageDir(segment));
}
}

View File

@ -30,13 +30,14 @@ import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
public class GoogleDataSegmentPusher implements DataSegmentPusher
{
@ -84,7 +85,8 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
return descriptorFile;
}
public void insert(final File file, final String contentType, final String path) throws IOException {
public void insert(final File file, final String contentType, final String path) throws IOException
{
LOG.info("Inserting [%s] to [%s]", file, path);
FileInputStream fileSteam = new FileInputStream(file);
@ -107,19 +109,13 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
try {
indexFile = File.createTempFile("index", ".zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, indexFile);
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment);
final String indexPath = buildPath(storageDir + "/" + "index.zip");
final String descriptorPath = buildPath(storageDir + "/" + "descriptor.json");
final DataSegment outSegment = segment
.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type", GoogleStorageDruidModule.SCHEME,
"bucket", config.getBucket(),
"path", indexPath
)
)
.withLoadSpec(makeLoadSpec(config.getBucket(), indexPath))
.withBinaryVersion(version);
descriptorFile = createDescriptorFile(jsonMapper, outSegment);
@ -131,7 +127,8 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
}
catch (Exception e) {
throw Throwables.propagate(e);
} finally {
}
finally {
if (indexFile != null) {
LOG.info("Deleting file [%s]", indexFile);
indexFile.delete();
@ -153,4 +150,20 @@ public class GoogleDataSegmentPusher implements DataSegmentPusher
return path;
}
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
// remove the leading "/"
return makeLoadSpec(config.getBucket(),finalIndexZipFilePath.getPath().substring(1));
}
private Map<String, Object> makeLoadSpec(String bucket, String path) {
return ImmutableMap.<String, Object>of(
"type", GoogleStorageDruidModule.SCHEME,
"bucket", bucket,
"path", path
);
}
}

View File

@ -25,7 +25,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.EasyMock;
@ -106,7 +105,7 @@ public class GoogleDataSegmentPusherTest extends EasyMockSupport
jsonMapper
).addMockedMethod("insert", File.class, String.class, String.class).createMock();
final String storageDir = DataSegmentPusherUtil.getStorageDir(segmentToPush);
final String storageDir = pusher.getStorageDir(segmentToPush);
final String indexPath = prefix + "/" + storageDir + "/" + "index.zip";
final String descriptorPath = prefix + "/" + storageDir + "/" + "descriptor.json";

View File

@ -216,7 +216,8 @@ public class OrcIndexGeneratorJobTest
true,
null,
false,
false
false,
null
)
)
);

View File

@ -140,12 +140,17 @@
<artifactId>emitter</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>
@ -178,6 +183,12 @@
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-indexing-hadoop</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -29,17 +29,19 @@ import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HadoopFsWrapper;
import org.apache.hadoop.fs.Path;
import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;
/**
*/
@ -62,8 +64,11 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
this.config = config;
this.hadoopConfig = hadoopConfig;
this.jsonMapper = jsonMapper;
this.fullyQualifiedStorageDirectory = FileSystem.newInstance(hadoopConfig).makeQualified(new Path(config.getStorageDirectory()))
.toUri().toString();
Path storageDir = new Path(config.getStorageDirectory());
this.fullyQualifiedStorageDirectory = FileSystem.newInstance(storageDir.toUri(), hadoopConfig)
.makeQualified(storageDir)
.toUri()
.toString();
log.info("Configured HDFS as deep storage");
}
@ -84,7 +89,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment push(File inDir, DataSegment segment) throws IOException
{
final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment);
final String storageDir = this.getStorageDir(segment);
log.info(
"Copying segment[%s] to HDFS at location[%s/%s]",
@ -115,7 +120,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
storageDir,
segment.getShardSpec().getPartitionNum()
));
final Path outDescriptorFile = new Path(String.format(
"%s/%s/%d_descriptor.json",
fullyQualifiedStorageDirectory,
@ -123,7 +127,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
segment.getShardSpec().getPartitionNum()
));
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile))
dataSegment = segment.withLoadSpec(makeLoadSpec(outIndexFile.toUri()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(inDir));
@ -176,11 +180,6 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
}
}
private ImmutableMap<String, Object> makeLoadSpec(Path outFile)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", outFile.toUri().toString());
}
private static class HdfsOutputStreamSupplier extends ByteSink
{
private final FileSystem fs;
@ -198,4 +197,40 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher
return fs.create(descriptorFile);
}
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return ImmutableMap.<String, Object>of("type", "hdfs", "path", finalIndexZipFilePath.toString());
}
/**
* Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in
* path names. So we format paths differently for HDFS.
*/
@Override
public String getStorageDir(DataSegment segment)
{
return JOINER.join(
segment.getDataSource(),
String.format(
"%s_%s",
segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()),
segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
segment.getVersion().replaceAll(":", "_")
);
}
@Override
public String makeIndexPathName(DataSegment dataSegment, String indexName)
{
return String.format(
"./%s/%d_%s",
this.getStorageDir(dataSegment),
dataSegment.getShardSpec().getPartitionNum(),
indexName
);
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
@ -30,20 +31,33 @@ import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import io.druid.indexer.Bucket;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.JobHelper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.jackson.GranularityModule;
import io.druid.segment.loading.LocalDataSegmentPusher;
import io.druid.segment.loading.LocalDataSegmentPusherConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@ -51,18 +65,33 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class HdfsDataSegmentPusherTest
{
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
@Rule
public final ExpectedException expectedException = ExpectedException.none();
TestObjectMapper objectMapper = new TestObjectMapper();
static TestObjectMapper objectMapper = new TestObjectMapper();
private HdfsDataSegmentPusher hdfsDataSegmentPusher;
@Before
public void setUp() throws IOException
{
HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConf = new HdfsDataSegmentPusherConfig();
hdfsDataSegmentPusherConf.setStorageDirectory("path/to/");
hdfsDataSegmentPusher = new HdfsDataSegmentPusher(hdfsDataSegmentPusherConf, new Configuration(true), objectMapper);
}
static {
objectMapper = new TestObjectMapper();
objectMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, objectMapper));
}
@Test
public void testPushWithScheme() throws Exception
@ -73,8 +102,8 @@ public class HdfsDataSegmentPusherTest
@Test
public void testPushWithBadScheme() throws Exception
{
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Wrong FS");
expectedException.expect(IOException.class);
expectedException.expectMessage("No FileSystem for scheme");
testUsingScheme("xyzzy");
// Not reached
@ -133,7 +162,7 @@ public class HdfsDataSegmentPusherTest
String indexUri = String.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segmentToPush),
pusher.getStorageDir(segmentToPush),
segmentToPush.getShardSpec().getPartitionNum()
);
@ -146,7 +175,7 @@ public class HdfsDataSegmentPusherTest
indexUri
), segment.getLoadSpec());
// rename directory after push
final String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(segment);
final String segmentPath = pusher.getStorageDir(segment);
File indexFile = new File(String.format(
"%s/%s/%d_index.zip",
@ -217,7 +246,7 @@ public class HdfsDataSegmentPusherTest
String indexUri = String.format(
"%s/%s/%d_index.zip",
FileSystem.newInstance(conf).makeQualified(new Path(config.getStorageDirectory())).toUri().toString(),
DataSegmentPusherUtil.getHdfsStorageDir(segments[i]),
pusher.getStorageDir(segments[i]),
segments[i].getShardSpec().getPartitionNum()
);
@ -230,7 +259,7 @@ public class HdfsDataSegmentPusherTest
indexUri
), pushedSegment.getLoadSpec());
// rename directory after push
String segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(pushedSegment);
String segmentPath = pusher.getStorageDir(pushedSegment);
File indexFile = new File(String.format(
"%s/%s/%d_index.zip",
@ -259,7 +288,7 @@ public class HdfsDataSegmentPusherTest
indexUri
), fromDescriptorFileDataSegment.getLoadSpec());
// rename directory after push
segmentPath = DataSegmentPusherUtil.getHdfsStorageDir(fromDescriptorFileDataSegment);
segmentPath = pusher.getStorageDir(fromDescriptorFileDataSegment);
indexFile = new File(String.format(
"%s/%s/%d_index.zip",
@ -282,7 +311,7 @@ public class HdfsDataSegmentPusherTest
}
}
public class TestObjectMapper extends ObjectMapper
public static class TestObjectMapper extends ObjectMapper
{
public TestObjectMapper()
{
@ -292,10 +321,12 @@ public class HdfsDataSegmentPusherTest
configure(MapperFeature.AUTO_DETECT_IS_GETTERS, false);
configure(MapperFeature.AUTO_DETECT_SETTERS, false);
configure(SerializationFeature.INDENT_OUTPUT, false);
configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
registerModule(new TestModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec")));
registerModule(new GranularityModule());
}
public class TestModule extends SimpleModule
public static class TestModule extends SimpleModule
{
TestModule()
{
@ -317,4 +348,250 @@ public class HdfsDataSegmentPusherTest
}
}
@Test
public void shouldNotHaveColonsInHdfsStorageDir() throws Exception
{
Interval interval = new Interval("2011-10-01/2011-10-02");
ImmutableMap<String, Object> loadSpec = ImmutableMap.<String, Object>of("something", "or_other");
DataSegment segment = new DataSegment(
"something",
interval,
"brand:new:version",
loadSpec,
Arrays.asList("dim1", "dim2"),
Arrays.asList("met1", "met2"),
NoneShardSpec.instance(),
null,
1
);
String storageDir = hdfsDataSegmentPusher.getStorageDir(segment);
Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version", storageDir);
}
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
HadoopIngestionSpec schema;
try {
schema = objectMapper.readValue(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
//DataSchema dataSchema = new DataSchema("dataSource", null, null, Gra)
//schema = new HadoopIngestionSpec(dataSchema, ioConfig, HadoopTuningConfig.makeDefaultTuningConfig());
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.INDEX_ZIP,
hdfsDataSegmentPusher
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON,
hdfsDataSegmentPusher
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0),
hdfsDataSegmentPusher
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip.0",
path.toString()
);
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopIngestionSpec schema;
try {
schema = objectMapper.readValue(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"the:data:source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.INDEX_ZIP,
new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON,
new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0),
new LocalDataSegmentPusher( new LocalDataSegmentPusherConfig(), objectMapper)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/index.zip.0",
path.toString()
);
}
}

View File

@ -30,6 +30,7 @@ import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
@ -69,7 +70,7 @@ public class S3DataSegmentMover implements DataSegmentMover
final String targetS3Bucket = MapUtils.getString(targetLoadSpec, "bucket");
final String targetS3BaseKey = MapUtils.getString(targetLoadSpec, "baseKey");
final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, segment);
final String targetS3Path = S3Utils.constructSegmentPath(targetS3BaseKey, DataSegmentPusher.getDefaultStorageDir(segment));
String targetS3DescriptorPath = S3Utils.descriptorPathForSegmentPath(targetS3Path);
if (targetS3Bucket.isEmpty()) {

View File

@ -38,6 +38,8 @@ import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.Callable;
public class S3DataSegmentPusher implements DataSegmentPusher
@ -65,6 +67,9 @@ public class S3DataSegmentPusher implements DataSegmentPusher
@Override
public String getPathForHadoop()
{
if (config.isUseS3aSchema()) {
return String.format("s3a://%s/%s", config.getBucket(), config.getBaseKey());
}
return String.format("s3n://%s/%s", config.getBucket(), config.getBaseKey());
}
@ -78,7 +83,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment push(final File indexFilesDir, final DataSegment inSegment) throws IOException
{
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), inSegment);
final String s3Path = S3Utils.constructSegmentPath(config.getBaseKey(), getStorageDir(inSegment));
log.info("Copying segment[%s] to S3 at location[%s]", inSegment.getIdentifier(), s3Path);
@ -107,16 +112,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
s3Client.putObject(outputBucket, toPush);
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(
ImmutableMap.<String, Object>of(
"type",
"s3_zip",
"bucket",
outputBucket,
"key",
toPush.getKey()
)
)
.withLoadSpec(makeLoadSpec(outputBucket, toPush.getKey()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
@ -149,4 +145,30 @@ public class S3DataSegmentPusher implements DataSegmentPusher
throw Throwables.propagate(e);
}
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
// remove the leading "/"
return makeLoadSpec(finalIndexZipFilePath.getHost(), finalIndexZipFilePath.getPath().substring(1));
}
/**
* Any change in loadSpec need to be reflected {@link io.druid.indexer.JobHelper#getURIFromSegment()}
*
*/
@SuppressWarnings("JavadocReference")
private Map<String, Object> makeLoadSpec(String bucket, String key)
{
return ImmutableMap.<String, Object>of(
"type",
"s3_zip",
"bucket",
bucket,
"key",
key,
"S3Schema",
config.isUseS3aSchema() ? "s3a" : "s3n"
);
}
}

View File

@ -39,6 +39,9 @@ public class S3DataSegmentPusherConfig
@JsonProperty
@Min(0)
private int maxListingLength = 1000;
// use s3n by default for backward compatibility
@JsonProperty
private boolean useS3aSchema = false;
public void setBucket(String bucket)
{
@ -60,6 +63,16 @@ public class S3DataSegmentPusherConfig
this.maxListingLength = maxListingLength;
}
public boolean isUseS3aSchema()
{
return useS3aSchema;
}
public void setUseS3aSchema(boolean useS3aSchema)
{
this.useS3aSchema = useS3aSchema;
}
public String getBucket()
{
return bucket;

View File

@ -24,8 +24,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import io.druid.java.util.common.RetryUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
@ -187,11 +185,11 @@ public class S3Utils
};
}
public static String constructSegmentPath(String baseKey, DataSegment segment)
public static String constructSegmentPath(String baseKey, String storageDir)
{
return JOINER.join(
baseKey.isEmpty() ? null : baseKey,
DataSegmentPusherUtil.getStorageDir(segment)
storageDir
) + "/index.zip";
}

View File

@ -39,7 +39,7 @@ public class S3DataSegmentPusherConfigTest
public void testSerialization() throws IOException
{
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+"\"disableAcl\":false,\"maxListingLength\":2000}";
+"\"disableAcl\":false,\"maxListingLength\":2000,\"useS3aSchema\":false}";
S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Assert.assertEquals(jsonConfig, jsonMapper.writeValueAsString(config));
@ -50,7 +50,7 @@ public class S3DataSegmentPusherConfigTest
{
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}";
String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+"\"disableAcl\":false,\"maxListingLength\":1000}";
+"\"disableAcl\":false,\"maxListingLength\":1000,\"useS3aSchema\":false}";
S3DataSegmentPusherConfig config = jsonMapper.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Assert.assertEquals(expectedJsonConfig, jsonMapper.writeValueAsString(config));

View File

@ -54,6 +54,7 @@ import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.ShardSpec;
@ -92,9 +93,11 @@ public class HadoopDruidIndexerConfig
public static final IndexMerger INDEX_MERGER;
public static final IndexMergerV9 INDEX_MERGER_V9;
public static final HadoopKerberosConfig HADOOP_KERBEROS_CONFIG;
public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";
static {
injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
@ -118,6 +121,7 @@ public class HadoopDruidIndexerConfig
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
HADOOP_KERBEROS_CONFIG = injector.getInstance(HadoopKerberosConfig.class);
DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class);
}
public static enum IndexJobCounters
@ -218,6 +222,7 @@ public class HadoopDruidIndexerConfig
private final Map<Long, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final Granularity rollupGran;
private final List<String> allowedHadoopPrefix;
@JsonCreator
public HadoopDruidIndexerConfig(
@ -254,6 +259,7 @@ public class HadoopDruidIndexerConfig
}
this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity();
this.allowedHadoopPrefix = spec.getTuningConfig().getAllowedHadoopPrefix();
}
@JsonProperty(value = "spec")
@ -592,4 +598,9 @@ public class HadoopDruidIndexerConfig
Preconditions.checkNotNull(schema.getIOConfig().getSegmentOutputPath(), "segmentOutputPath");
Preconditions.checkNotNull(schema.getTuningConfig().getVersion(), "version");
}
public List<String> getAllowedHadoopPrefix()
{
return allowedHadoopPrefix;
}
}

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
@ -66,7 +67,8 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS,
false,
false
false,
null
);
}
@ -87,6 +89,7 @@ public class HadoopTuningConfig implements TuningConfig
private final int numBackgroundPersistThreads;
private final boolean forceExtendableShardSpecs;
private final boolean useExplicitVersion;
private final List<String> allowedHadoopPrefix;
@JsonCreator
public HadoopTuningConfig(
@ -108,7 +111,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads,
final @JsonProperty("forceExtendableShardSpecs") boolean forceExtendableShardSpecs,
final @JsonProperty("useExplicitVersion") boolean useExplicitVersion
final @JsonProperty("useExplicitVersion") boolean useExplicitVersion,
final @JsonProperty("allowedHadoopPrefix") List<String> allowedHadoopPrefix
)
{
this.workingPath = workingPath;
@ -135,6 +139,9 @@ public class HadoopTuningConfig implements TuningConfig
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
this.useExplicitVersion = useExplicitVersion;
this.allowedHadoopPrefix = allowedHadoopPrefix == null
? ImmutableList.of("druid.storage.", "druid.javascript.")
: allowedHadoopPrefix;
}
@JsonProperty
@ -259,7 +266,8 @@ public class HadoopTuningConfig implements TuningConfig
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion
useExplicitVersion,
null
);
}
@ -283,7 +291,8 @@ public class HadoopTuningConfig implements TuningConfig
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion
useExplicitVersion,
null
);
}
@ -307,7 +316,14 @@ public class HadoopTuningConfig implements TuningConfig
buildV9Directly,
numBackgroundPersistThreads,
forceExtendableShardSpecs,
useExplicitVersion
useExplicitVersion,
null
);
}
@JsonProperty
public List<String> getAllowedHadoopPrefix()
{
return allowedHadoopPrefix;
}
}

View File

@ -166,6 +166,8 @@ public class IndexGeneratorJob implements Jobby
JobHelper.injectSystemProperties(job);
config.addJobProperties(job);
// inject druid properties like deep storage bindings
JobHelper.injectDruidProperties(job.getConfiguration(), config.getAllowedHadoopPrefix());
job.setMapperClass(IndexGeneratorMapper.class);
job.setMapOutputValueClass(BytesWritable.class);
@ -741,20 +743,24 @@ public class IndexGeneratorJob implements Jobby
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
JobHelper.INDEX_ZIP
JobHelper.INDEX_ZIP,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeFileNamePath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
JobHelper.DESCRIPTOR_JSON
JobHelper.DESCRIPTOR_JSON,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeTmpPath(
new Path(config.getSchema().getIOConfig().getSegmentOutputPath()),
outputFS,
segmentTemplate,
context.getTaskAttemptID()
)
context.getTaskAttemptID(),
config.DATA_SEGMENT_PUSHER
),
config.DATA_SEGMENT_PUSHER
);
Path descriptorPath = config.makeDescriptorInfoPath(segment);

View File

@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
@ -35,7 +34,7 @@ import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusherUtil;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -45,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.UserGroupInformation;
@ -310,6 +310,29 @@ public class JobHelper
injectSystemProperties(job.getConfiguration());
}
public static void injectDruidProperties(Configuration configuration, List<String> listOfAllowedPrefix)
{
String mapJavaOpts = configuration.get(MRJobConfig.MAP_JAVA_OPTS);
String reduceJavaOpts = configuration.get(MRJobConfig.REDUCE_JAVA_OPTS);
for (String propName : System.getProperties().stringPropertyNames()) {
for (String prefix : listOfAllowedPrefix) {
if (propName.startsWith(prefix)) {
mapJavaOpts = String.format("%s -D%s=%s", mapJavaOpts, propName, System.getProperty(propName));
reduceJavaOpts = String.format("%s -D%s=%s", reduceJavaOpts, propName, System.getProperty(propName));
break;
}
}
}
if (!Strings.isNullOrEmpty(mapJavaOpts)) {
configuration.set(MRJobConfig.MAP_JAVA_OPTS, mapJavaOpts);
}
if (!Strings.isNullOrEmpty(reduceJavaOpts)) {
configuration.set(MRJobConfig.REDUCE_JAVA_OPTS, reduceJavaOpts);
}
}
public static Configuration injectSystemProperties(Configuration conf)
{
for (String propName : System.getProperties().stringPropertyNames()) {
@ -379,7 +402,8 @@ public class JobHelper
final File mergedBase,
final Path finalIndexZipFilePath,
final Path finalDescriptorPath,
final Path tmpPath
final Path tmpPath,
DataSegmentPusher dataSegmentPusher
)
throws IOException
{
@ -412,43 +436,8 @@ public class JobHelper
log.info("Zipped %,d bytes to [%s]", size.get(), tmpPath.toUri());
final URI indexOutURI = finalIndexZipFilePath.toUri();
final ImmutableMap<String, Object> loadSpec;
// TODO: Make this a part of Pushers or Pullers
switch (outputFS.getScheme()) {
case "hdfs":
case "viewfs":
case "maprfs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "hdfs",
"path", indexOutURI.toString()
);
break;
case "gs":
loadSpec = ImmutableMap.<String, Object>of(
"type", "google",
"bucket", indexOutURI.getHost(),
"path", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "s3":
case "s3n":
loadSpec = ImmutableMap.<String, Object>of(
"type", "s3_zip",
"bucket", indexOutURI.getHost(),
"key", indexOutURI.getPath().substring(1) // remove the leading "/"
);
break;
case "file":
loadSpec = ImmutableMap.<String, Object>of(
"type", "local",
"path", indexOutURI.getPath()
);
break;
default:
throw new IAE("Unknown file system scheme [%s]", outputFS.getScheme());
}
final DataSegment finalSegment = segmentTemplate
.withLoadSpec(loadSpec)
.withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI))
.withSize(size.get())
.withBinaryVersion(SegmentUtils.getVersionFromDir(mergedBase));
@ -575,77 +564,33 @@ public class JobHelper
out.putNextEntry(new ZipEntry(file.getName()));
}
public static boolean isHdfs(FileSystem fs)
{
return "hdfs".equals(fs.getScheme()) || "viewfs".equals(fs.getScheme()) || "maprfs".equals(fs.getScheme());
}
public static Path makeFileNamePath(
final Path basePath,
final FileSystem fs,
final DataSegment segmentTemplate,
final String baseFileName
final String baseFileName,
DataSegmentPusher dataSegmentPusher
)
{
final Path finalIndexZipPath;
final String segmentDir;
if (isHdfs(fs)) {
segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate);
finalIndexZipPath = new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_%s",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
baseFileName
)
);
} else {
segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate);
finalIndexZipPath = new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%s",
segmentDir,
baseFileName
)
);
}
return finalIndexZipPath;
return new Path(prependFSIfNullScheme(fs, basePath),
dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName));
}
public static Path makeTmpPath(
final Path basePath,
final FileSystem fs,
final DataSegment segmentTemplate,
final TaskAttemptID taskAttemptID
final TaskAttemptID taskAttemptID,
DataSegmentPusher dataSegmentPusher
)
{
final String segmentDir;
if (isHdfs(fs)) {
segmentDir = DataSegmentPusherUtil.getHdfsStorageDir(segmentTemplate);
return new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_index.zip.%d",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
taskAttemptID.getId()
)
);
} else {
segmentDir = DataSegmentPusherUtil.getStorageDir(segmentTemplate);
return new Path(
prependFSIfNullScheme(fs, basePath),
String.format(
"./%s/%d_index.zip.%d",
segmentDir,
segmentTemplate.getShardSpec().getPartitionNum(),
taskAttemptID.getId()
)
);
}
return new Path(
prependFSIfNullScheme(fs, basePath),
String.format("./%s.%d",
dataSegmentPusher.makeIndexPathName(segmentTemplate, JobHelper.INDEX_ZIP),
taskAttemptID.getId()
)
);
}
/**
@ -793,7 +738,12 @@ public class JobHelper
final String type = loadSpec.get("type").toString();
final URI segmentLocURI;
if ("s3_zip".equals(type)) {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
if ("s3a".equals(loadSpec.get("S3Schema"))) {
segmentLocURI = URI.create(String.format("s3a://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
} else {
segmentLocURI = URI.create(String.format("s3n://%s/%s", loadSpec.get("bucket"), loadSpec.get("key")));
}
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("google".equals(type)) {

View File

@ -556,20 +556,24 @@ public class HadoopConverterJob
baseOutputPath,
outputFS,
finalSegmentTemplate,
JobHelper.INDEX_ZIP
JobHelper.INDEX_ZIP,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeFileNamePath(
baseOutputPath,
outputFS,
finalSegmentTemplate,
JobHelper.DESCRIPTOR_JSON
JobHelper.DESCRIPTOR_JSON,
config.DATA_SEGMENT_PUSHER
),
JobHelper.makeTmpPath(
baseOutputPath,
outputFS,
finalSegmentTemplate,
context.getTaskAttemptID()
)
context.getTaskAttemptID(),
config.DATA_SEGMENT_PUSHER
),
config.DATA_SEGMENT_PUSHER
);
context.progress();
context.setStatus("Finished PUSH");

View File

@ -37,6 +37,7 @@ import io.druid.initialization.Initialization;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexSpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.server.DruidNode;
import io.druid.timeline.DataSegment;
import org.joda.time.Interval;
@ -53,6 +54,7 @@ public class HadoopDruidConverterConfig
public static final ObjectMapper jsonMapper;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER;
public static final DataSegmentPusher DATA_SEGMENT_PUSHER;
private static final Injector injector = Initialization.makeInjectorWithModules(
GuiceInjectors.makeStartupInjector(),
@ -75,6 +77,7 @@ public class HadoopDruidConverterConfig
jsonMapper.registerSubtypes(HadoopDruidConverterConfig.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER = injector.getInstance(IndexMerger.class);
DATA_SEGMENT_PUSHER = injector.getInstance(DataSegmentPusher.class);
}
private static final TypeReference<Map<String, Object>> mapTypeReference = new TypeReference<Map<String, Object>>()

View File

@ -386,7 +386,8 @@ public class BatchDeltaIngestionTest
null,
null,
false,
false
false,
null
)
)
);

View File

@ -177,7 +177,8 @@ public class DetermineHashedPartitionsJobTest
null,
null,
false,
false
false,
null
)
);
this.indexerConfig = new HadoopDruidIndexerConfig(ingestionSpec);

View File

@ -269,7 +269,8 @@ public class DeterminePartitionsJobTest
null,
null,
false,
false
false,
null
)
)
);

View File

@ -21,7 +21,6 @@ package io.druid.indexer;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
@ -31,15 +30,8 @@ import io.druid.java.util.common.granularity.Granularities;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskType;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
@ -58,229 +50,6 @@ public class HadoopDruidIndexerConfigTest
jsonMapper.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, jsonMapper));
}
public static <T> T jsonReadWriteRead(String s, Class<T> klass)
{
try {
return jsonMapper.readValue(jsonMapper.writeValueAsBytes(jsonMapper.readValue(s, klass)), klass);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void shouldMakeHDFSCompliantSegmentOutputPath()
{
HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"hdfs://server:9100/tmp/druid/datatest\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.INDEX_ZIP
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new DistributedFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0)
);
Assert.assertEquals(
"hdfs://server:9100/tmp/druid/datatest/source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version"
+ "/4712_index.zip.0",
path.toString()
);
}
@Test
public void shouldMakeDefaultSegmentOutputPathIfNotHDFS()
{
final HadoopIngestionSpec schema;
try {
schema = jsonReadWriteRead(
"{\n"
+ " \"dataSchema\": {\n"
+ " \"dataSource\": \"the:data:source\",\n"
+ " \"metricsSpec\": [],\n"
+ " \"granularitySpec\": {\n"
+ " \"type\": \"uniform\",\n"
+ " \"segmentGranularity\": \"hour\",\n"
+ " \"intervals\": [\"2012-07-10/P1D\"]\n"
+ " }\n"
+ " },\n"
+ " \"ioConfig\": {\n"
+ " \"type\": \"hadoop\",\n"
+ " \"segmentOutputPath\": \"/tmp/dru:id/data:test\"\n"
+ " }\n"
+ "}",
HadoopIngestionSpec.class
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
HadoopDruidIndexerConfig cfg = new HadoopDruidIndexerConfig(
schema.withTuningConfig(
schema.getTuningConfig()
.withVersion(
"some:brand:new:version"
)
)
);
Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712);
Path path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.INDEX_ZIP
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/index.zip",
path.toString()
);
path = JobHelper.makeFileNamePath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
JobHelper.DESCRIPTOR_JSON
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/descriptor.json",
path.toString()
);
path = JobHelper.makeTmpPath(
new Path(cfg.getSchema().getIOConfig().getSegmentOutputPath()),
new LocalFileSystem(),
new DataSegment(
cfg.getSchema().getDataSchema().getDataSource(),
cfg.getSchema().getDataSchema().getGranularitySpec().bucketInterval(bucket.time).get(),
cfg.getSchema().getTuningConfig().getVersion(),
null,
null,
null,
new NumberedShardSpec(bucket.partitionNum, 5000),
-1,
-1
),
new TaskAttemptID("abc", 123, TaskType.REDUCE, 1, 0)
);
Assert.assertEquals(
"file:/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:"
+ "version/4712/4712_index.zip.0",
path.toString()
);
}
@Test
public void testHashedBucketSelection()
@ -325,7 +94,8 @@ public class HadoopDruidIndexerConfigTest
null,
null,
false,
false
false,
null
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
@ -397,7 +167,8 @@ public class HadoopDruidIndexerConfigTest
null,
null,
false,
false
false,
null
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);

View File

@ -57,7 +57,8 @@ public class HadoopTuningConfigTest
null,
null,
true,
true
true,
null
);
HadoopTuningConfig actual = jsonReadWriteRead(jsonMapper.writeValueAsString(expected), HadoopTuningConfig.class);

View File

@ -525,7 +525,8 @@ public class IndexGeneratorJobTest
buildV9Directly,
null,
forceExtendableShardSpecs,
false
false,
null
)
)
);

View File

@ -125,7 +125,8 @@ public class JobHelperTest
null,
null,
false,
false
false,
null
)
)
);

View File

@ -71,7 +71,8 @@ public class GranularityPathSpecTest
null,
null,
false,
false
false,
null
);
private GranularityPathSpec granularityPathSpec;

View File

@ -209,7 +209,8 @@ public class HadoopConverterJobTest
null,
null,
false,
false
false,
null
)
)
);

View File

@ -31,7 +31,7 @@ import java.util.List;
public class TaskConfig
{
public static final List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
"org.apache.hadoop:hadoop-client:2.3.0"
"org.apache.hadoop:hadoop-client:2.7.3"
);
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");

View File

@ -66,6 +66,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@ -550,6 +551,12 @@ public class IndexTaskTest
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
}, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(),
indexMerger, indexIO, null, null, indexMergerV9
)

View File

@ -54,8 +54,10 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
public class SameIntervalMergeTaskTest
@ -200,6 +202,12 @@ public class SameIntervalMergeTaskTest
segments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return null;
}
}, null, null, null, null, null, null, null, null, null, new SegmentLoader()
{
@Override

View File

@ -93,6 +93,7 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@ -231,6 +232,12 @@ public class IngestSegmentFirehoseFactoryTest
{
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
},
new DataSegmentKiller()
{

View File

@ -122,6 +122,7 @@ import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
@ -477,6 +478,12 @@ public class TaskLifecycleTest
pushedSegments++;
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
}
@ -1017,6 +1024,12 @@ public class TaskLifecycleTest
{
throw new RuntimeException("FAILURE");
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
tb = setUpTaskToolboxFactory(dataSegmentPusher, handoffNotifierFactory, mdc);

View File

@ -26,6 +26,8 @@ import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Set;
public class TestDataSegmentPusher implements DataSegmentPusher
@ -52,6 +54,12 @@ public class TestDataSegmentPusher implements DataSegmentPusher
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
public Set<DataSegment> getPushedSegments()
{
return ImmutableSet.copyOf(pushedSegments);

View File

@ -71,7 +71,7 @@
<netty.version>4.1.6.Final</netty.version>
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.3.0</hadoop.compile.version>
<hadoop.compile.version>2.7.3</hadoop.compile.version>
<hive.version>2.0.0</hive.version>
<powermock.version>1.6.6</powermock.version>
</properties>
@ -180,7 +180,9 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.10.21</version>
<!-- to use S3a via HDFS pusher/puller we need this version due to
https://issues.apache.org/jira/browse/HADOOP-12420 -->
<version>1.10.56</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>

View File

@ -33,7 +33,9 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.FileAlreadyExistsException;
import java.util.Map;
import java.util.UUID;
/**
@ -73,7 +75,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
{
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment);
final File baseStorageDir = config.getStorageDirectory();
final File outDir = new File(baseStorageDir, storageDir);
@ -86,7 +88,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
}
return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outDir))
segment.withLoadSpec(makeLoadSpec(outDir.toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
outDir
@ -98,7 +100,7 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
final long size = compressSegment(dataSegmentFile, tmpOutDir);
final DataSegment dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip")))
segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip").toURI()))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
tmpOutDir
@ -118,6 +120,12 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
return dataSegment;
}
@Override
public Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath)
{
return ImmutableMap.<String, Object>of("type", "local", "path", finalIndexZipFilePath.getPath());
}
private String intermediateDirFor(String storageDir)
{
return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString();
@ -138,9 +146,4 @@ public class LocalDataSegmentPusher implements DataSegmentPusher
Files.copy(ByteStreams.newInputStreamSupplier(jsonMapper.writeValueAsBytes(segment)), descriptorFile);
return segment;
}
private ImmutableMap<String, Object> makeLoadSpec(File outFile)
{
return ImmutableMap.<String, Object>of("type", "local", "path", outFile.toString());
}
}

View File

@ -91,7 +91,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
public StorageLocation findStorageLocationIfLoaded(final DataSegment segment)
{
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
if (localStorageDir.exists()) {
return location;
}
@ -124,7 +124,7 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
public File getSegmentFiles(DataSegment segment) throws SegmentLoadingException
{
StorageLocation loc = findStorageLocationIfLoaded(segment);
String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
String storageDir = DataSegmentPusher.getDefaultStorageDir(segment);
if (loc == null) {
loc = loadSegmentWithRetry(segment, storageDir);
@ -233,11 +233,11 @@ public class SegmentLoaderLocalCacheManager implements SegmentLoader
// in this case, findStorageLocationIfLoaded() will think segment is located in the failed storageDir which is actually not.
// So we should always clean all possible locations here
for (StorageLocation location : getSortedList(locations)) {
File localStorageDir = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
File localStorageDir = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
if (localStorageDir.exists()) {
// Druid creates folders of the form dataSource/interval/version/partitionNum.
// We need to clean up all these directories if they are all empty.
File cacheFile = new File(location.getPath(), DataSegmentPusherUtil.getStorageDir(segment));
File cacheFile = new File(location.getPath(), DataSegmentPusher.getDefaultStorageDir(segment));
cleanupCacheFiles(location.getPath(), cacheFile);
location.removeSegment(segment);
}

View File

@ -88,14 +88,14 @@ public class LocalDataSegmentPusherTest
Assert.assertEquals(dataSegment2, returnSegment2);
Assert.assertNotEquals(
DataSegmentPusherUtil.getStorageDir(dataSegment),
DataSegmentPusherUtil.getStorageDir(dataSegment2)
localDataSegmentPusher.getStorageDir(dataSegment),
localDataSegmentPusher.getStorageDir(dataSegment2)
);
for (DataSegment returnSegment : ImmutableList.of(returnSegment1, returnSegment2)) {
File outDir = new File(
config.getStorageDirectory(),
DataSegmentPusherUtil.getStorageDir(returnSegment)
localDataSegmentPusher.getStorageDir(returnSegment)
);
File versionFile = new File(outDir, "index.zip");
File descriptorJson = new File(outDir, "descriptor.json");

View File

@ -61,6 +61,7 @@ import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@ -186,6 +187,12 @@ public class AppenderatorTester implements AutoCloseable
pushedSegments.add(segment);
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
throw new UnsupportedOperationException();
}
};
appenderator = Appenderators.createRealtime(
schema,

View File

@ -20,6 +20,7 @@
package io.druid.cli;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Inject;
import com.google.inject.Module;
@ -39,7 +40,9 @@ import io.druid.timeline.DataSegment;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
@ -152,6 +155,12 @@ public class CliRealtimeExample extends ServerRunnable
{
return segment;
}
@Override
public Map<String, Object> makeLoadSpec(URI uri)
{
return ImmutableMap.of();
}
}
private static class NoopDataSegmentAnnouncer implements DataSegmentAnnouncer