Do persist IncrementalIndex in another thread in IndexGeneratorReducer

This commit is contained in:
binlijin 2016-01-20 09:20:09 +08:00
parent cb8f714f82
commit 8e43e2c446
12 changed files with 114 additions and 10 deletions

View File

@ -207,6 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)| |useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)| |jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)| |buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
### Partitioning specification ### Partitioning specification

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec;
@ -43,6 +44,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false; private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE; private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0;
public static HadoopTuningConfig makeDefaultTuningConfig() public static HadoopTuningConfig makeDefaultTuningConfig()
{ {
@ -61,7 +63,8 @@ public class HadoopTuningConfig implements TuningConfig
false, false,
false, false,
null, null,
DEFAULT_BUILD_V9_DIRECTLY DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_PERSIST_BACKGROUND_COUNT
); );
} }
@ -79,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean combineText; private final boolean combineText;
private final boolean useCombiner; private final boolean useCombiner;
private final Boolean buildV9Directly; private final Boolean buildV9Directly;
private final int persistBackgroundCount;
@JsonCreator @JsonCreator
public HadoopTuningConfig( public HadoopTuningConfig(
@ -97,7 +101,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("useCombiner") Boolean useCombiner, final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922 // See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT, final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount
) )
{ {
this.workingPath = workingPath; this.workingPath = workingPath;
@ -116,6 +121,8 @@ public class HadoopTuningConfig implements TuningConfig
this.combineText = combineText; this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly; this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount;
Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0");
} }
@JsonProperty @JsonProperty
@ -201,6 +208,12 @@ public class HadoopTuningConfig implements TuningConfig
return buildV9Directly; return buildV9Directly;
} }
@JsonProperty
public int getPersistBackgroundCount()
{
return persistBackgroundCount;
}
public HadoopTuningConfig withWorkingPath(String path) public HadoopTuningConfig withWorkingPath(String path)
{ {
return new HadoopTuningConfig( return new HadoopTuningConfig(
@ -218,7 +231,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText, combineText,
useCombiner, useCombiner,
null, null,
buildV9Directly buildV9Directly,
persistBackgroundCount
); );
} }
@ -239,7 +253,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText, combineText,
useCombiner, useCombiner,
null, null,
buildV9Directly buildV9Directly,
persistBackgroundCount
); );
} }
@ -260,7 +275,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText, combineText,
useCombiner, useCombiner,
null, null,
buildV9Directly buildV9Directly,
persistBackgroundCount
); );
} }
} }

View File

@ -29,9 +29,15 @@ import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction; import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing; import com.google.common.hash.Hashing;
import com.google.common.primitives.Longs; import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.ISE; import com.metamx.common.ISE;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow; import io.druid.data.input.InputRow;
import io.druid.data.input.Row; import io.druid.data.input.Row;
import io.druid.data.input.Rows; import io.druid.data.input.Rows;
@ -73,6 +79,15 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/** /**
*/ */
@ -531,6 +546,8 @@ public class IndexGeneratorJob implements Jobby
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
ListeningExecutorService persistExecutor = null;
List<ListenableFuture<?>> persistFutures = Lists.newArrayList();
IncrementalIndex index = makeIncrementalIndex( IncrementalIndex index = makeIncrementalIndex(
bucket, bucket,
combiningAggs, combiningAggs,
@ -550,6 +567,35 @@ public class IndexGeneratorJob implements Jobby
Set<String> allDimensionNames = Sets.newLinkedHashSet(); Set<String> allDimensionNames = Sets.newLinkedHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context); final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount();
if (persistBackgroundCount > 0) {
final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
persistBackgroundCount,
persistBackgroundCount,
0L,
TimeUnit.MILLISECONDS,
queue,
Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
}
for (final BytesWritable bw : values) { for (final BytesWritable bw : values) {
context.progress(); context.progress();
@ -575,9 +621,29 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(file); toMerge.add(file);
context.progress(); context.progress();
persist(index, interval, file, progressIndicator); final IncrementalIndex persistIndex = index;
// close this index and make a new one, reusing same buffer persistFutures.add(
index.close(); persistExecutor.submit(
new ThreadRenamingRunnable(String.format("%s-persist", file.getName()))
{
@Override
public void doRun()
{
try {
persist(persistIndex, interval, file, progressIndicator);
}
catch (Exception e) {
log.error("persist index error", e);
throw Throwables.propagate(e);
}
finally {
// close this index
persistIndex.close();
}
}
}
)
);
index = makeIncrementalIndex( index = makeIncrementalIndex(
bucket, bucket,
@ -611,6 +677,9 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(finalFile); toMerge.add(finalFile);
} }
Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS);
persistExecutor.shutdown();
for (File file : toMerge) { for (File file : toMerge) {
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file)); indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
} }
@ -665,8 +734,17 @@ public class IndexGeneratorJob implements Jobby
FileUtils.deleteDirectory(file); FileUtils.deleteDirectory(file);
} }
} }
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
catch (TimeoutException e) {
throw Throwables.propagate(e);
}
finally { finally {
index.close(); index.close();
if (persistExecutor != null) {
persistExecutor.shutdownNow();
}
} }
} }
} }

View File

@ -382,6 +382,7 @@ public class BatchDeltaIngestionTest
false, false,
false, false,
null, null,
null,
null null
) )
) )

View File

@ -161,6 +161,7 @@ public class DetermineHashedPartitionsJobTest
false, false,
false, false,
null, null,
null,
null null
) )
); );

View File

@ -265,6 +265,7 @@ public class DeterminePartitionsJobTest
false, false,
false, false,
null, null,
null,
null null
) )
) )

View File

@ -208,6 +208,7 @@ public class HadoopDruidIndexerConfigTest
false, false,
false, false,
null, null,
null,
null null
) )
); );

View File

@ -54,6 +54,7 @@ public class HadoopTuningConfigTest
true, true,
true, true,
null, null,
null,
null null
); );
@ -72,6 +73,7 @@ public class HadoopTuningConfigTest
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties()); Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText()); Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.getUseCombiner()); Assert.assertEquals(true, actual.getUseCombiner());
Assert.assertEquals(0, actual.getPersistBackgroundCount());
} }

View File

@ -502,7 +502,8 @@ public class IndexGeneratorJobTest
false, false,
useCombiner, useCombiner,
null, null,
buildV9Directly buildV9Directly,
null
) )
) )
); );

View File

@ -116,6 +116,7 @@ public class JobHelperTest
false, false,
false, false,
null, null,
null,
null null
) )
) )

View File

@ -120,7 +120,7 @@ public class GranularityPathSpecTest
jsonMapper jsonMapper
), ),
new HadoopIOConfig(null, null, null), new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null) new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null)
); );
granularityPathSpec.setDataGranularity(Granularity.HOUR); granularityPathSpec.setDataGranularity(Granularity.HOUR);

View File

@ -202,6 +202,7 @@ public class HadoopConverterJobTest
false, false,
false, false,
null, null,
null,
null null
) )
) )