Best effort to find locations for input splits

This commit is contained in:
navis.ryu 2016-01-07 17:20:10 +09:00
parent dfc631c2d0
commit f03f7fb625
6 changed files with 210 additions and 11 deletions

View File

@ -20,14 +20,25 @@
package io.druid.indexer.hadoop;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import com.metamx.common.Pair;
import com.metamx.common.logger.Logger;
import io.druid.collections.CountingMap;
import io.druid.data.input.InputRow;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.JobHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
@ -36,9 +47,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
@ -89,9 +104,11 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
List<WindowedDataSegment> list = new ArrayList<>();
long size = 0;
JobConf dummyConf = new JobConf();
org.apache.hadoop.mapred.InputFormat fio = supplier.get();
for (WindowedDataSegment segment : segments) {
if (size + segment.getSegment().getSize() > maxSize && size > 0) {
splits.add(new DatasourceInputSplit(list));
splits.add(toDataSourceSplit(list, fio, dummyConf));
list = Lists.newArrayList();
size = 0;
}
@ -101,7 +118,7 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
}
if (list.size() > 0) {
splits.add(new DatasourceInputSplit(list));
splits.add(toDataSourceSplit(list, fio, dummyConf));
}
logger.info("Number of splits [%d]", splits.size());
@ -116,4 +133,85 @@ public class DatasourceInputFormat extends InputFormat<NullWritable, InputRow>
{
return new DatasourceRecordReader();
}
private Supplier<org.apache.hadoop.mapred.InputFormat> supplier = new Supplier<org.apache.hadoop.mapred.InputFormat>()
{
@Override
public org.apache.hadoop.mapred.InputFormat get()
{
return new TextInputFormat();
}
};
@VisibleForTesting
DatasourceInputFormat setSupplier(Supplier<org.apache.hadoop.mapred.InputFormat> supplier) {
this.supplier = supplier;
return this;
}
private DatasourceInputSplit toDataSourceSplit(
List<WindowedDataSegment> segments,
org.apache.hadoop.mapred.InputFormat fio,
JobConf conf
)
{
String[] locations = null;
try {
locations = getFrequentLocations(segments, fio, conf);
}
catch (Exception e) {
logger.error("Exception thrown finding location of splits", e);
}
return new DatasourceInputSplit(segments, locations);
}
private String[] getFrequentLocations(
List<WindowedDataSegment> segments,
org.apache.hadoop.mapred.InputFormat fio,
JobConf conf
) throws IOException
{
Iterable<String> locations = Collections.emptyList();
for (WindowedDataSegment segment : segments) {
FileInputFormat.setInputPaths(conf, new Path(JobHelper.getURIFromSegment(segment.getSegment())));
for (org.apache.hadoop.mapred.InputSplit split : fio.getSplits(conf, 1)) {
locations = Iterables.concat(locations, Arrays.asList(split.getLocations()));
}
}
return getFrequentLocations(locations);
}
private static String[] getFrequentLocations(Iterable<String> hosts) {
final CountingMap<String> counter = new CountingMap<>();
for (String location : hosts) {
counter.add(location, 1);
}
final TreeSet<Pair<Long, String>> sorted = Sets.<Pair<Long, String>>newTreeSet(
new Comparator<Pair<Long, String>>()
{
@Override
public int compare(Pair<Long, String> o1, Pair<Long, String> o2)
{
int compare = o2.lhs.compareTo(o1.lhs); // descending
if (compare == 0) {
compare = o1.rhs.compareTo(o2.rhs); // ascending
}
return compare;
}
}
);
for (Map.Entry<String, AtomicLong> entry : counter.entrySet()) {
sorted.add(Pair.of(entry.getValue().get(), entry.getKey()));
}
// use default replication factor, if possible
final List<String> locations = Lists.newArrayListWithCapacity(3);
for (Pair<Long, String> frequent : Iterables.limit(sorted, 3)) {
locations.add(frequent.rhs);
}
return locations.toArray(new String[locations.size()]);
}
}

View File

@ -33,17 +33,21 @@ import java.util.List;
public class DatasourceInputSplit extends InputSplit implements Writable
{
private static final String[] EMPTY_STR_ARRAY = new String[0];
private List<WindowedDataSegment> segments = null;
private String[] locations = null;
//required for deserialization
public DatasourceInputSplit()
{
}
public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments)
public DatasourceInputSplit(@NotNull List<WindowedDataSegment> segments, String[] locations)
{
Preconditions.checkArgument(segments != null && segments.size() > 0, "no segments");
this.segments = segments;
this.locations = locations == null ? EMPTY_STR_ARRAY : locations;
}
@Override
@ -59,7 +63,7 @@ public class DatasourceInputSplit extends InputSplit implements Writable
@Override
public String[] getLocations() throws IOException, InterruptedException
{
return new String[]{};
return locations;
}
public List<WindowedDataSegment> getSegments()
@ -71,6 +75,10 @@ public class DatasourceInputSplit extends InputSplit implements Writable
public void write(DataOutput out) throws IOException
{
out.writeUTF(HadoopDruidIndexerConfig.JSON_MAPPER.writeValueAsString(segments));
out.writeInt(locations.length);
for (String location : locations) {
out.writeUTF(location);
}
}
@Override
@ -82,5 +90,9 @@ public class DatasourceInputSplit extends InputSplit implements Writable
{
}
);
locations = new String[in.readInt()];
for (int i = 0; i < locations.length; i++) {
locations[i] = in.readUTF();
}
}
}

View File

@ -593,7 +593,7 @@ public class HadoopConverterJob
@Override
public InputSplit apply(DataSegment input)
{
return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)));
return new DatasourceInputSplit(ImmutableList.of(WindowedDataSegment.of(input)), null);
}
}
);

View File

@ -19,13 +19,24 @@
package io.druid.indexer.hadoop;
import com.google.api.client.util.Maps;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import io.druid.indexer.JobHelper;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.easymock.EasyMock;
@ -34,13 +45,17 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
/**
*/
public class DatasourceInputFormatTest
{
private List<WindowedDataSegment> segments;
private List<LocatedFileStatus> locations;
private Configuration config;
private JobContext context;
@ -98,6 +113,36 @@ public class DatasourceInputFormatTest
)
);
Path path1 = new Path(JobHelper.getURIFromSegment(segments.get(0).getSegment()));
Path path2 = new Path(JobHelper.getURIFromSegment(segments.get(1).getSegment()));
Path path3 = new Path(JobHelper.getURIFromSegment(segments.get(2).getSegment()));
// dummy locations for test
locations = ImmutableList.of(
new LocatedFileStatus(
1000, false, 0, 0, 0, 0, null, null, null, null, path1,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 600),
new BlockLocation(null, new String[]{"s2", "s3"}, 600, 400)
}
),
new LocatedFileStatus(
4000, false, 0, 0, 0, 0, null, null, null, null, path2,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s1", "s2"}, 0, 1000),
new BlockLocation(null, new String[]{"s1", "s3"}, 1000, 1200),
new BlockLocation(null, new String[]{"s2", "s3"}, 2200, 1100),
new BlockLocation(null, new String[]{"s1", "s2"}, 3300, 700),
}
),
new LocatedFileStatus(
500, false, 0, 0, 0, 0, null, null, null, null, path3,
new BlockLocation[]{
new BlockLocation(null, new String[]{"s2", "s3"}, 0, 500)
}
)
);
config = new Configuration();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
@ -109,35 +154,75 @@ public class DatasourceInputFormatTest
EasyMock.replay(context);
}
private Supplier<InputFormat> testFormatter = new Supplier<InputFormat>() {
@Override
public InputFormat get()
{
final Map<String, LocatedFileStatus> locationMap = Maps.newHashMap();
for (LocatedFileStatus status : locations) {
locationMap.put(status.getPath().getName(), status);
}
return new TextInputFormat()
{
@Override
protected boolean isSplitable(FileSystem fs, Path file) {
return false;
}
@Override
protected FileStatus[] listStatus(JobConf job) throws IOException
{
Path[] dirs = getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
FileStatus[] status = new FileStatus[dirs.length];
for (int i = 0; i < dirs.length; i++) {
status[i] = locationMap.get(dirs[i].getName());
}
return status;
}
};
}
};
@Test
public void testGetSplitsNoCombining() throws Exception
{
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
DatasourceInputFormat inputFormat = new DatasourceInputFormat().setSupplier(testFormatter);
List<InputSplit> splits = inputFormat.getSplits(context);
Assert.assertEquals(segments.size(), splits.size());
for (int i = 0; i < segments.size(); i++) {
Assert.assertEquals(segments.get(i), ((DatasourceInputSplit) splits.get(i)).getSegments().get(0));
DatasourceInputSplit split = (DatasourceInputSplit) splits.get(i);
Assert.assertEquals(segments.get(i), split.getSegments().get(0));
}
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(0).getLocations());
Assert.assertArrayEquals(new String[] {"s1", "s2"}, splits.get(1).getLocations());
Assert.assertArrayEquals(new String[] {"s2", "s3"}, splits.get(2).getLocations());
}
@Test
public void testGetSplitsAllCombined() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "999999");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(1, splits.size());
Assert.assertEquals(
Sets.newHashSet(segments),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());
}
@Test
public void testGetSplitsCombineInTwo() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "6");
List<InputSplit> splits = new DatasourceInputFormat().getSplits(context);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);
Assert.assertEquals(2, splits.size());
@ -145,11 +230,13 @@ public class DatasourceInputFormatTest
Sets.newHashSet(segments.get(0), segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s1", "s3"}, splits.get(0).getLocations());
Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
}
@Test

View File

@ -59,7 +59,8 @@ public class DatasourceInputSplitTest
),
interval
)
)
),
new String[] { "server1", "server2", "server3"}
);
ByteArrayDataOutput out = ByteStreams.newDataOutput();
@ -70,6 +71,7 @@ public class DatasourceInputSplitTest
actual.readFields(in);
Assert.assertEquals(expected.getSegments(), actual.getSegments());
Assert.assertArrayEquals(expected.getLocations(), actual.getLocations());
Assert.assertEquals(12334, actual.getLength());
}
}

View File

@ -55,7 +55,7 @@ public class DatasourceRecordReaderTest
this.getClass().getClassLoader().getResource("test-segment/index.zip").getPath()
)
);
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)));
InputSplit split = new DatasourceInputSplit(Lists.newArrayList(WindowedDataSegment.of(segment)), null);
Configuration config = new Configuration();
config.set(