From 6f06d701d95566b5bb510c370c3e6a883a29fe5d Mon Sep 17 00:00:00 2001 From: Jan Rudert Date: Thu, 4 Jul 2013 16:00:20 +0200 Subject: [PATCH 1/8] Building a correct segment path for HDFS --- .../druid/loading/HdfsDataSegmentPusher.java | 2 +- .../loading/HdfsDataSegmentPusherUtil.java | 50 +++++++++++++++++++ .../HdfsDataSegmentPusherUtilTest.java | 39 +++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java create mode 100644 server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java index 52ac15129d4..f62ee6d0604 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java @@ -42,7 +42,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = HdfsDataSegmentPusherUtil.getStorageDir(segment); Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); FileSystem fs = outFile.getFileSystem(hadoopConfig); diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java new file mode 100644 index 00000000000..88406574c52 --- /dev/null +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java @@ -0,0 +1,50 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.loading; + +import org.joda.time.format.ISODateTimeFormat; + +import com.google.common.base.Joiner; +import com.metamx.druid.client.DataSegment; + +/** + */ +public class HdfsDataSegmentPusherUtil +{ + private static final Joiner JOINER = Joiner.on("/").skipNulls(); + + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + public static String getStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_"), + segment.getShardSpec().getPartitionNum() + ); + } +} diff --git a/server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java b/server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java new file mode 100644 index 00000000000..868f86d2a60 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java @@ -0,0 +1,39 @@ +package com.metamx.druid.loading; + +import com.google.common.collect.ImmutableMap; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.shard.NoneShardSpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +/** + * @author jan.rudert + */ +public class HdfsDataSegmentPusherUtilTest { + @Test + public void testGetStorageDir() throws Exception { + + Interval interval = new Interval("2011-10-01/2011-10-02"); + ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); + + DataSegment segment = new DataSegment( + "something", + interval, + "1", + loadSpec, + Arrays.asList("dim1", "dim2"), + Arrays.asList("met1", "met2"), + new NoneShardSpec(), + IndexIO.CURRENT_VERSION_ID, + 1 + ); + + String storageDir = HdfsDataSegmentPusherUtil.getStorageDir(segment); + Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/1/0", storageDir); + + } +} From 18ec08185592751767976d51c7c498e9a757fb6a Mon Sep 17 00:00:00 2001 From: Jan Rudert Date: Mon, 8 Jul 2013 08:39:08 +0200 Subject: [PATCH 2/8] moved getHdfsStorageDir() to DataSegmentPusherUtil, extended test --- .../druid/loading/DataSegmentPusherUtil.java | 48 ++++++++++++------ .../druid/loading/HdfsDataSegmentPusher.java | 2 +- .../loading/HdfsDataSegmentPusherUtil.java | 50 ------------------- ...st.java => DataSegmentPusherUtilTest.java} | 10 ++-- 4 files changed, 40 insertions(+), 70 deletions(-) delete mode 100644 server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java rename server/src/test/java/com/metamx/druid/loading/{HdfsDataSegmentPusherUtilTest.java => DataSegmentPusherUtilTest.java} (78%) diff --git a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java index e72bd787bb3..349194836d9 100644 --- a/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java +++ b/server/src/main/java/com/metamx/druid/loading/DataSegmentPusherUtil.java @@ -19,6 +19,8 @@ package com.metamx.druid.loading; +import org.joda.time.format.ISODateTimeFormat; + import com.google.common.base.Joiner; import com.metamx.druid.client.DataSegment; @@ -26,19 +28,37 @@ import com.metamx.druid.client.DataSegment; */ public class DataSegmentPusherUtil { - private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private static final Joiner JOINER = Joiner.on("/").skipNulls(); - public static String getStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart(), - segment.getInterval().getEnd() - ), - segment.getVersion(), - segment.getShardSpec().getPartitionNum() - ); - } + public static String getStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart(), + segment.getInterval().getEnd() + ), + segment.getVersion(), + segment.getShardSpec().getPartitionNum() + ); + } + + /** + * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in + * path names. So we format paths differently for HDFS. + */ + public static String getHdfsStorageDir(DataSegment segment) + { + return JOINER.join( + segment.getDataSource(), + String.format( + "%s_%s", + segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), + segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) + ), + segment.getVersion().replaceAll(":", "_"), + segment.getShardSpec().getPartitionNum() + ); + } } diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java index f62ee6d0604..e232bbb8f08 100644 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java +++ b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusher.java @@ -42,7 +42,7 @@ public class HdfsDataSegmentPusher implements DataSegmentPusher @Override public DataSegment push(File inDir, DataSegment segment) throws IOException { - final String storageDir = HdfsDataSegmentPusherUtil.getStorageDir(segment); + final String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); Path outFile = new Path(String.format("%s/%s/index.zip", config.getStorageDirectory(), storageDir)); FileSystem fs = outFile.getFileSystem(hadoopConfig); diff --git a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java b/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java deleted file mode 100644 index 88406574c52..00000000000 --- a/server/src/main/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtil.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright (C) 2012 Metamarkets Group Inc. - * - * This program is free software; you can redistribute it and/or - * modify it under the terms of the GNU General Public License - * as published by the Free Software Foundation; either version 2 - * of the License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - */ - -package com.metamx.druid.loading; - -import org.joda.time.format.ISODateTimeFormat; - -import com.google.common.base.Joiner; -import com.metamx.druid.client.DataSegment; - -/** - */ -public class HdfsDataSegmentPusherUtil -{ - private static final Joiner JOINER = Joiner.on("/").skipNulls(); - - /** - * Due to https://issues.apache.org/jira/browse/HDFS-13 ":" are not allowed in - * path names. So we format paths differently for HDFS. - */ - public static String getStorageDir(DataSegment segment) - { - return JOINER.join( - segment.getDataSource(), - String.format( - "%s_%s", - segment.getInterval().getStart().toString(ISODateTimeFormat.basicDateTime()), - segment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime()) - ), - segment.getVersion().replaceAll(":", "_"), - segment.getShardSpec().getPartitionNum() - ); - } -} diff --git a/server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java similarity index 78% rename from server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java rename to server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java index 868f86d2a60..f367fceab47 100644 --- a/server/src/test/java/com/metamx/druid/loading/HdfsDataSegmentPusherUtilTest.java +++ b/server/src/test/java/com/metamx/druid/loading/DataSegmentPusherUtilTest.java @@ -13,9 +13,9 @@ import java.util.Arrays; /** * @author jan.rudert */ -public class HdfsDataSegmentPusherUtilTest { +public class DataSegmentPusherUtilTest { @Test - public void testGetStorageDir() throws Exception { + public void shouldNotHaveColonsInHdfsStorageDir() throws Exception { Interval interval = new Interval("2011-10-01/2011-10-02"); ImmutableMap loadSpec = ImmutableMap.of("something", "or_other"); @@ -23,7 +23,7 @@ public class HdfsDataSegmentPusherUtilTest { DataSegment segment = new DataSegment( "something", interval, - "1", + "brand:new:version", loadSpec, Arrays.asList("dim1", "dim2"), Arrays.asList("met1", "met2"), @@ -32,8 +32,8 @@ public class HdfsDataSegmentPusherUtilTest { 1 ); - String storageDir = HdfsDataSegmentPusherUtil.getStorageDir(segment); - Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/1/0", storageDir); + String storageDir = DataSegmentPusherUtil.getHdfsStorageDir(segment); + Assert.assertEquals("something/20111001T000000.000Z_20111002T000000.000Z/brand_new_version/0", storageDir); } } From ad087a7a221f71083b44ebc5d3c5ff9bbc18d0fc Mon Sep 17 00:00:00 2001 From: Jan Rudert Date: Wed, 10 Jul 2013 09:21:45 +0200 Subject: [PATCH 3/8] correct segment path for hadoop indexer --- .../indexer/HadoopDruidIndexerConfig.java | 45 ++++++++----- .../druid/indexer/IndexGeneratorJob.java | 12 ++-- .../indexer/HadoopDruidIndexerConfigTest.java | 64 +++++++++++++++++++ 3 files changed, 100 insertions(+), 21 deletions(-) diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java index 6d0ea322313..0fe72cf0e44 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/HadoopDruidIndexerConfig.java @@ -57,7 +57,9 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.shard.ShardSpec; import com.metamx.druid.utils.JodaUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.mapreduce.Job; import org.joda.time.DateTime; @@ -656,22 +658,33 @@ public class HadoopDruidIndexerConfig return new Path(makeDescriptorInfoDir(), String.format("%s.json", segment.getIdentifier().replace(":", ""))); } - public Path makeSegmentOutputPath(Bucket bucket) - { - final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); - - return new Path( - String.format( - "%s/%s/%s_%s/%s/%s", - getSegmentOutputDir(), - dataSource, - bucketInterval.getStart().toString(), - bucketInterval.getEnd().toString(), - getVersion(), - bucket.partitionNum - ) - ); - } + public Path makeSegmentOutputPath(FileSystem fileSystem, Bucket bucket) + { + final Interval bucketInterval = getGranularitySpec().bucketInterval(bucket.time).get(); + if (fileSystem instanceof DistributedFileSystem) + { + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir().replace(":", "_"), + dataSource.replace(":", "_"), + bucketInterval.getStart().toString(ISODateTimeFormat.basicDateTime()), + bucketInterval.getEnd().toString(ISODateTimeFormat.basicDateTime()), + getVersion().replace(":", "_"), + bucket.partitionNum + )); + } + return new Path( + String.format( + "%s/%s/%s_%s/%s/%s", + getSegmentOutputDir(), + dataSource, + bucketInterval.getStart().toString(), + bucketInterval.getEnd().toString(), + getVersion(), + bucket.partitionNum + )); + } public Job addInputPaths(Job job) throws IOException { diff --git a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java index 45cbded7f1a..583cafcae4f 100644 --- a/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/com/metamx/druid/indexer/IndexGeneratorJob.java @@ -375,12 +375,14 @@ public class IndexGeneratorJob implements Jobby Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); int attemptNumber = context.getTaskAttemptID().getId(); - Path indexBasePath = config.makeSegmentOutputPath(bucket); - Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); - final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); - final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); - outputFS.mkdirs(indexBasePath); + FileSystem fileSystem = FileSystem.get(context.getConfiguration()); + Path indexBasePath = config.makeSegmentOutputPath(fileSystem, bucket); + Path indexZipFilePath = new Path(indexBasePath, String.format("index.zip.%s", attemptNumber)); + final FileSystem infoFS = config.makeDescriptorInfoDir().getFileSystem(context.getConfiguration()); + final FileSystem outputFS = indexBasePath.getFileSystem(context.getConfiguration()); + + outputFS.mkdirs(indexBasePath); Exception caughtException = null; ZipOutputStream out = null; diff --git a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java index dcabe168e67..d0d2ee1d2c0 100644 --- a/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/com/metamx/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -27,7 +27,12 @@ import com.metamx.druid.indexer.partitions.PartitionsSpec; import com.metamx.druid.indexer.updater.DbUpdaterJobSpec; import com.metamx.druid.jackson.DefaultObjectMapper; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.joda.time.DateTime; import org.joda.time.Interval; +import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Test; @@ -427,6 +432,65 @@ public class HadoopDruidIndexerConfigTest ); } + + @Test + public void shouldMakeHDFSCompliantSegmentOutputPath() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new DistributedFileSystem(), bucket); + Assert.assertEquals("/tmp/dru_id/data_test/the_data_source/20120710T050000.000Z_20120710T060000.000Z/some_brand_new_version/4712", path.toString()); + + } + + @Test + public void shouldMakeDefaultSegmentOutputPathIfNotHDFS() { + final HadoopDruidIndexerConfig cfg; + + try { + cfg = jsonReadWriteRead( + "{" + + "\"dataSource\": \"the:data:source\"," + + " \"granularitySpec\":{" + + " \"type\":\"uniform\"," + + " \"gran\":\"hour\"," + + " \"intervals\":[\"2012-07-10/P1D\"]" + + " }," + + "\"segmentOutputPath\": \"/tmp/dru:id/data:test\"" + + "}", + HadoopDruidIndexerConfig.class + ); + } catch(Exception e) { + throw Throwables.propagate(e); + } + + cfg.setVersion("some:brand:new:version"); + + Bucket bucket = new Bucket(4711, new DateTime(2012, 07, 10, 5, 30), 4712); + Path path = cfg.makeSegmentOutputPath(new LocalFileSystem(), bucket); + Assert.assertEquals("/tmp/dru:id/data:test/the:data:source/2012-07-10T05:00:00.000Z_2012-07-10T06:00:00.000Z/some:brand:new:version/4712", path.toString()); + + } + private T jsonReadWriteRead(String s, Class klass) { try { From 50836798faa3355339a4dfc592b97e334b6bfbe2 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 15:40:45 -0700 Subject: [PATCH 4/8] toggle between compressed and non compressed service discovery --- .../ServiceDiscoveryConfig.java | 7 +++++ .../worker/executor/ExecutorNode.java | 30 +++++++++---------- .../indexing/worker/http/WorkerNode.java | 6 ++-- .../com/metamx/druid/http/MasterMain.java | 10 +++++-- 4 files changed, 33 insertions(+), 20 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 04776d6545a..2d828cac9fd 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -36,4 +36,11 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig @Config("druid.zk.paths.discoveryPath") public abstract String getDiscoveryPath(); + + @Override + @Config("druid.service.discovery.curator.compression.enable") + public boolean enableCompression() + { + return false; + } } diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java index c5692e59d58..20428fe269c 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/executor/ExecutorNode.java @@ -24,7 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.smile.SmileFactory; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Guice; import com.google.inject.Injector; @@ -44,6 +43,17 @@ import com.metamx.druid.curator.discovery.ServiceInstanceFactory; import com.metamx.druid.http.GuiceServletConfig; import com.metamx.druid.http.QueryServlet; import com.metamx.druid.http.StatusServlet; +import com.metamx.druid.indexing.common.RetryPolicyFactory; +import com.metamx.druid.indexing.common.TaskToolboxFactory; +import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; +import com.metamx.druid.indexing.common.config.RetryPolicyConfig; +import com.metamx.druid.indexing.common.config.TaskConfig; +import com.metamx.druid.indexing.common.index.ChatHandlerProvider; +import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; +import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; +import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; +import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; +import com.metamx.druid.indexing.worker.config.WorkerConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServerInit; @@ -52,17 +62,6 @@ import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.loading.DataSegmentKiller; import com.metamx.druid.loading.DataSegmentPusher; import com.metamx.druid.loading.S3DataSegmentKiller; -import com.metamx.druid.indexing.common.RetryPolicyFactory; -import com.metamx.druid.indexing.common.TaskToolboxFactory; -import com.metamx.druid.indexing.common.actions.RemoteTaskActionClientFactory; -import com.metamx.druid.indexing.common.config.RetryPolicyConfig; -import com.metamx.druid.indexing.common.config.TaskConfig; -import com.metamx.druid.indexing.common.index.EventReceiverFirehoseFactory; -import com.metamx.druid.indexing.common.index.ChatHandlerProvider; -import com.metamx.druid.indexing.common.index.StaticS3FirehoseFactory; -import com.metamx.druid.indexing.coordinator.ThreadPoolTaskRunner; -import com.metamx.druid.indexing.worker.config.ChatHandlerProviderConfig; -import com.metamx.druid.indexing.worker.config.WorkerConfig; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.Emitters; @@ -70,11 +69,10 @@ import com.metamx.emitter.service.ServiceEmitter; import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClientConfig; import com.metamx.http.client.HttpClientInit; -import com.metamx.metrics.JvmMonitor; import com.metamx.metrics.Monitor; import com.metamx.metrics.MonitorScheduler; import com.metamx.metrics.MonitorSchedulerConfig; -import com.metamx.metrics.SysMonitor; +import org.apache.curator.framework.CuratorFramework; import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceProvider; import org.jets3t.service.S3ServiceException; @@ -86,7 +84,6 @@ import org.mortbay.jetty.servlet.DefaultServlet; import org.mortbay.jetty.servlet.ServletHolder; import org.skife.config.ConfigurationObjectFactory; -import java.util.List; import java.util.Properties; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -350,8 +347,9 @@ public class ExecutorNode extends BaseServerNode { final ServiceDiscoveryConfig config = configFactory.build(ServiceDiscoveryConfig.class); if (serviceDiscovery == null) { + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework(config, lifecycle); this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), config, lifecycle + serviceDiscoveryCuratorFramework, config, lifecycle ); } if (serviceAnnouncer == null) { diff --git a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java index 1ccf32e0795..dda7afe7d78 100644 --- a/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java +++ b/indexing-service/src/main/java/com/metamx/druid/indexing/worker/http/WorkerNode.java @@ -331,11 +331,13 @@ public class WorkerNode extends QueryableNode { if (serviceDiscovery == null) { final ServiceDiscoveryConfig config = getConfigFactory().build(ServiceDiscoveryConfig.class); - this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( - getCuratorFramework(), + final CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( config, getLifecycle() ); + this.serviceDiscovery = Initialization.makeServiceDiscoveryClient( + serviceDiscoveryCuratorFramework, config, getLifecycle() + ); } if (coordinatorServiceProvider == null) { this.coordinatorServiceProvider = Initialization.makeServiceProvider( diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 63521717e79..c8edf93dbbf 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; @@ -124,10 +125,15 @@ public class MasterMain final ScheduledExecutorFactory scheduledExecutorFactory = ScheduledExecutors.createFactory(lifecycle); final ServiceDiscoveryConfig serviceDiscoveryConfig = configFactory.build(ServiceDiscoveryConfig.class); - CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + CuratorFramework serviceDiscoveryCuratorFramework = Initialization.makeCuratorFramework( serviceDiscoveryConfig, lifecycle ); + final CuratorConfig curatorConfig = configFactory.build(CuratorConfig.class); + CuratorFramework curatorFramework = Initialization.makeCuratorFramework( + curatorConfig, + lifecycle + ); final ZkPathsConfig zkPaths = configFactory.build(ZkPathsConfig.class); @@ -201,7 +207,7 @@ public class MasterMain final DruidMasterConfig druidMasterConfig = configFactory.build(DruidMasterConfig.class); final ServiceDiscovery serviceDiscovery = Initialization.makeServiceDiscoveryClient( - curatorFramework, + serviceDiscoveryCuratorFramework, serviceDiscoveryConfig, lifecycle ); From 091dce11c8b813ac7a5a0a7e85be99120a235e66 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 15:47:05 -0700 Subject: [PATCH 5/8] Add a timeout in LQP to fail an assign if it takes too long --- .../com/metamx/druid/http/MasterMain.java | 10 +++- .../com/metamx/druid/master/DruidMaster.java | 9 ++-- .../druid/master/DruidMasterConfig.java | 6 +++ .../metamx/druid/master/LoadQueuePeon.java | 46 +++++++++++++++---- .../druid/master/LoadQueueTaskMaster.java | 11 ++++- .../metamx/druid/master/DruidMasterTest.java | 9 ++-- .../druid/master/LoadQueuePeonTester.java | 2 +- 7 files changed, 69 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 63521717e79..426660ee799 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -49,6 +49,7 @@ import com.metamx.druid.db.DatabaseSegmentManager; import com.metamx.druid.db.DatabaseSegmentManagerConfig; import com.metamx.druid.db.DbConnector; import com.metamx.druid.db.DbConnectorConfig; +import com.metamx.druid.initialization.CuratorConfig; import com.metamx.druid.initialization.Initialization; import com.metamx.druid.initialization.ServerConfig; import com.metamx.druid.initialization.ServiceDiscoveryConfig; @@ -226,8 +227,13 @@ public class MasterMain new ConfigManager(dbi, configManagerConfig), jsonMapper ); + final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d"); final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( - curatorFramework, jsonMapper, Execs.singleThreaded("Master-PeonExec--%d") + curatorFramework, + jsonMapper, + Execs.singleThreaded("Master-PeonExec--%d"), + scheduledExecutorService, + druidMasterConfig ); final DruidMaster master = new DruidMaster( @@ -239,7 +245,7 @@ public class MasterMain databaseRuleManager, curatorFramework, emitter, - scheduledExecutorFactory, + scheduledExecutorService, indexingServiceClient, taskMaster ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 592e76f0d06..924096cc10c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -30,7 +30,6 @@ import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.metamx.common.IAE; import com.metamx.common.Pair; -import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -105,7 +104,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorFactory scheduledExecutorFactory, + ScheduledExecutorService scheduledExecutorService, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster ) @@ -119,7 +118,7 @@ public class DruidMaster databaseRuleManager, curator, emitter, - scheduledExecutorFactory, + scheduledExecutorService, indexingServiceClient, taskMaster, Maps.newConcurrentMap() @@ -135,7 +134,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorFactory scheduledExecutorFactory, + ScheduledExecutorService scheduledExecutorService, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ConcurrentMap loadQueuePeonMap @@ -153,7 +152,7 @@ public class DruidMaster this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; - this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); + this.exec = scheduledExecutorService; this.leaderLatch = new AtomicReference(null); this.loadManagementPeons = loadQueuePeonMap; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java index 662569e8485..d514b4d5c4f 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterConfig.java @@ -89,4 +89,10 @@ public abstract class DruidMasterConfig @Config("druid.master.replicant.throttleLimit") @Default("10") public abstract int getReplicantThrottleLimit(); + + @Config("druid.master.load.timeout") + public Duration getLoadTimeoutDelay() + { + return new Duration(15 * 60 * 1000); + } } diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index c4ce6ac5a8c..d75e1f39959 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; +import com.metamx.common.ISE; +import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentChangeRequest; @@ -42,8 +44,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; /** @@ -54,15 +58,6 @@ public class LoadQueuePeon private static final int DROP = 0; private static final int LOAD = 1; - private final Object lock = new Object(); - - private final CuratorFramework curator; - private final String basePath; - private final ObjectMapper jsonMapper; - private final ExecutorService zkWritingExecutor; - - private final AtomicLong queuedSize = new AtomicLong(0); - private static Comparator segmentHolderComparator = new Comparator() { private Comparator comparator = Comparators.inverse(DataSegment.bucketMonthComparator()); @@ -74,6 +69,15 @@ public class LoadQueuePeon } }; + private final CuratorFramework curator; + private final String basePath; + private final ObjectMapper jsonMapper; + private final ExecutorService zkWritingExecutor; + private final ScheduledExecutorService scheduledExecutorService; + private final DruidMasterConfig config; + + private final AtomicLong queuedSize = new AtomicLong(0); + private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( segmentHolderComparator ); @@ -81,19 +85,25 @@ public class LoadQueuePeon segmentHolderComparator ); + private final Object lock = new Object(); + private volatile SegmentHolder currentlyLoading = null; LoadQueuePeon( CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ExecutorService zkWritingExecutor + ExecutorService zkWritingExecutor, + ScheduledExecutorService scheduledExecutorService, + DruidMasterConfig config ) { this.curator = curator; this.basePath = basePath; this.jsonMapper = jsonMapper; this.zkWritingExecutor = zkWritingExecutor; + this.scheduledExecutorService = scheduledExecutorService; + this.config = config; } public Set getSegmentsToLoad() @@ -232,6 +242,22 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); + ScheduledExecutors.scheduleWithFixedDelay( + scheduledExecutorService, + config.getLoadTimeoutDelay(), + new Callable() + { + @Override + public ScheduledExecutors.Signal call() throws Exception + { + if (curator.checkExists().forPath(path) != null) { + throw new ISE("%s was never removed! Failing this assign!", path); + } + return ScheduledExecutors.Signal.STOP; + } + } + ); + final Stat stat = curator.checkExists().usingWatcher( new CuratorWatcher() { diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index 9ef5b61e5a0..254b83e32cd 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; /** * Provides LoadQueuePeons @@ -32,20 +33,26 @@ public class LoadQueueTaskMaster private final CuratorFramework curator; private final ObjectMapper jsonMapper; private final ExecutorService peonExec; + private final ScheduledExecutorService scheduledExecutorService; + private final DruidMasterConfig config; public LoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, - ExecutorService peonExec + ExecutorService peonExec, + ScheduledExecutorService scheduledExecutorService, + DruidMasterConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; + this.scheduledExecutorService = scheduledExecutorService; + this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config); } } diff --git a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java index a49dc85a582..e47afd8d109 100644 --- a/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java +++ b/server/src/test/java/com/metamx/druid/master/DruidMasterTest.java @@ -36,6 +36,7 @@ import org.junit.Before; import org.junit.Test; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ScheduledExecutorService; /** */ @@ -46,7 +47,7 @@ public class DruidMasterTest private LoadQueueTaskMaster taskMaster; private DatabaseSegmentManager databaseSegmentManager; private SingleServerInventoryView serverInventoryView; - private ScheduledExecutorFactory scheduledExecutorFactory; + private ScheduledExecutorService scheduledExecutorService; private DruidServer druidServer; private DataSegment segment; private ConcurrentMap loadManagementPeons; @@ -64,8 +65,8 @@ public class DruidMasterTest databaseSegmentManager = EasyMock.createNiceMock(DatabaseSegmentManager.class); EasyMock.replay(databaseSegmentManager); - scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class); - EasyMock.replay(scheduledExecutorFactory); + scheduledExecutorService = EasyMock.createNiceMock(ScheduledExecutorService.class); + EasyMock.replay(scheduledExecutorService); master = new DruidMaster( new DruidMasterConfig() @@ -138,7 +139,7 @@ public class DruidMasterTest null, curator, new NoopServiceEmitter(), - scheduledExecutorFactory, + scheduledExecutorService, null, taskMaster, loadManagementPeons diff --git a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java index b0e000d20ad..3594c660c09 100644 --- a/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java +++ b/server/src/test/java/com/metamx/druid/master/LoadQueuePeonTester.java @@ -10,7 +10,7 @@ public class LoadQueuePeonTester extends LoadQueuePeon public LoadQueuePeonTester() { - super(null, null, null, null); + super(null, null, null, null, null, null); } @Override From af6db21264857017508f8c6308c050e6693a6b7d Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 17:04:27 -0700 Subject: [PATCH 6/8] fix configs based on code review --- .../java/com/metamx/druid/initialization/CuratorConfig.java | 2 +- .../com/metamx/druid/initialization/ServiceDiscoveryConfig.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java index 741bc59d3d9..be46aea41f6 100644 --- a/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/CuratorConfig.java @@ -33,7 +33,7 @@ public abstract class CuratorConfig @Default("30000") public abstract int getZkSessionTimeoutMs(); - @Config("druid.curator.compression.enable") + @Config("druid.curator.compress") @Default("false") public abstract boolean enableCompression(); } diff --git a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java index 2d828cac9fd..7d0c20cd7ef 100644 --- a/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java +++ b/client/src/main/java/com/metamx/druid/initialization/ServiceDiscoveryConfig.java @@ -38,7 +38,7 @@ public abstract class ServiceDiscoveryConfig extends CuratorConfig public abstract String getDiscoveryPath(); @Override - @Config("druid.service.discovery.curator.compression.enable") + @Config("druid.curator.discovery.compress") public boolean enableCompression() { return false; From 6a96c1fb76c577b30bc86de97a75d85e2a61560b Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 29 Jul 2013 17:56:19 -0700 Subject: [PATCH 7/8] fix according to code review comments --- .../com/metamx/druid/http/MasterMain.java | 6 +- .../com/metamx/druid/master/DruidMaster.java | 9 +-- .../druid/master/DruidMasterLogger.java | 6 ++ .../metamx/druid/master/LoadQueuePeon.java | 57 ++++++++++++------- .../druid/master/LoadQueueTaskMaster.java | 9 +-- 5 files changed, 52 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index 426660ee799..1c63eee2ef7 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -227,12 +227,10 @@ public class MasterMain new ConfigManager(dbi, configManagerConfig), jsonMapper ); - final ScheduledExecutorService scheduledExecutorService = scheduledExecutorFactory.create(1, "Master-Exec--%d"); final LoadQueueTaskMaster taskMaster = new LoadQueueTaskMaster( curatorFramework, jsonMapper, - Execs.singleThreaded("Master-PeonExec--%d"), - scheduledExecutorService, + scheduledExecutorFactory.create(1, "Master-PeonExec--%d"), druidMasterConfig ); @@ -245,7 +243,7 @@ public class MasterMain databaseRuleManager, curatorFramework, emitter, - scheduledExecutorService, + scheduledExecutorFactory, indexingServiceClient, taskMaster ); diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 924096cc10c..592e76f0d06 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -30,6 +30,7 @@ import com.google.common.collect.Sets; import com.google.common.io.Closeables; import com.metamx.common.IAE; import com.metamx.common.Pair; +import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.common.guava.FunctionalIterable; @@ -104,7 +105,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster ) @@ -118,7 +119,7 @@ public class DruidMaster databaseRuleManager, curator, emitter, - scheduledExecutorService, + scheduledExecutorFactory, indexingServiceClient, taskMaster, Maps.newConcurrentMap() @@ -134,7 +135,7 @@ public class DruidMaster DatabaseRuleManager databaseRuleManager, CuratorFramework curator, ServiceEmitter emitter, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorFactory scheduledExecutorFactory, IndexingServiceClient indexingServiceClient, LoadQueueTaskMaster taskMaster, ConcurrentMap loadQueuePeonMap @@ -152,7 +153,7 @@ public class DruidMaster this.indexingServiceClient = indexingServiceClient; this.taskMaster = taskMaster; - this.exec = scheduledExecutorService; + this.exec = scheduledExecutorFactory.create(1, "Master-Exec--%d"); this.leaderLatch = new AtomicReference(null); this.loadManagementPeons = loadQueuePeonMap; diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index 382d03966b6..c2992921f30 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -185,6 +185,12 @@ public class DruidMasterLogger implements DruidMasterHelper "master/loadQueue/size", queuePeon.getLoadQueueSize() ) ); + emitter.emit( + new ServiceMetricEvent.Builder() + .setUser1(serverName).build( + "master/loadQueue/failed", queuePeon.getFailedAssignCount() + ) + ); emitter.emit( new ServiceMetricEvent.Builder() .setUser1(serverName).build( diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index d75e1f39959..189a48efc30 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -24,7 +24,6 @@ import com.google.common.base.Function; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.metamx.common.ISE; -import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.guava.Comparators; import com.metamx.druid.client.DataSegment; import com.metamx.druid.coordination.DataSegmentChangeRequest; @@ -44,10 +43,10 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -72,11 +71,11 @@ public class LoadQueuePeon private final CuratorFramework curator; private final String basePath; private final ObjectMapper jsonMapper; - private final ExecutorService zkWritingExecutor; - private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService zkWritingExecutor; private final DruidMasterConfig config; private final AtomicLong queuedSize = new AtomicLong(0); + private final AtomicInteger failedAssignCount = new AtomicInteger(0); private final ConcurrentSkipListSet segmentsToLoad = new ConcurrentSkipListSet( segmentHolderComparator @@ -93,8 +92,7 @@ public class LoadQueuePeon CuratorFramework curator, String basePath, ObjectMapper jsonMapper, - ExecutorService zkWritingExecutor, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorService zkWritingExecutor, DruidMasterConfig config ) { @@ -102,7 +100,6 @@ public class LoadQueuePeon this.basePath = basePath; this.jsonMapper = jsonMapper; this.zkWritingExecutor = zkWritingExecutor; - this.scheduledExecutorService = scheduledExecutorService; this.config = config; } @@ -145,6 +142,11 @@ public class LoadQueuePeon return queuedSize.get(); } + public int getFailedAssignCount() + { + return failedAssignCount.get(); + } + public void loadSegment( DataSegment segment, LoadPeonCallback callback @@ -242,20 +244,24 @@ public class LoadQueuePeon final byte[] payload = jsonMapper.writeValueAsBytes(currentlyLoading.getChangeRequest()); curator.create().withMode(CreateMode.EPHEMERAL).forPath(path, payload); - ScheduledExecutors.scheduleWithFixedDelay( - scheduledExecutorService, - config.getLoadTimeoutDelay(), - new Callable() + zkWritingExecutor.schedule( + new Runnable() { @Override - public ScheduledExecutors.Signal call() throws Exception + public void run() { - if (curator.checkExists().forPath(path) != null) { - throw new ISE("%s was never removed! Failing this assign!", path); + try { + if (curator.checkExists().forPath(path) != null) { + failAssign(new ISE("%s was never removed! Failing this assign!", path)); + } + } + catch (Exception e) { + failAssign(e); } - return ScheduledExecutors.Signal.STOP; } - } + }, + config.getLoadTimeoutDelay().getMillis(), + TimeUnit.MILLISECONDS ); final Stat stat = curator.checkExists().usingWatcher( @@ -294,10 +300,7 @@ public class LoadQueuePeon } } catch (Exception e) { - log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); - // Act like it was completed so that the master gives it to someone else - actionCompleted(); - doNext(); + failAssign(e); } } } @@ -353,6 +356,7 @@ public class LoadQueuePeon segmentsToLoad.clear(); queuedSize.set(0L); + failedAssignCount.set(0); } } @@ -377,6 +381,17 @@ public class LoadQueuePeon doNext(); } + private void failAssign(Exception e) + { + synchronized (lock) { + log.error(e, "Server[%s], throwable caught when submitting [%s].", basePath, currentlyLoading); + failedAssignCount.getAndIncrement(); + // Act like it was completed so that the master gives it to someone else + actionCompleted(); + doNext(); + } + } + private class SegmentHolder { private final DataSegment segment; diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java index 254b83e32cd..2547127bc5a 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueueTaskMaster.java @@ -32,27 +32,24 @@ public class LoadQueueTaskMaster { private final CuratorFramework curator; private final ObjectMapper jsonMapper; - private final ExecutorService peonExec; - private final ScheduledExecutorService scheduledExecutorService; + private final ScheduledExecutorService peonExec; private final DruidMasterConfig config; public LoadQueueTaskMaster( CuratorFramework curator, ObjectMapper jsonMapper, - ExecutorService peonExec, - ScheduledExecutorService scheduledExecutorService, + ScheduledExecutorService peonExec, DruidMasterConfig config ) { this.curator = curator; this.jsonMapper = jsonMapper; this.peonExec = peonExec; - this.scheduledExecutorService = scheduledExecutorService; this.config = config; } public LoadQueuePeon giveMePeon(String basePath) { - return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, scheduledExecutorService, config); + return new LoadQueuePeon(curator, basePath, jsonMapper, peonExec, config); } } From e7c6dd8112adb1ae13de5f697509facc0a392135 Mon Sep 17 00:00:00 2001 From: fjy Date: Tue, 30 Jul 2013 14:50:24 -0700 Subject: [PATCH 8/8] get and reset failed assign count --- .../main/java/com/metamx/druid/master/DruidMasterLogger.java | 2 +- .../src/main/java/com/metamx/druid/master/LoadQueuePeon.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java index c2992921f30..1b226933a52 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMasterLogger.java @@ -188,7 +188,7 @@ public class DruidMasterLogger implements DruidMasterHelper emitter.emit( new ServiceMetricEvent.Builder() .setUser1(serverName).build( - "master/loadQueue/failed", queuePeon.getFailedAssignCount() + "master/loadQueue/failed", queuePeon.getAndResetFailedAssignCount() ) ); emitter.emit( diff --git a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java index 189a48efc30..2b5c96cb187 100644 --- a/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java +++ b/server/src/main/java/com/metamx/druid/master/LoadQueuePeon.java @@ -142,9 +142,9 @@ public class LoadQueuePeon return queuedSize.get(); } - public int getFailedAssignCount() + public int getAndResetFailedAssignCount() { - return failedAssignCount.get(); + return failedAssignCount.getAndSet(0); } public void loadSegment(