From 97f828f6b1adc9ad3bb6e8fb7885bf08fee4c979 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 11 Mar 2013 17:20:09 -0700 Subject: [PATCH 01/14] [maven-release-plugin] prepare release druid-0.3.19 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 809d02343b9..5bd2dabd9cd 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/common/pom.xml b/common/pom.xml index 340f8a6d3a2..e3e6c99f94c 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index d1aab696a14..595b6d7ceff 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.19-SNAPSHOT + 0.3.19 com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/examples/pom.xml b/examples/pom.xml index 88151a158f5..98723a0c488 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 54d30ddd5a8..9151c027e67 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 3b7c2d3582d..a38e3e38bac 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/index-common/pom.xml b/index-common/pom.xml index d5db4e3be63..79e4890c9d4 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/indexer/pom.xml b/indexer/pom.xml index b1d065b9977..35e4f47b0b1 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/merger/pom.xml b/merger/pom.xml index fab94d85089..1866b60520f 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/pom.xml b/pom.xml index 27e639e7c18..3e06133348f 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.19-SNAPSHOT + 0.3.19 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 24b19ccf813..3927803c38b 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 diff --git a/server/pom.xml b/server/pom.xml index edbc04a6806..a628f952901 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19-SNAPSHOT + 0.3.19 From a49d0c5e4c9f73b406ead966f2fa2855ed169827 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Mon, 11 Mar 2013 17:20:16 -0700 Subject: [PATCH 02/14] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index 5bd2dabd9cd..cd8bacf5dc0 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index e3e6c99f94c..7f47d7fffed 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 595b6d7ceff..ca691b5e84a 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.19 + 0.3.20-SNAPSHOT com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 98723a0c488..3e790ab4be6 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 9151c027e67..b4394161de6 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index a38e3e38bac..3f68c7c82a6 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 79e4890c9d4..ebe57f00fb1 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index 35e4f47b0b1..a736362d25d 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 1866b60520f..d478400f00c 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/pom.xml b/pom.xml index 3e06133348f..5d2635955a7 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.19 + 0.3.20-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 3927803c38b..0ded94e9d25 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index a628f952901..c8ec34d5f14 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.19 + 0.3.20-SNAPSHOT From ac2d4e52da21af91e8d68991a5b77f5dcf97cc27 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 13 Mar 2013 14:09:21 -0700 Subject: [PATCH 03/14] bug fix for indexer coordinator not alerting when worker provisioning taking too long --- .../SimpleResourceManagementStrategy.java | 9 +-- .../SimpleResourceManagementStrategyTest.java | 62 ++++++++++++++++++- 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 082870c83c8..05e51da314a 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -102,7 +102,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } } else { - Duration durSinceLastProvision = new Duration(new DateTime(), lastProvisionTime); + Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { log.makeAlert("Worker node provisioning taking too long") .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) @@ -111,8 +111,9 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } log.info( - "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker.", - currentlyProvisioning + "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s", + currentlyProvisioning, + durSinceLastProvision ); } @@ -203,7 +204,7 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat return true; } } else { - Duration durSinceLastTerminate = new Duration(new DateTime(), lastTerminateTime); + Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { log.makeAlert("Worker node termination taking too long") .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java index 2052ae014bb..186723ef7df 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategyTest.java @@ -30,6 +30,10 @@ import com.metamx.druid.merger.coordinator.TaskRunnerWorkItem; import com.metamx.druid.merger.coordinator.ZkWorker; import com.metamx.druid.merger.coordinator.setup.WorkerSetupData; import com.metamx.druid.merger.worker.Worker; +import com.metamx.emitter.EmittingLogger; +import com.metamx.emitter.core.Event; +import com.metamx.emitter.service.ServiceEmitter; +import com.metamx.emitter.service.ServiceEventBuilder; import junit.framework.Assert; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -90,7 +94,7 @@ public class SimpleResourceManagementStrategyTest @Override public Duration getMaxScalingDuration() { - return null; + return new Duration(1000); } @Override @@ -184,6 +188,62 @@ public class SimpleResourceManagementStrategyTest EasyMock.verify(autoScalingStrategy); } + @Test + public void testProvisionAlert() throws Exception + { + ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); + EmittingLogger.registerEmitter(emitter); + emitter.emit(EasyMock.anyObject()); + EasyMock.expectLastCall(); + EasyMock.replay(emitter); + + EasyMock.expect(autoScalingStrategy.ipToIdLookup(EasyMock.>anyObject())) + .andReturn(Lists.newArrayList()).times(2); + EasyMock.expect(autoScalingStrategy.provision()).andReturn( + new AutoScalingData(Lists.newArrayList("fake"), Lists.newArrayList("faker")) + ); + EasyMock.replay(autoScalingStrategy); + + boolean provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(testTask) + ) + ); + + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(simpleResourceManagementStrategy.getStats().toList().size() == 1); + DateTime createdTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + + Thread.sleep(2000); + + provisionedSomething = simpleResourceManagementStrategy.doProvision( + Arrays.asList( + new TaskRunnerWorkItem(testTask, null, null, null).withQueueInsertionTime(new DateTime()) + ), + Arrays.asList( + new TestZkWorker(testTask) + ) + ); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + simpleResourceManagementStrategy.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = simpleResourceManagementStrategy.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScalingStrategy); + EasyMock.verify(emitter); + } + @Test public void testDoSuccessfulTerminate() throws Exception { From 86c7ebe1e22f68f332ed572525bcff2f384b61ef Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 13 Mar 2013 14:28:18 -0700 Subject: [PATCH 04/14] autoscaling will clear state if a node takes too long to create --- .../http/IndexerCoordinatorResource.java | 3 ++ .../SimpleResourceManagementStrategy.java | 30 +++++++++++-------- .../merger/coordinator/setup/EC2NodeData.java | 13 ++++++++ .../coordinator/setup/GalaxyUserData.java | 10 +++++++ .../coordinator/setup/WorkerSetupData.java | 11 +++++++ 5 files changed, 55 insertions(+), 12 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java index 21adae8b09b..ee84e777101 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorResource.java @@ -176,6 +176,9 @@ public class IndexerCoordinatorResource if (!configManager.set(WorkerSetupData.CONFIG_KEY, workerSetupData)) { return Response.status(Response.Status.BAD_REQUEST).build(); } + + log.info("Updating Worker Setup configs: %s", workerSetupData); + return Response.ok().build(); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java index 05e51da314a..1d9e5f9f52e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/SimpleResourceManagementStrategy.java @@ -103,18 +103,21 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } else { Duration durSinceLastProvision = new Duration(lastProvisionTime, new DateTime()); - if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { - log.makeAlert("Worker node provisioning taking too long") - .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) - .addData("provisioningCount", currentlyProvisioning.size()) - .emit(); - } log.info( "%s still provisioning. Wait for all provisioned nodes to complete before requesting new worker. Current wait time: %s", currentlyProvisioning, durSinceLastProvision ); + + if (durSinceLastProvision.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert("Worker node provisioning taking too long!") + .addData("millisSinceLastProvision", durSinceLastProvision.getMillis()) + .addData("provisioningCount", currentlyProvisioning.size()) + .emit(); + + currentlyProvisioning.clear(); + } } return false; @@ -205,17 +208,20 @@ public class SimpleResourceManagementStrategy implements ResourceManagementStrat } } else { Duration durSinceLastTerminate = new Duration(lastTerminateTime, new DateTime()); - if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { - log.makeAlert("Worker node termination taking too long") - .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) - .addData("terminatingCount", currentlyTerminating.size()) - .emit(); - } log.info( "%s still terminating. Wait for all nodes to terminate before trying again.", currentlyTerminating ); + + if (durSinceLastTerminate.isLongerThan(config.getMaxScalingDuration())) { + log.makeAlert("Worker node termination taking too long!") + .addData("millisSinceLastTerminate", durSinceLastTerminate.getMillis()) + .addData("terminatingCount", currentlyTerminating.size()) + .emit(); + + currentlyTerminating.clear(); + } } return false; diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java index 4c2b86f4f6f..6def924eb98 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/EC2NodeData.java @@ -91,4 +91,17 @@ public class EC2NodeData { return keyName; } + + @Override + public String toString() + { + return "EC2NodeData{" + + "amiId='" + amiId + '\'' + + ", instanceType='" + instanceType + '\'' + + ", minInstances=" + minInstances + + ", maxInstances=" + maxInstances + + ", securityGroupIds=" + securityGroupIds + + ", keyName='" + keyName + '\'' + + '}'; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java index 76061637312..262fe3ac66d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/GalaxyUserData.java @@ -60,4 +60,14 @@ public class GalaxyUserData { return type; } + + @Override + public String toString() + { + return "GalaxyUserData{" + + "env='" + env + '\'' + + ", version='" + version + '\'' + + ", type='" + type + '\'' + + '}'; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java index 18cd85e6962..7fc28437300 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/setup/WorkerSetupData.java @@ -75,4 +75,15 @@ public class WorkerSetupData { return userData; } + + @Override + public String toString() + { + return "WorkerSetupData{" + + "minVersion='" + minVersion + '\'' + + ", minNumWorkers=" + minNumWorkers + + ", nodeData=" + nodeData + + ", userData=" + userData + + '}'; + } } From fa66e1d2cc39d9e8436f32946bd1e4f2ff07b1bc Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 16:29:34 -0500 Subject: [PATCH 05/14] 1) SQL is hard --- .../main/java/com/metamx/druid/config/ConfigManager.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/com/metamx/druid/config/ConfigManager.java b/common/src/main/java/com/metamx/druid/config/ConfigManager.java index 1ecfd24482c..4aa97e7ffc7 100644 --- a/common/src/main/java/com/metamx/druid/config/ConfigManager.java +++ b/common/src/main/java/com/metamx/druid/config/ConfigManager.java @@ -38,6 +38,7 @@ public class ConfigManager private final ScheduledExecutorService exec; private final ConcurrentMap watchedConfigs; private final String selectStatement; + private final String insertStatement; private volatile ConfigManager.PollingCallable poller; @@ -49,6 +50,10 @@ public class ConfigManager this.exec = ScheduledExecutors.fixed(1, "config-manager-%s"); this.watchedConfigs = Maps.newConcurrentMap(); this.selectStatement = String.format("SELECT payload FROM %s WHERE name = :name", config.getConfigTable()); + insertStatement = String.format( + "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload", + config.getConfigTable() + ); } @LifecycleStart @@ -192,9 +197,7 @@ public class ConfigManager @Override public Void withHandle(Handle handle) throws Exception { - handle.createStatement( - "INSERT INTO %s (name, payload) VALUES (:name, :payload) ON DUPLICATE KEY UPDATE payload = :payload" - ) + handle.createStatement(insertStatement) .bind("name", key) .bind("payload", newBytes) .execute(); From 8aac482618d3a215909899d847286d900367384f Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 16:33:10 -0500 Subject: [PATCH 06/14] [maven-release-plugin] prepare release druid-0.3.20 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index cd8bacf5dc0..f6e2d1bc8e7 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/common/pom.xml b/common/pom.xml index 7f47d7fffed..8db43bd87ff 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index ca691b5e84a..5be8af0ee5c 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.20-SNAPSHOT + 0.3.20 com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/examples/pom.xml b/examples/pom.xml index 3e790ab4be6..ab99556ef2a 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index b4394161de6..69fd75ea590 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 3f68c7c82a6..01f606108e0 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/index-common/pom.xml b/index-common/pom.xml index ebe57f00fb1..20a5d9a28e1 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/indexer/pom.xml b/indexer/pom.xml index a736362d25d..ca3dfcc366f 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/merger/pom.xml b/merger/pom.xml index d478400f00c..3a02cf10581 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/pom.xml b/pom.xml index 5d2635955a7..ef260d207de 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.20-SNAPSHOT + 0.3.20 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 0ded94e9d25..b43709c064b 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 diff --git a/server/pom.xml b/server/pom.xml index c8ec34d5f14..d72fc05ea1b 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20-SNAPSHOT + 0.3.20 From f1175389c4434502ee0dccf75c4f48676d6956fd Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 16:33:17 -0500 Subject: [PATCH 07/14] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index f6e2d1bc8e7..bef5ae94e48 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 8db43bd87ff..cbaa95f3f35 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 5be8af0ee5c..610dee6a6d8 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.20 + 0.3.21-SNAPSHOT com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index ab99556ef2a..68ff0659504 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index 69fd75ea590..eee585b036f 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 01f606108e0..160d2e882ea 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index 20a5d9a28e1..3f4905b3b93 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index ca3dfcc366f..4c2b714f84d 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index 3a02cf10581..c4ae6ce1b73 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/pom.xml b/pom.xml index ef260d207de..67e03a2d585 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.20 + 0.3.21-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index b43709c064b..a21b6d26a22 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index d72fc05ea1b..8c1ddddad0e 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.20 + 0.3.21-SNAPSHOT From 16de004fdc0be9eed56c0ebcb760177f8ed9541f Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 13 Mar 2013 15:45:07 -0700 Subject: [PATCH 08/14] fix json --- .../metamx/druid/merger/coordinator/RemoteTaskRunner.java | 5 +++++ .../druid/merger/coordinator/scaling/AutoScalingData.java | 1 - 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index b1ed92087bc..4fa01d22b71 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -566,4 +566,9 @@ public class RemoteTaskRunner implements TaskRunner throw Throwables.propagate(e); } } + + public static void main(String[] args) + { + System.out.println("2013-03-11".compareTo("0")); + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java index 89d8e00f29e..9eb3cb5a093 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/scaling/AutoScalingData.java @@ -42,7 +42,6 @@ public class AutoScalingData return nodeIds; } - @JsonProperty public List getNodes() { return nodes; From 688e5e7417941a7438f05f6ebe2010f75aadce7e Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 18:29:47 -0500 Subject: [PATCH 09/14] 1) Serialization of Tasks is important --- .../merger/common/task/AbstractTask.java | 28 ++++++++ .../common/task/VersionConverterTask.java | 62 ++++++++++++++++- .../common/task/VersionConverterTaskTest.java | 66 +++++++++++++++++++ 3 files changed, 153 insertions(+), 3 deletions(-) create mode 100644 merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 518fb04ab37..502c9838de2 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -111,4 +111,32 @@ public abstract class AbstractTask implements Task { return TaskStatus.success(getId()); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AbstractTask that = (AbstractTask) o; + + if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) { + return false; + } + if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != null) { + return false; + } + if (id != null ? !id.equals(that.id) : that.id != null) { + return false; + } + if (interval != null ? !interval.equals(that.interval) : that.interval != null) { + return false; + } + + return true; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index cebebd218cd..db5234dce5d 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -53,14 +54,37 @@ public class VersionConverterTask extends AbstractTask private static final Logger log = new Logger(VersionConverterTask.class); private final DataSegment segment; - public VersionConverterTask( + public static VersionConverterTask create(String dataSource, Interval interval) + { + final String id = makeId(dataSource, interval); + return new VersionConverterTask(id, id, dataSource, interval, null); + } + + public static VersionConverterTask create(DataSegment segment) + { + final Interval interval = segment.getInterval(); + final String dataSource = segment.getDataSource(); + final String id = makeId(dataSource, interval); + return new VersionConverterTask(id, id, dataSource, interval, segment); + } + + private static String makeId(String dataSource, Interval interval) + { + return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()); + } + + @JsonCreator + private VersionConverterTask( + @JsonProperty("id") String id, + @JsonProperty("groupId") String groupId, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("segment") DataSegment segment ) { super( - joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), new DateTime()), + id, + groupId, dataSource, interval ); @@ -74,6 +98,12 @@ public class VersionConverterTask extends AbstractTask return TYPE; } + @JsonProperty + public DataSegment getSegment() + { + return segment; + } + @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { @@ -121,11 +151,31 @@ public class VersionConverterTask extends AbstractTask return TaskStatus.success(getId()); } + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + VersionConverterTask that = (VersionConverterTask) o; + + if (segment != null ? !segment.equals(that.segment) : that.segment != null) { + return false; + } + + return super.equals(o); + } + public static class SubTask extends AbstractTask { private final DataSegment segment; - protected SubTask( + @JsonCreator + public SubTask( @JsonProperty("groupId") String groupId, @JsonProperty("segment") DataSegment segment ) @@ -145,6 +195,12 @@ public class VersionConverterTask extends AbstractTask this.segment = segment; } + @JsonProperty + public DataSegment getSegment() + { + return segment; + } + @Override public String getType() { diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java new file mode 100644 index 00000000000..8beeae6d411 --- /dev/null +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/VersionConverterTaskTest.java @@ -0,0 +1,66 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.merger.common.task; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.metamx.druid.client.DataSegment; +import com.metamx.druid.jackson.DefaultObjectMapper; +import com.metamx.druid.shard.NoneShardSpec; +import junit.framework.Assert; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +/** + */ +public class VersionConverterTaskTest +{ + @Test + public void testSerializationSimple() throws Exception + { + final String dataSource = "billy"; + final Interval interval = new Interval(new DateTime().minus(1000), new DateTime()); + + DefaultObjectMapper jsonMapper = new DefaultObjectMapper(); + + VersionConverterTask task = VersionConverterTask.create(dataSource, interval); + + Task task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); + Assert.assertEquals(task, task2); + + DataSegment segment = new DataSegment( + dataSource, + interval, + new DateTime().toString(), + ImmutableMap.of(), + ImmutableList.of(), + ImmutableList.of(), + new NoneShardSpec(), + 9, + 102937 + ); + + task = VersionConverterTask.create(segment); + + task2 = jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task), Task.class); + Assert.assertEquals(task, task2); + } +} From b7960e202d1940d6b42628c0e41ff1a58f6895c0 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 18:31:55 -0500 Subject: [PATCH 10/14] [maven-release-plugin] prepare release druid-0.3.21 --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index bef5ae94e48..a0ffb6ae511 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/common/pom.xml b/common/pom.xml index cbaa95f3f35..59ff0b9dacd 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/druid-services/pom.xml b/druid-services/pom.xml index 610dee6a6d8..af4a14265fb 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.21-SNAPSHOT + 0.3.21 com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/examples/pom.xml b/examples/pom.xml index 68ff0659504..83246575dbe 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index eee585b036f..e4880f5c898 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index 160d2e882ea..ac71730123f 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/index-common/pom.xml b/index-common/pom.xml index 3f4905b3b93..d1cc7863fb3 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/indexer/pom.xml b/indexer/pom.xml index 4c2b714f84d..968d409c47f 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/merger/pom.xml b/merger/pom.xml index c4ae6ce1b73..c40eab857bd 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/pom.xml b/pom.xml index 67e03a2d585..5f353ceb054 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.21-SNAPSHOT + 0.3.21 druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index a21b6d26a22..0ee0b190ac4 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 diff --git a/server/pom.xml b/server/pom.xml index 8c1ddddad0e..0b9232b0195 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21-SNAPSHOT + 0.3.21 From 2bd34f145461785431f61d2b6233f670459d84b8 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 18:32:00 -0500 Subject: [PATCH 11/14] [maven-release-plugin] prepare for next development iteration --- client/pom.xml | 2 +- common/pom.xml | 2 +- druid-services/pom.xml | 4 ++-- examples/pom.xml | 2 +- examples/rand/pom.xml | 2 +- examples/twitter/pom.xml | 2 +- index-common/pom.xml | 2 +- indexer/pom.xml | 2 +- merger/pom.xml | 2 +- pom.xml | 2 +- realtime/pom.xml | 2 +- server/pom.xml | 2 +- 12 files changed, 13 insertions(+), 13 deletions(-) diff --git a/client/pom.xml b/client/pom.xml index a0ffb6ae511..b232092d027 100644 --- a/client/pom.xml +++ b/client/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/common/pom.xml b/common/pom.xml index 59ff0b9dacd..3111998cb7a 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/druid-services/pom.xml b/druid-services/pom.xml index af4a14265fb..58487a127a4 100644 --- a/druid-services/pom.xml +++ b/druid-services/pom.xml @@ -24,11 +24,11 @@ druid-services druid-services druid-services - 0.3.21 + 0.3.22-SNAPSHOT com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/examples/pom.xml b/examples/pom.xml index 83246575dbe..e3a508b7b1e 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/examples/rand/pom.xml b/examples/rand/pom.xml index e4880f5c898..63439d5f1c9 100644 --- a/examples/rand/pom.xml +++ b/examples/rand/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/examples/twitter/pom.xml b/examples/twitter/pom.xml index ac71730123f..6436a52a21f 100644 --- a/examples/twitter/pom.xml +++ b/examples/twitter/pom.xml @@ -9,7 +9,7 @@ com.metamx druid-examples - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/index-common/pom.xml b/index-common/pom.xml index d1cc7863fb3..d36ea5de375 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/indexer/pom.xml b/indexer/pom.xml index 968d409c47f..17573c6b6f4 100644 --- a/indexer/pom.xml +++ b/indexer/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/merger/pom.xml b/merger/pom.xml index c40eab857bd..a9fe0a84f31 100644 --- a/merger/pom.xml +++ b/merger/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/pom.xml b/pom.xml index 5f353ceb054..c31f8d8d485 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ com.metamx druid pom - 0.3.21 + 0.3.22-SNAPSHOT druid druid diff --git a/realtime/pom.xml b/realtime/pom.xml index 0ee0b190ac4..1d61bc68a8a 100644 --- a/realtime/pom.xml +++ b/realtime/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT diff --git a/server/pom.xml b/server/pom.xml index 0b9232b0195..3e986e60054 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -28,7 +28,7 @@ com.metamx druid - 0.3.21 + 0.3.22-SNAPSHOT From 4c165b4880955df03b7bbcb8f8021b0030aa47b8 Mon Sep 17 00:00:00 2001 From: Eric Tschetter Date: Wed, 13 Mar 2013 19:15:29 -0500 Subject: [PATCH 12/14] 1) Better logging of master doing version checking 2) Exception out when the scv cannot find its indexer --- .../druid/merger/common/actions/RemoteTaskActionClient.java | 5 +++++ .../src/main/java/com/metamx/druid/master/DruidMaster.java | 1 + 2 files changed, 6 insertions(+) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java index 5cebc6ee1ec..4ee65327451 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/actions/RemoteTaskActionClient.java @@ -3,6 +3,7 @@ package com.metamx.druid.merger.common.actions; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Charsets; import com.google.common.base.Throwables; +import com.metamx.common.ISE; import com.metamx.common.logger.Logger; import com.metamx.druid.merger.common.task.Task; import com.metamx.http.client.HttpClient; @@ -63,6 +64,10 @@ public class RemoteTaskActionClient implements TaskActionClient final int port; final String path = "/mmx/merger/v1/action"; + if (instance == null) { + throw new ISE("Cannot find instance of indexer to talk to!"); + } + host = instance.getAddress(); if (instance.getSslPort() != null && instance.getSslPort() > 0) { diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java index 74b9d17d57b..dff4d93d10c 100644 --- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java +++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java @@ -526,6 +526,7 @@ public class DruidMaster final Integer binaryVersion = dataSegment.getBinaryVersion(); if (binaryVersion == null || binaryVersion < IndexIO.CURRENT_VERSION_ID) { + log.info("Upgrading version on segment[%s]", dataSegment.getIdentifier()); indexingServiceClient.upgradeSegment(dataSegment); } } From cf470b1ed474e58be20f10ebc7524e4d64331299 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Mar 2013 18:39:06 -0700 Subject: [PATCH 13/14] Merger: Task serde without relying on jackson private-final-setter magic --- .../merger/common/task/AbstractTask.java | 8 + .../druid/merger/common/task/AppendTask.java | 3 +- .../druid/merger/common/task/DeleteTask.java | 3 +- .../merger/common/task/HadoopIndexTask.java | 12 +- .../task/IndexDeterminePartitionsTask.java | 37 +++- .../common/task/IndexGeneratorTask.java | 28 ++- .../druid/merger/common/task/IndexTask.java | 55 +++++- .../druid/merger/common/task/KillTask.java | 3 +- .../druid/merger/common/task/MergeTask.java | 11 +- .../merger/common/task/MergeTaskBase.java | 8 +- .../common/task/VersionConverterTask.java | 4 + .../com/metamx/druid/merger/TestTask.java | 12 +- .../merger/common/task/MergeTaskBaseTest.java | 2 +- .../merger/common/task/TaskSerdeTest.java | 159 +++++++++++++++++- .../merger/coordinator/TaskLifecycleTest.java | 4 +- 15 files changed, 308 insertions(+), 41 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java index 502c9838de2..917b446e7b1 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AbstractTask.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Joiner; import com.google.common.base.Objects; @@ -33,9 +34,16 @@ public abstract class AbstractTask implements Task { private static final Joiner ID_JOINER = Joiner.on("_"); + @JsonIgnore private final String id; + + @JsonIgnore private final String groupId; + + @JsonIgnore private final String dataSource; + + @JsonIgnore private final Optional interval; protected AbstractTask(String id, String dataSource, Interval interval) diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java index 5d15269677a..b00c1c24399 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/AppendTask.java @@ -48,11 +48,12 @@ public class AppendTask extends MergeTaskBase { @JsonCreator public AppendTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments ) { - super(dataSource, segments); + super(id, dataSource, segments); } @Override diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java index 86fd2a7ec37..5d704b26b3f 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/DeleteTask.java @@ -50,12 +50,13 @@ public class DeleteTask extends AbstractTask @JsonCreator public DeleteTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { super( - String.format( + id != null ? id : String.format( "delete_%s_%s_%s_%s", dataSource, interval.getStart(), diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java index 6e284557529..f3ce30c90cb 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/HadoopIndexTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -41,7 +42,7 @@ import java.util.List; public class HadoopIndexTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final HadoopDruidIndexerConfig config; private static final Logger log = new Logger(HadoopIndexTask.class); @@ -58,11 +59,12 @@ public class HadoopIndexTask extends AbstractTask @JsonCreator public HadoopIndexTask( + @JsonProperty("id") String id, @JsonProperty("config") HadoopDruidIndexerConfig config ) { super( - String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), + id != null ? id : String.format("index_hadoop_%s_%s", config.getDataSource(), new DateTime()), config.getDataSource(), JodaUtils.umbrellaInterval(config.getIntervals()) ); @@ -133,4 +135,10 @@ public class HadoopIndexTask extends AbstractTask } } + + @JsonProperty + public HadoopDruidIndexerConfig getConfig() + { + return config; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java index 47f72b12501..675b1675072 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexDeterminePartitionsTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -48,22 +49,23 @@ import java.util.Set; public class IndexDeterminePartitionsTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final Schema schema; - @JsonProperty + @JsonIgnore private final long targetPartitionSize; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexDeterminePartitionsTask( + @JsonProperty("id") String id, @JsonProperty("groupId") String groupId, @JsonProperty("interval") Interval interval, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @@ -73,7 +75,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask ) { super( - String.format( + id != null ? id : String.format( "%s_partitions_%s_%s", groupId, interval.getStart(), @@ -243,6 +245,7 @@ public class IndexDeterminePartitionsTask extends AbstractTask public Task apply(ShardSpec shardSpec) { return new IndexGeneratorTask( + null, getGroupId(), getImplicitLockInterval().get(), firehoseFactory, @@ -262,4 +265,28 @@ public class IndexDeterminePartitionsTask extends AbstractTask return TaskStatus.success(getId()); } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public Schema getSchema() + { + return schema; + } + + @JsonProperty + public long getTargetPartitionSize() + { + return targetPartitionSize; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java index dd928883232..6eb58ea91c6 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexGeneratorTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -52,19 +53,20 @@ import java.util.concurrent.CopyOnWriteArrayList; public class IndexGeneratorTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final Schema schema; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexGeneratorTask( + @JsonProperty("id") String id, @JsonProperty("groupId") String groupId, @JsonProperty("interval") Interval interval, @JsonProperty("firehose") FirehoseFactory firehoseFactory, @@ -73,7 +75,7 @@ public class IndexGeneratorTask extends AbstractTask ) { super( - String.format( + id != null ? id : String.format( "%s_generator_%s_%s_%s", groupId, interval.getStart(), @@ -216,4 +218,22 @@ public class IndexGeneratorTask extends AbstractTask return schema.getShardSpec().isInChunk(eventDimensions); } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public Schema getSchema() + { + return schema; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java index 35babcd6a22..a86c57d94f5 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/IndexTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -40,28 +41,29 @@ import java.util.List; public class IndexTask extends AbstractTask { - @JsonProperty + @JsonIgnore private final GranularitySpec granularitySpec; - @JsonProperty + @JsonIgnore private final AggregatorFactory[] aggregators; - @JsonProperty + @JsonIgnore private final QueryGranularity indexGranularity; - @JsonProperty + @JsonIgnore private final long targetPartitionSize; - @JsonProperty + @JsonIgnore private final FirehoseFactory firehoseFactory; - @JsonProperty + @JsonIgnore private final int rowFlushBoundary; private static final Logger log = new Logger(IndexTask.class); @JsonCreator public IndexTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("granularitySpec") GranularitySpec granularitySpec, @JsonProperty("aggregators") AggregatorFactory[] aggregators, @@ -73,7 +75,7 @@ public class IndexTask extends AbstractTask { super( // _not_ the version, just something uniqueish - String.format("index_%s_%s", dataSource, new DateTime().toString()), + id != null ? id : String.format("index_%s_%s", dataSource, new DateTime().toString()), dataSource, new Interval( granularitySpec.bucketIntervals().first().getStart(), @@ -98,6 +100,7 @@ public class IndexTask extends AbstractTask // Need to do one pass over the data before indexing in order to determine good partitions retVal.add( new IndexDeterminePartitionsTask( + null, getGroupId(), interval, firehoseFactory, @@ -115,6 +118,7 @@ public class IndexTask extends AbstractTask // Jump straight into indexing retVal.add( new IndexGeneratorTask( + null, getGroupId(), interval, firehoseFactory, @@ -151,4 +155,41 @@ public class IndexTask extends AbstractTask { throw new IllegalStateException("IndexTasks should not be run!"); } + + @JsonProperty + public GranularitySpec getGranularitySpec() + { + return granularitySpec; + } + + @JsonProperty + public AggregatorFactory[] getAggregators() + { + return aggregators; + } + + @JsonProperty + public QueryGranularity getIndexGranularity() + { + return indexGranularity; + } + + @JsonProperty + public long getTargetPartitionSize() + { + return targetPartitionSize; + } + + @JsonProperty + public FirehoseFactory getFirehoseFactory() + { + return firehoseFactory; + } + + @JsonProperty + public int getRowFlushBoundary() + { + return rowFlushBoundary; + } + } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java index f4476ffd858..e26a25fd038 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/KillTask.java @@ -45,12 +45,13 @@ public class KillTask extends AbstractTask @JsonCreator public KillTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) { super( - String.format( + id != null ? id : String.format( "kill_%s_%s_%s_%s", dataSource, interval.getStart(), diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java index 4e6102f666b..9867eec0c4c 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.base.Throwables; @@ -42,16 +43,18 @@ import java.util.Map; */ public class MergeTask extends MergeTaskBase { + @JsonIgnore private final List aggregators; @JsonCreator public MergeTask( + @JsonProperty("id") String id, @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators ) { - super(dataSource, segments); + super(id, dataSource, segments); this.aggregators = aggregators; } @@ -86,4 +89,10 @@ public class MergeTask extends MergeTaskBase { return "merge"; } + + @JsonProperty("aggregations") + public List getAggregators() + { + return aggregators; + } } diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java index 4bda0363941..7402d69e537 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/MergeTaskBase.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.common.task; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.base.Function; @@ -56,15 +57,18 @@ import java.util.Set; */ public abstract class MergeTaskBase extends AbstractTask { + @JsonIgnore private final List segments; private static final EmittingLogger log = new EmittingLogger(MergeTaskBase.class); - protected MergeTaskBase(final String dataSource, final List segments) + protected MergeTaskBase(final String id, final String dataSource, final List segments) { super( // _not_ the version, just something uniqueish - String.format("merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString()), + id != null ? id : String.format( + "merge_%s_%s", computeProcessingID(dataSource, segments), new DateTime().toString() + ), dataSource, computeMergedInterval(segments) ); diff --git a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java index db5234dce5d..c5db8aba959 100644 --- a/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java +++ b/merger/src/main/java/com/metamx/druid/merger/common/task/VersionConverterTask.java @@ -20,6 +20,7 @@ package com.metamx.druid.merger.common.task; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Function; import com.google.common.collect.Lists; @@ -52,6 +53,8 @@ public class VersionConverterTask extends AbstractTask private static final Integer CURR_VERSION_INTEGER = new Integer(IndexIO.CURRENT_VERSION_ID); private static final Logger log = new Logger(VersionConverterTask.class); + + @JsonIgnore private final DataSegment segment; public static VersionConverterTask create(String dataSource, Interval interval) @@ -172,6 +175,7 @@ public class VersionConverterTask extends AbstractTask public static class SubTask extends AbstractTask { + @JsonIgnore private final DataSegment segment; @JsonCreator diff --git a/merger/src/test/java/com/metamx/druid/merger/TestTask.java b/merger/src/test/java/com/metamx/druid/merger/TestTask.java index d0a77cff447..2aa41dc031f 100644 --- a/merger/src/test/java/com/metamx/druid/merger/TestTask.java +++ b/merger/src/test/java/com/metamx/druid/merger/TestTask.java @@ -35,7 +35,6 @@ import java.util.List; @JsonTypeName("test") public class TestTask extends MergeTask { - private final String id; private final TaskStatus status; @JsonCreator @@ -47,19 +46,10 @@ public class TestTask extends MergeTask @JsonProperty("taskStatus") TaskStatus status ) { - super(dataSource, segments, aggregators); - - this.id = id; + super(id, dataSource, segments, aggregators); this.status = status; } - @Override - @JsonProperty - public String getId() - { - return id; - } - @Override @JsonProperty public String getType() diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java index a2f6e8175fb..e8c6622369a 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/MergeTaskBaseTest.java @@ -43,7 +43,7 @@ public class MergeTaskBaseTest .add(segmentBuilder.interval(new Interval("2012-01-03/2012-01-05")).build()) .build(); - final MergeTaskBase testMergeTaskBase = new MergeTaskBase("foo", segments) + final MergeTaskBase testMergeTaskBase = new MergeTaskBase(null, "foo", segments) { @Override protected File merge(Map segments, File outDir) throws Exception diff --git a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java index 701093209ea..1f1a8c41038 100644 --- a/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/common/task/TaskSerdeTest.java @@ -5,6 +5,7 @@ import com.google.common.collect.ImmutableList; import com.metamx.common.Granularity; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.client.DataSegment; import com.metamx.druid.indexer.HadoopDruidIndexerConfig; @@ -26,6 +27,7 @@ public class TaskSerdeTest public void testIndexTaskSerde() throws Exception { final Task task = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -54,6 +56,7 @@ public class TaskSerdeTest public void testIndexGeneratorTaskSerde() throws Exception { final Task task = new IndexGeneratorTask( + null, "foo", new Interval("2010-01-01/P1D"), null, @@ -68,6 +71,8 @@ public class TaskSerdeTest final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); @@ -80,17 +85,23 @@ public class TaskSerdeTest } @Test - public void testAppendTaskSerde() throws Exception + public void testMergeTaskSerde() throws Exception { - final Task task = new AppendTask( + final Task task = new MergeTask( + null, "foo", ImmutableList.of( DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ), + ImmutableList.of( + new CountAggregatorFactory("cnt") ) ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); Assert.assertEquals("foo", task.getDataSource()); @@ -100,20 +111,131 @@ public class TaskSerdeTest Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(((MergeTask) task).getSegments(), ((MergeTask) task2).getSegments()); + Assert.assertEquals( + ((MergeTask) task).getAggregators().get(0).getName(), + ((MergeTask) task2).getAggregators().get(0).getName() + ); } @Test - public void testDeleteTaskSerde() throws Exception + public void testKillTaskSerde() throws Exception { - final Task task = new DeleteTask( + final Task task = new KillTask( + null, "foo", new Interval("2010-01-01/P1D") ); final ObjectMapper jsonMapper = new DefaultObjectMapper(); final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change final Task task2 = jsonMapper.readValue(json, Task.class); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + } + + @Test + public void testVersionConverterTaskSerde() throws Exception + { + final Task task = VersionConverterTask.create( + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(((VersionConverterTask) task).getSegment(), ((VersionConverterTask) task).getSegment()); + } + + @Test + public void testVersionConverterSubTaskSerde() throws Exception + { + final Task task = new VersionConverterTask.SubTask( + "myGroupId", + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + Assert.assertEquals("myGroupId", task.getGroupId()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals( + ((VersionConverterTask.SubTask) task).getSegment(), + ((VersionConverterTask.SubTask) task).getSegment() + ); + } + + @Test + public void testDeleteTaskSerde() throws Exception + { + final Task task = new DeleteTask( + null, + "foo", + new Interval("2010-01-01/P1D") + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); + } + + + @Test + public void testDeleteTaskFromJson() throws Exception + { + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final Task task = jsonMapper.readValue( + "{\"type\":\"delete\",\"dataSource\":\"foo\",\"interval\":\"2010-01-01/P1D\"}", + Task.class + ); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertNotNull(task.getId()); + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + Assert.assertEquals(task.getId(), task2.getId()); Assert.assertEquals(task.getGroupId(), task2.getGroupId()); Assert.assertEquals(task.getDataSource(), task2.getDataSource()); @@ -121,10 +243,39 @@ public class TaskSerdeTest Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); } + @Test + public void testAppendTaskSerde() throws Exception + { + final Task task = new AppendTask( + null, + "foo", + ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2010-01-01/P1D")).version("1234").build() + ) + ); + + final ObjectMapper jsonMapper = new DefaultObjectMapper(); + final String json = jsonMapper.writeValueAsString(task); + + Thread.sleep(100); // Just want to run the clock a bit to make sure the task id doesn't change + final Task task2 = jsonMapper.readValue(json, Task.class); + + Assert.assertEquals("foo", task.getDataSource()); + Assert.assertEquals(Optional.of(new Interval("2010-01-01/P1D")), task.getImplicitLockInterval()); + + Assert.assertEquals(task.getId(), task2.getId()); + Assert.assertEquals(task.getGroupId(), task2.getGroupId()); + Assert.assertEquals(task.getDataSource(), task2.getDataSource()); + Assert.assertEquals(task.getImplicitLockInterval(), task2.getImplicitLockInterval()); + Assert.assertEquals(task.getImplicitLockInterval().get(), task2.getImplicitLockInterval().get()); + Assert.assertEquals(((AppendTask) task).getSegments(), ((AppendTask) task2).getSegments()); + } + @Test public void testHadoopIndexTaskSerde() throws Exception { final HadoopIndexTask task = new HadoopIndexTask( + null, new HadoopDruidIndexerConfig( null, "foo", diff --git a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java index c94369726e9..917a264237c 100644 --- a/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java +++ b/merger/src/test/java/com/metamx/druid/merger/coordinator/TaskLifecycleTest.java @@ -184,6 +184,7 @@ public class TaskLifecycleTest public void testIndexTask() throws Exception { final Task indexTask = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P2D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -226,6 +227,7 @@ public class TaskLifecycleTest public void testIndexTaskFailure() throws Exception { final Task indexTask = new IndexTask( + null, "foo", new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))), new AggregatorFactory[]{new DoubleSumAggregatorFactory("met", "met")}, @@ -249,7 +251,7 @@ public class TaskLifecycleTest { // This test doesn't actually do anything right now. We should actually put things into the Mocked coordinator // Such that this test can test things... - final Task killTask = new KillTask("foo", new Interval("2010-01-02/P2D")); + final Task killTask = new KillTask(null, "foo", new Interval("2010-01-02/P2D")); final TaskStatus status = runTask(killTask); Assert.assertEquals("merged statusCode", TaskStatus.Status.SUCCESS, status.getStatusCode()); From a1c823402bb6a2ff8dea893c7a017882e80407ba Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 13 Mar 2013 22:32:12 -0700 Subject: [PATCH 14/14] Merger: Make json exceptions while bootstrapping non-fatal --- .../merger/coordinator/DbTaskStorage.java | 21 ++++----- .../coordinator/HeapMemoryTaskStorage.java | 6 +-- .../druid/merger/coordinator/TaskQueue.java | 43 +++++++++++++------ .../druid/merger/coordinator/TaskStorage.java | 6 +-- 4 files changed, 44 insertions(+), 32 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java index b878885dd4a..98d16d671e9 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/DbTaskStorage.java @@ -36,6 +36,7 @@ import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.task.Task; import com.metamx.druid.merger.coordinator.config.IndexerDbConnectorConfig; +import com.metamx.emitter.EmittingLogger; import org.joda.time.DateTime; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; @@ -52,7 +53,7 @@ public class DbTaskStorage implements TaskStorage private final IndexerDbConnectorConfig dbConnectorConfig; private final DBI dbi; - private static final Logger log = new Logger(DbTaskStorage.class); + private static final EmittingLogger log = new EmittingLogger(DbTaskStorage.class); public DbTaskStorage(ObjectMapper jsonMapper, IndexerDbConnectorConfig dbConnectorConfig, DBI dbi) { @@ -203,18 +204,18 @@ public class DbTaskStorage implements TaskStorage } @Override - public List getRunningTasks() + public List getRunningTaskIds() { return dbi.withHandle( - new HandleCallback>() + new HandleCallback>() { @Override - public List withHandle(Handle handle) throws Exception + public List withHandle(Handle handle) throws Exception { final List> dbTasks = handle.createQuery( String.format( - "SELECT payload FROM %s WHERE status_code = :status_code", + "SELECT id FROM %s WHERE status_code = :status_code", dbConnectorConfig.getTaskTable() ) ) @@ -222,16 +223,12 @@ public class DbTaskStorage implements TaskStorage .list(); return Lists.transform( - dbTasks, new Function, Task>() + dbTasks, new Function, String>() { @Override - public Task apply(Map row) + public String apply(Map row) { - try { - return jsonMapper.readValue(row.get("payload").toString(), Task.class); - } catch(Exception e) { - throw Throwables.propagate(e); - } + return row.get("id").toString(); } } ); diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java index 895804bc7fd..8d372c29000 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/HeapMemoryTaskStorage.java @@ -128,15 +128,15 @@ public class HeapMemoryTaskStorage implements TaskStorage } @Override - public List getRunningTasks() + public List getRunningTaskIds() { giant.lock(); try { - final ImmutableList.Builder listBuilder = ImmutableList.builder(); + final ImmutableList.Builder listBuilder = ImmutableList.builder(); for(final TaskStuff taskStuff : tasks.values()) { if(taskStuff.getStatus().isRunnable()) { - listBuilder.add(taskStuff.getTask()); + listBuilder.add(taskStuff.getTask().getId()); } } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java index e16912b4c6e..6fd61b4a6d8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskQueue.java @@ -19,6 +19,7 @@ package com.metamx.druid.merger.coordinator; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -28,8 +29,8 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Ordering; import com.metamx.common.lifecycle.LifecycleStart; import com.metamx.common.lifecycle.LifecycleStop; -import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.TaskLock; +import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.task.Task; import com.metamx.emitter.EmittingLogger; @@ -89,18 +90,32 @@ public class TaskQueue queue.clear(); taskLockbox.clear(); - // Add running tasks to the queue - final List runningTasks = taskStorage.getRunningTasks(); - - for(final Task task : runningTasks) { - queue.add(task); - } - - // Get all locks, along with which tasks they belong to + // Get all running tasks and their locks final Multimap tasksByLock = ArrayListMultimap.create(); - for(final Task runningTask : runningTasks) { - for(final TaskLock taskLock : taskStorage.getLocks(runningTask.getId())) { - tasksByLock.put(taskLock, runningTask); + + for (final String taskId : taskStorage.getRunningTaskIds()) { + try { + // .get since TaskStorage semantics should mean this task is always found + final Task task = taskStorage.getTask(taskId).get(); + final List taskLocks = taskStorage.getLocks(task.getId()); + + queue.add(task); + + for (final TaskLock taskLock : taskLocks) { + tasksByLock.put(taskLock, task); + } + } + catch (Exception e) { + log.makeAlert("Failed to bootstrap task").addData("task", taskId).emit(); + + // A bit goofy to special-case JsonProcessingException, but we don't want to suppress bootstrap problems on + // any old Exception or even IOException... + if (e instanceof JsonProcessingException || e.getCause() instanceof JsonProcessingException) { + // Mark this task a failure, and continue bootstrapping + taskStorage.setStatus(TaskStatus.failure(taskId)); + } else { + throw Throwables.propagate(e); + } } } @@ -150,7 +165,7 @@ public class TaskQueue } } - log.info("Bootstrapped %,d tasks. Ready to go!", runningTasks.size()); + log.info("Bootstrapped %,d tasks with %,d locks. Ready to go!", queue.size(), tasksByLock.keySet().size()); } finally { giant.unlock(); } @@ -214,7 +229,7 @@ public class TaskQueue // insert the task into our queue. try { taskStorage.insert(task, TaskStatus.running(task.getId())); - } catch(TaskExistsException e) { + } catch (TaskExistsException e) { log.warn("Attempt to add task twice: %s", task.getId()); throw Throwables.propagate(e); } diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java index d6bfbfd889e..ee633efffb9 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/TaskStorage.java @@ -20,9 +20,9 @@ package com.metamx.druid.merger.coordinator; import com.google.common.base.Optional; +import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.actions.TaskAction; -import com.metamx.druid.merger.common.TaskLock; import com.metamx.druid.merger.common.task.Task; import java.util.List; @@ -77,9 +77,9 @@ public interface TaskStorage public List getAuditLogs(String taskid); /** - * Returns a list of currently-running tasks as stored in the storage facility, in no particular order. + * Returns a list of currently-running task IDs as stored in the storage facility, in no particular order. */ - public List getRunningTasks(); + public List getRunningTaskIds(); /** * Returns a list of locks for a particular task.