From 8e43e2c4460710158f3ed61ea2794282a1223d5f Mon Sep 17 00:00:00 2001 From: binlijin Date: Wed, 20 Jan 2016 09:20:09 +0800 Subject: [PATCH] Do persist IncrementalIndex in another thread in IndexGeneratorReducer --- docs/content/ingestion/batch-ingestion.md | 1 + .../io/druid/indexer/HadoopTuningConfig.java | 26 ++++-- .../io/druid/indexer/IndexGeneratorJob.java | 84 ++++++++++++++++++- .../indexer/BatchDeltaIngestionTest.java | 1 + .../DetermineHashedPartitionsJobTest.java | 1 + .../indexer/DeterminePartitionsJobTest.java | 1 + .../indexer/HadoopDruidIndexerConfigTest.java | 1 + .../druid/indexer/HadoopTuningConfigTest.java | 2 + .../druid/indexer/IndexGeneratorJobTest.java | 3 +- .../java/io/druid/indexer/JobHelperTest.java | 1 + .../indexer/path/GranularityPathSpecTest.java | 2 +- .../updater/HadoopConverterJobTest.java | 1 + 12 files changed, 114 insertions(+), 10 deletions(-) diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index d2661269969..d0b9cc8f5bf 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -207,6 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)| |buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)| +|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| ### Partitioning specification diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 522511d0a77..fd92fbe02d8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -22,6 +22,7 @@ package io.druid.indexer; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; @@ -43,6 +44,7 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final boolean DEFAULT_USE_COMBINER = false; private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE; + private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0; public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -61,7 +63,8 @@ public class HadoopTuningConfig implements TuningConfig false, false, null, - DEFAULT_BUILD_V9_DIRECTLY + DEFAULT_BUILD_V9_DIRECTLY, + DEFAULT_PERSIST_BACKGROUND_COUNT ); } @@ -79,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig private final boolean combineText; private final boolean useCombiner; private final Boolean buildV9Directly; + private final int persistBackgroundCount; @JsonCreator public HadoopTuningConfig( @@ -97,7 +101,8 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("useCombiner") Boolean useCombiner, // See https://github.com/druid-io/druid/pull/1922 final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT, - final @JsonProperty("buildV9Directly") Boolean buildV9Directly + final @JsonProperty("buildV9Directly") Boolean buildV9Directly, + final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount ) { this.workingPath = workingPath; @@ -116,6 +121,8 @@ public class HadoopTuningConfig implements TuningConfig this.combineText = combineText; this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly; + this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount; + Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0"); } @JsonProperty @@ -201,6 +208,12 @@ public class HadoopTuningConfig implements TuningConfig return buildV9Directly; } + @JsonProperty + public int getPersistBackgroundCount() + { + return persistBackgroundCount; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -218,7 +231,8 @@ public class HadoopTuningConfig implements TuningConfig combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } @@ -239,7 +253,8 @@ public class HadoopTuningConfig implements TuningConfig combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } @@ -260,7 +275,8 @@ public class HadoopTuningConfig implements TuningConfig combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 0a651f73369..dbbf162610c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -29,9 +29,15 @@ import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; @@ -73,6 +79,15 @@ import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ @@ -531,6 +546,8 @@ public class IndexGeneratorJob implements Jobby final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); + ListeningExecutorService persistExecutor = null; + List> persistFutures = Lists.newArrayList(); IncrementalIndex index = makeIncrementalIndex( bucket, combiningAggs, @@ -550,6 +567,35 @@ public class IndexGeneratorJob implements Jobby Set allDimensionNames = Sets.newLinkedHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); + int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount(); + if (persistBackgroundCount > 0) { + final BlockingQueue queue = new SynchronousQueue<>(); + ExecutorService executorService = new ThreadPoolExecutor( + persistBackgroundCount, + persistBackgroundCount, + 0L, + TimeUnit.MILLISECONDS, + queue, + Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"), + new RejectedExecutionHandler() + { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) + { + try { + executor.getQueue().put(r); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Got Interrupted while adding to the Queue"); + } + } + } + ); + persistExecutor = MoreExecutors.listeningDecorator(executorService); + } else { + persistExecutor = MoreExecutors.sameThreadExecutor(); + } for (final BytesWritable bw : values) { context.progress(); @@ -575,9 +621,29 @@ public class IndexGeneratorJob implements Jobby toMerge.add(file); context.progress(); - persist(index, interval, file, progressIndicator); - // close this index and make a new one, reusing same buffer - index.close(); + final IncrementalIndex persistIndex = index; + persistFutures.add( + persistExecutor.submit( + new ThreadRenamingRunnable(String.format("%s-persist", file.getName())) + { + @Override + public void doRun() + { + try { + persist(persistIndex, interval, file, progressIndicator); + } + catch (Exception e) { + log.error("persist index error", e); + throw Throwables.propagate(e); + } + finally { + // close this index + persistIndex.close(); + } + } + } + ) + ); index = makeIncrementalIndex( bucket, @@ -611,6 +677,9 @@ public class IndexGeneratorJob implements Jobby toMerge.add(finalFile); } + Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS); + persistExecutor.shutdown(); + for (File file : toMerge) { indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file)); } @@ -665,8 +734,17 @@ public class IndexGeneratorJob implements Jobby FileUtils.deleteDirectory(file); } } + catch (ExecutionException e) { + throw Throwables.propagate(e); + } + catch (TimeoutException e) { + throw Throwables.propagate(e); + } finally { index.close(); + if (persistExecutor != null) { + persistExecutor.shutdownNow(); + } } } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 6860d703446..30122001791 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -382,6 +382,7 @@ public class BatchDeltaIngestionTest false, false, null, + null, null ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 7aec077c1a9..51696be32f5 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -161,6 +161,7 @@ public class DetermineHashedPartitionsJobTest false, false, null, + null, null ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index aad381e7720..7aaa24fffe4 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -265,6 +265,7 @@ public class DeterminePartitionsJobTest false, false, null, + null, null ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 9ce7ff16aba..66a1d6a3049 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -208,6 +208,7 @@ public class HadoopDruidIndexerConfigTest false, false, null, + null, null ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index bd632c3c252..6264da09289 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -54,6 +54,7 @@ public class HadoopTuningConfigTest true, true, null, + null, null ); @@ -72,6 +73,7 @@ public class HadoopTuningConfigTest Assert.assertEquals(ImmutableMap.of(), actual.getJobProperties()); Assert.assertEquals(true, actual.isCombineText()); Assert.assertEquals(true, actual.getUseCombiner()); + Assert.assertEquals(0, actual.getPersistBackgroundCount()); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 42dbeddba39..e1a5238d139 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -502,7 +502,8 @@ public class IndexGeneratorJobTest false, useCombiner, null, - buildV9Directly + buildV9Directly, + null ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 4116ea29464..024ef33742a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -116,6 +116,7 @@ public class JobHelperTest false, false, null, + null, null ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index 77e31e79fe1..aef6ddd5400 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -120,7 +120,7 @@ public class GranularityPathSpecTest jsonMapper ), new HadoopIOConfig(null, null, null), - new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null) + new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null) ); granularityPathSpec.setDataGranularity(Granularity.HOUR); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 126a37b1bfa..3c7e92df897 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -202,6 +202,7 @@ public class HadoopConverterJobTest false, false, null, + null, null ) )