diff --git a/client/pom.xml b/client/pom.xml
index 6979f20000c..68b5e99d8a4 100644
--- a/client/pom.xml
+++ b/client/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/client/src/main/java/com/metamx/druid/client/DruidServer.java b/client/src/main/java/com/metamx/druid/client/DruidServer.java
index ebd078e9383..e86efdad816 100644
--- a/client/src/main/java/com/metamx/druid/client/DruidServer.java
+++ b/client/src/main/java/com/metamx/druid/client/DruidServer.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -116,7 +117,8 @@ public class DruidServer implements Comparable
@JsonProperty
public Map getSegments()
{
- return ImmutableMap.copyOf(segments);
+ // Copying the map slows things down a lot here, don't use Immutable Map here
+ return Collections.unmodifiableMap(segments);
}
public DataSegment getSegment(String segmentName)
diff --git a/common/pom.xml b/common/pom.xml
index 1964f5658ff..c48c0fe20c8 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/examples/bin/examples/webstream/query.body b/examples/bin/examples/webstream/query.body
new file mode 100644
index 00000000000..f1103b406de
--- /dev/null
+++ b/examples/bin/examples/webstream/query.body
@@ -0,0 +1,12 @@
+{
+ "queryType": "groupBy",
+ "dataSource": "webstream",
+ "granularity": "all",
+ "dimensions": ["country"],
+ "aggregations":[
+ { "type": "count", "name": "rows"},
+ { "type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
+ ],
+ "filter": { "type": "selector", "dimension": "geo_region", "value": "CA" },
+ "intervals":["2012-10-01T00:00/2020-01-01T00"]
+}
diff --git a/examples/bin/examples/webstream/webstream_realtime.spec b/examples/bin/examples/webstream/webstream_realtime.spec
new file mode 100644
index 00000000000..92a0cf42ce9
--- /dev/null
+++ b/examples/bin/examples/webstream/webstream_realtime.spec
@@ -0,0 +1,47 @@
+[{
+ "schema": {
+ "dataSource": "webstream",
+ "aggregators": [
+ {"type": "count", "name": "rows"},
+ {"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
+ ],
+ "indexGranularity": "minute",
+ "shardSpec": {"type": "none"}
+ },
+
+ "config": {
+ "maxRowsInMemory": 50000,
+ "intermediatePersistPeriod": "PT2m"
+ },
+
+ "firehose": {
+ "type": "webstream",
+ "url":"http://developer.usa.gov/1usagov",
+ "renamedDimensions": {
+ "g":"bitly_hash",
+ "c":"country",
+ "a":"user",
+ "cy":"city",
+ "l":"encoding_user_login",
+ "hh":"short_url",
+ "hc":"timestamp_hash",
+ "h":"user_bitly_hash",
+ "u":"url",
+ "tz":"timezone",
+ "t":"time",
+ "r":"referring_url",
+ "gr":"geo_region",
+ "nk":"known_users",
+ "al":"accept_language"
+ },
+ "timeDimension":"t",
+ "timeFormat":"posix"
+ },
+
+ "plumber": {
+ "type": "realtime",
+ "windowPeriod": "PT3m",
+ "segmentGranularity": "hour",
+ "basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
+ }
+}]
diff --git a/examples/bin/run_ec2.sh b/examples/bin/run_ec2.sh
index 4acdde9cc8d..c66965af512 100755
--- a/examples/bin/run_ec2.sh
+++ b/examples/bin/run_ec2.sh
@@ -55,7 +55,7 @@ sleep 60
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
-ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
+ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
echo "Prepared $INSTANCE_ADDRESS for druid."
@@ -70,7 +70,7 @@ fi
# Now boot druid parts
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
-ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
+ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
echo "Druid booting complete!"
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"
diff --git a/examples/pom.xml b/examples/pom.xml
index 9acca31f12d..39e52cf5b51 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -1,6 +1,5 @@
-
+
4.0.0
com.metamx.druid
druid-examples
@@ -10,7 +9,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/indexing-common/pom.xml b/indexing-common/pom.xml
index d09ea33bd64..2249e107f10 100644
--- a/indexing-common/pom.xml
+++ b/indexing-common/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
index 079f46676ca..15fa7c88240 100644
--- a/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
+++ b/indexing-common/src/main/java/com/metamx/druid/common/s3/S3Utils.java
@@ -19,15 +19,14 @@
package com.metamx.druid.common.s3;
+import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger;
-import org.jets3t.service.S3ServiceException;
-import org.jets3t.service.impl.rest.httpclient.RestS3Service;
-import org.jets3t.service.model.S3Bucket;
+import org.jets3t.service.ServiceException;
import org.jets3t.service.model.S3Object;
-import java.io.File;
import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
+import java.util.Random;
+import java.util.concurrent.Callable;
/**
*
@@ -36,37 +35,6 @@ public class S3Utils
{
private static final Logger log = new Logger(S3Utils.class);
- public static void putFileToS3(
- File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
- )
- throws S3ServiceException, IOException, NoSuchAlgorithmException
- {
- S3Object s3Obj = new S3Object(localFile);
- s3Obj.setBucketName(outputS3Bucket);
- s3Obj.setKey(outputS3Path);
-
- log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey());
- s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj);
- }
-
- public static void putFileToS3WrapExceptions(
- File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
- )
- {
- try {
- putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path);
- }
- catch (S3ServiceException e) {
- throw new RuntimeException(e);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
- catch (NoSuchAlgorithmException e) {
- throw new RuntimeException(e);
- }
- }
-
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
@@ -80,4 +48,52 @@ public class S3Utils
}
}
+
+ /**
+ * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
+ * found, etc) are not retried.
+ */
+ public static T retryS3Operation(Callable f) throws ServiceException, InterruptedException
+ {
+ int nTry = 0;
+ final int maxTries = 3;
+ while (true) {
+ try {
+ nTry++;
+ return f.call();
+ }
+ catch (IOException e) {
+ if (nTry <= maxTries) {
+ awaitNextRetry(e, nTry);
+ } else {
+ throw Throwables.propagate(e);
+ }
+ }
+ catch (ServiceException e) {
+ if (nTry <= maxTries &&
+ (e.getCause() instanceof IOException ||
+ (e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
+ awaitNextRetry(e, nTry);
+ } else {
+ throw e;
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
+ {
+ final long baseSleepMillis = 1000;
+ final double fuzziness = 0.2;
+ final long sleepMillis = Math.max(
+ baseSleepMillis,
+ (long) (baseSleepMillis * Math.pow(2, nTry) *
+ (1 + new Random().nextGaussian() * fuzziness))
+ );
+ log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
+ Thread.sleep(sleepMillis);
+ }
}
diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml
index a8b8d429c79..4f130f14fa8 100644
--- a/indexing-hadoop/pom.xml
+++ b/indexing-hadoop/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index 37cb5b9c2e4..d290229eb8e 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index dd042d33c33..9c3cc0f3231 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
com.metamx
druid
pom
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
druid
druid
@@ -65,7 +65,7 @@
com.metamx
http-client
- 0.7.1
+ 0.8.1
com.metamx
diff --git a/realtime/pom.xml b/realtime/pom.xml
index abe217c1af2..9e8c5d96983 100644
--- a/realtime/pom.xml
+++ b/realtime/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/server/pom.xml b/server/pom.xml
index e32b027219a..f5aa2785d8e 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -28,7 +28,7 @@
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
index 011e1633ca1..75d212886da 100644
--- a/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
+++ b/server/src/main/java/com/metamx/druid/loading/S3DataSegmentPuller.java
@@ -19,6 +19,7 @@
package com.metamx.druid.loading;
+import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
@@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils;
-import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
-import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object;
+import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream;
/**
@@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
}
@Override
- public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
+ public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{
- S3Coords s3Coords = new S3Coords(segment);
+ final S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller
throw new ISE("outDir[%s] must be a directory.", outDir);
}
- long startTime = System.currentTimeMillis();
- S3Object s3Obj = null;
-
try {
- s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
+ S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public Void call() throws Exception
+ {
+ long startTime = System.currentTimeMillis();
+ S3Object s3Obj = null;
- InputStream in = null;
- try {
- in = s3Obj.getDataInputStream();
- final String key = s3Obj.getKey();
- if (key.endsWith(".zip")) {
- CompressionUtils.unzip(in, outDir);
- } else if (key.endsWith(".gz")) {
- final File outFile = new File(outDir, toFilename(key, ".gz"));
- ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
- } else {
- ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
- }
- log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
- }
- catch (IOException e) {
- FileUtils.deleteDirectory(outDir);
- throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
- }
- finally {
- Closeables.closeQuietly(in);
- }
+ try {
+ s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
+
+ InputStream in = null;
+ try {
+ in = s3Obj.getDataInputStream();
+ final String key = s3Obj.getKey();
+ if (key.endsWith(".zip")) {
+ CompressionUtils.unzip(in, outDir);
+ } else if (key.endsWith(".gz")) {
+ final File outFile = new File(outDir, toFilename(key, ".gz"));
+ ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
+ } else {
+ ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
+ }
+ log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
+ return null;
+ }
+ catch (IOException e) {
+ FileUtils.deleteDirectory(outDir);
+ throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
+ }
+ finally {
+ Closeables.closeQuietly(in);
+ }
+ }
+ finally {
+ S3Utils.closeStreamsQuietly(s3Obj);
+ }
+ }
+ }
+ );
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
- finally {
- S3Utils.closeStreamsQuietly(s3Obj);
- }
-
}
private String toFilename(String key, final String suffix)
@@ -124,25 +137,49 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return filename;
}
- private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
+ private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{
try {
- return s3Client.isObjectInBucket(coords.bucket, coords.path);
+ return S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public Boolean call() throws Exception
+ {
+ return s3Client.isObjectInBucket(coords.bucket, coords.path);
+ }
+ }
+ );
+ }
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
}
catch (ServiceException e) {
- throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
+ throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
}
@Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException
{
- S3Coords coords = new S3Coords(segment);
+ final S3Coords coords = new S3Coords(segment);
try {
- S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
+ final StorageObject objDetails = S3Utils.retryS3Operation(
+ new Callable()
+ {
+ @Override
+ public StorageObject call() throws Exception
+ {
+ return s3Client.getObjectDetails(coords.bucket, coords.path);
+ }
+ }
+ );
return objDetails.getLastModifiedDate().getTime();
}
- catch (S3ServiceException e) {
+ catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage());
}
}
diff --git a/server/src/main/java/com/metamx/druid/master/DruidMaster.java b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
index c7c39d3fdd3..e0f7b5841ae 100644
--- a/server/src/main/java/com/metamx/druid/master/DruidMaster.java
+++ b/server/src/main/java/com/metamx/druid/master/DruidMaster.java
@@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
+import com.metamx.common.IAE;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors;
@@ -247,144 +248,106 @@ public class DruidMaster
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{
- final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
- if (fromServer == null) {
- throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
- }
+ try {
+ final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
+ if (fromServer == null) {
+ throw new IAE("Unable to find server [%s]", from);
+ }
- final DruidServer toServer = serverInventoryView.getInventoryValue(to);
- if (toServer == null) {
- throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
- }
+ final DruidServer toServer = serverInventoryView.getInventoryValue(to);
+ if (toServer == null) {
+ throw new IAE("Unable to find server [%s]", to);
+ }
- if (to.equalsIgnoreCase(from)) {
- throw new IllegalArgumentException(
- String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to)
+ if (to.equalsIgnoreCase(from)) {
+ throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
+ }
+
+ final DataSegment segment = fromServer.getSegment(segmentName);
+ if (segment == null) {
+ throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
+ }
+
+ final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
+ if (loadPeon == null) {
+ throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
+ }
+
+ final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
+ if (dropPeon == null) {
+ throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
+ }
+
+ final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
+ if (toHolder.getAvailableSize() < segment.getSize()) {
+ throw new IAE(
+ "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
+ to,
+ segment,
+ segment.getSize(),
+ toHolder.getAvailableSize()
+ );
+ }
+
+ final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
+ final String toServedSegPath = ZKPaths.makePath(
+ ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
);
- }
- final DataSegment segment = fromServer.getSegment(segmentName);
- if (segment == null) {
- throw new IllegalArgumentException(
- String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
- );
- }
-
- final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
- if (loadPeon == null) {
- throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
- }
-
- final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
- if (dropPeon == null) {
- throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
- }
-
- final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
- if (toHolder.getAvailableSize() < segment.getSize()) {
- throw new IllegalArgumentException(
- String.format(
- "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
- to,
- segment,
- segment.getSize(),
- toHolder.getAvailableSize()
- )
- );
- }
-
- final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
- final String toServedSegPath = ZKPaths.makePath(
- ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
- );
-
- loadPeon.loadSegment(
- segment,
- new LoadPeonCallback()
- {
- @Override
- protected void execute()
+ loadPeon.loadSegment(
+ segment,
+ new LoadPeonCallback()
{
- try {
- if (curator.checkExists().forPath(toServedSegPath) != null &&
- curator.checkExists().forPath(toLoadQueueSegPath) == null &&
- !dropPeon.getSegmentsToDrop().contains(segment)) {
- dropPeon.dropSegment(segment, callback);
- } else if (callback != null) {
- callback.execute();
+ @Override
+ protected void execute()
+ {
+ try {
+ if (curator.checkExists().forPath(toServedSegPath) != null &&
+ curator.checkExists().forPath(toLoadQueueSegPath) == null &&
+ !dropPeon.getSegmentsToDrop().contains(segment)) {
+ dropPeon.dropSegment(segment, callback);
+ } else if (callback != null) {
+ callback.execute();
+ }
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
}
}
- catch (Exception e) {
- throw Throwables.propagate(e);
- }
}
- }
- );
- }
-
- public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback)
- {
- final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
- if (fromServer == null) {
- throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
- }
-
- final DruidServer toServer = serverInventoryView.getInventoryValue(to);
- if (toServer == null) {
- throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
- }
-
- final DataSegment segment = fromServer.getSegment(segmentName);
- if (segment == null) {
- throw new IllegalArgumentException(
- String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
-
- final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
- if (loadPeon == null) {
- throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
- }
-
- final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
- if (toHolder.getAvailableSize() < segment.getSize()) {
- throw new IllegalArgumentException(
- String.format(
- "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
- to,
- segment,
- segment.getSize(),
- toHolder.getAvailableSize()
- )
- );
- }
-
- if (!loadPeon.getSegmentsToLoad().contains(segment)) {
- loadPeon.loadSegment(segment, callback);
+ catch (Exception e) {
+ log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
+ callback.execute();
}
}
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
{
- final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
- if (fromServer == null) {
- throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
- }
+ try {
+ final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
+ if (fromServer == null) {
+ throw new IAE("Unable to find server [%s]", from);
+ }
- final DataSegment segment = fromServer.getSegment(segmentName);
- if (segment == null) {
- throw new IllegalArgumentException(
- String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
- );
- }
+ final DataSegment segment = fromServer.getSegment(segmentName);
+ if (segment == null) {
+ throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
+ }
- final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
- if (dropPeon == null) {
- throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
- }
+ final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
+ if (dropPeon == null) {
+ throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
+ }
- if (!dropPeon.getSegmentsToDrop().contains(segment)) {
- dropPeon.dropSegment(segment, callback);
+ if (!dropPeon.getSegmentsToDrop().contains(segment)) {
+ dropPeon.dropSegment(segment, callback);
+ }
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
+ callback.execute();
}
}
@@ -543,7 +506,7 @@ public class DruidMaster
}
catch (Exception e) {
log.makeAlert(e, "Unable to become master")
- .emit();
+ .emit();
final LeaderLatch oldLatch = createNewLeaderLatch();
Closeables.closeQuietly(oldLatch);
try {
diff --git a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
index 9677b5028bc..3c6c58593e7 100644
--- a/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
+++ b/server/src/main/java/com/metamx/druid/master/rules/LoadRule.java
@@ -92,6 +92,12 @@ public abstract class LoadRule implements Rule
final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) {
+ boolean replicate = totalReplicants > 0;
+
+ if (replicate && !replicationManager.canAddReplicant(getTier())) {
+ break;
+ }
+
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
if (holder == null) {
@@ -104,15 +110,10 @@ public abstract class LoadRule implements Rule
break;
}
- if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
- if (!replicationManager.canAddReplicant(getTier()) ||
- !replicationManager.registerReplicantCreation(
- getTier(),
- segment.getIdentifier(),
- holder.getServer().getHost()
- )) {
- break;
- }
+ if (replicate && !replicationManager.registerReplicantCreation(
+ getTier(), segment.getIdentifier(), holder.getServer().getHost()
+ )) {
+ break;
}
holder.getPeon().loadSegment(
diff --git a/services/pom.xml b/services/pom.xml
index d01c1f44706..9cc6497f2d2 100644
--- a/services/pom.xml
+++ b/services/pom.xml
@@ -24,11 +24,11 @@
druid-services
druid-services
druid-services
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
com.metamx
druid
- 0.5.6-SNAPSHOT
+ 0.5.12-SNAPSHOT
diff --git a/upload.sh b/upload.sh
old mode 100644
new mode 100755