First cut of provider-independent PlatformServices to supply Cron and TaskQueue functionality where not provided by the platform

This commit is contained in:
Andrew Phillips 2012-02-05 01:06:33 -05:00
parent dc49957304
commit 8e74a1e029
18 changed files with 1106 additions and 143 deletions

View File

@ -46,7 +46,18 @@
<artifactId>guice-servlet</artifactId>
<version>3.0</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.1.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- RUN@cloud API -->
<dependency>
<groupId>net.stax</groupId>

View File

@ -18,21 +18,47 @@
*/
package org.jclouds.demo.paas;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.demo.paas.config.PlatformServicesInitializer.PLATFORM_SERVICES_ATTRIBUTE_NAME;
import java.util.Map;
import javax.servlet.ServletContext;
import org.jclouds.demo.paas.service.scheduler.Scheduler;
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
import org.jclouds.javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
/**
* @author Andrew Phillips
*/
public class PlatformServices implements ServletContextListener {
public class PlatformServices {
protected final String baseUrl;
protected final Scheduler scheduler;
private ImmutableMap<String, TaskQueue> taskQueues;
@Override
public void contextInitialized(ServletContextEvent arg0) {
throw new UnsupportedOperationException("TODO Auto-generated method stub");
public PlatformServices(String baseUrl, Scheduler scheduler, Map<String, TaskQueue> taskQueues) {
this.baseUrl = baseUrl;
this.scheduler = scheduler;
this.taskQueues = ImmutableMap.copyOf(taskQueues);
}
@Override
public void contextDestroyed(ServletContextEvent arg0) {
throw new UnsupportedOperationException("TODO Auto-generated method stub");
public String getBaseUrl() {
return baseUrl;
}
public Scheduler getScheduler() {
return scheduler;
}
public @Nullable TaskQueue getTaskQueue(String name) {
return taskQueues.get(name);
}
public static PlatformServices get(ServletContext context) {
return (PlatformServices) checkNotNull(context.getAttribute(
PLATFORM_SERVICES_ATTRIBUTE_NAME), PLATFORM_SERVICES_ATTRIBUTE_NAME);
}
}

View File

@ -16,93 +16,70 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.service.taskqueue;
package org.jclouds.demo.paas;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.inject.name.Names.bindProperties;
import static java.lang.String.format;
import static org.jclouds.demo.tweetstore.controller.StoreTweetsController.AUTHORIZED_REQUEST_ORIGINATOR_HEADER;
import java.util.Properties;
import javax.ws.rs.core.UriBuilder;
import org.jclouds.PropertiesBuilder;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.http.HttpCommand;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.HttpRequest;
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.sun.jersey.api.uri.UriBuilderImpl;
public class HttpRequestTask implements Runnable {
public static Factory factory(Properties props) {
return factory(props, format("%s@%d", Factory.class.getName(), System.currentTimeMillis()));
public class RunnableHttpRequest implements Runnable {
public static final String PLATFORM_REQUEST_ORIGINATOR_HEADER = "X-Platform-Originator";
public static Factory factory(HttpCommandExecutorService httpClient) {
return factory(httpClient, format("%s@%d", Factory.class.getName(), System.currentTimeMillis()));
}
public static Factory factory(Properties props, String originator) {
return new Factory(props, originator);
public static Factory factory(HttpCommandExecutorService httpClient, String originator) {
return new Factory(httpClient, originator);
}
public static class Factory {
protected final HttpCommandExecutorService httpClient;
protected final String originator;
private Factory(final Properties props, String originator) {
private Factory(HttpCommandExecutorService httpClient, String originator) {
this.httpClient = httpClient;
this.originator = originator;
httpClient = Guice.createInjector(new ExecutorServiceModule(),
new JavaUrlHttpCommandExecutorServiceModule(),
new AbstractModule() {
@Override
protected void configure() {
// URL connection defaults
Properties toBind = new PropertiesBuilder().build();
toBind.putAll(checkNotNull(props, "properties"));
toBind.putAll(System.getProperties());
bindProperties(binder(), toBind);
bind(UriBuilder.class).to(UriBuilderImpl.class);
}
}).getInstance(HttpCommandExecutorService.class);
}
public HttpRequestTask create(HttpRequest request) {
HttpRequest requestWithSubmitter = request.toBuilder().headers(
copyOfWithEntry(request.getHeaders(),
AUTHORIZED_REQUEST_ORIGINATOR_HEADER, originator)).build();
return new HttpRequestTask(httpClient, requestWithSubmitter);
public RunnableHttpRequest create(HttpRequest request) {
HttpRequest requestWithSubmitter = request.toBuilder()
.headers(copyOfWithEntry(request.getHeaders(),
PLATFORM_REQUEST_ORIGINATOR_HEADER, originator)).build();
return new RunnableHttpRequest(httpClient, requestWithSubmitter);
}
private static <K, V> Multimap<K, V> copyOfWithEntry(
Multimap<? extends K, ? extends V> multimap, K k1, V v1) {
return ImmutableMultimap.<K, V>builder().putAll(multimap).put(k1, v1).build();
}
}
private final HttpCommandExecutorService httpClient;
private final HttpRequest request;
private HttpRequestTask(HttpCommandExecutorService httpClient, HttpRequest request) {
private RunnableHttpRequest(HttpCommandExecutorService httpClient, HttpRequest request) {
this.httpClient = httpClient;
this.request = request;
}
@Override
public void run() {
httpClient.submit(new ImmutableHttpCommand(request));
}
private class ImmutableHttpCommand implements HttpCommand {
private final HttpRequest request;
public ImmutableHttpCommand(HttpRequest request) {
this.request = request;
}
@Override
public void setException(Exception exception) {
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.config;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.inject.name.Names.bindProperties;
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Properties;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.ws.rs.core.UriBuilder;
import org.jclouds.PropertiesBuilder;
import org.jclouds.concurrent.config.ExecutorServiceModule;
import org.jclouds.demo.paas.PlatformServices;
import org.jclouds.demo.paas.service.scheduler.Scheduler;
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
import org.jclouds.demo.tweetstore.config.util.PropertiesLoader;
import org.jclouds.http.HttpCommandExecutorService;
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.sun.jersey.api.uri.UriBuilderImpl;
/**
* @author Andrew Phillips
*/
public class PlatformServicesInitializer implements ServletContextListener {
public static final String PLATFORM_SERVICES_ATTRIBUTE_NAME = PlatformServicesInitializer.class.getName();
// keep in sync with cloudbees-web.xml
protected static final String HOST_PARAMETER = "application.host";
@Override
public void contextInitialized(ServletContextEvent contextEvent) {
ServletContext context = contextEvent.getServletContext();
context.setAttribute(PLATFORM_SERVICES_ATTRIBUTE_NAME, createServices(context));
}
protected static PlatformServices createServices(ServletContext context) {
HttpCommandExecutorService httpClient = createHttpClient(context);
return new PlatformServices(getBaseUrl(context), new Scheduler(httpClient),
createTaskQueues(httpClient));
}
protected static HttpCommandExecutorService createHttpClient(
final ServletContext context) {
return Guice.createInjector(new ExecutorServiceModule(),
new JavaUrlHttpCommandExecutorServiceModule(),
new AbstractModule() {
@Override
protected void configure() {
// URL connection defaults
Properties toBind = new PropertiesBuilder().build();
toBind.putAll(checkNotNull(new PropertiesLoader(context).get(), "properties"));
toBind.putAll(System.getProperties());
bindProperties(binder(), toBind);
bind(UriBuilder.class).to(UriBuilderImpl.class);
}
}).getInstance(HttpCommandExecutorService.class);
}
protected static String getBaseUrl(ServletContext context) {
return "http://" + checkNotNull(context.getInitParameter(HOST_PARAMETER), HOST_PARAMETER)
+ context.getContextPath();
}
// TODO: make the number and names of queues configurable
protected static ImmutableMap<String, TaskQueue> createTaskQueues(HttpCommandExecutorService httpClient) {
Builder<String, TaskQueue> taskQueues = ImmutableMap.builder();
taskQueues.put("twitter", TaskQueue.builder(httpClient)
.name("twitter").period(SECONDS.toMillis(30))
.build());
return taskQueues.build();
}
@Override
public void contextDestroyed(ServletContextEvent servletContextEvent) {
ServletContext context = servletContextEvent.getServletContext();
context.removeAttribute(PLATFORM_SERVICES_ATTRIBUTE_NAME);
}
}

View File

@ -0,0 +1,28 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.reference;
/**
* Configuration properties and constants used in PaaS applications.
*
* @author Andrew Phillips
*/
public interface PaasConstants {
static final String PROPERTY_PLATFORM_BASE_URL = "jclouds.paas.baseurl";
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.service.scheduler;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.URI;
import javax.servlet.ServletContext;
import org.jclouds.demo.paas.PlatformServices;
import org.jclouds.demo.paas.RunnableHttpRequest;
import org.jclouds.http.HttpRequest;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
/**
* @author Andrew Phillips
*/
public class HttpRequestJob implements Job {
protected static final String URL_ATTRIBUTE_NAME = "url";
// keep in sync with "quartz:scheduler-context-servlet-context-key" param in web.xml
protected static final String SERVLET_CONTEXT_KEY = "servlet-context";
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
PlatformServices platform = getPlatform(context);
RunnableHttpRequest request = platform.getScheduler().getHttpRequestFactory().create(
HttpRequest.builder()
.endpoint(getTargetUrl(platform.getBaseUrl(), context))
.method("GET").build());
request.run();
}
private static URI getTargetUrl(String baseUrl, JobExecutionContext context) {
return URI.create(baseUrl + (String) checkNotNull(
context.getMergedJobDataMap().get(URL_ATTRIBUTE_NAME), URL_ATTRIBUTE_NAME));
}
private static PlatformServices getPlatform(JobExecutionContext jobContext) throws JobExecutionException {
try {
return PlatformServices.get((ServletContext) checkNotNull(
jobContext.getScheduler().getContext().get(SERVLET_CONTEXT_KEY), SERVLET_CONTEXT_KEY));
} catch (SchedulerException exception) {
throw new JobExecutionException("Unable to get platform services from the job execution context", exception);
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.service.scheduler;
import org.jclouds.demo.paas.RunnableHttpRequest;
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
import org.jclouds.http.HttpCommandExecutorService;
/**
* @author Andrew Phillips
*/
public class Scheduler {
protected static final String SCHEDULER_ORIGINATOR_NAME = "scheduler";
protected final Factory httpRequestFactory;
public Scheduler(HttpCommandExecutorService httpClient) {
httpRequestFactory =
RunnableHttpRequest.factory(httpClient, SCHEDULER_ORIGINATOR_NAME);
}
public Factory getHttpRequestFactory() {
return httpRequestFactory;
}
}

View File

@ -0,0 +1,401 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.paas.service.scheduler.quartz.plugins;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.jclouds.logging.Logger;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerKey;
import org.quartz.jobs.FileScanJob;
import org.quartz.jobs.FileScanListener;
import org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin;
import org.quartz.simpl.CascadingClassLoadHelper;
import org.quartz.spi.ClassLoadHelper;
import org.quartz.spi.SchedulerPlugin;
import org.quartz.xml.XMLSchedulingDataProcessor;
/**
* A copy of {@link XMLSchedulingDataProcessorPlugin} that does not reference
* {@code javax.transaction.UserTransaction} as so does not require a dependency
* on JTA.
*
* @author Andrew Phillips
* @see XMLSchedulingDataProcessorPlugin
*/
public class TransactionlessXmlSchedulingDataProcessorPlugin implements
FileScanListener, SchedulerPlugin {
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Data members.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
private static final int MAX_JOB_TRIGGER_NAME_LEN = 80;
private static final String JOB_INITIALIZATION_PLUGIN_NAME = "JobSchedulingDataLoaderPlugin";
private static final String FILE_NAME_DELIMITERS = ",";
private String name;
private Scheduler scheduler;
private final Logger log = Logger.CONSOLE;
private boolean failOnFileNotFound = true;
private String fileNames = XMLSchedulingDataProcessor.QUARTZ_XML_DEFAULT_FILE_NAME;
// Populated by initialization
private Map<String, JobFile> jobFiles = new LinkedHashMap<String, JobFile>();
private long scanInterval = 0;
boolean started = false;
protected ClassLoadHelper classLoadHelper = null;
private Set<String> jobTriggerNameSet = new HashSet<String>();
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* Comma separated list of file names (with paths) to the XML files that should be read.
*/
public String getFileNames() {
return fileNames;
}
/**
* The file name (and path) to the XML file that should be read.
*/
public void setFileNames(String fileNames) {
this.fileNames = fileNames;
}
/**
* The interval (in seconds) at which to scan for changes to the file.
* If the file has been changed, it is re-loaded and parsed. The default
* value for the interval is 0, which disables scanning.
*
* @return Returns the scanInterval.
*/
public long getScanInterval() {
return scanInterval / 1000;
}
/**
* The interval (in seconds) at which to scan for changes to the file.
* If the file has been changed, it is re-loaded and parsed. The default
* value for the interval is 0, which disables scanning.
*
* @param scanInterval The scanInterval to set.
*/
public void setScanInterval(long scanInterval) {
this.scanInterval = scanInterval * 1000;
}
/**
* Whether or not initialization of the plugin should fail (throw an
* exception) if the file cannot be found. Default is <code>true</code>.
*/
public boolean isFailOnFileNotFound() {
return failOnFileNotFound;
}
/**
* Whether or not initialization of the plugin should fail (throw an
* exception) if the file cannot be found. Default is <code>true</code>.
*/
public void setFailOnFileNotFound(boolean failOnFileNotFound) {
this.failOnFileNotFound = failOnFileNotFound;
}
/*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*
* SchedulerPlugin Interface.
*
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
*/
/**
* <p>
* Called during creation of the <code>Scheduler</code> in order to give
* the <code>SchedulerPlugin</code> a chance to initialize.
* </p>
*
* @throws org.quartz.SchedulerConfigException
* if there is an error initializing.
*/
@Override
public void initialize(String name, Scheduler scheduler)
throws SchedulerException {
this.name = name;
this.scheduler = scheduler;
classLoadHelper = new CascadingClassLoadHelper();
classLoadHelper.initialize();
log.info("Registering Quartz Job Initialization Plug-in.");
// Create JobFile objects
StringTokenizer stok = new StringTokenizer(fileNames, FILE_NAME_DELIMITERS);
while (stok.hasMoreTokens()) {
final String fileName = stok.nextToken();
final JobFile jobFile = new JobFile(fileName);
jobFiles.put(fileName, jobFile);
}
}
@Override
public void start() {
try {
if (jobFiles.isEmpty() == false) {
if (scanInterval > 0) {
scheduler.getContext().put(JOB_INITIALIZATION_PLUGIN_NAME + '_' + name, this);
}
Iterator<JobFile> iterator = jobFiles.values().iterator();
while (iterator.hasNext()) {
JobFile jobFile = iterator.next();
if (scanInterval > 0) {
String jobTriggerName = buildJobTriggerName(jobFile.getFileBasename());
TriggerKey tKey = new TriggerKey(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME);
// remove pre-existing job/trigger, if any
scheduler.unscheduleJob(tKey);
// TODO: convert to use builder
SimpleTrigger trig = newTrigger()
.withIdentity(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME)
.startNow()
.endAt(null)
.withSchedule(simpleSchedule()
.repeatForever()
.withIntervalInMilliseconds(scanInterval))
.build();
JobDetail job = JobBuilder.newJob(FileScanJob.class)
.withIdentity(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME)
.build();
job.getJobDataMap().put(FileScanJob.FILE_NAME, jobFile.getFileName());
job.getJobDataMap().put(FileScanJob.FILE_SCAN_LISTENER_NAME, JOB_INITIALIZATION_PLUGIN_NAME + '_' + name);
scheduler.scheduleJob(job, trig);
log.debug("Scheduled file scan job for data file: {}, at interval: {}", jobFile.getFileName(), scanInterval);
}
processFile(jobFile);
}
}
} catch(SchedulerException se) {
log.error("Error starting background-task for watching jobs file.", se);
} finally {
started = true;
}
}
/**
* Helper method for generating unique job/trigger name for the
* file scanning jobs (one per FileJob). The unique names are saved
* in jobTriggerNameSet.
*/
private String buildJobTriggerName(
String fileBasename) {
// Name w/o collisions will be prefix + _ + filename (with '.' of filename replaced with '_')
// For example: JobInitializationPlugin_jobInitializer_myjobs_xml
String jobTriggerName = JOB_INITIALIZATION_PLUGIN_NAME + '_' + name + '_' + fileBasename.replace('.', '_');
// If name is too long (DB column is 80 chars), then truncate to max length
if (jobTriggerName.length() > MAX_JOB_TRIGGER_NAME_LEN) {
jobTriggerName = jobTriggerName.substring(0, MAX_JOB_TRIGGER_NAME_LEN);
}
// Make sure this name is unique in case the same file name under different
// directories is being checked, or had a naming collision due to length truncation.
// If there is a conflict, keep incrementing a _# suffix on the name (being sure
// not to get too long), until we find a unique name.
int currentIndex = 1;
while (jobTriggerNameSet.add(jobTriggerName) == false) {
// If not our first time through, then strip off old numeric suffix
if (currentIndex > 1) {
jobTriggerName = jobTriggerName.substring(0, jobTriggerName.lastIndexOf('_'));
}
String numericSuffix = "_" + currentIndex++;
// If the numeric suffix would make the name too long, then make room for it.
if (jobTriggerName.length() > (MAX_JOB_TRIGGER_NAME_LEN - numericSuffix.length())) {
jobTriggerName = jobTriggerName.substring(0, (MAX_JOB_TRIGGER_NAME_LEN - numericSuffix.length()));
}
jobTriggerName += numericSuffix;
}
return jobTriggerName;
}
@Override
public void shutdown() {
// nothing to do
}
private void processFile(JobFile jobFile) {
if (jobFile == null || !jobFile.getFileFound()) {
return;
}
try {
XMLSchedulingDataProcessor processor =
new XMLSchedulingDataProcessor(this.classLoadHelper);
processor.addJobGroupToNeverDelete(JOB_INITIALIZATION_PLUGIN_NAME);
processor.addTriggerGroupToNeverDelete(JOB_INITIALIZATION_PLUGIN_NAME);
processor.processFileAndScheduleJobs(
jobFile.getFileName(),
jobFile.getFileName(), // systemId
scheduler);
} catch (Exception e) {
log.error("Error scheduling jobs: " + e.getMessage(), e);
}
}
public void processFile(String filePath) {
processFile((JobFile)jobFiles.get(filePath));
}
/**
* @see org.quartz.jobs.FileScanListener#fileUpdated(java.lang.String)
*/
public void fileUpdated(String fileName) {
if (started) {
processFile(fileName);
}
}
class JobFile {
private String fileName;
// These are set by initialize()
private String filePath;
private String fileBasename;
private boolean fileFound;
protected JobFile(String fileName) throws SchedulerException {
this.fileName = fileName;
initialize();
}
protected String getFileName() {
return fileName;
}
protected boolean getFileFound() {
return fileFound;
}
protected String getFilePath() {
return filePath;
}
protected String getFileBasename() {
return fileBasename;
}
private void initialize() throws SchedulerException {
InputStream f = null;
try {
String furl = null;
File file = new File(getFileName()); // files in filesystem
if (!file.exists()) {
URL url = classLoadHelper.getResource(getFileName());
if(url != null) {
try {
furl = URLDecoder.decode(url.getPath(), "UTF-8");
} catch (UnsupportedEncodingException e) {
furl = url.getPath();
}
file = new File(furl);
try {
f = url.openStream();
} catch (IOException ignor) {
// Swallow the exception
}
}
} else {
try {
f = new java.io.FileInputStream(file);
}catch (FileNotFoundException e) {
// ignore
}
}
if (f == null) {
if (isFailOnFileNotFound()) {
throw new SchedulerException(
"File named '" + getFileName() + "' does not exist.");
} else {
log.warn("File named '" + getFileName() + "' does not exist.");
}
} else {
fileFound = true;
}
filePath = (furl != null) ? furl : file.getAbsolutePath();
fileBasename = file.getName();
} finally {
try {
if (f != null) {
f.close();
}
} catch (IOException ioe) {
log.warn("Error closing jobs file " + getFileName(), ioe);
}
}
}
}
}

View File

@ -20,21 +20,63 @@ package org.jclouds.demo.paas.service.taskqueue;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import org.jclouds.demo.paas.RunnableHttpRequest;
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
import org.jclouds.http.HttpCommandExecutorService;
import com.google.inject.Provider;
public class TaskQueue {
public static Builder builder() {
return new Builder();
protected final Factory httpRequestFactory;
private final Timer timer;
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
private TaskQueue(String name, long pollingIntervalMillis, Factory httpRequestFactory) {
this.httpRequestFactory = httpRequestFactory;
timer = new Timer(name);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
}
}, 0, pollingIntervalMillis);
}
public void add(final Runnable task) {
tasks.add(task);
}
public Factory getHttpRequestFactory() {
return httpRequestFactory;
}
public void destroy() {
timer.cancel();
tasks.clear();
}
public static Builder builder(HttpCommandExecutorService httpClient) {
return new Builder(httpClient);
}
public static class Builder implements Provider<TaskQueue> {
protected final HttpCommandExecutorService httpClient;
protected String name = "default";
protected long taskPeriodMillis = TimeUnit.SECONDS.toMillis(1);
protected long pollingIntervalMillis = TimeUnit.SECONDS.toMillis(1);
private Builder(HttpCommandExecutorService httpClient) {
this.httpClient = checkNotNull(httpClient, "httpClient");
}
public Builder name(String name) {
this.name = checkNotNull(name, "name");
@ -42,18 +84,19 @@ public class TaskQueue {
}
public Builder period(TimeUnit period) {
this.taskPeriodMillis = checkNotNull(period, "period").toMillis(1);
this.pollingIntervalMillis = checkNotNull(period, "period").toMillis(1);
return this;
}
public Builder period(long taskPeriodMillis) {
checkArgument(taskPeriodMillis > 0, "taskPeriodMillis");
this.taskPeriodMillis = taskPeriodMillis;
public Builder period(long pollingIntervalMillis) {
checkArgument(pollingIntervalMillis > 0, "pollingIntervalMillis");
this.pollingIntervalMillis = pollingIntervalMillis;
return this;
}
public TaskQueue build() {
return new TaskQueue(name, taskPeriodMillis);
return new TaskQueue(name, pollingIntervalMillis,
RunnableHttpRequest.factory(httpClient, format("taskqueue-%s", name)));
}
@Override
@ -61,25 +104,4 @@ public class TaskQueue {
return build();
}
}
private final Timer timer;
private final long taskPeriodMillis;
private TaskQueue(String name, long taskPeriodMillis) {
timer = new Timer(name);
this.taskPeriodMillis = taskPeriodMillis;
}
public void add(final Runnable task) {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
task.run();
}
}, 0, taskPeriodMillis);
}
public void destroy() {
timer.cancel();
}
}

View File

@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Predicates.in;
import static com.google.common.collect.ImmutableSet.copyOf;
import static com.google.common.collect.Sets.filter;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.jclouds.demo.paas.reference.PaasConstants.PROPERTY_PLATFORM_BASE_URL;
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
@ -31,9 +31,6 @@ import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TW
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_KEY;
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_SECRET;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@ -43,13 +40,13 @@ import javax.servlet.ServletContextEvent;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.demo.paas.service.taskqueue.HttpRequestTask;
import org.jclouds.demo.paas.PlatformServices;
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
import org.jclouds.demo.paas.service.taskqueue.HttpRequestTask.Factory;
import org.jclouds.demo.tweetstore.config.util.CredentialsCollector;
import org.jclouds.demo.tweetstore.config.util.PropertiesLoader;
import org.jclouds.demo.tweetstore.controller.AddTweetsController;
import org.jclouds.demo.tweetstore.controller.EnqueueStoresController;
import org.jclouds.demo.tweetstore.controller.StoreTweetsController;
import org.jclouds.http.HttpRequest;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
@ -57,10 +54,8 @@ import twitter4j.conf.Configuration;
import twitter4j.conf.ConfigurationBuilder;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
@ -79,13 +74,14 @@ public class GuiceServletConfig extends GuiceServletContextListener {
private Twitter twitterClient;
private String container;
private TaskQueue queue;
private String baseUrl;
@Override
public void contextInitialized(ServletContextEvent servletContextEvent) {
BlobStoreContextFactory blobStoreContextFactory = new BlobStoreContextFactory();
ServletContext servletContext = servletContextEvent.getServletContext();
Properties props = loadJCloudsProperties(servletContextEvent);
Properties props = new PropertiesLoader(servletContext).get();
Set<Module> modules = ImmutableSet.<Module>of();
// shared across all blobstores and used to retrieve tweets
try {
@ -108,16 +104,10 @@ public class GuiceServletConfig extends GuiceServletContextListener {
providerTypeToBlobStoreMap.put(hint, blobStoreContextFactory.createContext(hint, modules, props));
}
// get a queue for submitting store tweet requests
queue = TaskQueue.builder().name("twitter").period(MINUTES).build();
Factory taskFactory = HttpRequestTask.factory(props, "twitter");
// submit a job to store tweets for each configured blobstore
for (String name : providerTypeToBlobStoreMap.keySet()) {
queue.add(taskFactory.create(HttpRequest.builder()
.endpoint(withUrl(servletContextEvent.getServletContext(), "/store/do"))
.headers(ImmutableMultimap.of("context", name))
.method("GET").build()));
}
// get a queue for submitting store tweet requests and the application's base URL
PlatformServices platform = PlatformServices.get(servletContext);
queue = platform.getTaskQueue("twitter");
baseUrl = platform.getBaseUrl();
super.contextInitialized(servletContextEvent);
}
@ -131,40 +121,23 @@ public class GuiceServletConfig extends GuiceServletContextListener {
checkState(!contexts.isEmpty(), "no credentials available for any requested context");
return contexts;
}
private static URI withUrl(ServletContext servletContext, String url) {
return URI.create("http://" + checkNotNull(servletContext.getInitParameter("application.host"), "application.host")
+ servletContext.getContextPath() + url);
}
private Properties loadJCloudsProperties(
ServletContextEvent servletContextEvent) {
InputStream input = servletContextEvent.getServletContext()
.getResourceAsStream("/WEB-INF/jclouds.properties");
Properties props = new Properties();
try {
props.load(input);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
Closeables.closeQuietly(input);
}
return props;
}
@Override
protected Injector getInjector() {
return Guice.createInjector(new ServletModule() {
@Override
protected void configureServlets() {
bind(new TypeLiteral<Map<String, BlobStoreContext>>() {
}).toInstance(providerTypeToBlobStoreMap);
bind(new TypeLiteral<Map<String, BlobStoreContext>>() {})
.toInstance(providerTypeToBlobStoreMap);
bind(Twitter.class).toInstance(twitterClient);
bindConstant().annotatedWith(
Names.named(PROPERTY_TWEETSTORE_CONTAINER)).to(
container);
bind(TaskQueue.class).toInstance(queue);
bindConstant().annotatedWith(Names.named(PROPERTY_PLATFORM_BASE_URL))
.to(baseUrl);
bindConstant().annotatedWith(Names.named(PROPERTY_TWEETSTORE_CONTAINER))
.to(container);
serve("/store/*").with(StoreTweetsController.class);
serve("/tweets/*").with(AddTweetsController.class);
serve("/stores/*").with(EnqueueStoresController.class);
}
});
}

View File

@ -0,0 +1,59 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.tweetstore.config.util;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import javax.servlet.ServletContext;
import com.google.common.io.Closeables;
import com.google.inject.Provider;
/**
* @author Andrew Phillips
*/
public class PropertiesLoader implements Provider<Properties>{
private static final String PROPERTIES_FILE = "/WEB-INF/jclouds.properties";
private final Properties properties;
public PropertiesLoader(ServletContext context) {
properties = loadJcloudsProperties(context);
}
private static Properties loadJcloudsProperties(ServletContext context) {
InputStream input = context.getResourceAsStream(PROPERTIES_FILE);
Properties props = new Properties();
try {
props.load(input);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
Closeables.closeQuietly(input);
}
return props;
}
@Override
public Properties get() {
return properties;
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.tweetstore.controller;
import static com.google.common.base.Strings.nullToEmpty;
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import javax.annotation.Resource;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.demo.paas.reference.PaasConstants;
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
import org.jclouds.http.HttpRequest;
import org.jclouds.logging.Logger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMultimap;
/**
* Adds tasks to retrieve and store tweets in all registered contexts to an async
* task queue.
*
* @author Andrew Phillips
* @see StoreTweetsController
*/
@Singleton
public class EnqueueStoresController extends HttpServlet {
/** The serialVersionUID */
private static final long serialVersionUID = 7215420527854203714L;
private final Set<String> contextNames;
private final TaskQueue taskQueue;
private final String baseUrl;
@Resource
protected Logger logger = Logger.NULL;
@Inject
public EnqueueStoresController(Map<String, BlobStoreContext> contexts, TaskQueue taskQueue,
@Named(PaasConstants.PROPERTY_PLATFORM_BASE_URL) String baseUrl) {
contextNames = contexts.keySet();
this.taskQueue = taskQueue;
this.baseUrl = baseUrl;
}
@VisibleForTesting
void enqueueStoreTweetTasks() {
for (String contextName : contextNames) {
logger.debug("enqueuing task to store tweets in blobstore '%s'", contextName);
taskQueue.add(taskQueue.getHttpRequestFactory().create(HttpRequest.builder()
.endpoint(URI.create(baseUrl + "/store/do"))
.headers(ImmutableMultimap.of("context", contextName))
.method("GET").build()));
}
}
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (!nullToEmpty(request.getHeader(PLATFORM_REQUEST_ORIGINATOR_HEADER)).equals("scheduler")) {
response.sendError(401);
}
try {
enqueueStoreTweetTasks();
response.setContentType(MediaType.TEXT_PLAIN);
response.getWriter().println("Done!");
} catch (Exception e) {
logger.error(e, "Error storing tweets");
throw new ServletException(e);
}
}
}

View File

@ -19,6 +19,8 @@
package org.jclouds.demo.tweetstore.controller;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.nullToEmpty;
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
import java.io.IOException;
import java.util.Map;
@ -53,7 +55,6 @@ import com.google.common.base.Function;
*/
@Singleton
public class StoreTweetsController extends HttpServlet {
public static final String AUTHORIZED_REQUEST_ORIGINATOR_HEADER = "X-RUNatcloud-Originator";
private static final class StatusToBlob implements Function<Status, Blob> {
private final BlobMap map;
@ -110,8 +111,7 @@ public class StoreTweetsController extends HttpServlet {
@Override
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (request.getHeader(AUTHORIZED_REQUEST_ORIGINATOR_HEADER) != null
&& request.getHeader(AUTHORIZED_REQUEST_ORIGINATOR_HEADER).equals("twitter")) {
if (nullToEmpty(request.getHeader(PLATFORM_REQUEST_ORIGINATOR_HEADER)).equals("taskqueue-twitter")) {
try {
String contextName = checkNotNull(request.getHeader("context"), "missing header context");
logger.info("retrieving tweets");

View File

@ -0,0 +1,29 @@
<?xml version='1.0' encoding='utf-8'?>
<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_2_0.xsd"
version="2.0">
<schedule>
<job>
<name>enqueue-store-tweet-tasks</name>
<description>Enqueue 'store tweet' tasks for all contexts</description>
<job-class>org.jclouds.demo.paas.service.scheduler.HttpRequestJob</job-class>
<job-data-map>
<entry>
<key>url</key>
<value>/stores/do</value>
</entry>
</job-data-map>
</job>
<trigger>
<calendar-interval>
<name>submit-recurring-job</name>
<job-name>enqueue-store-tweet-tasks</job-name>
<repeat-interval>10</repeat-interval>
<repeat-interval-unit>MINUTE</repeat-interval-unit>
</calendar-interval>
</trigger>
</schedule>
</job-scheduling-data>

View File

@ -0,0 +1,28 @@
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
org.quartz.scheduler.skipUpdateCheck: true
#============================================================================
# Configure ThreadPool
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 1
#============================================================================
# Configure JobStore
#============================================================================
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
#============================================================================
# Configure the Job Initialization Plugin
#============================================================================
org.quartz.plugin.jobInitializer.class: org.jclouds.demo.paas.service.scheduler.quartz.plugins.TransactionlessXmlSchedulingDataProcessorPlugin
org.quartz.plugin.jobInitializer.fileNames: jobs.xml
org.quartz.plugin.jobInitializer.failOnFileNotFound: true
org.quartz.plugin.jobInitializer.scanInterval: 0
#org.quartz.plugin.jobInitializer.wrapInUserTransaction: false

View File

@ -23,7 +23,12 @@
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_5.xsd"
version="2.5">
<display-name>jclouds-tweetstore</display-name>
<context-param>
<param-name>quartz:scheduler-context-servlet-context-key</param-name>
<param-value>servlet-context</param-value>
</context-param>
<!-- Servlets -->
<filter>
<filter-name>guiceFilter</filter-name>
@ -34,10 +39,18 @@
<filter-name>guiceFilter</filter-name>
<url-pattern>/*</url-pattern>
</filter-mapping>
<!-- must be started first -->
<listener>
<listener-class>org.jclouds.demo.tweetstore.config.GuiceServletConfig
</listener-class>
<listener-class>org.jclouds.demo.paas.config.PlatformServicesInitializer</listener-class>
</listener>
<listener>
<listener-class>org.quartz.ee.servlet.QuartzInitializerListener</listener-class>
</listener>
<listener>
<listener-class>org.jclouds.demo.tweetstore.config.GuiceServletConfig</listener-class>
</listener>
<welcome-file-list>

View File

@ -0,0 +1,83 @@
/**
* Licensed to jclouds, Inc. (jclouds) under one or more
* contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. jclouds licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jclouds.demo.tweetstore.controller;
import static org.easymock.EasyMock.*;
import java.net.URI;
import java.util.Map;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.BlobStoreContextFactory;
import org.jclouds.demo.paas.RunnableHttpRequest;
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
import org.jclouds.http.HttpRequest;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
/**
* Tests behavior of {@code EnqueueStoresController}
*
* @author Andrew Phillips
*/
@Test(groups = "unit")
public class EnqueueStoresControllerTest {
Map<String, BlobStoreContext> createBlobStores() {
Map<String, BlobStoreContext> contexts = ImmutableMap.of(
"test1", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"),
"test2", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"));
return contexts;
}
public void testEnqueueStores() {
Map<String, BlobStoreContext> stores = createBlobStores();
TaskQueue taskQueue = createMock(TaskQueue.class);
Factory httpRequestFactory = createMock(Factory.class);
EnqueueStoresController function = new EnqueueStoresController(stores,
taskQueue, "http://localhost:8080");
expect(taskQueue.getHttpRequestFactory()).andStubReturn(httpRequestFactory);
HttpRequest storeInTest1Request = HttpRequest.builder().endpoint(
URI.create("http://localhost:8080/store/do"))
.headers(ImmutableMultimap.of("context", "test1")).method("GET").build();
RunnableHttpRequest storeInTest1Task = null;
expect(httpRequestFactory.create(eq(storeInTest1Request))).andReturn(storeInTest1Task);
HttpRequest storeInTest2Request = HttpRequest.builder().endpoint(
URI.create("http://localhost:8080/store/do"))
.headers(ImmutableMultimap.of("context", "test2")).method("GET").build();
RunnableHttpRequest storeInTest2Task = null;
expect(httpRequestFactory.create(eq(storeInTest2Request))).andReturn(storeInTest2Task);
taskQueue.add(storeInTest1Task);
expectLastCall();
taskQueue.add(storeInTest2Task);
expectLastCall();
replay(httpRequestFactory, taskQueue);
function.enqueueStoreTweetTasks();
verify(taskQueue);
}
}

View File

@ -19,7 +19,7 @@
package org.jclouds.demo.tweetstore.integration;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.jclouds.demo.tweetstore.controller.StoreTweetsController.AUTHORIZED_REQUEST_ORIGINATOR_HEADER;
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
@ -199,7 +199,7 @@ public class TweetStoreLiveTest {
for (String context : blobstores) {
System.out.println("storing at context: " + context);
HttpURLConnection connection = (HttpURLConnection) gurl.openConnection();
connection.addRequestProperty(AUTHORIZED_REQUEST_ORIGINATOR_HEADER, "twitter");
connection.addRequestProperty(PLATFORM_REQUEST_ORIGINATOR_HEADER, "taskqueue-twitter");
connection.addRequestProperty("context", context);
InputStream i = connection.getInputStream();
String string = Strings2.toStringAndClose(i);