1) Fix bugs with VersionConverterTask

2) Fix bugs with NPEs on indexing
This commit is contained in:
Eric Tschetter 2013-03-15 13:48:55 -05:00
parent 39449e6a3e
commit a933438e4e
18 changed files with 212 additions and 121 deletions

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.druid.client.DataSegment;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.response.InputStreamResponseHandler;
@ -106,6 +107,10 @@ public class IndexingServiceClient
{
try {
final ServiceInstance instance = serviceProvider.getInstance();
if (instance == null) {
throw new ISE("Cannot find instance of indexingService");
}
return String.format("http://%s:%s/mmx/merger/v1", instance.getAddress(), instance.getPort());
}
catch (Exception e) {

View File

@ -71,6 +71,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<!-- Tests -->
<dependency>

View File

@ -204,10 +204,31 @@ public class IndexIO
final int version = getVersionFromDir(toConvert);
switch (version) {
case 1:
case 2:
case 3:
final String mappableDirName = "mappable";
if (toConvert.getName().equals(mappableDirName)) {
throw new ISE("Infinite recursion at play! OMFG quit it, please, it hurts!");
}
File mappable = new File(toConvert, mappableDirName);
final Index index = readIndex(toConvert);
storeLatest(index, mappable);
return convertSegment(mappable, converted);
case 4:
case 5:
case 6:
case 7:
log.info("Old version, re-persisting.");
IndexMerger.append(Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))), converted);
return true;
case 8:
DefaultIndexIOHandler.convertV8toV9(toConvert, converted);
return true;
default:
log.info("Version[%s], skipping.", version);
return false;
}
}

View File

@ -310,9 +310,11 @@ public class IndexMerger
throw new ISE("Couldn't make outdir[%s].", outDir);
}
/*
if (indexes.size() < 2) {
throw new ISE("Too few indexes provided for append [%d].", indexes.size());
}
*/
final List<String> mergedDimensions = mergeIndexed(
Lists.transform(

View File

@ -19,12 +19,12 @@
package com.metamx.druid.index.v1;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.column.BitmapIndex;
import com.metamx.druid.index.column.Column;
@ -44,6 +44,7 @@ import org.joda.time.Interval;
import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
@ -52,13 +53,35 @@ import java.util.Set;
*/
public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
private static final Logger log = new Logger(QueryableIndexIndexableAdapter.class);
private final int numRows;
private final QueryableIndex input;
private final List<String> availableDimensions;
public QueryableIndexIndexableAdapter(QueryableIndex input)
{
this.input = input;
numRows = input.getNumRows();
// It appears possible that the dimensions have some columns listed which do not have a DictionaryEncodedColumn
// This breaks current logic, but should be fine going forward. This is a work-around to make things work
// in the current state. This code shouldn't be needed once github tracker issue #55 is finished.
this.availableDimensions = Lists.newArrayList();
for (String dim : input.getAvailableDimensions()) {
final Column col = input.getColumn(dim);
if (col == null) {
log.warn("Wtf!? column[%s] didn't exist!?!?!?", dim);
}
else if (col.getDictionaryEncoding() != null) {
availableDimensions.add(dim);
}
else {
log.info("No dictionary on dimension[%s]", dim);
}
}
}
@Override
@ -76,7 +99,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
@Override
public Indexed<String> getAvailableDimensions()
{
return input.getAvailableDimensions();
return new ListIndexed<String>(availableDimensions, String.class);
}
@Override
@ -161,7 +184,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
dimensions = Maps.newLinkedHashMap();
for (String dim : input.getAvailableDimensions()) {
for (String dim : getAvailableDimensions()) {
dimensions.put(dim, input.getColumn(dim).getDictionaryEncoding());
}

View File

@ -182,25 +182,4 @@
<artifactId>curator-test</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<outputFile>
${project.build.directory}/${project.artifactId}-${project.version}-selfcontained.jar
</outputFile>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -34,6 +34,7 @@ import com.metamx.druid.loading.SegmentLoadingException;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.actions.SegmentInsertAction;
import com.metamx.druid.merger.common.actions.SegmentListUsedAction;
import com.metamx.druid.merger.common.actions.SpawnTasksAction;
import com.metamx.druid.merger.common.actions.TaskActionClient;
import org.joda.time.DateTime;
@ -77,7 +78,7 @@ public class VersionConverterTask extends AbstractTask
}
@JsonCreator
private VersionConverterTask(
private static VersionConverterTask createFromJson(
@JsonProperty("id") String id,
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@ -85,12 +86,26 @@ public class VersionConverterTask extends AbstractTask
@JsonProperty("segment") DataSegment segment
)
{
super(
id,
groupId,
dataSource,
interval
);
if (id == null) {
if (segment == null) {
return create(dataSource, interval);
}
else {
return create(segment);
}
}
return new VersionConverterTask(id, groupId, dataSource, interval, segment);
}
private VersionConverterTask(
String id,
String groupId,
String dataSource,
Interval interval,
DataSegment segment
)
{
super(id, groupId, dataSource, interval);
this.segment = segment;
}
@ -122,6 +137,7 @@ public class VersionConverterTask extends AbstractTask
@Override
public TaskStatus preflight(TaskToolbox toolbox) throws Exception
{
log.info("HLKFJDSLKFJDSKLJFLKDSF -- Preflight for segment[%s]", segment);
if (segment != null) {
return super.preflight(toolbox);
}
@ -224,6 +240,21 @@ public class VersionConverterTask extends AbstractTask
throws SegmentLoadingException, IOException
{
log.info("Converting segment[%s]", segment);
final TaskActionClient actionClient = toolbox.getTaskActionClient();
final List<DataSegment> currentSegments = actionClient.submit(
new SegmentListUsedAction(segment.getDataSource(), segment.getInterval())
);
for (DataSegment currentSegment : currentSegments) {
final String version = currentSegment.getVersion();
final Integer binaryVersion = currentSegment.getBinaryVersion();
if (version.startsWith(segment.getVersion()) && CURR_VERSION_INTEGER.equals(binaryVersion)) {
log.info("Skipping already updated segment[%s].", segment);
return;
}
}
final Map<DataSegment, File> localSegments = toolbox.getSegments(Arrays.asList(segment));
final File location = localSegments.get(segment);
@ -236,8 +267,7 @@ public class VersionConverterTask extends AbstractTask
DataSegment updatedSegment = segment.withVersion(String.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
toolbox.getTaskActionClient()
.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)).withAllowOlderVersions(true));
actionClient.submit(new SegmentInsertAction(Sets.newHashSet(updatedSegment)).withAllowOlderVersions(true));
} else {
log.info("Conversion failed.");
}

View File

@ -19,20 +19,33 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.RetryPolicy;
import com.metamx.druid.merger.common.TaskCallback;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.TaskToolboxFactory;
import com.metamx.druid.merger.common.task.Task;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.mortbay.thread.ThreadPool;
import javax.annotation.Nullable;
import java.io.File;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Runs tasks in a JVM thread using an ExecutorService.
@ -42,6 +55,8 @@ public class LocalTaskRunner implements TaskRunner
private final TaskToolboxFactory toolboxFactory;
private final ExecutorService exec;
private final Set<TaskRunnerWorkItem> runningItems = new ConcurrentSkipListSet<TaskRunnerWorkItem>();
private static final Logger log = new Logger(LocalTaskRunner.class);
public LocalTaskRunner(
@ -64,65 +79,39 @@ public class LocalTaskRunner implements TaskRunner
{
final TaskToolbox toolbox = toolboxFactory.build(task);
exec.submit(
new Runnable()
{
@Override
public void run()
{
final long startTime = System.currentTimeMillis();
TaskStatus status;
try {
log.info("Running task: %s", task.getId());
status = task.run(toolbox);
}
catch (InterruptedException e) {
log.error(e, "Interrupted while running task[%s]", task);
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception while running task[%s]", task);
status = TaskStatus.failure(task.getId());
}
catch (Throwable t) {
log.error(t, "Uncaught Throwable while running task[%s]", task);
throw Throwables.propagate(t);
}
try {
final File taskDir = toolbox.getTaskDir();
if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir);
}
}
catch (Exception e) {
log.error(e, "Failed to delete task directory: %s", task.getId());
}
try {
callback.notify(status.withDuration(System.currentTimeMillis() - startTime));
} catch(Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e);
}
}
}
);
exec.submit(new LocalTaskRunnerRunnable(task, toolbox, callback));
}
@Override
public Collection<TaskRunnerWorkItem> getRunningTasks()
{
return Lists.newArrayList();
return runningItems;
}
@Override
public Collection<TaskRunnerWorkItem> getPendingTasks()
{
if (exec instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec;
return Lists.newArrayList(
FunctionalIterable.create(tpe.getQueue())
.keep(
new Function<Runnable, TaskRunnerWorkItem>()
{
@Override
public TaskRunnerWorkItem apply(Runnable input)
{
if (input instanceof LocalTaskRunnerRunnable) {
return ((LocalTaskRunnerRunnable) input).getTaskRunnerWorkItem();
}
return null;
}
}
)
);
}
return Lists.newArrayList();
}
@ -131,4 +120,76 @@ public class LocalTaskRunner implements TaskRunner
{
return Lists.newArrayList();
}
private static class LocalTaskRunnerRunnable implements Runnable
{
private final Task task;
private final TaskToolbox toolbox;
private final TaskCallback callback;
private final DateTime createdTime;
public LocalTaskRunnerRunnable(Task task, TaskToolbox toolbox, TaskCallback callback)
{
this.task = task;
this.toolbox = toolbox;
this.callback = callback;
this.createdTime = new DateTime();
}
@Override
public void run()
{
final long startTime = System.currentTimeMillis();
TaskStatus status;
try {
log.info("Running task: %s", task.getId());
status = task.run(toolbox);
}
catch (InterruptedException e) {
log.error(e, "Interrupted while running task[%s]", task);
throw Throwables.propagate(e);
}
catch (Exception e) {
log.error(e, "Exception while running task[%s]", task);
status = TaskStatus.failure(task.getId());
}
catch (Throwable t) {
log.error(t, "Uncaught Throwable while running task[%s]", task);
throw Throwables.propagate(t);
}
try {
final File taskDir = toolbox.getTaskDir();
if (taskDir.exists()) {
log.info("Removing task directory: %s", taskDir);
FileUtils.deleteDirectory(taskDir);
}
}
catch (Exception e) {
log.error(e, "Failed to delete task directory: %s", task.getId());
}
try {
callback.notify(status.withDuration(System.currentTimeMillis() - startTime));
} catch(Exception e) {
log.error(e, "Uncaught Exception during callback for task[%s]", task);
throw Throwables.propagate(e);
}
}
public TaskRunnerWorkItem getTaskRunnerWorkItem()
{
return new TaskRunnerWorkItem(
task,
callback,
null,
createdTime
);
}
}
}

View File

@ -102,6 +102,11 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
<artifactId>compress-lzf</artifactId>

View File

@ -182,41 +182,4 @@
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
</args>
</configuration>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>testCompile</goal>
</goals>
<phase>test-compile</phase>
</execution>
<execution>
<goals>
<goal>compile</goal>
</goals>
<phase>process-resources</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -63,12 +63,10 @@
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>