diff --git a/common/src/main/java/com/metamx/druid/guava/Runnables.java b/common/src/main/java/com/metamx/druid/guava/Runnables.java new file mode 100644 index 00000000000..8c41886d349 --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guava/Runnables.java @@ -0,0 +1,37 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guava; + +/** + */ +public class Runnables +{ + public static Runnable threadNaming(final String threadName, final Runnable runnable) + { + return new ThreadRenamingRunnable(threadName) + { + @Override + public void doRun() + { + runnable.run(); + } + }; + } +} diff --git a/common/src/main/java/com/metamx/druid/guava/ThreadRenamingRunnable.java b/common/src/main/java/com/metamx/druid/guava/ThreadRenamingRunnable.java new file mode 100644 index 00000000000..1175d6e795a --- /dev/null +++ b/common/src/main/java/com/metamx/druid/guava/ThreadRenamingRunnable.java @@ -0,0 +1,52 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.guava; + +/** + */ +public abstract class ThreadRenamingRunnable implements Runnable +{ + private final String name; + + public ThreadRenamingRunnable( + String name + ) + { + this.name = name; + } + + @Override + public final void run() + { + final Thread currThread = Thread.currentThread(); + String currName = currThread.getName(); + try { + currThread.setName(name); + doRun(); + } + finally { + currThread.setName(currName); + } + + } + + public abstract void doRun(); +} + diff --git a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java index 5659758591b..4bc092a7904 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/RealtimePlumberSchool.java @@ -33,9 +33,11 @@ import com.metamx.common.Pair; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.FunctionalIterable; import com.metamx.druid.Query; +import com.metamx.druid.StorageAdapter; import com.metamx.druid.client.DataSegment; import com.metamx.druid.client.DruidServer; import com.metamx.druid.client.ServerView; +import com.metamx.druid.guava.ThreadRenamingRunnable; import com.metamx.druid.index.v1.IndexGranularity; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; @@ -46,7 +48,6 @@ import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunnerFactory; import com.metamx.druid.query.QueryRunnerFactoryConglomerate; import com.metamx.druid.query.QueryToolChest; -import com.metamx.druid.shard.NoneShardSpec; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import com.metamx.emitter.service.ServiceMetricEvent; @@ -203,7 +204,7 @@ public class RealtimePlumberSchool implements PlumberSchool return ServerView.CallbackAction.CONTINUE; } - log.info("Checking segment[%s]", segment); + log.debug("Checking segment[%s] on server[%s]", segment, server); if (schema.getDataSource().equals(segment.getDataSource())) { final Interval interval = segment.getInterval(); for (Map.Entry entry : sinks.entrySet()) { @@ -239,10 +240,7 @@ public class RealtimePlumberSchool implements PlumberSchool log.info( "Expect to run at [%s]", new DateTime().plus( - new Duration( - System.currentTimeMillis(), - segmentGranularity.increment(truncatedNow) + windowMillis - ) + new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis) ) ); @@ -251,10 +249,10 @@ public class RealtimePlumberSchool implements PlumberSchool scheduledExecutor, new Duration(System.currentTimeMillis(), segmentGranularity.increment(truncatedNow) + windowMillis), new Duration(truncatedNow, segmentGranularity.increment(truncatedNow)), - new Runnable() + new ThreadRenamingRunnable(String.format("%s-overseer", schema.getDataSource())) { @Override - public void run() + public void doRun() { log.info("Starting merge and push."); @@ -272,11 +270,14 @@ public class RealtimePlumberSchool implements PlumberSchool for (final Map.Entry entry : sinksToPush) { final Sink sink = entry.getValue(); + final String threadName = String.format( + "%s-%s-persist-n-merge", schema.getDataSource(), new DateTime(entry.getKey()) + ); persistExecutor.execute( - new Runnable() + new ThreadRenamingRunnable(threadName) { @Override - public void run() + public void doRun() { final Interval interval = sink.getInterval(); @@ -290,17 +291,26 @@ public class RealtimePlumberSchool implements PlumberSchool final File mergedFile; try { - final File persistDir = computePersistDir(schema, interval); - - final File[] persistedIndexes = persistDir.listFiles(); List indexes = Lists.newArrayList(); - for (File persistedIndex : persistedIndexes) { - log.info("Adding index at [%s]", persistedIndex); - indexes.add(IndexIO.mapDir(persistedIndex)); + for (FireHydrant fireHydrant : sink) { + StorageAdapter adapter = fireHydrant.getAdapter(); + if (adapter instanceof MMappedIndexStorageAdapter) { + log.info("Adding hydrant[%s]", fireHydrant); + indexes.add(((MMappedIndexStorageAdapter) adapter).getIndex()); + } + else { + log.makeAlert("[%s] Failure to merge-n-push", schema.getDataSource()) + .addData("type", "Unknown adapter type") + .addData("adapterClass", adapter.getClass().toString()) + .emit(); + return; + } } mergedFile = IndexMerger.mergeMMapped( - indexes, schema.getAggregators(), new File(persistDir, "merged") + indexes, + schema.getAggregators(), + new File(computePersistDir(schema, interval), "merged") ); MMappedIndex index = IndexIO.mapDir(mergedFile); @@ -421,10 +431,10 @@ public class RealtimePlumberSchool implements PlumberSchool } persistExecutor.execute( - new Runnable() + new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) { @Override - public void run() + public void doRun() { for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount(persistHydrant(pair.lhs, schema, pair.rhs)); diff --git a/realtime/src/main/java/com/metamx/druid/realtime/Sink.java b/realtime/src/main/java/com/metamx/druid/realtime/Sink.java index 95d164f367f..051550e3c41 100644 --- a/realtime/src/main/java/com/metamx/druid/realtime/Sink.java +++ b/realtime/src/main/java/com/metamx/druid/realtime/Sink.java @@ -45,9 +45,7 @@ public class Sink implements Iterable { private static final Logger log = new Logger(Sink.class); - private volatile int swapCount = 0; private volatile FireHydrant currIndex; - private volatile boolean hasSwapped = false; private final Interval interval; private final Schema schema; @@ -82,7 +80,6 @@ public class Sink implements Iterable } this.hydrants.addAll(hydrants); - swapCount = hydrants.size(); makeNewCurrIndex(interval.getStartMillis(), schema); } @@ -109,7 +106,6 @@ public class Sink implements Iterable */ public FireHydrant swap() { - hasSwapped = true; return makeNewCurrIndex(interval.getStartMillis(), schema); } @@ -149,16 +145,15 @@ public class Sink implements Iterable ); FireHydrant old; - if (currIndex == null) { // Only happens on initialization... + if (currIndex == null) { // Only happens on initialization, cannot synchronize on null old = currIndex; - currIndex = new FireHydrant(newIndex, swapCount); + currIndex = new FireHydrant(newIndex, hydrants.size()); hydrants.add(currIndex); } else { synchronized (currIndex) { old = currIndex; - currIndex = new FireHydrant(newIndex, swapCount); + currIndex = new FireHydrant(newIndex, hydrants.size()); hydrants.add(currIndex); - ++swapCount; } } diff --git a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java index 61d2d895fc5..1e64950d308 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/MMappedIndexStorageAdapter.java @@ -62,6 +62,11 @@ public class MMappedIndexStorageAdapter extends BaseStorageAdapter this.index = index; } + public MMappedIndex getIndex() + { + return index; + } + @Override public String getSegmentIdentifier() {