diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index 20177a080da..54c9efe4c11 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -20,14 +20,25 @@ package io.druid.indexer.hadoop; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.common.ISE; +import com.metamx.common.Pair; import com.metamx.common.logger.Logger; +import io.druid.collections.CountingMap; import io.druid.data.input.InputRow; import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.JobHelper; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; @@ -36,9 +47,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicLong; public class DatasourceInputFormat extends InputFormat { @@ -89,9 +104,11 @@ public class DatasourceInputFormat extends InputFormat List list = new ArrayList<>(); long size = 0; + JobConf dummyConf = new JobConf(); + org.apache.hadoop.mapred.InputFormat fio = supplier.get(); for (WindowedDataSegment segment : segments) { if (size + segment.getSegment().getSize() > maxSize && size > 0) { - splits.add(new DatasourceInputSplit(list)); + splits.add(toDataSourceSplit(list, fio, dummyConf)); list = Lists.newArrayList(); size = 0; } @@ -101,7 +118,7 @@ public class DatasourceInputFormat extends InputFormat } if (list.size() > 0) { - splits.add(new DatasourceInputSplit(list)); + splits.add(toDataSourceSplit(list, fio, dummyConf)); } logger.info("Number of splits [%d]", splits.size()); @@ -116,4 +133,85 @@ public class DatasourceInputFormat extends InputFormat { return new DatasourceRecordReader(); } + + private Supplier supplier = new Supplier() + { + @Override + public org.apache.hadoop.mapred.InputFormat get() + { + return new TextInputFormat(); + } + }; + + @VisibleForTesting + DatasourceInputFormat setSupplier(Supplier supplier) { + this.supplier = supplier; + return this; + } + + private DatasourceInputSplit toDataSourceSplit( + List segments, + org.apache.hadoop.mapred.InputFormat fio, + JobConf conf + ) + { + String[] locations = null; + try { + locations = getFrequentLocations(segments, fio, conf); + } + catch (Exception e) { + logger.error("Exception thrown finding location of splits", e); + } + return new DatasourceInputSplit(segments, locations); + } + + private String[] getFrequentLocations( + List segments, + org.apache.hadoop.mapred.InputFormat fio, + JobConf conf + ) throws IOException + { + Iterable locations = Collections.emptyList(); + for (WindowedDataSegment segment : segments) { + FileInputFormat.setInputPaths(conf, new Path(JobHelper.getURIFromSegment(segment.getSegment()))); + for (org.apache.hadoop.mapred.InputSplit split : fio.getSplits(conf, 1)) { + locations = Iterables.concat(locations, Arrays.asList(split.getLocations())); + } + } + return getFrequentLocations(locations); + } + + private static String[] getFrequentLocations(Iterable hosts) { + + final CountingMap counter = new CountingMap<>(); + for (String location : hosts) { + counter.add(location, 1); + } + + final TreeSet> sorted = Sets.>newTreeSet( + new Comparator>() + { + @Override + public int compare(Pair o1, Pair o2) + { + int compare = o2.lhs.compareTo(o1.lhs); // descending + if (compare == 0) { + compare = o1.rhs.compareTo(o2.rhs); // ascending + } + return compare; + } + } + ); + + for (Map.Entry entry : counter.entrySet()) { + sorted.add(Pair.of(entry.getValue().get(), entry.getKey())); + } + + // use default replication factor, if possible + final List locations = Lists.newArrayListWithCapacity(3); + for (Pair frequent : Iterables.limit(sorted, 3)) { + locations.add(frequent.rhs); + } + return locations.toArray(new String[locations.size()]); + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java index 9a6ff3bfc00..caacaf73c09 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputSplit.java @@ -33,17 +33,21 @@ import java.util.List; public class DatasourceInputSplit extends InputSplit implements Writable { + private static final String[] EMPTY_STR_ARRAY = new String[0]; + private List segments = null; + private String[] locations = null; //required for deserialization public DatasourceInputSplit() { } - public DatasourceInputSplit(@NotNull List segments) + public DatasourceInputSplit(@NotNull List segments, String[] locations) { Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments"); this.segments = segments; + this.locations = locations == null ? EMPTY_STR_ARRAY : locations; } @Override @@ -59,7 +63,7 @@ public class DatasourceInputSplit extends InputSplit implements Writable @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[]{}; + return locations; } public List getSegments() @@ -71,6 +75,10 @@ public class DatasourceInputSplit extends InputSplit implements Writable public void write(DataOutput out) throws IOException { out.writeUTF(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments)); + out.writeInt(locations.length); + for (String location : locations) { + out.writeUTF(location); + } } @Override @@ -82,5 +90,9 @@ public class DatasourceInputSplit extends InputSplit implements Writable { } ); + locations = new String[in.readInt()]; + for (int i = 0; i < locations.length; i++) { + locations[i] = in.readUTF(); + } } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java index e02e0858c80..5983733f80c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/updater/HadoopConverterJob.java @@ -593,7 +593,7 @@ public class HadoopConverterJob @Override public InputSplit apply(DataSegment input) { - return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input))); + return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)), null); } } ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java index b70f27427d3..71b95a5986c 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputFormatTest.java @@ -19,13 +19,24 @@ package io.druid.indexer.hadoop; +import com.google.api.client.util.Maps; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import io.druid.indexer.JobHelper; import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.easymock.EasyMock; @@ -34,13 +45,17 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.Arrays; import java.util.List; +import java.util.Map; /** */ public class DatasourceInputFormatTest { private List segments; + private List locations; private Configuration config; private JobContext context; @@ -98,6 +113,36 @@ public class DatasourceInputFormatTest ) ); + Path path1 = new Path(JobHelper.getURIFromSegment(segments.get(0).getSegment())); + Path path2 = new Path(JobHelper.getURIFromSegment(segments.get(1).getSegment())); + Path path3 = new Path(JobHelper.getURIFromSegment(segments.get(2).getSegment())); + + // dummy locations for test + locations = ImmutableList.of( + new LocatedFileStatus( + 1000, false, 0, 0, 0, 0, null, null, null, null, path1, + new BlockLocation[]{ + new BlockLocation(null, new String[]{"s1", "s2"}, 0, 600), + new BlockLocation(null, new String[]{"s2", "s3"}, 600, 400) + } + ), + new LocatedFileStatus( + 4000, false, 0, 0, 0, 0, null, null, null, null, path2, + new BlockLocation[]{ + new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000), + new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200), + new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100), + new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700), + } + ), + new LocatedFileStatus( + 500, false, 0, 0, 0, 0, null, null, null, null, path3, + new BlockLocation[]{ + new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500) + } + ) + ); + config = new Configuration(); config.set( DatasourceInputFormat.CONF_INPUT_SEGMENTS, @@ -109,35 +154,75 @@ public class DatasourceInputFormatTest EasyMock.replay(context); } + private Supplier testFormatter = new Supplier() { + @Override + public InputFormat get() + { + final Map locationMap = Maps.newHashMap(); + for (LocatedFileStatus status : locations) { + locationMap.put(status.getPath().getName(), status); + } + + return new TextInputFormat() + { + @Override + protected boolean isSplitable(FileSystem fs, Path file) { + return false; + } + + @Override + protected FileStatus[] listStatus(JobConf job) throws IOException + { + Path[] dirs = getInputPaths(job); + if (dirs.length == 0) { + throw new IOException("No input paths specified in job"); + } + FileStatus[] status = new FileStatus[dirs.length]; + for (int i = 0; i < dirs.length; i++) { + status[i] = locationMap.get(dirs[i].getName()); + } + return status; + } + }; + } + }; + @Test public void testGetSplitsNoCombining() throws Exception { - List splits = new DatasourceInputFormat().getSplits(context); + DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter); + List splits = inputFormat.getSplits(context); Assert.assertEquals(segments.size(), splits.size()); for (int i = 0; i < segments.size(); i++) { - Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0)); + DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i); + Assert.assertEquals(segments.get(i), split.getSegments().get(0)); } + Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(0).getLocations()); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(1).getLocations()); + Assert.assertArrayEquals(new String[] {"s2", "s3"}, splits.get(2).getLocations()); } @Test public void testGetSplitsAllCombined() throws Exception { config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999"); - List splits = new DatasourceInputFormat().getSplits(context); + List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); Assert.assertEquals(1, splits.size()); Assert.assertEquals( Sets.newHashSet(segments), Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) ); + + Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations()); } @Test public void testGetSplitsCombineInTwo() throws Exception { config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6"); - List splits = new DatasourceInputFormat().getSplits(context); + List splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context); Assert.assertEquals(2, splits.size()); @@ -145,11 +230,13 @@ public class DatasourceInputFormatTest Sets.newHashSet(segments.get(0), segments.get(2)), Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments())) ); + Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations()); Assert.assertEquals( Sets.newHashSet(segments.get(1)), Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments())) ); + Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations()); } @Test diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java index c39dc718b69..dc17d198360 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceInputSplitTest.java @@ -59,7 +59,8 @@ public class DatasourceInputSplitTest ), interval ) - ) + ), + new String[] { "server1", "server2", "server3"} ); ByteArrayDataOutput out = ByteStreams.newDataOutput(); @@ -70,6 +71,7 @@ public class DatasourceInputSplitTest actual.readFields(in); Assert.assertEquals(expected.getSegments(), actual.getSegments()); + Assert.assertArrayEquals(expected.getLocations(), actual.getLocations()); Assert.assertEquals(12334, actual.getLength()); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java index e4fb03092d2..0d99dbb947d 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/hadoop/DatasourceRecordReaderTest.java @@ -55,7 +55,7 @@ public class DatasourceRecordReaderTest this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath() ) ); - InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment))); + InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)), null); Configuration config = new Configuration(); config.set(