fixed demos

This commit is contained in:
Adrian Cole 2010-05-24 22:44:43 -07:00
parent a5abb14d20
commit 6a7b9a0a86
7 changed files with 120 additions and 82 deletions

View File

@ -21,4 +21,4 @@
# 1. execute 'mvn install' to build the sample
# 2. invoke the jar, passing your aws credentials and the bucket you wish to create
# ex.
# java -jar target/jclouds-speedtest-sqs-jar-with-dependencies.jar accesskey secretkey queueName messageCount
# java -jar target/jclouds-speedtest-sqs-jar-with-dependencies.jar $AWS_USER $AWS_PWD testqueue 1000

View File

@ -18,21 +18,24 @@
*/
package org.jclouds.aws.sqs;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.aws.sqs.options.ListQueuesOptions.Builder.queuePrefix;
import static org.jclouds.concurrent.ConcurrentUtils.awaitCompletion;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jclouds.aws.domain.Region;
import org.jclouds.aws.sqs.domain.Queue;
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.ConsoleLogger;
import org.jclouds.logging.Logger;
import org.jclouds.logging.config.NullLoggingModule;
import org.jclouds.rest.RestContext;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
@ -45,9 +48,26 @@ import com.google.common.util.concurrent.ListenableFuture;
* @author Adrian Cole
*/
public class SpeedTest {
private static final ImmutableSet<String> REGIONS = ImmutableSet.of(Region.EU_WEST_1,
Region.US_EAST_1, Region.US_WEST_1, Region.AP_SOUTHEAST_1);
public static final int PARAMETERS = 4;
public static final String INVALID_SYNTAX = "Invalid number of parameters. Syntax is: \"accesskeyid\" \"secretkey\" \"queueName\" \"messageCount\" ";
public static int PARAMETERS = 4;
public static String INVALID_SYNTAX = "Invalid number of parameters. Syntax is: \"accesskeyid\" \"secretkey\" \"queueName\" \"messageCount\" ";
private static final Logger logger = Logger.CONSOLE;
private static final Logger traceLogger = new ConsoleLogger() {
@Override
public boolean isTraceEnabled() {
return true;
}
@Override
public void trace(String message, Object... args) {
super.info(message, args);
}
};
public static void main(String[] args) throws InterruptedException {
@ -69,7 +89,7 @@ public class SpeedTest {
try {
Set<Queue> queues = Sets.newHashSet();
if (purgeQueues(queueName, context)) {
System.out.printf("pausing 60 seconds before recreating queues%n");
logger.info("pausing 60 seconds before recreating queues");
Thread.sleep(60 * 1000);
}
createQueues(queueName, context, queues);
@ -83,6 +103,21 @@ public class SpeedTest {
}
private static class QueueMessage {
final Queue queue;
final String message;
QueueMessage(Queue queue, String message) {
this.queue = queue;
this.message = message;
}
@Override
public String toString() {
return "[queue=" + queue + ", message=" + message + "]";
}
}
private static void runTests(int messageCount, String contextName,
RestContext<SQSAsyncClient, SQSClient> context, Set<Queue> queues)
throws InterruptedException {
@ -90,48 +125,34 @@ public class SpeedTest {
long timeOut = messageCount * 200; // minimum rate should be at least 5/second
for (Queue queue : queues) {
int complete = 0;
int errors = 0;
long start = System.currentTimeMillis();
logger.info("context: %s, region: %s, queueName: %s", contextName, queue.getRegion(),
queue.getName());
// fire off all the messages for the test
Set<ListenableFuture<byte[]>> responses = Sets.newHashSet();
Map<QueueMessage, ListenableFuture<byte[]>> responses = Maps.newHashMap();
for (int i = 0; i < messageCount; i++) {
responses.add(context.getAsyncApi().sendMessage(queue, message));
responses.put(new QueueMessage(queue, message), context.getAsyncApi().sendMessage(
queue, message));
}
do {
Set<ListenableFuture<byte[]>> retries = Sets.newHashSet();
for (ListenableFuture<byte[]> response : responses) {
try {
response.get(100, TimeUnit.MILLISECONDS);
complete++;
} catch (ExecutionException e) {
System.err.println(e.getMessage());
errors++;
} catch (TimeoutException e) {
retries.add(response);
}
}
responses = Sets.newHashSet(retries);
} while (responses.size() > 0 && System.currentTimeMillis() < start + timeOut);
long duration = System.currentTimeMillis() - start;
if (duration > timeOut)
System.out.printf("TIMEOUT: context: %s, region: %s, rate: %f messages/second%n",
contextName, queue.getRegion(), ((double) complete) / (duration / 1000.0));
else
System.out.printf("COMPLETE: context: %s, region: %s, rate: %f messages/second%n",
contextName, queue.getRegion(), ((double) complete) / (duration / 1000.0));
Map<QueueMessage, Exception> exceptions = awaitCompletion(responses, sameThreadExecutor(),
timeOut, traceLogger, String.format("context: %s, region: %s", contextName, queue
.getRegion()));
if (exceptions.size() > 0)
logger.error("problems in context: %s, region: %s: %s", contextName, queue.getRegion(),
exceptions);
System.gc();
System.out.println("pausing 5 seconds before the next run");
logger.info("pausing 5 seconds before the next run");
Thread.sleep(5000);// let the network quiet down
}
}
private static void createQueues(String queueName,
RestContext<SQSAsyncClient, SQSClient> nullLoggingDefaultContext, Set<Queue> queues) {
for (String region : ImmutableSet.of(Region.EU_WEST_1, Region.US_EAST_1, Region.US_WEST_1)) {
System.out.printf("creating queue: %s in region %s%n", queueName, region);
for (String region : REGIONS) {
logger.info("creating queue: %s in region %s", queueName, region);
queues.add(nullLoggingDefaultContext.getApi().createQueueInRegion(region, queueName));
}
}
@ -139,13 +160,13 @@ public class SpeedTest {
private static boolean purgeQueues(String queueName,
RestContext<SQSAsyncClient, SQSClient> nullLoggingDefaultContext) {
boolean deleted = false;
for (String region : ImmutableSet.of(Region.EU_WEST_1, Region.US_EAST_1, Region.US_WEST_1)) {
for (String region : REGIONS) {
try {
SortedSet<Queue> result = Sets.newTreeSet(nullLoggingDefaultContext.getApi()
.listQueuesInRegion(region, queuePrefix(queueName)));
if (result.size() >= 1) {
nullLoggingDefaultContext.getApi().deleteQueue(result.last());
System.out.printf("deleted queue: %s in region %s%n", queueName, region);
logger.info("deleted queue: %s in region %s", queueName, region);
deleted = true;
}
} catch (Exception e) {

View File

@ -43,7 +43,7 @@ public class ConsoleLogger implements Logger {
*/
@Override
public void error(String message, Object... args) {
System.err.printf("ERROR: %s%n", message, args);
System.err.printf(String.format("ERROR: %s%n", message), args);
}
/**
@ -51,8 +51,8 @@ public class ConsoleLogger implements Logger {
*/
@Override
public void error(Throwable throwable, String message, Object... args) {
System.err
.printf("ERROR: %s%n%s", message, args, Throwables.getStackTraceAsString(throwable));
System.err.printf(String.format("ERROR: %s%n%s", message, Throwables
.getStackTraceAsString(throwable)), args);
}
/**
@ -68,7 +68,7 @@ public class ConsoleLogger implements Logger {
*/
@Override
public void info(String message, Object... args) {
System.err.printf("INFO: %s%n", message, args);
System.err.printf(String.format("INFO: %s%n", message), args);
}
/**
@ -128,7 +128,7 @@ public class ConsoleLogger implements Logger {
*/
@Override
public void warn(String message, Object... args) {
System.err.printf("WARN: %s%n", message, args);
System.err.printf(String.format("WARN: %s%n", message), args);
}
/**
@ -136,7 +136,8 @@ public class ConsoleLogger implements Logger {
*/
@Override
public void warn(Throwable throwable, String message, Object... args) {
System.err.printf("WARN: %s%n%s", message, args, Throwables.getStackTraceAsString(throwable));
System.err.printf(String.format("WARN: %s%n%s", message, Throwables
.getStackTraceAsString(throwable)), args);
}
}

View File

@ -140,7 +140,7 @@ public class TweetStoreLiveTest {
for (BlobStoreContext context : contexts) {
System.err.printf("creating container %s at %s%n", container, context
.getProviderSpecificContext().getEndPoint());
context.getBlobStore().createContainerInLocation("default", container);
context.getBlobStore().createContainerInLocation(null, container);
}
if (deleted) {
System.err.println("sleeping 5 seconds to allow containers to create");

View File

@ -141,7 +141,7 @@ public class TweetStoreLiveTest {
for (BlobStoreContext context : contexts) {
System.err.printf("creating container %s at %s%n", container, context
.getProviderSpecificContext().getEndPoint());
context.getBlobStore().createContainerInLocation("default", container);
context.getBlobStore().createContainerInLocation(null, container);
}
if (deleted) {
System.err.println("sleeping 5 seconds to allow containers to create");

View File

@ -21,4 +21,4 @@
# 1. execute 'mvn install' to build the sample
# 2. invoke the jar, passing your azure credentials and the bucket you wish to create
# ex.
# java -jar target/jclouds-speedtest-azurequeue-jar-with-dependencies.jar accountName encodedKey queueName messageCount
# java -jar target/jclouds-demo-speedtest-azurequeue-jar-with-dependencies.jar $AZURE_USER $AZURE_PWD testqueue 1000

View File

@ -18,11 +18,11 @@
*/
package org.jclouds.azure.azurequeue;
import java.util.Set;
import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
import static org.jclouds.concurrent.ConcurrentUtils.awaitCompletion;
import java.util.Map;
import java.util.SortedSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jclouds.azure.storage.options.ListOptions;
import org.jclouds.azure.storage.queue.AzureQueueAsyncClient;
@ -30,9 +30,12 @@ import org.jclouds.azure.storage.queue.AzureQueueClient;
import org.jclouds.azure.storage.queue.AzureQueueContextFactory;
import org.jclouds.azure.storage.queue.domain.QueueMetadata;
import org.jclouds.enterprise.config.EnterpriseConfigurationModule;
import org.jclouds.logging.ConsoleLogger;
import org.jclouds.logging.Logger;
import org.jclouds.logging.config.NullLoggingModule;
import org.jclouds.rest.RestContext;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
@ -48,6 +51,21 @@ public class SpeedTest {
public static int PARAMETERS = 4;
public static String INVALID_SYNTAX = "Invalid number of parameters. Syntax is: \"account\" \"encodedKey\" \"queueName\" \"messageCount\" ";
private static final Logger logger = Logger.CONSOLE;
private static final Logger traceLogger = new ConsoleLogger() {
@Override
public boolean isTraceEnabled() {
return true;
}
@Override
public void trace(String message, Object... args) {
super.info(message, args);
}
};
public static void main(String[] args) throws InterruptedException {
@ -69,7 +87,7 @@ public class SpeedTest {
try {
if (purgeQueues(queueName, context)) {
System.out.printf("pausing 60 seconds before recreating queues%n");
logger.info("pausing 60 seconds before recreating queues");
Thread.sleep(60 * 1000);
}
createQueue(queueName, context);
@ -83,52 +101,50 @@ public class SpeedTest {
}
private static class QueueMessage {
final String queue;
final String message;
QueueMessage(String queue, String message) {
this.queue = queue;
this.message = message;
}
@Override
public String toString() {
return "[queue=" + queue + ", message=" + message + "]";
}
}
private static void runTests(String queueName, int messageCount, String contextName,
RestContext<AzureQueueAsyncClient, AzureQueueClient> context)
throws InterruptedException {
String message = "1";
long timeOut = messageCount * 200; // minimum rate should be at least 5/second
int complete = 0;
int errors = 0;
long start = System.currentTimeMillis();
logger.info("context: %s, queueName: %s", contextName, queueName);
// fire off all the messages for the test
Set<ListenableFuture<Void>> responses = Sets.newHashSet();
Map<QueueMessage, ListenableFuture<Void>> responses = Maps.newHashMap();
for (int i = 0; i < messageCount; i++) {
responses.add(context.getAsyncApi().putMessage(queueName, message));
responses.put(new QueueMessage(queueName, message), context.getAsyncApi().putMessage(
queueName, message));
}
do {
Set<ListenableFuture<Void>> retries = Sets.newHashSet();
for (ListenableFuture<Void> response : responses) {
try {
response.get(100, TimeUnit.MILLISECONDS);
complete++;
} catch (ExecutionException e) {
System.err.println(e.getMessage());
errors++;
} catch (TimeoutException e) {
retries.add(response);
}
}
responses = Sets.newHashSet(retries);
} while (responses.size() > 0 && System.currentTimeMillis() < start + timeOut);
long duration = System.currentTimeMillis() - start;
if (duration > timeOut)
System.out.printf("TIMEOUT: context: %s, rate: %f messages/second%n", contextName,
((double) complete) / (duration / 1000.0));
else
System.out.printf("COMPLETE: context: %s, rate: %f messages/second%n", contextName,
((double) complete) / (duration / 1000.0));
Map<QueueMessage, Exception> exceptions = awaitCompletion(responses, sameThreadExecutor(),
timeOut, traceLogger, String.format("context: %s", contextName));
if (exceptions.size() > 0)
logger.error("problems in context: %s: %s", contextName, exceptions);
System.gc();
System.out.println("pausing 5 seconds before the next run");
logger.info("pausing 5 seconds before the next run");
Thread.sleep(5000);// let the network quiet down
}
private static void createQueue(String queueName,
RestContext<AzureQueueAsyncClient, AzureQueueClient> nullLoggingDefaultContext) {
System.out.printf("creating queue: %s%n", queueName);
logger.info("creating queue: %s", queueName);
nullLoggingDefaultContext.getApi().createQueue(queueName);
}