mirror of https://github.com/apache/druid.git
Merge branch 'master' of github.com:metamx/druid into kafka-protobuf
This commit is contained in:
commit
d3412c851b
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
|
|||
import com.metamx.common.logger.Logger;
|
||||
import com.metamx.druid.coordination.DruidServerMetadata;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -116,7 +117,8 @@ public class DruidServer implements Comparable
|
|||
@JsonProperty
|
||||
public Map<String, DataSegment> getSegments()
|
||||
{
|
||||
return ImmutableMap.copyOf(segments);
|
||||
// Copying the map slows things down a lot here, don't use Immutable Map here
|
||||
return Collections.unmodifiableMap(segments);
|
||||
}
|
||||
|
||||
public DataSegment getSegment(String segmentName)
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
{
|
||||
"queryType": "groupBy",
|
||||
"dataSource": "webstream",
|
||||
"granularity": "all",
|
||||
"dimensions": ["country"],
|
||||
"aggregations":[
|
||||
{ "type": "count", "name": "rows"},
|
||||
{ "type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
|
||||
],
|
||||
"filter": { "type": "selector", "dimension": "geo_region", "value": "CA" },
|
||||
"intervals":["2012-10-01T00:00/2020-01-01T00"]
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
[{
|
||||
"schema": {
|
||||
"dataSource": "webstream",
|
||||
"aggregators": [
|
||||
{"type": "count", "name": "rows"},
|
||||
{"type": "doubleSum", "fieldName": "known_users", "name": "known_users"}
|
||||
],
|
||||
"indexGranularity": "minute",
|
||||
"shardSpec": {"type": "none"}
|
||||
},
|
||||
|
||||
"config": {
|
||||
"maxRowsInMemory": 50000,
|
||||
"intermediatePersistPeriod": "PT2m"
|
||||
},
|
||||
|
||||
"firehose": {
|
||||
"type": "webstream",
|
||||
"url":"http://developer.usa.gov/1usagov",
|
||||
"renamedDimensions": {
|
||||
"g":"bitly_hash",
|
||||
"c":"country",
|
||||
"a":"user",
|
||||
"cy":"city",
|
||||
"l":"encoding_user_login",
|
||||
"hh":"short_url",
|
||||
"hc":"timestamp_hash",
|
||||
"h":"user_bitly_hash",
|
||||
"u":"url",
|
||||
"tz":"timezone",
|
||||
"t":"time",
|
||||
"r":"referring_url",
|
||||
"gr":"geo_region",
|
||||
"nk":"known_users",
|
||||
"al":"accept_language"
|
||||
},
|
||||
"timeDimension":"t",
|
||||
"timeFormat":"posix"
|
||||
},
|
||||
|
||||
"plumber": {
|
||||
"type": "realtime",
|
||||
"windowPeriod": "PT3m",
|
||||
"segmentGranularity": "hour",
|
||||
"basePersistDirectory": "/tmp/example/usagov_realtime/basePersist"
|
||||
}
|
||||
}]
|
|
@ -55,7 +55,7 @@ sleep 60
|
|||
INSTANCE_ADDRESS=`ec2-describe-instances|grep 'INSTANCE'|grep $INSTANCE_ID|cut -f4`
|
||||
echo "Connecting to $INSTANCE_ADDRESS to prepare environment for druid..."
|
||||
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/env.sh ubuntu@${INSTANCE_ADDRESS}:
|
||||
ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
|
||||
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./env.sh;./env.sh'
|
||||
|
||||
echo "Prepared $INSTANCE_ADDRESS for druid."
|
||||
|
||||
|
@ -70,7 +70,7 @@ fi
|
|||
|
||||
# Now boot druid parts
|
||||
scp -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ./ec2/run.sh ubuntu@${INSTANCE_ADDRESS}:
|
||||
ssh -q -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
|
||||
ssh -q -f -i ~/.ssh/druid-keypair -o StrictHostKeyChecking=no ubuntu@${INSTANCE_ADDRESS} 'chmod +x ./run.sh;./run.sh'
|
||||
|
||||
echo "Druid booting complete!"
|
||||
echo "ssh -i ~/.ssh/druid-keypair ubuntu@${INSTANCE_ADDRESS} #to connect"
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<groupId>com.metamx.druid</groupId>
|
||||
<artifactId>druid-examples</artifactId>
|
||||
|
@ -10,7 +9,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,15 +19,14 @@
|
|||
|
||||
package com.metamx.druid.common.s3;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.metamx.common.logger.Logger;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.model.S3Bucket;
|
||||
import org.jets3t.service.ServiceException;
|
||||
import org.jets3t.service.model.S3Object;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -36,37 +35,6 @@ public class S3Utils
|
|||
{
|
||||
private static final Logger log = new Logger(S3Utils.class);
|
||||
|
||||
public static void putFileToS3(
|
||||
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
|
||||
)
|
||||
throws S3ServiceException, IOException, NoSuchAlgorithmException
|
||||
{
|
||||
S3Object s3Obj = new S3Object(localFile);
|
||||
s3Obj.setBucketName(outputS3Bucket);
|
||||
s3Obj.setKey(outputS3Path);
|
||||
|
||||
log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey());
|
||||
s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj);
|
||||
}
|
||||
|
||||
public static void putFileToS3WrapExceptions(
|
||||
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
|
||||
)
|
||||
{
|
||||
try {
|
||||
putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path);
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
catch (NoSuchAlgorithmException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void closeStreamsQuietly(S3Object s3Obj)
|
||||
{
|
||||
if (s3Obj == null) {
|
||||
|
@ -80,4 +48,52 @@ public class S3Utils
|
|||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
|
||||
* found, etc) are not retried.
|
||||
*/
|
||||
public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
|
||||
{
|
||||
int nTry = 0;
|
||||
final int maxTries = 3;
|
||||
while (true) {
|
||||
try {
|
||||
nTry++;
|
||||
return f.call();
|
||||
}
|
||||
catch (IOException e) {
|
||||
if (nTry <= maxTries) {
|
||||
awaitNextRetry(e, nTry);
|
||||
} else {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
if (nTry <= maxTries &&
|
||||
(e.getCause() instanceof IOException ||
|
||||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
|
||||
awaitNextRetry(e, nTry);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
|
||||
{
|
||||
final long baseSleepMillis = 1000;
|
||||
final double fuzziness = 0.2;
|
||||
final long sleepMillis = Math.max(
|
||||
baseSleepMillis,
|
||||
(long) (baseSleepMillis * Math.pow(2, nTry) *
|
||||
(1 + new Random().nextGaussian() * fuzziness))
|
||||
);
|
||||
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
|
||||
Thread.sleep(sleepMillis);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
4
pom.xml
4
pom.xml
|
@ -23,7 +23,7 @@
|
|||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<packaging>pom</packaging>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
<name>druid</name>
|
||||
<description>druid</description>
|
||||
<scm>
|
||||
|
@ -65,7 +65,7 @@
|
|||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>http-client</artifactId>
|
||||
<version>0.7.1</version>
|
||||
<version>0.8.1</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.metamx</groupId>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package com.metamx.druid.loading;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.google.common.io.Files;
|
||||
|
@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
|
|||
import com.metamx.druid.common.s3.S3Utils;
|
||||
import com.metamx.druid.utils.CompressionUtils;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.jets3t.service.S3ServiceException;
|
||||
import org.jets3t.service.ServiceException;
|
||||
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
|
||||
import org.jets3t.service.model.S3Bucket;
|
||||
import org.jets3t.service.model.S3Object;
|
||||
import org.jets3t.service.model.StorageObject;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
/**
|
||||
|
@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
|||
}
|
||||
|
||||
@Override
|
||||
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException
|
||||
public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
|
||||
{
|
||||
S3Coords s3Coords = new S3Coords(segment);
|
||||
final S3Coords s3Coords = new S3Coords(segment);
|
||||
|
||||
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
||||
|
||||
|
@ -80,41 +82,52 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
|||
throw new ISE("outDir[%s] must be a directory.", outDir);
|
||||
}
|
||||
|
||||
long startTime = System.currentTimeMillis();
|
||||
S3Object s3Obj = null;
|
||||
|
||||
try {
|
||||
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path);
|
||||
S3Utils.retryS3Operation(
|
||||
new Callable<Void>()
|
||||
{
|
||||
@Override
|
||||
public Void call() throws Exception
|
||||
{
|
||||
long startTime = System.currentTimeMillis();
|
||||
S3Object s3Obj = null;
|
||||
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = s3Obj.getDataInputStream();
|
||||
final String key = s3Obj.getKey();
|
||||
if (key.endsWith(".zip")) {
|
||||
CompressionUtils.unzip(in, outDir);
|
||||
} else if (key.endsWith(".gz")) {
|
||||
final File outFile = new File(outDir, toFilename(key, ".gz"));
|
||||
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
|
||||
} else {
|
||||
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
|
||||
}
|
||||
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
catch (IOException e) {
|
||||
FileUtils.deleteDirectory(outDir);
|
||||
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj);
|
||||
}
|
||||
finally {
|
||||
Closeables.closeQuietly(in);
|
||||
}
|
||||
try {
|
||||
s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
|
||||
|
||||
InputStream in = null;
|
||||
try {
|
||||
in = s3Obj.getDataInputStream();
|
||||
final String key = s3Obj.getKey();
|
||||
if (key.endsWith(".zip")) {
|
||||
CompressionUtils.unzip(in, outDir);
|
||||
} else if (key.endsWith(".gz")) {
|
||||
final File outFile = new File(outDir, toFilename(key, ".gz"));
|
||||
ByteStreams.copy(new GZIPInputStream(in), Files.newOutputStreamSupplier(outFile));
|
||||
} else {
|
||||
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
|
||||
}
|
||||
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
|
||||
return null;
|
||||
}
|
||||
catch (IOException e) {
|
||||
FileUtils.deleteDirectory(outDir);
|
||||
throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
|
||||
}
|
||||
finally {
|
||||
Closeables.closeQuietly(in);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
S3Utils.closeStreamsQuietly(s3Obj);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
}
|
||||
finally {
|
||||
S3Utils.closeStreamsQuietly(s3Obj);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private String toFilename(String key, final String suffix)
|
||||
|
@ -124,25 +137,49 @@ public class S3DataSegmentPuller implements DataSegmentPuller
|
|||
return filename;
|
||||
}
|
||||
|
||||
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException
|
||||
private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
|
||||
{
|
||||
try {
|
||||
return s3Client.isObjectInBucket(coords.bucket, coords.path);
|
||||
return S3Utils.retryS3Operation(
|
||||
new Callable<Boolean>()
|
||||
{
|
||||
@Override
|
||||
public Boolean call() throws Exception
|
||||
{
|
||||
return s3Client.isObjectInBucket(coords.bucket, coords.path);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
|
||||
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastModified(DataSegment segment) throws SegmentLoadingException
|
||||
{
|
||||
S3Coords coords = new S3Coords(segment);
|
||||
final S3Coords coords = new S3Coords(segment);
|
||||
try {
|
||||
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path);
|
||||
final StorageObject objDetails = S3Utils.retryS3Operation(
|
||||
new Callable<StorageObject>()
|
||||
{
|
||||
@Override
|
||||
public StorageObject call() throws Exception
|
||||
{
|
||||
return s3Client.getObjectDetails(coords.bucket, coords.path);
|
||||
}
|
||||
}
|
||||
);
|
||||
return objDetails.getLastModifiedDate().getTime();
|
||||
}
|
||||
catch (S3ServiceException e) {
|
||||
catch (InterruptedException e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
catch (ServiceException e) {
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.io.Closeables;
|
||||
import com.metamx.common.IAE;
|
||||
import com.metamx.common.Pair;
|
||||
import com.metamx.common.concurrent.ScheduledExecutorFactory;
|
||||
import com.metamx.common.concurrent.ScheduledExecutors;
|
||||
|
@ -247,144 +248,106 @@ public class DruidMaster
|
|||
|
||||
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
|
||||
{
|
||||
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
|
||||
if (fromServer == null) {
|
||||
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
|
||||
}
|
||||
try {
|
||||
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
|
||||
if (fromServer == null) {
|
||||
throw new IAE("Unable to find server [%s]", from);
|
||||
}
|
||||
|
||||
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
|
||||
if (toServer == null) {
|
||||
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
|
||||
}
|
||||
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
|
||||
if (toServer == null) {
|
||||
throw new IAE("Unable to find server [%s]", to);
|
||||
}
|
||||
|
||||
if (to.equalsIgnoreCase(from)) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to)
|
||||
if (to.equalsIgnoreCase(from)) {
|
||||
throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
|
||||
}
|
||||
|
||||
final DataSegment segment = fromServer.getSegment(segmentName);
|
||||
if (segment == null) {
|
||||
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
|
||||
}
|
||||
|
||||
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
|
||||
if (loadPeon == null) {
|
||||
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
|
||||
}
|
||||
|
||||
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
|
||||
if (dropPeon == null) {
|
||||
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
|
||||
}
|
||||
|
||||
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
|
||||
if (toHolder.getAvailableSize() < segment.getSize()) {
|
||||
throw new IAE(
|
||||
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
|
||||
to,
|
||||
segment,
|
||||
segment.getSize(),
|
||||
toHolder.getAvailableSize()
|
||||
);
|
||||
}
|
||||
|
||||
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
|
||||
final String toServedSegPath = ZKPaths.makePath(
|
||||
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
|
||||
);
|
||||
}
|
||||
|
||||
final DataSegment segment = fromServer.getSegment(segmentName);
|
||||
if (segment == null) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
|
||||
);
|
||||
}
|
||||
|
||||
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
|
||||
if (loadPeon == null) {
|
||||
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
|
||||
}
|
||||
|
||||
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
|
||||
if (dropPeon == null) {
|
||||
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
|
||||
}
|
||||
|
||||
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
|
||||
if (toHolder.getAvailableSize() < segment.getSize()) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
|
||||
to,
|
||||
segment,
|
||||
segment.getSize(),
|
||||
toHolder.getAvailableSize()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
final String toLoadQueueSegPath = ZKPaths.makePath(ZKPaths.makePath(zkPaths.getLoadQueuePath(), to), segmentName);
|
||||
final String toServedSegPath = ZKPaths.makePath(
|
||||
ZKPaths.makePath(zkPaths.getServedSegmentsPath(), to), segmentName
|
||||
);
|
||||
|
||||
loadPeon.loadSegment(
|
||||
segment,
|
||||
new LoadPeonCallback()
|
||||
{
|
||||
@Override
|
||||
protected void execute()
|
||||
loadPeon.loadSegment(
|
||||
segment,
|
||||
new LoadPeonCallback()
|
||||
{
|
||||
try {
|
||||
if (curator.checkExists().forPath(toServedSegPath) != null &&
|
||||
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
|
||||
!dropPeon.getSegmentsToDrop().contains(segment)) {
|
||||
dropPeon.dropSegment(segment, callback);
|
||||
} else if (callback != null) {
|
||||
callback.execute();
|
||||
@Override
|
||||
protected void execute()
|
||||
{
|
||||
try {
|
||||
if (curator.checkExists().forPath(toServedSegPath) != null &&
|
||||
curator.checkExists().forPath(toLoadQueueSegPath) == null &&
|
||||
!dropPeon.getSegmentsToDrop().contains(segment)) {
|
||||
dropPeon.dropSegment(segment, callback);
|
||||
} else if (callback != null) {
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback)
|
||||
{
|
||||
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
|
||||
if (fromServer == null) {
|
||||
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
|
||||
}
|
||||
|
||||
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
|
||||
if (toServer == null) {
|
||||
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
|
||||
}
|
||||
|
||||
final DataSegment segment = fromServer.getSegment(segmentName);
|
||||
if (segment == null) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
|
||||
);
|
||||
}
|
||||
|
||||
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
|
||||
if (loadPeon == null) {
|
||||
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
|
||||
}
|
||||
|
||||
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
|
||||
if (toHolder.getAvailableSize() < segment.getSize()) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format(
|
||||
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
|
||||
to,
|
||||
segment,
|
||||
segment.getSize(),
|
||||
toHolder.getAvailableSize()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (!loadPeon.getSegmentsToLoad().contains(segment)) {
|
||||
loadPeon.loadSegment(segment, callback);
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
|
||||
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
|
||||
{
|
||||
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
|
||||
if (fromServer == null) {
|
||||
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
|
||||
}
|
||||
try {
|
||||
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
|
||||
if (fromServer == null) {
|
||||
throw new IAE("Unable to find server [%s]", from);
|
||||
}
|
||||
|
||||
final DataSegment segment = fromServer.getSegment(segmentName);
|
||||
if (segment == null) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
|
||||
);
|
||||
}
|
||||
final DataSegment segment = fromServer.getSegment(segmentName);
|
||||
if (segment == null) {
|
||||
throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
|
||||
}
|
||||
|
||||
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
|
||||
if (dropPeon == null) {
|
||||
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from));
|
||||
}
|
||||
final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
|
||||
if (dropPeon == null) {
|
||||
throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
|
||||
}
|
||||
|
||||
if (!dropPeon.getSegmentsToDrop().contains(segment)) {
|
||||
dropPeon.dropSegment(segment, callback);
|
||||
if (!dropPeon.getSegmentsToDrop().contains(segment)) {
|
||||
dropPeon.dropSegment(segment, callback);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
|
||||
callback.execute();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -543,7 +506,7 @@ public class DruidMaster
|
|||
}
|
||||
catch (Exception e) {
|
||||
log.makeAlert(e, "Unable to become master")
|
||||
.emit();
|
||||
.emit();
|
||||
final LeaderLatch oldLatch = createNewLeaderLatch();
|
||||
Closeables.closeQuietly(oldLatch);
|
||||
try {
|
||||
|
|
|
@ -92,6 +92,12 @@ public abstract class LoadRule implements Rule
|
|||
final MasterStats stats = new MasterStats();
|
||||
|
||||
while (totalReplicants < expectedReplicants) {
|
||||
boolean replicate = totalReplicants > 0;
|
||||
|
||||
if (replicate && !replicationManager.canAddReplicant(getTier())) {
|
||||
break;
|
||||
}
|
||||
|
||||
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
|
||||
|
||||
if (holder == null) {
|
||||
|
@ -104,15 +110,10 @@ public abstract class LoadRule implements Rule
|
|||
break;
|
||||
}
|
||||
|
||||
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster
|
||||
if (!replicationManager.canAddReplicant(getTier()) ||
|
||||
!replicationManager.registerReplicantCreation(
|
||||
getTier(),
|
||||
segment.getIdentifier(),
|
||||
holder.getServer().getHost()
|
||||
)) {
|
||||
break;
|
||||
}
|
||||
if (replicate && !replicationManager.registerReplicantCreation(
|
||||
getTier(), segment.getIdentifier(), holder.getServer().getHost()
|
||||
)) {
|
||||
break;
|
||||
}
|
||||
|
||||
holder.getPeon().loadSegment(
|
||||
|
|
|
@ -24,11 +24,11 @@
|
|||
<artifactId>druid-services</artifactId>
|
||||
<name>druid-services</name>
|
||||
<description>druid-services</description>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
<parent>
|
||||
<groupId>com.metamx</groupId>
|
||||
<artifactId>druid</artifactId>
|
||||
<version>0.5.6-SNAPSHOT</version>
|
||||
<version>0.5.12-SNAPSHOT</version>
|
||||
</parent>
|
||||
|
||||
<dependencies>
|
||||
|
|
Loading…
Reference in New Issue