Now using a cron job to enqueue 'store tweet' tasks every 5min since the GAE task queue actually removes tasks once they have completed successfully. This meant that the blobstores were only populated *once*.

This commit is contained in:
Andrew Phillips 2012-01-11 01:47:20 -05:00
parent 2f08d13fd6
commit 69f1fb5749
6 changed files with 213 additions and 11 deletions

View File

@ -18,7 +18,6 @@
*/ */
package org.jclouds.demo.tweetstore.config; package org.jclouds.demo.tweetstore.config;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in; import static com.google.common.base.Predicates.in;
@ -47,6 +46,7 @@ import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory; import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.demo.tweetstore.config.util.CredentialsCollector; import org.jclouds.demo.tweetstore.config.util.CredentialsCollector;
import org.jclouds.demo.tweetstore.controller.AddTweetsController; import org.jclouds.demo.tweetstore.controller.AddTweetsController;
import org.jclouds.demo.tweetstore.controller.EnqueueStoresController;
import org.jclouds.demo.tweetstore.controller.StoreTweetsController; import org.jclouds.demo.tweetstore.controller.StoreTweetsController;
import org.jclouds.demo.tweetstore.functions.ServiceToStoredTweetStatuses; import org.jclouds.demo.tweetstore.functions.ServiceToStoredTweetStatuses;
import org.jclouds.gae.config.GoogleAppEngineConfigurationModule; import org.jclouds.gae.config.GoogleAppEngineConfigurationModule;
@ -65,7 +65,6 @@ import twitter4j.conf.ConfigurationBuilder;
import com.google.appengine.api.taskqueue.Queue; import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.QueueFactory; import com.google.appengine.api.taskqueue.QueueFactory;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
@ -121,10 +120,7 @@ public class SpringServletConfig extends LoggingConfig implements ServletConfigA
// get a queue for submitting store tweet requests // get a queue for submitting store tweet requests
queue = QueueFactory.getQueue("twitter"); queue = QueueFactory.getQueue("twitter");
// submit a job to store tweets for each configured blobstore
for (String name : providerTypeToBlobStoreMap.keySet()) {
queue.add(withUrl("/store/do").header("context", name).method(Method.GET));
}
logger.trace("Members initialized. Twitter: '%s', container: '%s', provider types: '%s'", twitterClient, logger.trace("Members initialized. Twitter: '%s', container: '%s', provider types: '%s'", twitterClient,
container, providerTypeToBlobStoreMap.keySet()); container, providerTypeToBlobStoreMap.keySet());
} }
@ -169,6 +165,11 @@ public class SpringServletConfig extends LoggingConfig implements ServletConfigA
return controller; return controller;
} }
@Bean
public EnqueueStoresController enqueueStoresController() {
return new EnqueueStoresController(providerTypeToBlobStoreMap, queue);
}
private void injectServletConfig(Servlet servlet) { private void injectServletConfig(Servlet servlet) {
logger.trace("About to inject servlet config '%s'", servletConfig); logger.trace("About to inject servlet config '%s'", servletConfig);
try { try {
@ -190,10 +191,11 @@ public class SpringServletConfig extends LoggingConfig implements ServletConfigA
Map<String, Object> urlMap = Maps.newHashMapWithExpectedSize(2); Map<String, Object> urlMap = Maps.newHashMapWithExpectedSize(2);
urlMap.put("/store/*", storeTweetsController()); urlMap.put("/store/*", storeTweetsController());
urlMap.put("/tweets/*", addTweetsController()); urlMap.put("/tweets/*", addTweetsController());
urlMap.put("/stores/*", enqueueStoresController());
mapping.setUrlMap(urlMap); mapping.setUrlMap(urlMap);
/* /*
* "/store" and "/tweets" are part of the servlet mapping and thus stripped by the mapping if * "/store", "/tweets" and "/stores" are part of the servlet mapping and thus
* using default settings. * stripped by the mapping if using default settings.
*/ */
mapping.setAlwaysUseFullPath(true); mapping.setAlwaysUseFullPath(true);
return mapping; return mapping;

View File

@ -0,0 +1,92 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.tweetstore.controller;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static com.google.appengine.repackaged.com.google.common.base.Strings.nullToEmpty;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Singleton;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.logging.Logger;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.annotations.VisibleForTesting;
/**
* Adds tasks to retrieve and store tweets in all registered contexts to an async
* task queue.
*
* @author Andrew Phillips
* @see StoreTweetsController
*/
@Singleton
public class EnqueueStoresController extends HttpServlet {
/** The serialVersionUID */
private static final long serialVersionUID = 7215420527854203714L;
private final Set<String> contextNames;
private final Queue taskQueue;
@Resource
protected Logger logger = Logger.NULL;
@Inject
public EnqueueStoresController(Map<String, BlobStoreContext> contexts,
Queue taskQueue) {
contextNames = contexts.keySet();
this.taskQueue = taskQueue;
}
@VisibleForTesting
void enqueueStoreTweetTasks() {
for (String contextName : contextNames) {
logger.debug("enqueuing task to store tweets in blobstore '%s'", contextName);
taskQueue.add(withUrl("/store/do").header("context", contextName).method(Method.GET));
}
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (!nullToEmpty(request.getHeader("X-AppEngine-Cron")).equals("true")) {
response.sendError(401);
}
try {
enqueueStoreTweetTasks();
response.setContentType(MediaType.TEXT_PLAIN);
response.getWriter().println("Done!");
} catch (Exception e) {
logger.error(e, "Error storing tweets");
throw new ServletException(e);
}
}
}

View File

@ -0,0 +1,28 @@
<?xml version="1.0" encoding="utf-8"?>
<!--
Licensed to jclouds, Inc. (jclouds) under one or more
contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. jclouds licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<cronentries>
<cron>
<url>/stores/do</url>
<description>Enqueue 'store tweet' tasks for all contexts</description>
<schedule>every 10 minutes</schedule>
</cron>
</cronentries>

View File

@ -22,6 +22,8 @@
<queue-entries> <queue-entries>
<queue> <queue>
<name>twitter</name> <name>twitter</name>
<rate>1/m</rate> <!-- poll every 30s, but only allow one request at a time to save CPU -->
<rate>2/m</rate>
<max-concurrent-requests>1</max-concurrent-requests>
</queue> </queue>
</queue-entries> </queue-entries>

View File

@ -41,9 +41,22 @@
<servlet-name>dispatcher</servlet-name> <servlet-name>dispatcher</servlet-name>
<url-pattern>/tweets/*</url-pattern> <url-pattern>/tweets/*</url-pattern>
</servlet-mapping> </servlet-mapping>
<servlet-mapping>
<servlet-name>dispatcher</servlet-name>
<url-pattern>/stores/*</url-pattern>
</servlet-mapping>
<!-- limit submission of storage tasks to the cron job -->
<security-constraint>
<web-resource-collection>
<url-pattern>/stores/*</url-pattern>
</web-resource-collection>
<auth-constraint>
<role-name>admin</role-name>
</auth-constraint>
</security-constraint>
<welcome-file-list> <welcome-file-list>
<welcome-file>index.jsp</welcome-file> <welcome-file>index.jsp</welcome-file>
</welcome-file-list> </welcome-file-list>
</web-app> </web-app>

View File

@ -0,0 +1,65 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.tweetstore.controller;
import static com.google.appengine.api.taskqueue.TaskOptions.Builder.withUrl;
import static org.easymock.EasyMock.expect;
import static org.easymock.classextension.EasyMock.createMock;
import static org.easymock.classextension.EasyMock.replay;
import static org.easymock.classextension.EasyMock.verify;
import java.util.Map;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import org.testng.annotations.Test;
import com.google.appengine.api.taskqueue.Queue;
import com.google.appengine.api.taskqueue.TaskOptions.Method;
import com.google.common.collect.ImmutableMap;
/**
* Tests behavior of {@code StoreTweetsController}
*
* @author Adrian Cole
*/
@Test(groups = "unit")
public class EnqueueStoresControllerTest {
Map<String, BlobStoreContext> createBlobStores() {
Map<String, BlobStoreContext> contexts = ImmutableMap.of(
"test1", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"),
"test2", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"));
return contexts;
}
public void testEnqueueStores() {
Map<String, BlobStoreContext> stores = createBlobStores();
Queue taskQueue = createMock(Queue.class);
EnqueueStoresController function = new EnqueueStoresController(stores, taskQueue);
expect(taskQueue.add(withUrl("/store/do").header("context", "test1").method(Method.GET))).andReturn(null);
expect(taskQueue.add(withUrl("/store/do").header("context", "test2").method(Method.GET))).andReturn(null);
replay(taskQueue);
function.enqueueStoreTweetTasks();
verify(taskQueue);
}
}