Fix reindexing of segments in Google Cloud Storage (#3788)

Google Cloud Storage allows `:` in paths. For this reason `google` was
not added to da007ca3c2/indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java (L585)
Normally this is not an issue but when reindexing segments the Hadoop
code for `getSplits` trips up on the `:` and returns:
`Relative path in absolute URI`
This patch URL encodes the `:` character making it work in Hadoop as
well.
This commit is contained in:
Erik Dubbelboer 2016-12-21 02:16:33 +01:00 committed by Fangjin Yang
parent c5df30d813
commit c0c34f82ad
2 changed files with 37 additions and 1 deletions

View File

@ -737,7 +737,16 @@ public class JobHelper
} else if ("hdfs".equals(type)) {
segmentLocURI = URI.create(loadSpec.get("path").toString());
} else if ("google".equals(type)) {
segmentLocURI = URI.create(String.format("gs://%s/%s", loadSpec.get("bucket"), loadSpec.get("path")));
// Segment names contain : in their path.
// Google Cloud Storage supports : but Hadoop does not.
// This becomes an issue when re-indexing using the current segments.
// The Hadoop getSplits code doesn't understand the : and returns "Relative path in absolute URI"
// This could be fixed using the same code that generates path names for hdfs segments using
// getHdfsStorageDir. But that wouldn't fix this issue for people who already have segments with ":".
// Because of this we just URL encode the : making everything work as it should.
segmentLocURI = URI.create(
String.format("gs://%s/%s", loadSpec.get("bucket"), loadSpec.get("path").toString().replace(":", "%3A"))
);
} else if ("local".equals(type)) {
try {
segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);

View File

@ -31,6 +31,8 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.Interval;
@ -42,6 +44,8 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
@ -144,7 +148,30 @@ public class JobHelperTest
);
}
@Test
public void testGoogleGetURIFromSegment() throws URISyntaxException
{
DataSegment segment = new DataSegment(
"test1",
Interval.parse("2000/3000"),
"ver",
ImmutableMap.<String, Object>of(
"type", "google",
"bucket", "test-test",
"path", "tmp/foo:bar/index1.zip"
),
ImmutableList.<String>of(),
ImmutableList.<String>of(),
NoneShardSpec.instance(),
9,
1024
);
Assert.assertEquals(
new URI("gs://test-test/tmp/foo%3Abar/index1.zip"),
JobHelper.getURIFromSegment(segment)
);
}
private static class HadoopDruidIndexerConfigSpy extends HadoopDruidIndexerConfig
{