diff --git a/demos/gae-tweetstore/pom.xml b/demos/gae-tweetstore/pom.xml index c2cbc1a943..b3c5ba82bc 100755 --- a/demos/gae-tweetstore/pom.xml +++ b/demos/gae-tweetstore/pom.xml @@ -59,6 +59,11 @@ jclouds-twitter ${project.version} + + ${project.groupId} + jclouds-emcsaas + ${project.version} + ${project.groupId} jclouds-s3 @@ -133,6 +138,12 @@ provided + + com.google.appengine + appengine-api-labs + 1.2.5 + + com.google.appengine appengine-tools-api @@ -245,6 +256,14 @@ jclouds.rackspace.key ${jclouds.rackspace.key} + + jclouds.emcsaas.uid + ${jclouds.emcsaas.uid} + + + jclouds.emcsaas.key + ${jclouds.emcsaas.key} + jclouds.aws.accesskeyid ${jclouds.aws.accesskeyid} diff --git a/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/config/GuiceServletConfig.java b/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/config/GuiceServletConfig.java index d065134fa3..8d7e7659a2 100755 --- a/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/config/GuiceServletConfig.java +++ b/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/config/GuiceServletConfig.java @@ -23,6 +23,7 @@ */ package org.jclouds.demo.tweetstore.config; +import static com.google.appengine.api.labs.taskqueue.TaskOptions.Builder.url; import static com.google.common.base.Preconditions.checkNotNull; import static org.jclouds.blobstore.reference.BlobStoreConstants.PROPERTY_BLOBSTORE_CONTEXTBUILDERS; import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER; @@ -44,6 +45,9 @@ import org.jclouds.gae.config.GaeHttpCommandExecutorServiceModule; import org.jclouds.twitter.TwitterClient; import org.jclouds.twitter.TwitterContextFactory; +import com.google.appengine.api.labs.taskqueue.Queue; +import com.google.appengine.api.labs.taskqueue.QueueFactory; +import com.google.appengine.api.labs.taskqueue.TaskOptions.Method; import com.google.appengine.repackaged.com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.inject.Guice; @@ -68,6 +72,8 @@ public class GuiceServletConfig extends GuiceServletContextListener { @Override public void contextInitialized(ServletContextEvent servletContextEvent) { Properties props = loadJCloudsProperties(servletContextEvent); + Queue queue = QueueFactory.getQueue("twitter"); + container = checkNotNull(props.getProperty(PROPERTY_TWEETSTORE_CONTAINER), PROPERTY_TWEETSTORE_CONTAINER); ImmutableList list = ImmutableList. of(checkNotNull( @@ -84,6 +90,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { try { builderClass = (Class>) Class.forName(className); name = builderClass.getSimpleName().replaceAll("BlobStoreContextBuilder", ""); + queue.add(url("/store/do").header("context", name).method(Method.GET)); constructor = builderClass.getConstructor(Properties.class); context = constructor.newInstance(props).withModules( new GaeHttpCommandExecutorServiceModule()).buildContext(); @@ -119,7 +126,7 @@ public class GuiceServletConfig extends GuiceServletContextListener { }).toInstance(GuiceServletConfig.this.contexts); bind(TwitterClient.class).toInstance(client); bindConstant().annotatedWith(Jsr330.named(PROPERTY_TWEETSTORE_CONTAINER)).to(container); - serve("/cron/*").with(StoreTweetsController.class); + serve("/store/*").with(StoreTweetsController.class); serve("/tweets/*").with(AddTweetsController.class); requestInjection(this); } diff --git a/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/controller/StoreTweetsController.java b/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/controller/StoreTweetsController.java index 348b2d2069..b6e97cefa6 100644 --- a/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/controller/StoreTweetsController.java +++ b/demos/gae-tweetstore/src/main/java/org/jclouds/demo/tweetstore/controller/StoreTweetsController.java @@ -23,9 +23,10 @@ */ package org.jclouds.demo.tweetstore.controller; +import static com.google.common.base.Preconditions.checkNotNull; + import java.io.IOException; import java.util.Map; -import java.util.Set; import java.util.SortedSet; import java.util.concurrent.TimeUnit; @@ -49,8 +50,6 @@ import org.jclouds.twitter.domain.Status; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; /** * Grab tweets related to me and store them into blobstores @@ -81,35 +80,34 @@ public class StoreTweetsController extends HttpServlet { /** The serialVersionUID */ private static final long serialVersionUID = 7215420527854203714L; - private final Set maps; + private final Map> contexts; private final TwitterClient client; + private final String container; @Resource protected Logger logger = Logger.NULL; @Inject + @VisibleForTesting StoreTweetsController(Map> contexts, @Named(TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER) final String container, TwitterClient client) { - this(Sets.newHashSet(Iterables.transform(contexts.values(), - new Function, BlobMap>() { - public BlobMap apply(BlobStoreContext from) { - return from.createBlobMap(container); - } - })), client); - } - - @VisibleForTesting - StoreTweetsController(Set maps, TwitterClient client) { - this.maps = maps; + this.container = container; + this.contexts = contexts; this.client = client; } @VisibleForTesting - void addMyTweets(SortedSet allAboutMe) { - for (BlobMap map : maps) { - for (Status status : allAboutMe) { + void addMyTweets(String contextName, SortedSet allAboutMe) { + BlobStoreContext context = checkNotNull(contexts.get(contextName), "no context for " + + contextName + " in " + contexts.keySet()); + BlobMap map = context.createBlobMap(container); + for (Status status : allAboutMe) { + try { map.put(status.getId() + "", new StatusToBlob(map).apply(status)); + } catch (Exception e) { + logger.error(e, "Error storing tweet %s on map %s/%s", status.getId(), context, + container); } } } @@ -117,11 +115,13 @@ public class StoreTweetsController extends HttpServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { - if (request.getHeader("X-AppEngine-Cron") != null - && request.getHeader("X-AppEngine-Cron").equals("true")) { + if (request.getHeader("X-AppEngine-QueueName") != null + && request.getHeader("X-AppEngine-QueueName").equals("twitter")) { try { + String contextName = checkNotNull(request.getHeader("context"), + "missing header context"); logger.info("retrieving tweets"); - addMyTweets(client.getMyMentions().get(1, TimeUnit.SECONDS)); + addMyTweets(contextName, client.getMyMentions().get(1, TimeUnit.SECONDS)); logger.debug("done storing tweets"); response.setContentType(MediaType.TEXT_PLAIN); response.getWriter().println("Done!"); @@ -133,5 +133,4 @@ public class StoreTweetsController extends HttpServlet { response.sendError(401); } } - } \ No newline at end of file diff --git a/demos/gae-tweetstore/src/main/webapp/WEB-INF/cron.xml b/demos/gae-tweetstore/src/main/webapp/WEB-INF/cron.xml deleted file mode 100644 index 18f866aa34..0000000000 --- a/demos/gae-tweetstore/src/main/webapp/WEB-INF/cron.xml +++ /dev/null @@ -1,9 +0,0 @@ - - - - /cron/do - store twitter messages into cache stores - - every 2 minutes - - \ No newline at end of file diff --git a/demos/gae-tweetstore/src/main/webapp/WEB-INF/queue.xml b/demos/gae-tweetstore/src/main/webapp/WEB-INF/queue.xml new file mode 100644 index 0000000000..0b815a2f5a --- /dev/null +++ b/demos/gae-tweetstore/src/main/webapp/WEB-INF/queue.xml @@ -0,0 +1,7 @@ + + + + twitter + 1/m + + diff --git a/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/controller/StoreTweetsControllerTest.java b/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/controller/StoreTweetsControllerTest.java index 3e6bfcf3fb..29ca35df4e 100644 --- a/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/controller/StoreTweetsControllerTest.java +++ b/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/controller/StoreTweetsControllerTest.java @@ -5,13 +5,13 @@ import static org.testng.Assert.assertEquals; import java.io.IOException; import java.io.InputStream; -import java.util.Set; +import java.util.Map; import java.util.SortedSet; +import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import org.apache.commons.io.IOUtils; import org.jclouds.blobstore.BlobMap; -import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; import org.jclouds.blobstore.integration.StubBlobStoreContextBuilder; @@ -21,7 +21,7 @@ import org.jclouds.twitter.domain.Status; import org.jclouds.twitter.domain.User; import org.testng.annotations.Test; -import com.google.common.collect.ImmutableSet; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; /** @@ -32,21 +32,25 @@ import com.google.common.collect.Sets; @Test(groups = "unit", testName = "tweetstore.StoreTweetsControllerTest") public class StoreTweetsControllerTest { - TwitterClient createTwitterClient() { return createMock(TwitterClient.class); } - Set createMaps() throws InterruptedException, ExecutionException { - BlobStoreContext context = new StubBlobStoreContextBuilder().buildContext(); - context.getBlobStore().createContainer("test1").get(); - context.getBlobStore().createContainer("test2").get(); - return ImmutableSet.of(context.createBlobMap("test1"), context.createBlobMap("test2")); + Map> createBlobStores() throws InterruptedException, + ExecutionException { + Map> contexts = ImmutableMap.> of( + "test1", new StubBlobStoreContextBuilder().buildContext(), "test2", + new StubBlobStoreContextBuilder().buildContext()); + for (BlobStoreContext blobstore : contexts.values()) { + blobstore.getBlobStore().createContainer("favo").get(); + } + return contexts; } public void testStoreTweets() throws IOException, InterruptedException, ExecutionException { - Set maps = createMaps(); - StoreTweetsController function = new StoreTweetsController(maps, createTwitterClient()); + Map> stores = createBlobStores(); + StoreTweetsController function = new StoreTweetsController(stores, "favo", + createTwitterClient()); SortedSet allAboutMe = Sets.newTreeSet(); User frank = new User(); @@ -66,16 +70,18 @@ public class StoreTweetsControllerTest { allAboutMe.add(frankStatus); allAboutMe.add(jimmyStatus); - function.addMyTweets(allAboutMe); + function.addMyTweets("test1", allAboutMe); + function.addMyTweets("test2", allAboutMe); - for (BlobMap map : maps) { + for (Entry> entry : stores.entrySet()) { + BlobMap map = entry.getValue().createBlobMap("favo"); Blob frankBlob = map.get("1"); assertEquals(frankBlob.getMetadata().getName(), "1"); assertEquals(frankBlob.getMetadata().getUserMetadata() .get(TweetStoreConstants.SENDER_NAME), "frank"); assertEquals(frankBlob.getMetadata().getContentType(), "text/plain"); assertEquals(IOUtils.toString((InputStream) frankBlob.getData()), "I love beans!"); - + Blob jimmyBlob = map.get("2"); assertEquals(jimmyBlob.getMetadata().getName(), "2"); assertEquals(jimmyBlob.getMetadata().getUserMetadata() diff --git a/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/integration/TweetStoreLiveTest.java b/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/integration/TweetStoreLiveTest.java index 6363dbc467..b86b370801 100755 --- a/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/integration/TweetStoreLiveTest.java +++ b/demos/gae-tweetstore/src/test/java/org/jclouds/demo/tweetstore/integration/TweetStoreLiveTest.java @@ -24,6 +24,8 @@ package org.jclouds.demo.tweetstore.integration; import static com.google.common.base.Preconditions.checkNotNull; +import static org.jclouds.atmosonline.saas.reference.AtmosStorageConstants.PROPERTY_EMCSAAS_KEY; +import static org.jclouds.atmosonline.saas.reference.AtmosStorageConstants.PROPERTY_EMCSAAS_UID; import static org.jclouds.aws.reference.AWSConstants.PROPERTY_AWS_ACCESSKEYID; import static org.jclouds.aws.reference.AWSConstants.PROPERTY_AWS_SECRETACCESSKEY; import static org.jclouds.azure.storage.reference.AzureStorageConstants.PROPERTY_AZURESTORAGE_ACCOUNT; @@ -45,6 +47,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.io.IOUtils; +import org.jclouds.atmosonline.saas.AtmosStoragePropertiesBuilder; +import org.jclouds.atmosonline.saas.blobstore.AtmosBlobStoreContextBuilder; +import org.jclouds.atmosonline.saas.blobstore.AtmosBlobStoreContextFactory; import org.jclouds.aws.s3.S3PropertiesBuilder; import org.jclouds.aws.s3.blobstore.S3BlobStoreContextBuilder; import org.jclouds.aws.s3.blobstore.S3BlobStoreContextFactory; @@ -52,6 +57,7 @@ import org.jclouds.azure.storage.blob.AzureBlobPropertiesBuilder; import org.jclouds.azure.storage.blob.blobstore.AzureBlobStoreContextBuilder; import org.jclouds.azure.storage.blob.blobstore.AzureBlobStoreContextFactory; import org.jclouds.blobstore.BlobStoreContext; +import org.jclouds.logging.log4j.config.Log4JLoggingModule; import org.jclouds.rackspace.cloudfiles.CloudFilesPropertiesBuilder; import org.jclouds.rackspace.cloudfiles.blobstore.CloudFilesBlobStoreContextBuilder; import org.jclouds.rackspace.cloudfiles.blobstore.CloudFilesBlobStoreContextFactory; @@ -85,14 +91,20 @@ public class TweetStoreLiveTest { Properties props = new Properties(); props.setProperty(PROPERTY_TWEETSTORE_CONTAINER, checkNotNull(System .getProperty(PROPERTY_TWEETSTORE_CONTAINER))); - props.setProperty(PROPERTY_BLOBSTORE_CONTEXTBUILDERS, String.format("%s,%s,%s", + props.setProperty(PROPERTY_BLOBSTORE_CONTEXTBUILDERS, String.format( + "%s,%s,%s,%s", // WATCH THIS.. when adding a new context, you must update the string S3BlobStoreContextBuilder.class.getName(), CloudFilesBlobStoreContextBuilder.class - .getName(), AzureBlobStoreContextBuilder.class.getName())); + .getName(), AzureBlobStoreContextBuilder.class.getName(), + AtmosBlobStoreContextBuilder.class.getName())); props = new TwitterPropertiesBuilder(props).withCredentials( checkNotNull(System.getProperty(PROPERTY_TWITTER_USER), PROPERTY_TWITTER_USER), System.getProperty(PROPERTY_TWITTER_PASSWORD, PROPERTY_TWITTER_PASSWORD)).build(); + props = new AtmosStoragePropertiesBuilder(props).withCredentials( + checkNotNull(System.getProperty(PROPERTY_EMCSAAS_UID), PROPERTY_EMCSAAS_UID), + System.getProperty(PROPERTY_EMCSAAS_KEY, PROPERTY_EMCSAAS_KEY)).build(); + props = new S3PropertiesBuilder(props) .withCredentials( checkNotNull(System.getProperty(PROPERTY_AWS_ACCESSKEYID), @@ -128,7 +140,11 @@ public class TweetStoreLiveTest { BlobStoreContext azContext = AzureBlobStoreContextFactory.createContext(checkNotNull( System.getProperty(PROPERTY_AZURESTORAGE_ACCOUNT), PROPERTY_AZURESTORAGE_ACCOUNT), System.getProperty(PROPERTY_AZURESTORAGE_KEY, PROPERTY_AZURESTORAGE_KEY)); - this.contexts = ImmutableSet.of(s3Context, cfContext, azContext); + + BlobStoreContext emcContext = AtmosBlobStoreContextFactory.createContext(checkNotNull( + System.getProperty(PROPERTY_EMCSAAS_UID), PROPERTY_EMCSAAS_UID), System.getProperty( + PROPERTY_EMCSAAS_KEY, PROPERTY_EMCSAAS_KEY), new Log4JLoggingModule()); + this.contexts = ImmutableSet.of(s3Context, cfContext, azContext, emcContext); boolean deleted = false; for (BlobStoreContext context : contexts) { if (context.getBlobStore().exists(container)) { @@ -145,6 +161,10 @@ public class TweetStoreLiveTest { System.err.printf("creating container %s at %s%n", container, context.getEndPoint()); context.getBlobStore().createContainer(container).get(30, TimeUnit.SECONDS); } + if (deleted) { + System.err.println("sleeping 5 seconds to allow containers to create"); + Thread.sleep(30000); + } } @Test @@ -156,18 +176,28 @@ public class TweetStoreLiveTest { @Test(dependsOnMethods = "shouldPass", expectedExceptions = IOException.class) public void shouldFail() throws InterruptedException, IOException { - new URL(url, "/cron/do").openStream(); + new URL(url, "/store/do").openStream(); } @Test(dependsOnMethods = "shouldFail") - public void testPrimeContainers() throws IOException { - URL gurl = new URL(url, "/cron/do"); - HttpURLConnection connection = (HttpURLConnection) gurl.openConnection(); - connection.addRequestProperty("X-AppEngine-Cron", "true"); - InputStream i = connection.getInputStream(); - String string = IOUtils.toString(i); - assert string.indexOf("Done!") >= 0 : string; + public void testPrimeContainers() throws IOException, InterruptedException { + URL gurl = new URL(url, "/store/do"); + + for (String context : new String[] { "S3", "Azure", "CloudFiles", "Atmos" }) { + System.out.println("storing at context: " + context); + HttpURLConnection connection = (HttpURLConnection) gurl.openConnection(); + connection.addRequestProperty("X-AppEngine-QueueName", "twitter"); + connection.addRequestProperty("context", context); + InputStream i = connection.getInputStream(); + String string = IOUtils.toString(i); + assert string.indexOf("Done!") >= 0 : string; + connection.disconnect(); + } + + System.err.println("sleeping 10 seconds to allow for eventual consistency delay"); + Thread.sleep(10000); for (BlobStoreContext context : contexts) { + assert context.createInputStreamMap(container).size() > 0 : context.getEndPoint(); } } diff --git a/demos/gae-tweetstore/src/test/resources/log4j.xml b/demos/gae-tweetstore/src/test/resources/log4j.xml new file mode 100755 index 0000000000..f48c023c44 --- /dev/null +++ b/demos/gae-tweetstore/src/test/resources/log4j.xml @@ -0,0 +1,119 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +