Merge pull request #2149 from binlijin/master

Do persist IncrementalIndex in another thread in IndexGeneratorReducer
This commit is contained in:
Charles Allen 2016-01-20 17:06:42 -08:00
commit 2a69a58570
12 changed files with 114 additions and 10 deletions

View File

@ -207,6 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
### Partitioning specification

View File

@ -22,6 +22,7 @@ package io.druid.indexer;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
@ -43,6 +44,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0;
public static HadoopTuningConfig makeDefaultTuningConfig()
{
@ -61,7 +63,8 @@ public class HadoopTuningConfig implements TuningConfig
false,
false,
null,
DEFAULT_BUILD_V9_DIRECTLY
DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_PERSIST_BACKGROUND_COUNT
);
}
@ -79,6 +82,7 @@ public class HadoopTuningConfig implements TuningConfig
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;
private final int persistBackgroundCount;
@JsonCreator
public HadoopTuningConfig(
@ -97,7 +101,8 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount
)
{
this.workingPath = workingPath;
@ -116,6 +121,8 @@ public class HadoopTuningConfig implements TuningConfig
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount;
Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0");
}
@JsonProperty
@ -201,6 +208,12 @@ public class HadoopTuningConfig implements TuningConfig
return buildV9Directly;
}
@JsonProperty
public int getPersistBackgroundCount()
{
return persistBackgroundCount;
}
public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
@ -218,7 +231,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}
@ -239,7 +253,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}
@ -260,7 +275,8 @@ public class HadoopTuningConfig implements TuningConfig
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}
}

View File

@ -29,9 +29,15 @@ import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
@ -73,6 +79,15 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
*/
@ -531,6 +546,8 @@ public class IndexGeneratorJob implements Jobby
final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
ListeningExecutorService persistExecutor = null;
List<ListenableFuture<?>> persistFutures = Lists.newArrayList();
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
@ -550,6 +567,35 @@ public class IndexGeneratorJob implements Jobby
Set<String> allDimensionNames = Sets.newLinkedHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount();
if (persistBackgroundCount > 0) {
final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
persistBackgroundCount,
persistBackgroundCount,
0L,
TimeUnit.MILLISECONDS,
queue,
Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"),
new RejectedExecutionHandler()
{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
}
for (final BytesWritable bw : values) {
context.progress();
@ -575,9 +621,29 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(file);
context.progress();
persist(index, interval, file, progressIndicator);
// close this index and make a new one, reusing same buffer
index.close();
final IncrementalIndex persistIndex = index;
persistFutures.add(
persistExecutor.submit(
new ThreadRenamingRunnable(String.format("%s-persist", file.getName()))
{
@Override
public void doRun()
{
try {
persist(persistIndex, interval, file, progressIndicator);
}
catch (Exception e) {
log.error("persist index error", e);
throw Throwables.propagate(e);
}
finally {
// close this index
persistIndex.close();
}
}
}
)
);
index = makeIncrementalIndex(
bucket,
@ -611,6 +677,9 @@ public class IndexGeneratorJob implements Jobby
toMerge.add(finalFile);
}
Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS);
persistExecutor.shutdown();
for (File file : toMerge) {
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
@ -665,8 +734,17 @@ public class IndexGeneratorJob implements Jobby
FileUtils.deleteDirectory(file);
}
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
catch (TimeoutException e) {
throw Throwables.propagate(e);
}
finally {
index.close();
if (persistExecutor != null) {
persistExecutor.shutdownNow();
}
}
}
}

View File

@ -382,6 +382,7 @@ public class BatchDeltaIngestionTest
false,
false,
null,
null,
null
)
)

View File

@ -161,6 +161,7 @@ public class DetermineHashedPartitionsJobTest
false,
false,
null,
null,
null
)
);

View File

@ -265,6 +265,7 @@ public class DeterminePartitionsJobTest
false,
false,
null,
null,
null
)
)

View File

@ -208,6 +208,7 @@ public class HadoopDruidIndexerConfigTest
false,
false,
null,
null,
null
)
);

View File

@ -54,6 +54,7 @@ public class HadoopTuningConfigTest
true,
true,
null,
null,
null
);
@ -72,6 +73,7 @@ public class HadoopTuningConfigTest
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.getUseCombiner());
Assert.assertEquals(0, actual.getPersistBackgroundCount());
}

View File

@ -502,7 +502,8 @@ public class IndexGeneratorJobTest
false,
useCombiner,
null,
buildV9Directly
buildV9Directly,
null
)
)
);

View File

@ -116,6 +116,7 @@ public class JobHelperTest
false,
false,
null,
null,
null
)
)

View File

@ -120,7 +120,7 @@ public class GranularityPathSpecTest
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null)
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null)
);
granularityPathSpec.setDataGranularity(Granularity.HOUR);

View File

@ -202,6 +202,7 @@ public class HadoopConverterJobTest
false,
false,
null,
null,
null
)
)