From 0c789412bb2ad7189f5a8d0defb52d4b4ed9ae86 Mon Sep 17 00:00:00 2001 From: fjy Date: Sat, 25 Jan 2014 07:07:27 +0800 Subject: [PATCH 1/2] add a workaround for jackson bug where jacksoninject fails when a null value is passed through json creator annotated constructor --- docs/content/Plumber.md | 2 ++ .../common/task/RealtimeIndexTask.java | 12 ++++++++-- .../indexing/common/TestRealtimeTask.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 1 + .../indexing/worker/TaskAnnouncementTest.java | 1 + pom.xml | 18 +++++++------- .../realtime/FireDepartmentConfig.java | 14 +---------- .../plumber/FlushingPlumberSchool.java | 7 +++--- .../plumber/RealtimePlumberSchool.java | 24 ++++++++++++++----- .../segment/realtime/RealtimeManagerTest.java | 2 +- .../plumber/RealtimePlumberSchoolTest.java | 2 +- 11 files changed, 48 insertions(+), 36 deletions(-) diff --git a/docs/content/Plumber.md b/docs/content/Plumber.md index dfbb3b6b3bf..bbf6b79cbc4 100644 --- a/docs/content/Plumber.md +++ b/docs/content/Plumber.md @@ -13,6 +13,8 @@ We provide a brief description of the example to exemplify the types of things t * `windowPeriod` is the amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server. * `basePersistDirectory` is the directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists. +* `maxPendingPersists` is how many persists a plumber can do concurrently without starting to block. + Available Plumbers ------------------ diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 5947be6ede6..7a40035c3e6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -86,6 +86,9 @@ public class RealtimeIndexTask extends AbstractTask @JsonIgnore private final Period windowPeriod; + @JsonIgnore + private final int maxPendingPersists; + @JsonIgnore private final IndexGranularity segmentGranularity; @@ -106,6 +109,7 @@ public class RealtimeIndexTask extends AbstractTask @JsonProperty("firehose") FirehoseFactory firehoseFactory, @JsonProperty("fireDepartmentConfig") FireDepartmentConfig fireDepartmentConfig, @JsonProperty("windowPeriod") Period windowPeriod, + @JsonProperty("maxPendingPersists") int maxPendingPersists, @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, @JsonProperty("rejectionPolicy") RejectionPolicyFactory rejectionPolicyFactory ) @@ -113,7 +117,7 @@ public class RealtimeIndexTask extends AbstractTask super( id == null ? makeTaskId(schema.getDataSource(), schema.getShardSpec().getPartitionNum(), new DateTime().toString()) - :id, + : id, String.format( "index_realtime_%s", @@ -135,6 +139,9 @@ public class RealtimeIndexTask extends AbstractTask this.firehoseFactory = firehoseFactory; this.fireDepartmentConfig = fireDepartmentConfig; this.windowPeriod = windowPeriod; + this.maxPendingPersists = (maxPendingPersists == 0) + ? RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS + : maxPendingPersists; this.segmentGranularity = segmentGranularity; this.rejectionPolicyFactory = rejectionPolicyFactory; } @@ -194,8 +201,9 @@ public class RealtimeIndexTask extends AbstractTask final RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool( windowPeriod, new File(toolbox.getTaskWorkDir(), "persist"), - segmentGranularity, fireDepartmentConfig.getMaxPendingPersists() + segmentGranularity ); + realtimePlumberSchool.setDefaultMaxPendingPersists(maxPendingPersists); final SegmentPublisher segmentPublisher = new TaskActionSegmentPublisher(this, toolbox); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 178cae10513..ba3a7787289 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -51,6 +51,7 @@ public class TestRealtimeTask extends RealtimeIndexTask null, null, null, + 1, null, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 77ae0af4b52..e9ace7ac18a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -198,6 +198,7 @@ public class TaskSerdeTest null, null, new Period("PT10M"), + 1, IndexGranularity.HOUR, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index e4c16b11c28..303780de3b2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -46,6 +46,7 @@ public class TaskAnnouncementTest null, null, new Period("PT10M"), + 1, IndexGranularity.HOUR, null ); diff --git a/pom.xml b/pom.xml index 127a80413b9..990a1a75e5c 100644 --- a/pom.xml +++ b/pom.xml @@ -218,47 +218,47 @@ com.fasterxml.jackson.core jackson-annotations - 2.2.2 + 2.2.3 com.fasterxml.jackson.core jackson-core - 2.2.2 + 2.2.3 com.fasterxml.jackson.core jackson-databind - 2.2.2 + 2.2.3 com.fasterxml.jackson.datatype jackson-datatype-guava - 2.2.2 + 2.2.3 com.fasterxml.jackson.datatype jackson-datatype-joda - 2.2.2 + 2.2.3 com.fasterxml.jackson.dataformat jackson-dataformat-smile - 2.2.2 + 2.2.3 com.fasterxml.jackson.jaxrs jackson-jaxrs-json-provider - 2.2.2 + 2.2.3 org.codehaus.jackson jackson-core-asl - 1.9.11 + 1.9.13 org.codehaus.jackson jackson-mapper-asl - 1.9.11 + 1.9.13 org.hibernate diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartmentConfig.java b/server/src/main/java/io/druid/segment/realtime/FireDepartmentConfig.java index c664da3edbc..512cbe00cb5 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartmentConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartmentConfig.java @@ -28,23 +28,17 @@ import org.joda.time.Period; */ public class FireDepartmentConfig { - private static int MAX_PENDING_PERSIST_BATCHES_DEFAULT = 2; private final int maxRowsInMemory; private final Period intermediatePersistPeriod; - private final int maxPendingPersists; @JsonCreator public FireDepartmentConfig( @JsonProperty("maxRowsInMemory") int maxRowsInMemory, - @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, - @JsonProperty("maxPendingPersists") int maxPendingPersists + @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod ) { this.maxRowsInMemory = maxRowsInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod; - this.maxPendingPersists = maxPendingPersists > 0 - ? maxPendingPersists - : MAX_PENDING_PERSIST_BATCHES_DEFAULT; Preconditions.checkArgument(maxRowsInMemory > 0, "maxRowsInMemory[%s] should be greater than 0", maxRowsInMemory); Preconditions.checkNotNull(intermediatePersistPeriod, "intermediatePersistPeriod"); @@ -61,10 +55,4 @@ public class FireDepartmentConfig { return intermediatePersistPeriod; } - - @JsonProperty - public int getMaxPendingPersists() - { - return maxPendingPersists; - } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index a441df8cb03..eeb0b3c03ce 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -77,8 +77,7 @@ public class FlushingPlumberSchool implements PlumberSchool @JsonProperty("flushDuration") Duration flushDuration, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, - @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, - @JsonProperty("maxPendingPersists") int maxPendingPersists + @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity ) { this.flushDuration = flushDuration; @@ -87,9 +86,9 @@ public class FlushingPlumberSchool implements PlumberSchool this.segmentGranularity = segmentGranularity; this.versioningPolicy = new IntervalStartVersioningPolicy(); this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory(); - this.maxPendingPersists = maxPendingPersists; + // Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail + this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS; - Preconditions.checkArgument(maxPendingPersists > 0, "FlushingPlumberSchool requires maxPendingPersists > 0"); Preconditions.checkNotNull(flushDuration, "FlushingPlumberSchool requires a flushDuration."); Preconditions.checkNotNull(windowPeriod, "FlushingPlumberSchool requires a windowPeriod."); Preconditions.checkNotNull(basePersistDirectory, "FlushingPlumberSchool requires a basePersistDirectory."); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 2a8a096771d..f7d6398a194 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -44,36 +44,44 @@ import java.util.concurrent.ExecutorService; */ public class RealtimePlumberSchool implements PlumberSchool { + public static final int DEFAULT_MAX_PENDING_PERSISTS = 2; + private static final EmittingLogger log = new EmittingLogger(RealtimePlumberSchool.class); - private static final int defaultPending = 2; private final Period windowPeriod; private final File basePersistDirectory; private final IndexGranularity segmentGranularity; - private final int maxPendingPersists; @JacksonInject @NotNull private volatile ServiceEmitter emitter; + @JacksonInject @NotNull private volatile QueryRunnerFactoryConglomerate conglomerate = null; + @JacksonInject @NotNull private volatile DataSegmentPusher dataSegmentPusher = null; + @JacksonInject @NotNull private volatile DataSegmentAnnouncer segmentAnnouncer = null; + @JacksonInject @NotNull private volatile SegmentPublisher segmentPublisher = null; + @JacksonInject @NotNull private volatile ServerView serverView = null; + @JacksonInject @NotNull @Processing private volatile ExecutorService queryExecutorService = null; + + private volatile int maxPendingPersists; private volatile VersioningPolicy versioningPolicy = null; private volatile RejectionPolicyFactory rejectionPolicyFactory = null; @@ -81,8 +89,7 @@ public class RealtimePlumberSchool implements PlumberSchool public RealtimePlumberSchool( @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, - @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity, - @JsonProperty("maxPendingPersists") int maxPendingPersists + @JsonProperty("segmentGranularity") IndexGranularity segmentGranularity ) { this.windowPeriod = windowPeriod; @@ -90,9 +97,9 @@ public class RealtimePlumberSchool implements PlumberSchool this.segmentGranularity = segmentGranularity; this.versioningPolicy = new IntervalStartVersioningPolicy(); this.rejectionPolicyFactory = new ServerTimeRejectionPolicyFactory(); - this.maxPendingPersists = (maxPendingPersists > 0) ? maxPendingPersists : defaultPending; + // Workaround for Jackson issue where if maxPendingPersists is null, all JacksonInjects fail + this.maxPendingPersists = RealtimePlumberSchool.DEFAULT_MAX_PENDING_PERSISTS; - Preconditions.checkArgument(maxPendingPersists <= 0, "RealtimePlumberSchool requires maxPendingPersists > 0"); Preconditions.checkNotNull(windowPeriod, "RealtimePlumberSchool requires a windowPeriod."); Preconditions.checkNotNull(basePersistDirectory, "RealtimePlumberSchool requires a basePersistDirectory."); Preconditions.checkNotNull(segmentGranularity, "RealtimePlumberSchool requires a segmentGranularity."); @@ -145,6 +152,11 @@ public class RealtimePlumberSchool implements PlumberSchool this.queryExecutorService = executorService; } + public void setDefaultMaxPendingPersists(int maxPendingPersists) + { + this.maxPendingPersists = maxPendingPersists; + } + @Override public Plumber findPlumber(final Schema schema, final FireDepartmentMetrics metrics) { diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 50e8edd0e06..ed4aad7f12c 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -78,7 +78,7 @@ public class RealtimeManagerTest Arrays.asList( new FireDepartment( schema, - new FireDepartmentConfig(1, new Period("P1Y"), 1), + new FireDepartmentConfig(1, new Period("P1Y")), new FirehoseFactory() { @Override diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index f8c04a9c8f4..39400f30cbf 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -86,7 +86,7 @@ public class RealtimePlumberSchoolTest RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool( new Period("PT10m"), tmpDir, - IndexGranularity.HOUR, 1 + IndexGranularity.HOUR ); announcer = EasyMock.createMock(DataSegmentAnnouncer.class); From 6581436c0c49b623c5993f542417fd301de6387e Mon Sep 17 00:00:00 2001 From: fjy Date: Sat, 25 Jan 2014 07:26:17 +0800 Subject: [PATCH 2/2] update kafka 7 dep to be able to actually run it locally --- kafka-seven/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-seven/pom.xml b/kafka-seven/pom.xml index 6b1904c6fac..6a26af7e73a 100644 --- a/kafka-seven/pom.xml +++ b/kafka-seven/pom.xml @@ -39,7 +39,7 @@ kafka core-kafka - 0.7.2-mmx1 + 0.7.2-mmx4 log4j