fix worker node still using db connection

This commit is contained in:
Fangjin Yang 2013-01-27 09:57:37 -08:00
parent 1cff766e2a
commit 74057600f9
12 changed files with 159 additions and 151 deletions

View File

@ -51,11 +51,9 @@ public class KillTask extends AbstractTask
public TaskStatus run(TaskContext context, TaskToolbox toolbox, TaskCallback callback) throws Exception
{
// Kill segments
Set<DataSegment> segmentsToKill = ImmutableSet.copyOf(
toolbox.getSegmentKiller()
.kill(getDataSource(), getInterval())
);
toolbox.getSegmentKiller()
.kill(context.getUnusedSegments());
return TaskStatus.success(getId()).withSegmentsNuked(segmentsToKill);
return TaskStatus.success(getId()).withSegmentsNuked(context.getUnusedSegments());
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.metamx.common.logger.Logger;
@ -31,12 +32,16 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.ResultIterator;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import javax.annotation.Nullable;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
@ -63,7 +68,7 @@ public class MergerDBCoordinator
this.dbi = dbi;
}
public List<DataSegment> getSegmentsForInterval(final String dataSource, final Interval interval) throws IOException
public List<DataSegment> getUsedSegmentsForInterval(final String dataSource, final Interval interval) throws IOException
{
synchronized (lock) {
@ -209,6 +214,58 @@ public class MergerDBCoordinator
}
}
);
}
public List<DataSegment> getUnusedSegmentsForInterval(final String dataSource, final Interval interval)
{
List<DataSegment> matchingSegments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
dbConnectorConfig.getSegmentTable()
)
)
.bind("dataSource", dataSource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.fold(
Lists.<DataSegment>newArrayList(),
new Folder3<List<DataSegment>, Map<String, Object>>()
{
@Override
public List<DataSegment> fold(
List<DataSegment> accumulator,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(
(String) stringObjectMap.get("payload"),
DataSegment.class
);
accumulator.add(segment);
return accumulator;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), dataSource, interval);
return matchingSegments;
}
}

View File

@ -463,13 +463,13 @@ public class RemoteTaskRunner implements TaskRunner
if (callback != null) {
callback.notify(taskStatus);
}
}
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
if (taskStatus.isComplete()) {
// Worker is done with this task
workerWrapper.setLastCompletedTaskTime(new DateTime());
tasks.remove(taskId);
cf.delete().guaranteed().inBackground().forPath(event.getData().getPath());
}
}
}
}

View File

@ -32,15 +32,18 @@ public class TaskContext
{
final String version;
final Set<DataSegment> currentSegments;
final Set<DataSegment> unusedSegments;
@JsonCreator
public TaskContext(
@JsonProperty("version") String version,
@JsonProperty("currentSegments") Set<DataSegment> currentSegments
@JsonProperty("currentSegments") Set<DataSegment> currentSegments,
@JsonProperty("unusedSegments") Set<DataSegment> unusedSegments
)
{
this.version = version;
this.currentSegments = currentSegments;
this.unusedSegments = unusedSegments;
}
@JsonProperty
@ -54,4 +57,10 @@ public class TaskContext
{
return currentSegments;
}
@JsonProperty
public Set<DataSegment> getUnusedSegments()
{
return unusedSegments;
}
}

View File

@ -109,7 +109,7 @@ public class TaskConsumer implements Runnable
.emit();
// Retry would be nice, but only after we have a way to throttle and limit them. Just fail for now.
if(!shutdown) {
if (!shutdown) {
queue.notify(task, TaskStatus.failure(task.getId()));
}
}
@ -127,7 +127,13 @@ public class TaskConsumer implements Runnable
final TaskContext context = new TaskContext(
version,
ImmutableSet.copyOf(
mergerDBCoordinator.getSegmentsForInterval(
mergerDBCoordinator.getUsedSegmentsForInterval(
task.getDataSource(),
task.getInterval()
)
),
ImmutableSet.copyOf(
mergerDBCoordinator.getUnusedSegmentsForInterval(
task.getDataSource(),
task.getInterval()
)
@ -169,23 +175,24 @@ public class TaskConsumer implements Runnable
// If we're not supposed to be running anymore, don't do anything. Somewhat racey if the flag gets set after
// we check and before we commit the database transaction, but better than nothing.
if(shutdown) {
if (shutdown) {
log.info("Abandoning task due to shutdown: %s", task.getId());
return;
}
queue.notify(task, statusFromRunner, new Runnable()
queue.notify(
task, statusFromRunner, new Runnable()
{
@Override
public void run()
{
try {
if(statusFromRunner.getSegments().size() > 0) {
if (statusFromRunner.getSegments().size() > 0) {
// TODO -- Publish in transaction
publishSegments(task, context, statusFromRunner.getSegments());
}
if(statusFromRunner.getSegmentsNuked().size() > 0) {
if (statusFromRunner.getSegmentsNuked().size() > 0) {
deleteSegments(task, context, statusFromRunner.getSegmentsNuked());
}
}
@ -194,10 +201,11 @@ public class TaskConsumer implements Runnable
throw Throwables.propagate(e);
}
}
});
}
);
// Emit event and log, if the task is done
if(statusFromRunner.isComplete()) {
if (statusFromRunner.isComplete()) {
int segmentBytes = 0;
for (DataSegment segment : statusFromRunner.getSegments()) {
segmentBytes += segment.getSize();
@ -226,7 +234,8 @@ public class TaskConsumer implements Runnable
statusFromRunner.getDuration()
);
}
} catch(Exception e) {
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task callback")
.addData("task", task.getId())
.addData("statusCode", statusFromRunner.getStatusCode())

View File

@ -424,10 +424,7 @@ public class IndexerCoordinatorNode extends RegisteringNode
jsonMapper
);
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
s3Client
);
taskToolbox = new TaskToolbox(config, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}
@ -487,8 +484,6 @@ public class IndexerCoordinatorNode extends RegisteringNode
public void initializeWorkerSetupManager()
{
if (workerSetupManager == null) {
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
final DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final WorkerSetupManagerConfig workerSetupManagerConfig = configFactory.build(WorkerSetupManagerConfig.class);
DbConnector.createConfigTable(dbi, workerSetupManagerConfig.getConfigTable());

View File

@ -29,15 +29,16 @@ import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
import com.metamx.druid.RegisteringNode;
import com.metamx.druid.db.DbConnector;
import com.metamx.druid.db.DbConnectorConfig;
import com.metamx.druid.http.StatusServlet;
import com.metamx.druid.initialization.CuratorConfig;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.initialization.ServerConfig;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.loading.S3SegmentKiller;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentKiller;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.merger.common.TaskToolbox;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.metamx.druid.merger.common.index.StaticS3FirehoseFactory;
@ -46,9 +47,6 @@ import com.metamx.druid.merger.worker.TaskMonitor;
import com.metamx.druid.merger.worker.Worker;
import com.metamx.druid.merger.worker.WorkerCuratorCoordinator;
import com.metamx.druid.merger.worker.config.WorkerConfig;
import com.metamx.druid.loading.S3SegmentPusher;
import com.metamx.druid.loading.S3SegmentPusherConfig;
import com.metamx.druid.loading.SegmentPusher;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.core.Emitters;
@ -73,7 +71,6 @@ import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.DefaultServlet;
import org.mortbay.jetty.servlet.ServletHolder;
import org.skife.config.ConfigurationObjectFactory;
import org.skife.jdbi.v2.DBI;
import java.io.IOException;
import java.util.Arrays;
@ -298,13 +295,8 @@ public class WorkerNode extends RegisteringNode
configFactory.build(S3SegmentPusherConfig.class),
jsonMapper
);
final DbConnectorConfig dbConnectorConfig = configFactory.build(DbConnectorConfig.class);
DBI dbi = new DbConnector(dbConnectorConfig).getDBI();
final SegmentKiller segmentKiller = new S3SegmentKiller(
s3Client,
dbi,
dbConnectorConfig,
jsonMapper
s3Client
);
taskToolbox = new TaskToolbox(coordinatorConfig, emitter, s3Client, segmentPusher, segmentKiller, jsonMapper);
}

View File

@ -1,7 +1,6 @@
package com.metamx.druid.merger.coordinator;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
@ -135,7 +134,7 @@ public class RemoteTaskRunnerTest
{
remoteTaskRunner.run(
task1,
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()),
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet(), Sets.<DataSegment>newHashSet()),
null
);
}
@ -143,9 +142,25 @@ public class RemoteTaskRunnerTest
@Test
public void testAlreadyExecutedTask() throws Exception
{
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
remoteTaskRunner.run(
task1,
new TaskContext(
new DateTime().toString(),
Sets.<DataSegment>newHashSet(),
Sets.<DataSegment>newHashSet()
),
null
);
try {
remoteTaskRunner.run(task1, new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()), null);
remoteTaskRunner.run(
task1,
new TaskContext(
new DateTime().toString(),
Sets.<DataSegment>newHashSet(),
Sets.<DataSegment>newHashSet()
),
null
);
fail("ISE expected");
}
catch (ISE expected) {
@ -175,7 +190,7 @@ public class RemoteTaskRunnerTest
)
), Lists.<AggregatorFactory>newArrayList()
),
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet()),
new TaskContext(new DateTime().toString(), Sets.<DataSegment>newHashSet(), Sets.<DataSegment>newHashSet()),
null
);
}

View File

@ -41,30 +41,32 @@ public class TaskConsumerTest
tc.start();
try {
tq.add(new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public Type getType()
{
return Type.TEST;
}
tq.add(
new AbstractTask("id1", "id1", "ds", new Interval("2012-01-01/P1D"))
{
@Override
public Type getType()
{
return Type.TEST;
}
@Override
public TaskStatus run(
TaskContext context, TaskToolbox toolbox, TaskCallback callback
) throws Exception
{
return TaskStatus.success(getId()).withSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource("ds")
.interval(new Interval("2012-01-01/P1D"))
.version(context.getVersion())
.build()
)
);
}
});
@Override
public TaskStatus run(
TaskContext context, TaskToolbox toolbox, TaskCallback callback
) throws Exception
{
return TaskStatus.success(getId()).withSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource("ds")
.interval(new Interval("2012-01-01/P1D"))
.version(context.getVersion())
.build()
)
);
}
}
);
while (ts.getStatus("id1").get().isRunnable()) {
Thread.sleep(100);
@ -97,7 +99,13 @@ public class TaskConsumerTest
}
@Override
public List<DataSegment> getSegmentsForInterval(String dataSource, Interval interval) throws IOException
public List<DataSegment> getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException
{
return ImmutableList.of();
}
@Override
public List<DataSegment> getUnusedSegmentsForInterval(String dataSource, Interval interval)
{
return ImmutableList.of();
}

View File

@ -408,7 +408,7 @@ public class TaskQueueTest
}
};
callback.notify(vt.getTask().run(new TaskContext(vt.getVersion(), null), null, callback));
callback.notify(vt.getTask().run(new TaskContext(vt.getVersion(), null, null), null, callback));
// OK, finally ready to test stuff.
Assert.assertTrue("pass1", structThingy.pass1);

View File

@ -1,26 +1,13 @@
package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.metamx.common.ISE;
import com.metamx.common.MapUtils;
import com.metamx.common.logger.Logger;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.db.DbConnectorConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.joda.time.Interval;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.StatementContext;
import org.skife.jdbi.v2.tweak.HandleCallback;
import java.sql.SQLException;
import java.util.List;
import java.util.Collection;
import java.util.Map;
/**
@ -30,80 +17,20 @@ public class S3SegmentKiller implements SegmentKiller
private static final Logger log = new Logger(S3SegmentKiller.class);
private final RestS3Service s3Client;
private final DBI dbi;
private final DbConnectorConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3SegmentKiller(
RestS3Service s3Client,
DBI dbi,
DbConnectorConfig config,
ObjectMapper jsonMapper
RestS3Service s3Client
)
{
this.s3Client = s3Client;
this.dbi = dbi;
this.config = config;
this.jsonMapper = jsonMapper;
}
@Override
public List<DataSegment> kill(final String datasource, final Interval interval) throws ServiceException
public void kill(Collection<DataSegment> segments) throws ServiceException
{
// TODO -- Awkward for workers to use the DB!
List<DataSegment> matchingSegments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
public List<DataSegment> withHandle(Handle handle) throws Exception
{
return handle.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = :dataSource and start >= :start and end <= :end and used = 0",
config.getSegmentTable()
)
)
.bind("dataSource", datasource)
.bind("start", interval.getStart().toString())
.bind("end", interval.getEnd().toString())
.fold(
Lists.<DataSegment>newArrayList(),
new Folder3<List<DataSegment>, Map<String, Object>>()
{
@Override
public List<DataSegment> fold(
List<DataSegment> accumulator,
Map<String, Object> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
DataSegment segment = jsonMapper.readValue(
(String) stringObjectMap.get("payload"),
DataSegment.class
);
accumulator.add(segment);
return accumulator;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
);
log.info("Found %,d segments for %s for interval %s.", matchingSegments.size(), datasource, interval);
for (final DataSegment segment : matchingSegments) {
// Remove from S3
for (final DataSegment segment : segments) {
Map<String, Object> loadSpec = segment.getLoadSpec();
String s3Bucket = MapUtils.getString(loadSpec, "bucket");
String s3Path = MapUtils.getString(loadSpec, "key");
@ -118,7 +45,5 @@ public class S3SegmentKiller implements SegmentKiller
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
return matchingSegments;
}
}

View File

@ -2,13 +2,13 @@ package com.metamx.druid.loading;
import com.metamx.druid.client.DataSegment;
import org.jets3t.service.ServiceException;
import org.joda.time.Interval;
import java.util.Collection;
import java.util.List;
/**
*/
public interface SegmentKiller
{
public List<DataSegment> kill(String datasource, Interval interval) throws ServiceException;
public void kill(Collection<DataSegment> segments) throws ServiceException;
}