updated to use task queue and atmos support

git-svn-id: http://jclouds.googlecode.com/svn/trunk@2041 3d8758e0-26b5-11de-8745-db77d3ebf521
This commit is contained in:
adrian.f.cole 2009-11-03 01:54:11 +00:00
parent 197801204e
commit 8673d2e31e
8 changed files with 235 additions and 57 deletions

View File

@ -59,6 +59,11 @@
<artifactId>jclouds-twitter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jclouds-emcsaas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>jclouds-s3</artifactId>
@ -133,6 +138,12 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.appengine</groupId>
<artifactId>appengine-api-labs</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.google.appengine</groupId>
<artifactId>appengine-tools-api</artifactId>
@ -245,6 +256,14 @@
<name>jclouds.rackspace.key</name>
<value>${jclouds.rackspace.key}</value>
</property>
<property>
<name>jclouds.emcsaas.uid</name>
<value>${jclouds.emcsaas.uid}</value>
</property>
<property>
<name>jclouds.emcsaas.key</name>
<value>${jclouds.emcsaas.key}</value>
</property>
<property>
<name>jclouds.aws.accesskeyid</name>
<value>${jclouds.aws.accesskeyid}</value>

View File

@ -23,6 +23,7 @@
*/
package org.jclouds.demo.tweetstore.config;
import static com.google.appengine.api.labs.taskqueue.TaskOptions.Builder.url;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.blobstore.reference.BlobStoreConstants.PROPERTY_BLOBSTORE_CONTEXTBUILDERS;
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
@ -44,6 +45,9 @@ import org.jclouds.gae.config.GaeHttpCommandExecutorServiceModule;
import org.jclouds.twitter.TwitterClient;
import org.jclouds.twitter.TwitterContextFactory;
import com.google.appengine.api.labs.taskqueue.Queue;
import com.google.appengine.api.labs.taskqueue.QueueFactory;
import com.google.appengine.api.labs.taskqueue.TaskOptions.Method;
import com.google.appengine.repackaged.com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.inject.Guice;
@ -68,6 +72,8 @@ public class GuiceServletConfig extends GuiceServletContextListener {
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
Properties props = loadJCloudsProperties(servletContextEvent);
Queue queue = QueueFactory.getQueue("twitter");
container = checkNotNull(props.getProperty(PROPERTY_TWEETSTORE_CONTAINER),
PROPERTY_TWEETSTORE_CONTAINER);
ImmutableList<String> list = ImmutableList.<String> of(checkNotNull(
@ -84,6 +90,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
try {
builderClass = (Class<BlobStoreContextBuilder<?>>) Class.forName(className);
name = builderClass.getSimpleName().replaceAll("BlobStoreContextBuilder", "");
queue.add(url("/store/do").header("context", name).method(Method.GET));
constructor = builderClass.getConstructor(Properties.class);
context = constructor.newInstance(props).withModules(
new GaeHttpCommandExecutorServiceModule()).buildContext();
@ -119,7 +126,7 @@ public class GuiceServletConfig extends GuiceServletContextListener {
}).toInstance(GuiceServletConfig.this.contexts);
bind(TwitterClient.class).toInstance(client);
bindConstant().annotatedWith(Jsr330.named(PROPERTY_TWEETSTORE_CONTAINER)).to(container);
serve("/cron/*").with(StoreTweetsController.class);
serve("/store/*").with(StoreTweetsController.class);
serve("/tweets/*").with(AddTweetsController.class);
requestInjection(this);
}

View File

@ -23,9 +23,10 @@
*/
package org.jclouds.demo.tweetstore.controller;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.TimeUnit;
@ -49,8 +50,6 @@ import org.jclouds.twitter.domain.Status;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
/**
* Grab tweets related to me and store them into blobstores
@ -81,35 +80,34 @@ public class StoreTweetsController extends HttpServlet {
/** The serialVersionUID */
private static final long serialVersionUID = 7215420527854203714L;
private final Set<BlobMap> maps;
private final Map<String, BlobStoreContext<?>> contexts;
private final TwitterClient client;
private final String container;
@Resource
protected Logger logger = Logger.NULL;
@Inject
@VisibleForTesting
StoreTweetsController(Map<String, BlobStoreContext<?>> contexts,
@Named(TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER) final String container,
TwitterClient client) {
this(Sets.newHashSet(Iterables.transform(contexts.values(),
new Function<BlobStoreContext<?>, BlobMap>() {
public BlobMap apply(BlobStoreContext<?> from) {
return from.createBlobMap(container);
}
})), client);
}
@VisibleForTesting
StoreTweetsController(Set<BlobMap> maps, TwitterClient client) {
this.maps = maps;
this.container = container;
this.contexts = contexts;
this.client = client;
}
@VisibleForTesting
void addMyTweets(SortedSet<Status> allAboutMe) {
for (BlobMap map : maps) {
void addMyTweets(String contextName, SortedSet<Status> allAboutMe) {
BlobStoreContext<?> context = checkNotNull(contexts.get(contextName), "no context for "
+ contextName + " in " + contexts.keySet());
BlobMap map = context.createBlobMap(container);
for (Status status : allAboutMe) {
try {
map.put(status.getId() + "", new StatusToBlob(map).apply(status));
} catch (Exception e) {
logger.error(e, "Error storing tweet %s on map %s/%s", status.getId(), context,
container);
}
}
}
@ -117,11 +115,13 @@ public class StoreTweetsController extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
if (request.getHeader("X-AppEngine-Cron") != null
&& request.getHeader("X-AppEngine-Cron").equals("true")) {
if (request.getHeader("X-AppEngine-QueueName") != null
&& request.getHeader("X-AppEngine-QueueName").equals("twitter")) {
try {
String contextName = checkNotNull(request.getHeader("context"),
"missing header context");
logger.info("retrieving tweets");
addMyTweets(client.getMyMentions().get(1, TimeUnit.SECONDS));
addMyTweets(contextName, client.getMyMentions().get(1, TimeUnit.SECONDS));
logger.debug("done storing tweets");
response.setContentType(MediaType.TEXT_PLAIN);
response.getWriter().println("Done!");
@ -133,5 +133,4 @@ public class StoreTweetsController extends HttpServlet {
response.sendError(401);
}
}
}

View File

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<cronentries>
<cron>
<url>/cron/do</url>
<description>store twitter messages into cache stores
</description>
<schedule>every 2 minutes</schedule>
</cron>
</cronentries>

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<queue-entries>
<queue>
<name>twitter</name>
<rate>1/m</rate>
</queue>
</queue-entries>

View File

@ -5,13 +5,13 @@ import static org.testng.Assert.assertEquals;
import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.Map;
import java.util.SortedSet;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.IOUtils;
import org.jclouds.blobstore.BlobMap;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.integration.StubBlobStoreContextBuilder;
@ -21,7 +21,7 @@ import org.jclouds.twitter.domain.Status;
import org.jclouds.twitter.domain.User;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
/**
@ -32,21 +32,25 @@ import com.google.common.collect.Sets;
@Test(groups = "unit", testName = "tweetstore.StoreTweetsControllerTest")
public class StoreTweetsControllerTest {
TwitterClient createTwitterClient() {
return createMock(TwitterClient.class);
}
Set<BlobMap> createMaps() throws InterruptedException, ExecutionException {
BlobStoreContext<BlobStore> context = new StubBlobStoreContextBuilder().buildContext();
context.getBlobStore().createContainer("test1").get();
context.getBlobStore().createContainer("test2").get();
return ImmutableSet.of(context.createBlobMap("test1"), context.createBlobMap("test2"));
Map<String, BlobStoreContext<?>> createBlobStores() throws InterruptedException,
ExecutionException {
Map<String, BlobStoreContext<?>> contexts = ImmutableMap.<String, BlobStoreContext<?>> of(
"test1", new StubBlobStoreContextBuilder().buildContext(), "test2",
new StubBlobStoreContextBuilder().buildContext());
for (BlobStoreContext<?> blobstore : contexts.values()) {
blobstore.getBlobStore().createContainer("favo").get();
}
return contexts;
}
public void testStoreTweets() throws IOException, InterruptedException, ExecutionException {
Set<BlobMap> maps = createMaps();
StoreTweetsController function = new StoreTweetsController(maps, createTwitterClient());
Map<String, BlobStoreContext<?>> stores = createBlobStores();
StoreTweetsController function = new StoreTweetsController(stores, "favo",
createTwitterClient());
SortedSet<Status> allAboutMe = Sets.newTreeSet();
User frank = new User();
@ -66,9 +70,11 @@ public class StoreTweetsControllerTest {
allAboutMe.add(frankStatus);
allAboutMe.add(jimmyStatus);
function.addMyTweets(allAboutMe);
function.addMyTweets("test1", allAboutMe);
function.addMyTweets("test2", allAboutMe);
for (BlobMap map : maps) {
for (Entry<String, BlobStoreContext<?>> entry : stores.entrySet()) {
BlobMap map = entry.getValue().createBlobMap("favo");
Blob frankBlob = map.get("1");
assertEquals(frankBlob.getMetadata().getName(), "1");
assertEquals(frankBlob.getMetadata().getUserMetadata()

View File

@ -24,6 +24,8 @@
package org.jclouds.demo.tweetstore.integration;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.atmosonline.saas.reference.AtmosStorageConstants.PROPERTY_EMCSAAS_KEY;
import static org.jclouds.atmosonline.saas.reference.AtmosStorageConstants.PROPERTY_EMCSAAS_UID;
import static org.jclouds.aws.reference.AWSConstants.PROPERTY_AWS_ACCESSKEYID;
import static org.jclouds.aws.reference.AWSConstants.PROPERTY_AWS_SECRETACCESSKEY;
import static org.jclouds.azure.storage.reference.AzureStorageConstants.PROPERTY_AZURESTORAGE_ACCOUNT;
@ -45,6 +47,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.IOUtils;
import org.jclouds.atmosonline.saas.AtmosStoragePropertiesBuilder;
import org.jclouds.atmosonline.saas.blobstore.AtmosBlobStoreContextBuilder;
import org.jclouds.atmosonline.saas.blobstore.AtmosBlobStoreContextFactory;
import org.jclouds.aws.s3.S3PropertiesBuilder;
import org.jclouds.aws.s3.blobstore.S3BlobStoreContextBuilder;
import org.jclouds.aws.s3.blobstore.S3BlobStoreContextFactory;
@ -52,6 +57,7 @@ import org.jclouds.azure.storage.blob.AzureBlobPropertiesBuilder;
import org.jclouds.azure.storage.blob.blobstore.AzureBlobStoreContextBuilder;
import org.jclouds.azure.storage.blob.blobstore.AzureBlobStoreContextFactory;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.logging.log4j.config.Log4JLoggingModule;
import org.jclouds.rackspace.cloudfiles.CloudFilesPropertiesBuilder;
import org.jclouds.rackspace.cloudfiles.blobstore.CloudFilesBlobStoreContextBuilder;
import org.jclouds.rackspace.cloudfiles.blobstore.CloudFilesBlobStoreContextFactory;
@ -85,14 +91,20 @@ public class TweetStoreLiveTest {
Properties props = new Properties();
props.setProperty(PROPERTY_TWEETSTORE_CONTAINER, checkNotNull(System
.getProperty(PROPERTY_TWEETSTORE_CONTAINER)));
props.setProperty(PROPERTY_BLOBSTORE_CONTEXTBUILDERS, String.format("%s,%s,%s",
props.setProperty(PROPERTY_BLOBSTORE_CONTEXTBUILDERS, String.format(
"%s,%s,%s,%s", // WATCH THIS.. when adding a new context, you must update the string
S3BlobStoreContextBuilder.class.getName(), CloudFilesBlobStoreContextBuilder.class
.getName(), AzureBlobStoreContextBuilder.class.getName()));
.getName(), AzureBlobStoreContextBuilder.class.getName(),
AtmosBlobStoreContextBuilder.class.getName()));
props = new TwitterPropertiesBuilder(props).withCredentials(
checkNotNull(System.getProperty(PROPERTY_TWITTER_USER), PROPERTY_TWITTER_USER),
System.getProperty(PROPERTY_TWITTER_PASSWORD, PROPERTY_TWITTER_PASSWORD)).build();
props = new AtmosStoragePropertiesBuilder(props).withCredentials(
checkNotNull(System.getProperty(PROPERTY_EMCSAAS_UID), PROPERTY_EMCSAAS_UID),
System.getProperty(PROPERTY_EMCSAAS_KEY, PROPERTY_EMCSAAS_KEY)).build();
props = new S3PropertiesBuilder(props)
.withCredentials(
checkNotNull(System.getProperty(PROPERTY_AWS_ACCESSKEYID),
@ -128,7 +140,11 @@ public class TweetStoreLiveTest {
BlobStoreContext<?> azContext = AzureBlobStoreContextFactory.createContext(checkNotNull(
System.getProperty(PROPERTY_AZURESTORAGE_ACCOUNT), PROPERTY_AZURESTORAGE_ACCOUNT),
System.getProperty(PROPERTY_AZURESTORAGE_KEY, PROPERTY_AZURESTORAGE_KEY));
this.contexts = ImmutableSet.of(s3Context, cfContext, azContext);
BlobStoreContext<?> emcContext = AtmosBlobStoreContextFactory.createContext(checkNotNull(
System.getProperty(PROPERTY_EMCSAAS_UID), PROPERTY_EMCSAAS_UID), System.getProperty(
PROPERTY_EMCSAAS_KEY, PROPERTY_EMCSAAS_KEY), new Log4JLoggingModule());
this.contexts = ImmutableSet.of(s3Context, cfContext, azContext, emcContext);
boolean deleted = false;
for (BlobStoreContext<?> context : contexts) {
if (context.getBlobStore().exists(container)) {
@ -145,6 +161,10 @@ public class TweetStoreLiveTest {
System.err.printf("creating container %s at %s%n", container, context.getEndPoint());
context.getBlobStore().createContainer(container).get(30, TimeUnit.SECONDS);
}
if (deleted) {
System.err.println("sleeping 5 seconds to allow containers to create");
Thread.sleep(30000);
}
}
@Test
@ -156,18 +176,28 @@ public class TweetStoreLiveTest {
@Test(dependsOnMethods = "shouldPass", expectedExceptions = IOException.class)
public void shouldFail() throws InterruptedException, IOException {
new URL(url, "/cron/do").openStream();
new URL(url, "/store/do").openStream();
}
@Test(dependsOnMethods = "shouldFail")
public void testPrimeContainers() throws IOException {
URL gurl = new URL(url, "/cron/do");
public void testPrimeContainers() throws IOException, InterruptedException {
URL gurl = new URL(url, "/store/do");
for (String context : new String[] { "S3", "Azure", "CloudFiles", "Atmos" }) {
System.out.println("storing at context: " + context);
HttpURLConnection connection = (HttpURLConnection) gurl.openConnection();
connection.addRequestProperty("X-AppEngine-Cron", "true");
connection.addRequestProperty("X-AppEngine-QueueName", "twitter");
connection.addRequestProperty("context", context);
InputStream i = connection.getInputStream();
String string = IOUtils.toString(i);
assert string.indexOf("Done!") >= 0 : string;
connection.disconnect();
}
System.err.println("sleeping 10 seconds to allow for eventual consistency delay");
Thread.sleep(10000);
for (BlobStoreContext<?> context : contexts) {
assert context.createInputStreamMap(container).size() > 0 : context.getEndPoint();
}
}

View File

@ -0,0 +1,119 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (C) 2009 Cloud Conscious, LLC.
<info@cloudconscious.com>
====================================================================
Licensed to the Apache Software Foundation (ASF) under one or
more contributor license agreements. See the NOTICE file
distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file to you under the
Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain
a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 Unless required by
applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions
and limitations under the License.
====================================================================
-->
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<!--
For more configuration infromation and examples see the Apache
Log4j website: http://logging.apache.org/log4j/
-->
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"
debug="false">
<!-- A time/date based rolling appender -->
<appender name="WIREFILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="target/test-data/jclouds-wire.log" />
<param name="Append" value="true" />
<!-- Rollover at midnight each day -->
<param name="DatePattern" value="'.'yyyy-MM-dd" />
<param name="Threshold" value="TRACE" />
<layout class="org.apache.log4j.PatternLayout">
<!-- The default pattern: Date Priority [Category] Message\n -->
<param name="ConversionPattern" value="%d %-5p [%c] (%t) %m%n" />
<!--
The full pattern: Date MS Priority [Category]
(Thread:NDC) Message\n <param name="ConversionPattern"
value="%d %-5r %-5p [%c] (%t:%x) %m%n"/>
-->
</layout>
</appender>
<!-- A time/date based rolling appender -->
<appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender">
<param name="File" value="target/test-data/jclouds.log" />
<param name="Append" value="true" />
<!-- Rollover at midnight each day -->
<param name="DatePattern" value="'.'yyyy-MM-dd" />
<param name="Threshold" value="TRACE" />
<layout class="org.apache.log4j.PatternLayout">
<!-- The default pattern: Date Priority [Category] Message\n -->
<param name="ConversionPattern" value="%d %-5p [%c] (%t) %m%n" />
<!--
The full pattern: Date MS Priority [Category]
(Thread:NDC) Message\n <param name="ConversionPattern"
value="%d %-5r %-5p [%c] (%t:%x) %m%n"/>
-->
</layout>
</appender>
<appender name="ASYNC" class="org.apache.log4j.AsyncAppender">
<appender-ref ref="FILE" />
</appender>
<appender name="ASYNCWIRE" class="org.apache.log4j.AsyncAppender">
<appender-ref ref="WIREFILE" />
</appender>
<!-- ================ -->
<!-- Limit categories -->
<!-- ================ -->
<category name="org.jclouds">
<priority value="DEBUG" />
<appender-ref ref="ASYNC" />
</category>
<category name="jclouds.http.headers">
<priority value="DEBUG" />
<appender-ref ref="ASYNCWIRE" />
</category>
<category name="jclouds.http.wire">
<priority value="DEBUG" />
<appender-ref ref="ASYNCWIRE" />
</category>
<!--
<category name="jclouds.http.wire"> <priority value="DEBUG" />
<appender-ref ref="ASYNCWIRE" /> </category> <category
name="jclouds.signature"> <priority value="DEBUG" />
<appender-ref ref="ASYNCWIRE" /> </category>
-->
<!-- ======================= -->
<!-- Setup the Root category -->
<!-- ======================= -->
<root>
<priority value="WARN" />
</root>
</log4j:configuration>