Merge branch 'master' of github.com:metamx/druid

This commit is contained in:
cheddar 2013-07-18 11:37:11 -07:00
commit 861542cc3c
15 changed files with 237 additions and 219 deletions

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import com.metamx.druid.coordination.DruidServerMetadata; import com.metamx.druid.coordination.DruidServerMetadata;
import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -116,7 +117,8 @@ public class DruidServer implements Comparable
@JsonProperty @JsonProperty
public Map<String, DataSegment> getSegments() public Map<String, DataSegment> getSegments()
{ {
return ImmutableMap.copyOf(segments); // Copying the map slows things down a lot here, don't use Immutable Map here
return Collections.unmodifiableMap(segments);
} }
public DataSegment getSegment(String segmentName) public DataSegment getSegment(String segmentName)

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId> <groupId>com.metamx.druid</groupId>
<artifactId>druid-examples</artifactId> <artifactId>druid-examples</artifactId>
@ -10,7 +9,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -19,15 +19,14 @@
package com.metamx.druid.common.s3; package com.metamx.druid.common.s3;
import com.google.common.base.Throwables;
import com.metamx.common.logger.Logger; import com.metamx.common.logger.Logger;
import org.jets3t.service.S3ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.security.NoSuchAlgorithmException; import java.util.Random;
import java.util.concurrent.Callable;
/** /**
* *
@ -36,37 +35,6 @@ public class S3Utils
{ {
private static final Logger log = new Logger(S3Utils.class); private static final Logger log = new Logger(S3Utils.class);
public static void putFileToS3(
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
)
throws S3ServiceException, IOException, NoSuchAlgorithmException
{
S3Object s3Obj = new S3Object(localFile);
s3Obj.setBucketName(outputS3Bucket);
s3Obj.setKey(outputS3Path);
log.info("Uploading file[%s] to [s3://%s/%s]", localFile, s3Obj.getBucketName(), s3Obj.getKey());
s3Client.putObject(new S3Bucket(outputS3Bucket), s3Obj);
}
public static void putFileToS3WrapExceptions(
File localFile, RestS3Service s3Client, String outputS3Bucket, String outputS3Path
)
{
try {
putFileToS3(localFile, s3Client, outputS3Bucket, outputS3Path);
}
catch (S3ServiceException e) {
throw new RuntimeException(e);
}
catch (IOException e) {
throw new RuntimeException(e);
}
catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
public static void closeStreamsQuietly(S3Object s3Obj) public static void closeStreamsQuietly(S3Object s3Obj)
{ {
if (s3Obj == null) { if (s3Obj == null) {
@ -80,4 +48,52 @@ public class S3Utils
} }
} }
/**
* Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not
* found, etc) are not retried.
*/
public static <T> T retryS3Operation(Callable<T> f) throws ServiceException, InterruptedException
{
int nTry = 0;
final int maxTries = 3;
while (true) {
try {
nTry++;
return f.call();
}
catch (IOException e) {
if (nTry <= maxTries) {
awaitNextRetry(e, nTry);
} else {
throw Throwables.propagate(e);
}
}
catch (ServiceException e) {
if (nTry <= maxTries &&
(e.getCause() instanceof IOException ||
(e.getErrorCode() != null && e.getErrorCode().equals("RequestTimeout")))) {
awaitNextRetry(e, nTry);
} else {
throw e;
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
private static void awaitNextRetry(Exception e, int nTry) throws InterruptedException
{
final long baseSleepMillis = 1000;
final double fuzziness = 0.2;
final long sleepMillis = Math.max(
baseSleepMillis,
(long) (baseSleepMillis * Math.pow(2, nTry) *
(1 + new Random().nextGaussian() * fuzziness))
);
log.info(e, "S3 fail on try %d, retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
} }

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -23,7 +23,7 @@
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<packaging>pom</packaging> <packaging>pom</packaging>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
<name>druid</name> <name>druid</name>
<description>druid</description> <description>druid</description>
<scm> <scm>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -28,7 +28,7 @@
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>

View File

@ -19,6 +19,7 @@
package com.metamx.druid.loading; package com.metamx.druid.loading;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams; import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.google.common.io.Files; import com.google.common.io.Files;
@ -30,16 +31,17 @@ import com.metamx.druid.client.DataSegment;
import com.metamx.druid.common.s3.S3Utils; import com.metamx.druid.common.s3.S3Utils;
import com.metamx.druid.utils.CompressionUtils; import com.metamx.druid.utils.CompressionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException; import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Bucket;
import org.jets3t.service.model.S3Object; import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.zip.GZIPInputStream; import java.util.zip.GZIPInputStream;
/** /**
@ -62,9 +64,9 @@ public class S3DataSegmentPuller implements DataSegmentPuller
} }
@Override @Override
public void getSegmentFiles(DataSegment segment, File outDir) throws SegmentLoadingException public void getSegmentFiles(final DataSegment segment, final File outDir) throws SegmentLoadingException
{ {
S3Coords s3Coords = new S3Coords(segment); final S3Coords s3Coords = new S3Coords(segment);
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
@ -80,11 +82,18 @@ public class S3DataSegmentPuller implements DataSegmentPuller
throw new ISE("outDir[%s] must be a directory.", outDir); throw new ISE("outDir[%s] must be a directory.", outDir);
} }
try {
S3Utils.retryS3Operation(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
S3Object s3Obj = null; S3Object s3Obj = null;
try { try {
s3Obj = s3Client.getObject(new S3Bucket(s3Coords.bucket), s3Coords.path); s3Obj = s3Client.getObject(s3Coords.bucket, s3Coords.path);
InputStream in = null; InputStream in = null;
try { try {
@ -99,22 +108,26 @@ public class S3DataSegmentPuller implements DataSegmentPuller
ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, "")))); ByteStreams.copy(in, Files.newOutputStreamSupplier(new File(outDir, toFilename(key, ""))));
} }
log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime); log.info("Pull of file[%s] completed in %,d millis", s3Obj, System.currentTimeMillis() - startTime);
return null;
} }
catch (IOException e) { catch (IOException e) {
FileUtils.deleteDirectory(outDir); FileUtils.deleteDirectory(outDir);
throw new SegmentLoadingException(e, "Problem decompressing object[%s]", s3Obj); throw new IOException(String.format("Problem decompressing object[%s]", s3Obj), e);
} }
finally { finally {
Closeables.closeQuietly(in); Closeables.closeQuietly(in);
} }
} }
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
finally { finally {
S3Utils.closeStreamsQuietly(s3Obj); S3Utils.closeStreamsQuietly(s3Obj);
} }
}
}
);
}
catch (Exception e) {
throw new SegmentLoadingException(e, e.getMessage());
}
} }
private String toFilename(String key, final String suffix) private String toFilename(String key, final String suffix)
@ -124,11 +137,23 @@ public class S3DataSegmentPuller implements DataSegmentPuller
return filename; return filename;
} }
private boolean isObjectInBucket(S3Coords coords) throws SegmentLoadingException private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException
{ {
try { try {
return S3Utils.retryS3Operation(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return s3Client.isObjectInBucket(coords.bucket, coords.path); return s3Client.isObjectInBucket(coords.bucket, coords.path);
} }
}
);
}
catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ServiceException e) { catch (ServiceException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
} }
@ -137,12 +162,24 @@ public class S3DataSegmentPuller implements DataSegmentPuller
@Override @Override
public long getLastModified(DataSegment segment) throws SegmentLoadingException public long getLastModified(DataSegment segment) throws SegmentLoadingException
{ {
S3Coords coords = new S3Coords(segment); final S3Coords coords = new S3Coords(segment);
try { try {
S3Object objDetails = s3Client.getObjectDetails(new S3Bucket(coords.bucket), coords.path); final StorageObject objDetails = S3Utils.retryS3Operation(
new Callable<StorageObject>()
{
@Override
public StorageObject call() throws Exception
{
return s3Client.getObjectDetails(coords.bucket, coords.path);
}
}
);
return objDetails.getLastModifiedDate().getTime(); return objDetails.getLastModifiedDate().getTime();
} }
catch (S3ServiceException e) { catch (InterruptedException e) {
throw Throwables.propagate(e);
}
catch (ServiceException e) {
throw new SegmentLoadingException(e, e.getMessage()); throw new SegmentLoadingException(e, e.getMessage());
} }
} }

View File

@ -28,6 +28,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.google.common.io.Closeables; import com.google.common.io.Closeables;
import com.metamx.common.IAE;
import com.metamx.common.Pair; import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutorFactory; import com.metamx.common.concurrent.ScheduledExecutorFactory;
import com.metamx.common.concurrent.ScheduledExecutors; import com.metamx.common.concurrent.ScheduledExecutors;
@ -247,49 +248,44 @@ public class DruidMaster
public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback) public void moveSegment(String from, String to, String segmentName, final LoadPeonCallback callback)
{ {
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from); final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) { if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); throw new IAE("Unable to find server [%s]", from);
} }
final DruidServer toServer = serverInventoryView.getInventoryValue(to); final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) { if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to)); throw new IAE("Unable to find server [%s]", to);
} }
if (to.equalsIgnoreCase(from)) { if (to.equalsIgnoreCase(from)) {
throw new IllegalArgumentException( throw new IAE("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to);
String.format("Redundant command to move segment [%s] from [%s] to [%s]", segmentName, from, to)
);
} }
final DataSegment segment = fromServer.getSegment(segmentName); final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) { if (segment == null) {
throw new IllegalArgumentException( throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
} }
final LoadQueuePeon loadPeon = loadManagementPeons.get(to); final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) { if (loadPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to)); throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", to);
} }
final LoadQueuePeon dropPeon = loadManagementPeons.get(from); final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) { if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from)); throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
} }
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon); final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) { if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException( throw new IAE(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.", "Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to, to,
segment, segment,
segment.getSize(), segment.getSize(),
toHolder.getAvailableSize() toHolder.getAvailableSize()
)
); );
} }
@ -321,72 +317,39 @@ public class DruidMaster
} }
); );
} }
catch (Exception e) {
public void cloneSegment(String from, String to, String segmentName, LoadPeonCallback callback) log.makeAlert(e, "Exception moving segment %s", segmentName).emit();
{ callback.execute();
final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from));
}
final DruidServer toServer = serverInventoryView.getInventoryValue(to);
if (toServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", to));
}
final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) {
throw new IllegalArgumentException(
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
}
final LoadQueuePeon loadPeon = loadManagementPeons.get(to);
if (loadPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", to));
}
final ServerHolder toHolder = new ServerHolder(toServer, loadPeon);
if (toHolder.getAvailableSize() < segment.getSize()) {
throw new IllegalArgumentException(
String.format(
"Not enough capacity on server [%s] for segment [%s]. Required: %,d, available: %,d.",
to,
segment,
segment.getSize(),
toHolder.getAvailableSize()
)
);
}
if (!loadPeon.getSegmentsToLoad().contains(segment)) {
loadPeon.loadSegment(segment, callback);
} }
} }
public void dropSegment(String from, String segmentName, final LoadPeonCallback callback) public void dropSegment(String from, String segmentName, final LoadPeonCallback callback)
{ {
try {
final DruidServer fromServer = serverInventoryView.getInventoryValue(from); final DruidServer fromServer = serverInventoryView.getInventoryValue(from);
if (fromServer == null) { if (fromServer == null) {
throw new IllegalArgumentException(String.format("Unable to find server [%s]", from)); throw new IAE("Unable to find server [%s]", from);
} }
final DataSegment segment = fromServer.getSegment(segmentName); final DataSegment segment = fromServer.getSegment(segmentName);
if (segment == null) { if (segment == null) {
throw new IllegalArgumentException( throw new IAE("Unable to find segment [%s] on server [%s]", segmentName, from);
String.format("Unable to find segment [%s] on server [%s]", segmentName, from)
);
} }
final LoadQueuePeon dropPeon = loadManagementPeons.get(from); final LoadQueuePeon dropPeon = loadManagementPeons.get(from);
if (dropPeon == null) { if (dropPeon == null) {
throw new IllegalArgumentException(String.format("LoadQueuePeon hasn't been created yet for path [%s]", from)); throw new IAE("LoadQueuePeon hasn't been created yet for path [%s]", from);
} }
if (!dropPeon.getSegmentsToDrop().contains(segment)) { if (!dropPeon.getSegmentsToDrop().contains(segment)) {
dropPeon.dropSegment(segment, callback); dropPeon.dropSegment(segment, callback);
} }
} }
catch (Exception e) {
log.makeAlert(e, "Exception dropping segment %s", segmentName).emit();
callback.execute();
}
}
public Set<DataSegment> getAvailableDataSegments() public Set<DataSegment> getAvailableDataSegments()
{ {

View File

@ -92,6 +92,12 @@ public abstract class LoadRule implements Rule
final MasterStats stats = new MasterStats(); final MasterStats stats = new MasterStats();
while (totalReplicants < expectedReplicants) { while (totalReplicants < expectedReplicants) {
boolean replicate = totalReplicants > 0;
if (replicate && !replicationManager.canAddReplicant(getTier())) {
break;
}
final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList); final ServerHolder holder = analyzer.findNewSegmentHomeAssign(segment, serverHolderList);
if (holder == null) { if (holder == null) {
@ -104,16 +110,11 @@ public abstract class LoadRule implements Rule
break; break;
} }
if (totalReplicants > 0) { // don't throttle if there's only 1 copy of this segment in the cluster if (replicate && !replicationManager.registerReplicantCreation(
if (!replicationManager.canAddReplicant(getTier()) || getTier(), segment.getIdentifier(), holder.getServer().getHost()
!replicationManager.registerReplicantCreation(
getTier(),
segment.getIdentifier(),
holder.getServer().getHost()
)) { )) {
break; break;
} }
}
holder.getPeon().loadSegment( holder.getPeon().loadSegment(
segment, segment,

View File

@ -24,11 +24,11 @@
<artifactId>druid-services</artifactId> <artifactId>druid-services</artifactId>
<name>druid-services</name> <name>druid-services</name>
<description>druid-services</description> <description>druid-services</description>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
<parent> <parent>
<groupId>com.metamx</groupId> <groupId>com.metamx</groupId>
<artifactId>druid</artifactId> <artifactId>druid</artifactId>
<version>0.5.6-SNAPSHOT</version> <version>0.5.12-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>