Merge pull request #322 from metamx/filetasklogs

Filetasklogs
This commit is contained in:
fjy 2013-12-10 17:51:44 -08:00
commit 6c80de53b0
10 changed files with 142 additions and 11 deletions

View File

@ -23,22 +23,27 @@ import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.multibindings.MapBinder;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.indexing.common.tasklogs.FileTaskLogs;
import io.druid.tasklogs.NoopTaskLogs;
import io.druid.tasklogs.TaskLogPusher;
import io.druid.tasklogs.TaskLogs;
/**
*/
public class TaskLogsModule implements Module
public class IndexingServiceTaskLogsModule implements Module
{
@Override
public void configure(Binder binder)
{
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(NoopTaskLogs.class));
PolyBind.createChoice(binder, "druid.indexer.logs.type", Key.get(TaskLogs.class), Key.get(FileTaskLogs.class));
final MapBinder<String, TaskLogs> taskLogBinder = Binders.taskLogsBinder(binder);
taskLogBinder.addBinding("noop").to(NoopTaskLogs.class).in(LazySingleton.class);
taskLogBinder.addBinding("file").to(FileTaskLogs.class).in(LazySingleton.class);
binder.bind(NoopTaskLogs.class).in(LazySingleton.class);
binder.bind(FileTaskLogs.class).in(LazySingleton.class);
JsonConfigProvider.bind(binder, "druid.indexer.logs", FileTaskLogsConfig.class);
binder.bind(TaskLogPusher.class).to(TaskLogs.class);
}

View File

@ -0,0 +1,37 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
import java.io.File;
public class FileTaskLogsConfig
{
@JsonProperty
@NotNull
private File directory = new File("log");
public File getDirectory()
{
return directory;
}
}

View File

@ -0,0 +1,87 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012, 2013 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package io.druid.indexing.common.tasklogs;
import com.google.common.base.Optional;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.indexing.common.config.FileTaskLogsConfig;
import io.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
public class FileTaskLogs implements TaskLogs
{
private static final Logger log = new Logger(FileTaskLogs.class);
private final FileTaskLogsConfig config;
@Inject
public FileTaskLogs(
FileTaskLogsConfig config
)
{
this.config = config;
}
@Override
public void pushTaskLog(final String taskid, File file) throws IOException
{
if (!config.getDirectory().exists()) {
config.getDirectory().mkdir();
}
final File outputFile = fileForTask(taskid);
Files.copy(file, outputFile);
log.info("Wrote task log to: %s", outputFile);
}
@Override
public Optional<InputSupplier<InputStream>> streamTaskLog(final String taskid, final long offset) throws IOException
{
final File file = fileForTask(taskid);
if (file.exists()) {
return Optional.<InputSupplier<InputStream>>of(
new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
final InputStream inputStream = new FileInputStream(file);
ByteStreams.skipFully(inputStream, offset);
return inputStream;
}
}
);
} else {
return Optional.absent();
}
}
private File fileForTask(final String taskid)
{
return new File(config.getDirectory(), String.format("%s.log", taskid));
}
}

View File

@ -220,7 +220,7 @@ public class DbTaskStorage implements TaskStorage
}
@Override
public List<Task> getRunningTasks()
public List<Task> getActiveTasks()
{
return dbi.withHandle(
new HandleCallback<List<Task>>()

View File

@ -128,7 +128,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
}
@Override
public List<Task> getRunningTasks()
public List<Task> getActiveTasks()
{
giant.lock();

View File

@ -100,7 +100,7 @@ public class TaskQueue
// Get all running tasks and their locks
final Multimap<TaskLock, Task> tasksByLock = ArrayListMultimap.create();
for (final Task task : taskStorage.getRunningTasks()) {
for (final Task task : taskStorage.getActiveTasks()) {
try {
final List<TaskLock> taskLocks = taskStorage.getLocks(task.getId());

View File

@ -77,9 +77,9 @@ public interface TaskStorage
public List<TaskAction> getAuditLogs(String taskid);
/**
* Returns a list of currently-running tasks as stored in the storage facility, in no particular order.
* Returns a list of currently running or pending tasks as stored in the storage facility, in no particular order.
*/
public List<Task> getRunningTasks();
public List<Task> getActiveTasks();
/**
* Returns a list of locks for a particular task.

View File

@ -52,7 +52,6 @@ import io.druid.guice.QueryableModule;
import io.druid.guice.ServerModule;
import io.druid.guice.ServerViewModule;
import io.druid.guice.StorageNodeModule;
import io.druid.guice.TaskLogsModule;
import io.druid.guice.annotations.Client;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
@ -299,7 +298,6 @@ public class Initialization
new JacksonConfigManagerModule(),
new IndexingServiceDiscoveryModule(),
new DataSegmentPusherPullerModule(),
new TaskLogsModule(),
new FirehoseModule()
);

View File

@ -28,6 +28,7 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
@ -103,7 +104,8 @@ public class CliMiddleManager extends ServerRunnable
);
}
},
new IndexingServiceFirehoseModule()
new IndexingServiceFirehoseModule(),
new IndexingServiceTaskLogsModule()
);
}
}

View File

@ -32,6 +32,7 @@ import com.metamx.common.logger.Logger;
import io.airlift.command.Command;
import io.druid.guice.IndexingServiceFirehoseModule;
import io.druid.guice.IndexingServiceModuleHelper;
import io.druid.guice.IndexingServiceTaskLogsModule;
import io.druid.guice.JacksonConfigProvider;
import io.druid.guice.Jerseys;
import io.druid.guice.JsonConfigProvider;
@ -211,7 +212,8 @@ public class CliOverlord extends ServerRunnable
JsonConfigProvider.bind(binder, "druid.indexer.autoscale", SimpleResourceManagementConfig.class);
}
},
new IndexingServiceFirehoseModule()
new IndexingServiceFirehoseModule(),
new IndexingServiceTaskLogsModule()
);
}