mirror of https://github.com/apache/jclouds.git
Merge pull request #361 from jclouds/tweetstore-generic-paas
Starting on "generic PaaS" for TweetStore
This commit is contained in:
commit
e0c2090fca
|
@ -46,7 +46,18 @@
|
||||||
<artifactId>guice-servlet</artifactId>
|
<artifactId>guice-servlet</artifactId>
|
||||||
<version>3.0</version>
|
<version>3.0</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.quartz-scheduler</groupId>
|
||||||
|
<artifactId>quartz</artifactId>
|
||||||
|
<version>2.1.3</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>slf4j-api</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<!-- RUN@cloud API -->
|
<!-- RUN@cloud API -->
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>net.stax</groupId>
|
<groupId>net.stax</groupId>
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static org.jclouds.demo.paas.config.PlatformServicesInitializer.PLATFORM_SERVICES_ATTRIBUTE_NAME;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
|
||||||
|
import org.jclouds.demo.paas.service.scheduler.Scheduler;
|
||||||
|
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
|
||||||
|
import org.jclouds.javax.annotation.Nullable;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public class PlatformServices {
|
||||||
|
protected final String baseUrl;
|
||||||
|
protected final Scheduler scheduler;
|
||||||
|
private ImmutableMap<String, TaskQueue> taskQueues;
|
||||||
|
|
||||||
|
public PlatformServices(String baseUrl, Scheduler scheduler, Map<String, TaskQueue> taskQueues) {
|
||||||
|
this.baseUrl = baseUrl;
|
||||||
|
this.scheduler = scheduler;
|
||||||
|
this.taskQueues = ImmutableMap.copyOf(taskQueues);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getBaseUrl() {
|
||||||
|
return baseUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Scheduler getScheduler() {
|
||||||
|
return scheduler;
|
||||||
|
}
|
||||||
|
|
||||||
|
public @Nullable TaskQueue getTaskQueue(String name) {
|
||||||
|
return taskQueues.get(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PlatformServices get(ServletContext context) {
|
||||||
|
return (PlatformServices) checkNotNull(context.getAttribute(
|
||||||
|
PLATFORM_SERVICES_ATTRIBUTE_NAME), PLATFORM_SERVICES_ATTRIBUTE_NAME);
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,93 +16,70 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package org.jclouds.demo.tweetstore.taskqueue;
|
package org.jclouds.demo.paas;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
|
||||||
import static com.google.inject.name.Names.bindProperties;
|
|
||||||
import static java.lang.String.format;
|
import static java.lang.String.format;
|
||||||
import static org.jclouds.demo.tweetstore.controller.StoreTweetsController.AUTHORIZED_REQUEST_ORIGINATOR_HEADER;
|
|
||||||
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
import javax.ws.rs.core.UriBuilder;
|
|
||||||
|
|
||||||
import org.jclouds.PropertiesBuilder;
|
|
||||||
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
|
||||||
import org.jclouds.http.HttpCommand;
|
import org.jclouds.http.HttpCommand;
|
||||||
import org.jclouds.http.HttpCommandExecutorService;
|
import org.jclouds.http.HttpCommandExecutorService;
|
||||||
import org.jclouds.http.HttpRequest;
|
import org.jclouds.http.HttpRequest;
|
||||||
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
|
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableMultimap;
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
import com.google.common.collect.Multimap;
|
import com.google.common.collect.Multimap;
|
||||||
import com.google.inject.AbstractModule;
|
|
||||||
import com.google.inject.Guice;
|
|
||||||
import com.sun.jersey.api.uri.UriBuilderImpl;
|
|
||||||
|
|
||||||
public class HttpRequestTask implements Runnable {
|
public class RunnableHttpRequest implements Runnable {
|
||||||
public static Factory factory(Properties props) {
|
public static final String PLATFORM_REQUEST_ORIGINATOR_HEADER = "X-Platform-Originator";
|
||||||
return factory(props, format("%s@%d", Factory.class.getName(), System.currentTimeMillis()));
|
|
||||||
|
public static Factory factory(HttpCommandExecutorService httpClient) {
|
||||||
|
return factory(httpClient, format("%s@%d", Factory.class.getName(), System.currentTimeMillis()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Factory factory(Properties props, String originator) {
|
public static Factory factory(HttpCommandExecutorService httpClient, String originator) {
|
||||||
return new Factory(props, originator);
|
return new Factory(httpClient, originator);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Factory {
|
public static class Factory {
|
||||||
protected final HttpCommandExecutorService httpClient;
|
protected final HttpCommandExecutorService httpClient;
|
||||||
protected final String originator;
|
protected final String originator;
|
||||||
|
|
||||||
private Factory(final Properties props, String originator) {
|
private Factory(HttpCommandExecutorService httpClient, String originator) {
|
||||||
|
this.httpClient = httpClient;
|
||||||
this.originator = originator;
|
this.originator = originator;
|
||||||
httpClient = Guice.createInjector(new ExecutorServiceModule(),
|
|
||||||
new JavaUrlHttpCommandExecutorServiceModule(),
|
|
||||||
new AbstractModule() {
|
|
||||||
@Override
|
|
||||||
protected void configure() {
|
|
||||||
// URL connection defaults
|
|
||||||
Properties toBind = new PropertiesBuilder().build();
|
|
||||||
toBind.putAll(checkNotNull(props, "properties"));
|
|
||||||
toBind.putAll(System.getProperties());
|
|
||||||
bindProperties(binder(), toBind);
|
|
||||||
bind(UriBuilder.class).to(UriBuilderImpl.class);
|
|
||||||
}
|
|
||||||
}).getInstance(HttpCommandExecutorService.class);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public HttpRequestTask create(HttpRequest request) {
|
public RunnableHttpRequest create(HttpRequest request) {
|
||||||
HttpRequest requestWithSubmitter = request.toBuilder().headers(
|
HttpRequest requestWithSubmitter = request.toBuilder()
|
||||||
copyOfWithEntry(request.getHeaders(),
|
.headers(copyOfWithEntry(request.getHeaders(),
|
||||||
AUTHORIZED_REQUEST_ORIGINATOR_HEADER, originator)).build();
|
PLATFORM_REQUEST_ORIGINATOR_HEADER, originator)).build();
|
||||||
return new HttpRequestTask(httpClient, requestWithSubmitter);
|
return new RunnableHttpRequest(httpClient, requestWithSubmitter);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <K, V> Multimap<K, V> copyOfWithEntry(
|
private static <K, V> Multimap<K, V> copyOfWithEntry(
|
||||||
Multimap<? extends K, ? extends V> multimap, K k1, V v1) {
|
Multimap<? extends K, ? extends V> multimap, K k1, V v1) {
|
||||||
return ImmutableMultimap.<K, V>builder().putAll(multimap).put(k1, v1).build();
|
return ImmutableMultimap.<K, V>builder().putAll(multimap).put(k1, v1).build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final HttpCommandExecutorService httpClient;
|
private final HttpCommandExecutorService httpClient;
|
||||||
private final HttpRequest request;
|
private final HttpRequest request;
|
||||||
|
|
||||||
private HttpRequestTask(HttpCommandExecutorService httpClient, HttpRequest request) {
|
private RunnableHttpRequest(HttpCommandExecutorService httpClient, HttpRequest request) {
|
||||||
this.httpClient = httpClient;
|
this.httpClient = httpClient;
|
||||||
this.request = request;
|
this.request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
httpClient.submit(new ImmutableHttpCommand(request));
|
httpClient.submit(new ImmutableHttpCommand(request));
|
||||||
}
|
}
|
||||||
|
|
||||||
private class ImmutableHttpCommand implements HttpCommand {
|
private class ImmutableHttpCommand implements HttpCommand {
|
||||||
private final HttpRequest request;
|
private final HttpRequest request;
|
||||||
|
|
||||||
public ImmutableHttpCommand(HttpRequest request) {
|
public ImmutableHttpCommand(HttpRequest request) {
|
||||||
this.request = request;
|
this.request = request;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setException(Exception exception) {
|
public void setException(Exception exception) {
|
||||||
}
|
}
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas.config;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static com.google.inject.name.Names.bindProperties;
|
||||||
|
import static java.util.concurrent.TimeUnit.SECONDS;
|
||||||
|
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
import javax.servlet.ServletContextEvent;
|
||||||
|
import javax.servlet.ServletContextListener;
|
||||||
|
import javax.ws.rs.core.UriBuilder;
|
||||||
|
|
||||||
|
import org.jclouds.PropertiesBuilder;
|
||||||
|
import org.jclouds.concurrent.config.ExecutorServiceModule;
|
||||||
|
import org.jclouds.demo.paas.PlatformServices;
|
||||||
|
import org.jclouds.demo.paas.service.scheduler.Scheduler;
|
||||||
|
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
|
||||||
|
import org.jclouds.demo.tweetstore.config.util.PropertiesLoader;
|
||||||
|
import org.jclouds.http.HttpCommandExecutorService;
|
||||||
|
import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableMap.Builder;
|
||||||
|
import com.google.inject.AbstractModule;
|
||||||
|
import com.google.inject.Guice;
|
||||||
|
import com.sun.jersey.api.uri.UriBuilderImpl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public class PlatformServicesInitializer implements ServletContextListener {
|
||||||
|
public static final String PLATFORM_SERVICES_ATTRIBUTE_NAME = PlatformServicesInitializer.class.getName();
|
||||||
|
|
||||||
|
// keep in sync with cloudbees-web.xml
|
||||||
|
protected static final String HOST_PARAMETER = "application.host";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void contextInitialized(ServletContextEvent contextEvent) {
|
||||||
|
ServletContext context = contextEvent.getServletContext();
|
||||||
|
context.setAttribute(PLATFORM_SERVICES_ATTRIBUTE_NAME, createServices(context));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static PlatformServices createServices(ServletContext context) {
|
||||||
|
HttpCommandExecutorService httpClient = createHttpClient(context);
|
||||||
|
return new PlatformServices(getBaseUrl(context), new Scheduler(httpClient),
|
||||||
|
createTaskQueues(httpClient));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static HttpCommandExecutorService createHttpClient(
|
||||||
|
final ServletContext context) {
|
||||||
|
return Guice.createInjector(new ExecutorServiceModule(),
|
||||||
|
new JavaUrlHttpCommandExecutorServiceModule(),
|
||||||
|
new AbstractModule() {
|
||||||
|
@Override
|
||||||
|
protected void configure() {
|
||||||
|
// URL connection defaults
|
||||||
|
Properties toBind = new PropertiesBuilder().build();
|
||||||
|
toBind.putAll(checkNotNull(new PropertiesLoader(context).get(), "properties"));
|
||||||
|
toBind.putAll(System.getProperties());
|
||||||
|
bindProperties(binder(), toBind);
|
||||||
|
bind(UriBuilder.class).to(UriBuilderImpl.class);
|
||||||
|
}
|
||||||
|
}).getInstance(HttpCommandExecutorService.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected static String getBaseUrl(ServletContext context) {
|
||||||
|
return "http://" + checkNotNull(context.getInitParameter(HOST_PARAMETER), HOST_PARAMETER)
|
||||||
|
+ context.getContextPath();
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: make the number and names of queues configurable
|
||||||
|
protected static ImmutableMap<String, TaskQueue> createTaskQueues(HttpCommandExecutorService httpClient) {
|
||||||
|
Builder<String, TaskQueue> taskQueues = ImmutableMap.builder();
|
||||||
|
taskQueues.put("twitter", TaskQueue.builder(httpClient)
|
||||||
|
.name("twitter").period(SECONDS.toMillis(30))
|
||||||
|
.build());
|
||||||
|
return taskQueues.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void contextDestroyed(ServletContextEvent servletContextEvent) {
|
||||||
|
ServletContext context = servletContextEvent.getServletContext();
|
||||||
|
context.removeAttribute(PLATFORM_SERVICES_ATTRIBUTE_NAME);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,28 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas.reference;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configuration properties and constants used in PaaS applications.
|
||||||
|
*
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public interface PaasConstants {
|
||||||
|
static final String PROPERTY_PLATFORM_BASE_URL = "jclouds.paas.baseurl";
|
||||||
|
}
|
|
@ -0,0 +1,69 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas.service.scheduler;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
|
||||||
|
import org.jclouds.demo.paas.PlatformServices;
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest;
|
||||||
|
import org.jclouds.http.HttpRequest;
|
||||||
|
import org.quartz.Job;
|
||||||
|
import org.quartz.JobExecutionContext;
|
||||||
|
import org.quartz.JobExecutionException;
|
||||||
|
import org.quartz.SchedulerException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public class HttpRequestJob implements Job {
|
||||||
|
protected static final String URL_ATTRIBUTE_NAME = "url";
|
||||||
|
|
||||||
|
// keep in sync with "quartz:scheduler-context-servlet-context-key" param in web.xml
|
||||||
|
protected static final String SERVLET_CONTEXT_KEY = "servlet-context";
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(JobExecutionContext context) throws JobExecutionException {
|
||||||
|
PlatformServices platform = JobContexts.getPlatform(context);
|
||||||
|
RunnableHttpRequest request = platform.getScheduler().getHttpRequestFactory().create(
|
||||||
|
HttpRequest.builder()
|
||||||
|
.endpoint(JobContexts.getTargetUrl(platform.getBaseUrl(), context))
|
||||||
|
.method("GET").build());
|
||||||
|
request.run();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class JobContexts {
|
||||||
|
private static URI getTargetUrl(String baseUrl, JobExecutionContext context) {
|
||||||
|
return URI.create(baseUrl + (String) checkNotNull(
|
||||||
|
context.getMergedJobDataMap().get(URL_ATTRIBUTE_NAME), URL_ATTRIBUTE_NAME));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static PlatformServices getPlatform(JobExecutionContext jobContext) throws JobExecutionException {
|
||||||
|
try {
|
||||||
|
return PlatformServices.get((ServletContext) checkNotNull(
|
||||||
|
jobContext.getScheduler().getContext().get(SERVLET_CONTEXT_KEY), SERVLET_CONTEXT_KEY));
|
||||||
|
} catch (SchedulerException exception) {
|
||||||
|
throw new JobExecutionException("Unable to get platform services from the job execution context", exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas.service.scheduler;
|
||||||
|
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest;
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
|
||||||
|
import org.jclouds.http.HttpCommandExecutorService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public class Scheduler {
|
||||||
|
protected static final String SCHEDULER_ORIGINATOR_NAME = "scheduler";
|
||||||
|
|
||||||
|
protected final Factory httpRequestFactory;
|
||||||
|
|
||||||
|
public Scheduler(HttpCommandExecutorService httpClient) {
|
||||||
|
httpRequestFactory =
|
||||||
|
RunnableHttpRequest.factory(httpClient, SCHEDULER_ORIGINATOR_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Factory getHttpRequestFactory() {
|
||||||
|
return httpRequestFactory;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,401 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.paas.service.scheduler.quartz.plugins;
|
||||||
|
|
||||||
|
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
|
||||||
|
import static org.quartz.TriggerBuilder.newTrigger;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileNotFoundException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
import java.net.URL;
|
||||||
|
import java.net.URLDecoder;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.StringTokenizer;
|
||||||
|
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
import org.quartz.JobBuilder;
|
||||||
|
import org.quartz.JobDetail;
|
||||||
|
import org.quartz.Scheduler;
|
||||||
|
import org.quartz.SchedulerException;
|
||||||
|
import org.quartz.SimpleTrigger;
|
||||||
|
import org.quartz.TriggerKey;
|
||||||
|
import org.quartz.jobs.FileScanJob;
|
||||||
|
import org.quartz.jobs.FileScanListener;
|
||||||
|
import org.quartz.plugins.xml.XMLSchedulingDataProcessorPlugin;
|
||||||
|
import org.quartz.simpl.CascadingClassLoadHelper;
|
||||||
|
import org.quartz.spi.ClassLoadHelper;
|
||||||
|
import org.quartz.spi.SchedulerPlugin;
|
||||||
|
import org.quartz.xml.XMLSchedulingDataProcessor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A copy of {@link XMLSchedulingDataProcessorPlugin} that does not reference
|
||||||
|
* {@code javax.transaction.UserTransaction} as so does not require a dependency
|
||||||
|
* on JTA.
|
||||||
|
*
|
||||||
|
* @author Andrew Phillips
|
||||||
|
* @see XMLSchedulingDataProcessorPlugin
|
||||||
|
*/
|
||||||
|
public class TransactionlessXmlSchedulingDataProcessorPlugin implements
|
||||||
|
FileScanListener, SchedulerPlugin {
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*
|
||||||
|
* Data members.
|
||||||
|
*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*/
|
||||||
|
private static final int MAX_JOB_TRIGGER_NAME_LEN = 80;
|
||||||
|
private static final String JOB_INITIALIZATION_PLUGIN_NAME = "JobSchedulingDataLoaderPlugin";
|
||||||
|
private static final String FILE_NAME_DELIMITERS = ",";
|
||||||
|
|
||||||
|
private String name;
|
||||||
|
private Scheduler scheduler;
|
||||||
|
private final Logger log = Logger.CONSOLE;
|
||||||
|
|
||||||
|
private boolean failOnFileNotFound = true;
|
||||||
|
|
||||||
|
private String fileNames = XMLSchedulingDataProcessor.QUARTZ_XML_DEFAULT_FILE_NAME;
|
||||||
|
|
||||||
|
// Populated by initialization
|
||||||
|
private Map<String, JobFile> jobFiles = new LinkedHashMap<String, JobFile>();
|
||||||
|
|
||||||
|
private long scanInterval = 0;
|
||||||
|
|
||||||
|
boolean started = false;
|
||||||
|
|
||||||
|
protected ClassLoadHelper classLoadHelper = null;
|
||||||
|
|
||||||
|
private Set<String> jobTriggerNameSet = new HashSet<String>();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*
|
||||||
|
* Interface.
|
||||||
|
*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Comma separated list of file names (with paths) to the XML files that should be read.
|
||||||
|
*/
|
||||||
|
public String getFileNames() {
|
||||||
|
return fileNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The file name (and path) to the XML file that should be read.
|
||||||
|
*/
|
||||||
|
public void setFileNames(String fileNames) {
|
||||||
|
this.fileNames = fileNames;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The interval (in seconds) at which to scan for changes to the file.
|
||||||
|
* If the file has been changed, it is re-loaded and parsed. The default
|
||||||
|
* value for the interval is 0, which disables scanning.
|
||||||
|
*
|
||||||
|
* @return Returns the scanInterval.
|
||||||
|
*/
|
||||||
|
public long getScanInterval() {
|
||||||
|
return scanInterval / 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The interval (in seconds) at which to scan for changes to the file.
|
||||||
|
* If the file has been changed, it is re-loaded and parsed. The default
|
||||||
|
* value for the interval is 0, which disables scanning.
|
||||||
|
*
|
||||||
|
* @param scanInterval The scanInterval to set.
|
||||||
|
*/
|
||||||
|
public void setScanInterval(long scanInterval) {
|
||||||
|
this.scanInterval = scanInterval * 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not initialization of the plugin should fail (throw an
|
||||||
|
* exception) if the file cannot be found. Default is <code>true</code>.
|
||||||
|
*/
|
||||||
|
public boolean isFailOnFileNotFound() {
|
||||||
|
return failOnFileNotFound;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether or not initialization of the plugin should fail (throw an
|
||||||
|
* exception) if the file cannot be found. Default is <code>true</code>.
|
||||||
|
*/
|
||||||
|
public void setFailOnFileNotFound(boolean failOnFileNotFound) {
|
||||||
|
this.failOnFileNotFound = failOnFileNotFound;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*
|
||||||
|
* SchedulerPlugin Interface.
|
||||||
|
*
|
||||||
|
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Called during creation of the <code>Scheduler</code> in order to give
|
||||||
|
* the <code>SchedulerPlugin</code> a chance to initialize.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @throws org.quartz.SchedulerConfigException
|
||||||
|
* if there is an error initializing.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void initialize(String name, Scheduler scheduler)
|
||||||
|
throws SchedulerException {
|
||||||
|
this.name = name;
|
||||||
|
this.scheduler = scheduler;
|
||||||
|
|
||||||
|
classLoadHelper = new CascadingClassLoadHelper();
|
||||||
|
classLoadHelper.initialize();
|
||||||
|
|
||||||
|
log.info("Registering Quartz Job Initialization Plug-in.");
|
||||||
|
|
||||||
|
// Create JobFile objects
|
||||||
|
StringTokenizer stok = new StringTokenizer(fileNames, FILE_NAME_DELIMITERS);
|
||||||
|
while (stok.hasMoreTokens()) {
|
||||||
|
final String fileName = stok.nextToken();
|
||||||
|
final JobFile jobFile = new JobFile(fileName);
|
||||||
|
jobFiles.put(fileName, jobFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
try {
|
||||||
|
if (jobFiles.isEmpty() == false) {
|
||||||
|
|
||||||
|
if (scanInterval > 0) {
|
||||||
|
scheduler.getContext().put(JOB_INITIALIZATION_PLUGIN_NAME + '_' + name, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
Iterator<JobFile> iterator = jobFiles.values().iterator();
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
JobFile jobFile = iterator.next();
|
||||||
|
|
||||||
|
if (scanInterval > 0) {
|
||||||
|
String jobTriggerName = buildJobTriggerName(jobFile.getFileBasename());
|
||||||
|
TriggerKey tKey = new TriggerKey(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME);
|
||||||
|
|
||||||
|
// remove pre-existing job/trigger, if any
|
||||||
|
scheduler.unscheduleJob(tKey);
|
||||||
|
|
||||||
|
// TODO: convert to use builder
|
||||||
|
SimpleTrigger trig = newTrigger()
|
||||||
|
.withIdentity(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME)
|
||||||
|
.startNow()
|
||||||
|
.endAt(null)
|
||||||
|
.withSchedule(simpleSchedule()
|
||||||
|
.repeatForever()
|
||||||
|
.withIntervalInMilliseconds(scanInterval))
|
||||||
|
.build();
|
||||||
|
|
||||||
|
JobDetail job = JobBuilder.newJob(FileScanJob.class)
|
||||||
|
.withIdentity(jobTriggerName, JOB_INITIALIZATION_PLUGIN_NAME)
|
||||||
|
.build();
|
||||||
|
job.getJobDataMap().put(FileScanJob.FILE_NAME, jobFile.getFileName());
|
||||||
|
job.getJobDataMap().put(FileScanJob.FILE_SCAN_LISTENER_NAME, JOB_INITIALIZATION_PLUGIN_NAME + '_' + name);
|
||||||
|
|
||||||
|
scheduler.scheduleJob(job, trig);
|
||||||
|
log.debug("Scheduled file scan job for data file: {}, at interval: {}", jobFile.getFileName(), scanInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
processFile(jobFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch(SchedulerException se) {
|
||||||
|
log.error("Error starting background-task for watching jobs file.", se);
|
||||||
|
} finally {
|
||||||
|
started = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method for generating unique job/trigger name for the
|
||||||
|
* file scanning jobs (one per FileJob). The unique names are saved
|
||||||
|
* in jobTriggerNameSet.
|
||||||
|
*/
|
||||||
|
private String buildJobTriggerName(
|
||||||
|
String fileBasename) {
|
||||||
|
// Name w/o collisions will be prefix + _ + filename (with '.' of filename replaced with '_')
|
||||||
|
// For example: JobInitializationPlugin_jobInitializer_myjobs_xml
|
||||||
|
String jobTriggerName = JOB_INITIALIZATION_PLUGIN_NAME + '_' + name + '_' + fileBasename.replace('.', '_');
|
||||||
|
|
||||||
|
// If name is too long (DB column is 80 chars), then truncate to max length
|
||||||
|
if (jobTriggerName.length() > MAX_JOB_TRIGGER_NAME_LEN) {
|
||||||
|
jobTriggerName = jobTriggerName.substring(0, MAX_JOB_TRIGGER_NAME_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure this name is unique in case the same file name under different
|
||||||
|
// directories is being checked, or had a naming collision due to length truncation.
|
||||||
|
// If there is a conflict, keep incrementing a _# suffix on the name (being sure
|
||||||
|
// not to get too long), until we find a unique name.
|
||||||
|
int currentIndex = 1;
|
||||||
|
while (jobTriggerNameSet.add(jobTriggerName) == false) {
|
||||||
|
// If not our first time through, then strip off old numeric suffix
|
||||||
|
if (currentIndex > 1) {
|
||||||
|
jobTriggerName = jobTriggerName.substring(0, jobTriggerName.lastIndexOf('_'));
|
||||||
|
}
|
||||||
|
|
||||||
|
String numericSuffix = "_" + currentIndex++;
|
||||||
|
|
||||||
|
// If the numeric suffix would make the name too long, then make room for it.
|
||||||
|
if (jobTriggerName.length() > (MAX_JOB_TRIGGER_NAME_LEN - numericSuffix.length())) {
|
||||||
|
jobTriggerName = jobTriggerName.substring(0, (MAX_JOB_TRIGGER_NAME_LEN - numericSuffix.length()));
|
||||||
|
}
|
||||||
|
|
||||||
|
jobTriggerName += numericSuffix;
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobTriggerName;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
// nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processFile(JobFile jobFile) {
|
||||||
|
if (jobFile == null || !jobFile.getFileFound()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
XMLSchedulingDataProcessor processor =
|
||||||
|
new XMLSchedulingDataProcessor(this.classLoadHelper);
|
||||||
|
|
||||||
|
processor.addJobGroupToNeverDelete(JOB_INITIALIZATION_PLUGIN_NAME);
|
||||||
|
processor.addTriggerGroupToNeverDelete(JOB_INITIALIZATION_PLUGIN_NAME);
|
||||||
|
|
||||||
|
processor.processFileAndScheduleJobs(
|
||||||
|
jobFile.getFileName(),
|
||||||
|
jobFile.getFileName(), // systemId
|
||||||
|
scheduler);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Error scheduling jobs: " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processFile(String filePath) {
|
||||||
|
processFile((JobFile)jobFiles.get(filePath));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see org.quartz.jobs.FileScanListener#fileUpdated(java.lang.String)
|
||||||
|
*/
|
||||||
|
public void fileUpdated(String fileName) {
|
||||||
|
if (started) {
|
||||||
|
processFile(fileName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobFile {
|
||||||
|
private String fileName;
|
||||||
|
|
||||||
|
// These are set by initialize()
|
||||||
|
private String filePath;
|
||||||
|
private String fileBasename;
|
||||||
|
private boolean fileFound;
|
||||||
|
|
||||||
|
protected JobFile(String fileName) throws SchedulerException {
|
||||||
|
this.fileName = fileName;
|
||||||
|
initialize();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getFileName() {
|
||||||
|
return fileName;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean getFileFound() {
|
||||||
|
return fileFound;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getFilePath() {
|
||||||
|
return filePath;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected String getFileBasename() {
|
||||||
|
return fileBasename;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initialize() throws SchedulerException {
|
||||||
|
InputStream f = null;
|
||||||
|
try {
|
||||||
|
String furl = null;
|
||||||
|
|
||||||
|
File file = new File(getFileName()); // files in filesystem
|
||||||
|
if (!file.exists()) {
|
||||||
|
URL url = classLoadHelper.getResource(getFileName());
|
||||||
|
if(url != null) {
|
||||||
|
try {
|
||||||
|
furl = URLDecoder.decode(url.getPath(), "UTF-8");
|
||||||
|
} catch (UnsupportedEncodingException e) {
|
||||||
|
furl = url.getPath();
|
||||||
|
}
|
||||||
|
file = new File(furl);
|
||||||
|
try {
|
||||||
|
f = url.openStream();
|
||||||
|
} catch (IOException ignor) {
|
||||||
|
// Swallow the exception
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
f = new java.io.FileInputStream(file);
|
||||||
|
}catch (FileNotFoundException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (f == null) {
|
||||||
|
if (isFailOnFileNotFound()) {
|
||||||
|
throw new SchedulerException(
|
||||||
|
"File named '" + getFileName() + "' does not exist.");
|
||||||
|
} else {
|
||||||
|
log.warn("File named '" + getFileName() + "' does not exist.");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fileFound = true;
|
||||||
|
}
|
||||||
|
filePath = (furl != null) ? furl : file.getAbsolutePath();
|
||||||
|
fileBasename = file.getName();
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (f != null) {
|
||||||
|
f.close();
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
log.warn("Error closing jobs file " + getFileName(), ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -16,25 +16,67 @@
|
||||||
* specific language governing permissions and limitations
|
* specific language governing permissions and limitations
|
||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
package org.jclouds.demo.tweetstore.taskqueue;
|
package org.jclouds.demo.paas.service.taskqueue;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import static com.google.common.base.Preconditions.checkArgument;
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static java.lang.String.format;
|
||||||
|
|
||||||
import java.util.Timer;
|
import java.util.Timer;
|
||||||
import java.util.TimerTask;
|
import java.util.TimerTask;
|
||||||
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest;
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
|
||||||
|
import org.jclouds.http.HttpCommandExecutorService;
|
||||||
|
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
|
|
||||||
public class TaskQueue {
|
public class TaskQueue {
|
||||||
public static Builder builder() {
|
protected final Factory httpRequestFactory;
|
||||||
return new Builder();
|
private final Timer timer;
|
||||||
|
private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<Runnable>();
|
||||||
|
|
||||||
|
private TaskQueue(String name, long pollingIntervalMillis, Factory httpRequestFactory) {
|
||||||
|
this.httpRequestFactory = httpRequestFactory;
|
||||||
|
timer = new Timer(name);
|
||||||
|
timer.scheduleAtFixedRate(new TimerTask() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
Runnable task = tasks.poll();
|
||||||
|
if (task != null) {
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, 0, pollingIntervalMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(final Runnable task) {
|
||||||
|
tasks.add(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Factory getHttpRequestFactory() {
|
||||||
|
return httpRequestFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() {
|
||||||
|
timer.cancel();
|
||||||
|
tasks.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Builder builder(HttpCommandExecutorService httpClient) {
|
||||||
|
return new Builder(httpClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Builder implements Provider<TaskQueue> {
|
public static class Builder implements Provider<TaskQueue> {
|
||||||
|
protected final HttpCommandExecutorService httpClient;
|
||||||
protected String name = "default";
|
protected String name = "default";
|
||||||
protected long taskPeriodMillis = TimeUnit.SECONDS.toMillis(1);
|
protected long pollingIntervalMillis = TimeUnit.SECONDS.toMillis(1);
|
||||||
|
|
||||||
|
private Builder(HttpCommandExecutorService httpClient) {
|
||||||
|
this.httpClient = checkNotNull(httpClient, "httpClient");
|
||||||
|
}
|
||||||
|
|
||||||
public Builder name(String name) {
|
public Builder name(String name) {
|
||||||
this.name = checkNotNull(name, "name");
|
this.name = checkNotNull(name, "name");
|
||||||
|
@ -42,18 +84,19 @@ public class TaskQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder period(TimeUnit period) {
|
public Builder period(TimeUnit period) {
|
||||||
this.taskPeriodMillis = checkNotNull(period, "period").toMillis(1);
|
this.pollingIntervalMillis = checkNotNull(period, "period").toMillis(1);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Builder period(long taskPeriodMillis) {
|
public Builder period(long pollingIntervalMillis) {
|
||||||
checkArgument(taskPeriodMillis > 0, "taskPeriodMillis");
|
checkArgument(pollingIntervalMillis > 0, "pollingIntervalMillis");
|
||||||
this.taskPeriodMillis = taskPeriodMillis;
|
this.pollingIntervalMillis = pollingIntervalMillis;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TaskQueue build() {
|
public TaskQueue build() {
|
||||||
return new TaskQueue(name, taskPeriodMillis);
|
return new TaskQueue(name, pollingIntervalMillis,
|
||||||
|
RunnableHttpRequest.factory(httpClient, format("taskqueue-%s", name)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -61,25 +104,4 @@ public class TaskQueue {
|
||||||
return build();
|
return build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Timer timer;
|
|
||||||
private final long taskPeriodMillis;
|
|
||||||
|
|
||||||
private TaskQueue(String name, long taskPeriodMillis) {
|
|
||||||
timer = new Timer(name);
|
|
||||||
this.taskPeriodMillis = taskPeriodMillis;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(final Runnable task) {
|
|
||||||
timer.scheduleAtFixedRate(new TimerTask() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
task.run();
|
|
||||||
}
|
|
||||||
}, 0, taskPeriodMillis);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void destroy() {
|
|
||||||
timer.cancel();
|
|
||||||
}
|
|
||||||
}
|
}
|
|
@ -23,7 +23,7 @@ import static com.google.common.base.Preconditions.checkState;
|
||||||
import static com.google.common.base.Predicates.in;
|
import static com.google.common.base.Predicates.in;
|
||||||
import static com.google.common.collect.ImmutableSet.copyOf;
|
import static com.google.common.collect.ImmutableSet.copyOf;
|
||||||
import static com.google.common.collect.Sets.filter;
|
import static com.google.common.collect.Sets.filter;
|
||||||
import static java.util.concurrent.TimeUnit.MINUTES;
|
import static org.jclouds.demo.paas.reference.PaasConstants.PROPERTY_PLATFORM_BASE_URL;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
|
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
|
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
|
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
|
||||||
|
@ -31,9 +31,6 @@ import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TW
|
||||||
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_KEY;
|
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_KEY;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_SECRET;
|
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_CONSUMER_SECRET;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -43,13 +40,13 @@ import javax.servlet.ServletContextEvent;
|
||||||
|
|
||||||
import org.jclouds.blobstore.BlobStoreContext;
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
import org.jclouds.blobstore.BlobStoreContextFactory;
|
import org.jclouds.blobstore.BlobStoreContextFactory;
|
||||||
|
import org.jclouds.demo.paas.PlatformServices;
|
||||||
|
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
|
||||||
import org.jclouds.demo.tweetstore.config.util.CredentialsCollector;
|
import org.jclouds.demo.tweetstore.config.util.CredentialsCollector;
|
||||||
|
import org.jclouds.demo.tweetstore.config.util.PropertiesLoader;
|
||||||
import org.jclouds.demo.tweetstore.controller.AddTweetsController;
|
import org.jclouds.demo.tweetstore.controller.AddTweetsController;
|
||||||
|
import org.jclouds.demo.tweetstore.controller.EnqueueStoresController;
|
||||||
import org.jclouds.demo.tweetstore.controller.StoreTweetsController;
|
import org.jclouds.demo.tweetstore.controller.StoreTweetsController;
|
||||||
import org.jclouds.demo.tweetstore.taskqueue.HttpRequestTask;
|
|
||||||
import org.jclouds.demo.tweetstore.taskqueue.TaskQueue;
|
|
||||||
import org.jclouds.demo.tweetstore.taskqueue.HttpRequestTask.Factory;
|
|
||||||
import org.jclouds.http.HttpRequest;
|
|
||||||
|
|
||||||
import twitter4j.Twitter;
|
import twitter4j.Twitter;
|
||||||
import twitter4j.TwitterFactory;
|
import twitter4j.TwitterFactory;
|
||||||
|
@ -57,10 +54,8 @@ import twitter4j.conf.Configuration;
|
||||||
import twitter4j.conf.ConfigurationBuilder;
|
import twitter4j.conf.ConfigurationBuilder;
|
||||||
|
|
||||||
import com.google.common.base.Splitter;
|
import com.google.common.base.Splitter;
|
||||||
import com.google.common.collect.ImmutableMultimap;
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Maps;
|
import com.google.common.collect.Maps;
|
||||||
import com.google.common.io.Closeables;
|
|
||||||
import com.google.inject.Guice;
|
import com.google.inject.Guice;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
|
@ -79,13 +74,14 @@ public class GuiceServletConfig extends GuiceServletContextListener {
|
||||||
private Twitter twitterClient;
|
private Twitter twitterClient;
|
||||||
private String container;
|
private String container;
|
||||||
private TaskQueue queue;
|
private TaskQueue queue;
|
||||||
|
private String baseUrl;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void contextInitialized(ServletContextEvent servletContextEvent) {
|
public void contextInitialized(ServletContextEvent servletContextEvent) {
|
||||||
BlobStoreContextFactory blobStoreContextFactory = new BlobStoreContextFactory();
|
BlobStoreContextFactory blobStoreContextFactory = new BlobStoreContextFactory();
|
||||||
|
ServletContext servletContext = servletContextEvent.getServletContext();
|
||||||
|
|
||||||
Properties props = loadJCloudsProperties(servletContextEvent);
|
Properties props = new PropertiesLoader(servletContext).get();
|
||||||
|
|
||||||
Set<Module> modules = ImmutableSet.<Module>of();
|
Set<Module> modules = ImmutableSet.<Module>of();
|
||||||
// shared across all blobstores and used to retrieve tweets
|
// shared across all blobstores and used to retrieve tweets
|
||||||
try {
|
try {
|
||||||
|
@ -108,16 +104,10 @@ public class GuiceServletConfig extends GuiceServletContextListener {
|
||||||
providerTypeToBlobStoreMap.put(hint, blobStoreContextFactory.createContext(hint, modules, props));
|
providerTypeToBlobStoreMap.put(hint, blobStoreContextFactory.createContext(hint, modules, props));
|
||||||
}
|
}
|
||||||
|
|
||||||
// get a queue for submitting store tweet requests
|
// get a queue for submitting store tweet requests and the application's base URL
|
||||||
queue = TaskQueue.builder().name("twitter").period(MINUTES).build();
|
PlatformServices platform = PlatformServices.get(servletContext);
|
||||||
Factory taskFactory = HttpRequestTask.factory(props, "twitter");
|
queue = platform.getTaskQueue("twitter");
|
||||||
// submit a job to store tweets for each configured blobstore
|
baseUrl = platform.getBaseUrl();
|
||||||
for (String name : providerTypeToBlobStoreMap.keySet()) {
|
|
||||||
queue.add(taskFactory.create(HttpRequest.builder()
|
|
||||||
.endpoint(withUrl(servletContextEvent.getServletContext(), "/store/do"))
|
|
||||||
.headers(ImmutableMultimap.of("context", name))
|
|
||||||
.method("GET").build()));
|
|
||||||
}
|
|
||||||
|
|
||||||
super.contextInitialized(servletContextEvent);
|
super.contextInitialized(servletContextEvent);
|
||||||
}
|
}
|
||||||
|
@ -131,40 +121,23 @@ public class GuiceServletConfig extends GuiceServletContextListener {
|
||||||
checkState(!contexts.isEmpty(), "no credentials available for any requested context");
|
checkState(!contexts.isEmpty(), "no credentials available for any requested context");
|
||||||
return contexts;
|
return contexts;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static URI withUrl(ServletContext servletContext, String url) {
|
|
||||||
return URI.create("http://" + checkNotNull(servletContext.getInitParameter("application.host"), "application.host")
|
|
||||||
+ servletContext.getContextPath() + url);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Properties loadJCloudsProperties(
|
|
||||||
ServletContextEvent servletContextEvent) {
|
|
||||||
InputStream input = servletContextEvent.getServletContext()
|
|
||||||
.getResourceAsStream("/WEB-INF/jclouds.properties");
|
|
||||||
Properties props = new Properties();
|
|
||||||
try {
|
|
||||||
props.load(input);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
} finally {
|
|
||||||
Closeables.closeQuietly(input);
|
|
||||||
}
|
|
||||||
return props;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Injector getInjector() {
|
protected Injector getInjector() {
|
||||||
return Guice.createInjector(new ServletModule() {
|
return Guice.createInjector(new ServletModule() {
|
||||||
@Override
|
@Override
|
||||||
protected void configureServlets() {
|
protected void configureServlets() {
|
||||||
bind(new TypeLiteral<Map<String, BlobStoreContext>>() {
|
bind(new TypeLiteral<Map<String, BlobStoreContext>>() {})
|
||||||
}).toInstance(providerTypeToBlobStoreMap);
|
.toInstance(providerTypeToBlobStoreMap);
|
||||||
bind(Twitter.class).toInstance(twitterClient);
|
bind(Twitter.class).toInstance(twitterClient);
|
||||||
bindConstant().annotatedWith(
|
bind(TaskQueue.class).toInstance(queue);
|
||||||
Names.named(PROPERTY_TWEETSTORE_CONTAINER)).to(
|
bindConstant().annotatedWith(Names.named(PROPERTY_PLATFORM_BASE_URL))
|
||||||
container);
|
.to(baseUrl);
|
||||||
|
bindConstant().annotatedWith(Names.named(PROPERTY_TWEETSTORE_CONTAINER))
|
||||||
|
.to(container);
|
||||||
serve("/store/*").with(StoreTweetsController.class);
|
serve("/store/*").with(StoreTweetsController.class);
|
||||||
serve("/tweets/*").with(AddTweetsController.class);
|
serve("/tweets/*").with(AddTweetsController.class);
|
||||||
|
serve("/stores/*").with(EnqueueStoresController.class);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.tweetstore.config.util;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import javax.servlet.ServletContext;
|
||||||
|
|
||||||
|
import com.google.common.io.Closeables;
|
||||||
|
import com.google.inject.Provider;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
public class PropertiesLoader implements Provider<Properties>{
|
||||||
|
private static final String PROPERTIES_FILE = "/WEB-INF/jclouds.properties";
|
||||||
|
|
||||||
|
private final Properties properties;
|
||||||
|
|
||||||
|
public PropertiesLoader(ServletContext context) {
|
||||||
|
properties = loadJcloudsProperties(context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Properties loadJcloudsProperties(ServletContext context) {
|
||||||
|
InputStream input = context.getResourceAsStream(PROPERTIES_FILE);
|
||||||
|
Properties props = new Properties();
|
||||||
|
try {
|
||||||
|
props.load(input);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} finally {
|
||||||
|
Closeables.closeQuietly(input);
|
||||||
|
}
|
||||||
|
return props;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Properties get() {
|
||||||
|
return properties;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,101 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.tweetstore.controller;
|
||||||
|
|
||||||
|
import static com.google.common.base.Strings.nullToEmpty;
|
||||||
|
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import javax.annotation.Resource;
|
||||||
|
import javax.inject.Inject;
|
||||||
|
import javax.inject.Named;
|
||||||
|
import javax.inject.Singleton;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServlet;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
import javax.ws.rs.core.MediaType;
|
||||||
|
|
||||||
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
|
import org.jclouds.demo.paas.reference.PaasConstants;
|
||||||
|
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
|
||||||
|
import org.jclouds.http.HttpRequest;
|
||||||
|
import org.jclouds.logging.Logger;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds tasks to retrieve and store tweets in all registered contexts to an async
|
||||||
|
* task queue.
|
||||||
|
*
|
||||||
|
* @author Andrew Phillips
|
||||||
|
* @see StoreTweetsController
|
||||||
|
*/
|
||||||
|
@Singleton
|
||||||
|
public class EnqueueStoresController extends HttpServlet {
|
||||||
|
/** The serialVersionUID */
|
||||||
|
private static final long serialVersionUID = 7215420527854203714L;
|
||||||
|
|
||||||
|
private final Set<String> contextNames;
|
||||||
|
private final TaskQueue taskQueue;
|
||||||
|
private final String baseUrl;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
protected Logger logger = Logger.NULL;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public EnqueueStoresController(Map<String, BlobStoreContext> contexts, TaskQueue taskQueue,
|
||||||
|
@Named(PaasConstants.PROPERTY_PLATFORM_BASE_URL) String baseUrl) {
|
||||||
|
contextNames = contexts.keySet();
|
||||||
|
this.taskQueue = taskQueue;
|
||||||
|
this.baseUrl = baseUrl;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void enqueueStoreTweetTasks() {
|
||||||
|
for (String contextName : contextNames) {
|
||||||
|
logger.debug("enqueuing task to store tweets in blobstore '%s'", contextName);
|
||||||
|
taskQueue.add(taskQueue.getHttpRequestFactory().create(HttpRequest.builder()
|
||||||
|
.endpoint(URI.create(baseUrl + "/store/do"))
|
||||||
|
.headers(ImmutableMultimap.of("context", contextName))
|
||||||
|
.method("GET").build()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||||
|
if (!nullToEmpty(request.getHeader(PLATFORM_REQUEST_ORIGINATOR_HEADER)).equals("scheduler")) {
|
||||||
|
response.sendError(401);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
enqueueStoreTweetTasks();
|
||||||
|
response.setContentType(MediaType.TEXT_PLAIN);
|
||||||
|
response.getWriter().println("Done!");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error(e, "Error storing tweets");
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,6 +19,8 @@
|
||||||
package org.jclouds.demo.tweetstore.controller;
|
package org.jclouds.demo.tweetstore.controller;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
import static com.google.common.base.Strings.nullToEmpty;
|
||||||
|
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -53,7 +55,6 @@ import com.google.common.base.Function;
|
||||||
*/
|
*/
|
||||||
@Singleton
|
@Singleton
|
||||||
public class StoreTweetsController extends HttpServlet {
|
public class StoreTweetsController extends HttpServlet {
|
||||||
public static final String AUTHORIZED_REQUEST_ORIGINATOR_HEADER = "X-RUNatcloud-Originator";
|
|
||||||
|
|
||||||
private static final class StatusToBlob implements Function<Status, Blob> {
|
private static final class StatusToBlob implements Function<Status, Blob> {
|
||||||
private final BlobMap map;
|
private final BlobMap map;
|
||||||
|
@ -110,8 +111,7 @@ public class StoreTweetsController extends HttpServlet {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
||||||
if (request.getHeader(AUTHORIZED_REQUEST_ORIGINATOR_HEADER) != null
|
if (nullToEmpty(request.getHeader(PLATFORM_REQUEST_ORIGINATOR_HEADER)).equals("taskqueue-twitter")) {
|
||||||
&& request.getHeader(AUTHORIZED_REQUEST_ORIGINATOR_HEADER).equals("twitter")) {
|
|
||||||
try {
|
try {
|
||||||
String contextName = checkNotNull(request.getHeader("context"), "missing header context");
|
String contextName = checkNotNull(request.getHeader("context"), "missing header context");
|
||||||
logger.info("retrieving tweets");
|
logger.info("retrieving tweets");
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
<?xml version='1.0' encoding='utf-8'?>
|
||||||
|
<job-scheduling-data xmlns="http://www.quartz-scheduler.org/xml/JobSchedulingData"
|
||||||
|
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
|
xsi:schemaLocation="http://www.quartz-scheduler.org/xml/JobSchedulingData http://www.quartz-scheduler.org/xml/job_scheduling_data_2_0.xsd"
|
||||||
|
version="2.0">
|
||||||
|
|
||||||
|
<schedule>
|
||||||
|
<job>
|
||||||
|
<name>enqueue-store-tweet-tasks</name>
|
||||||
|
<description>Enqueue 'store tweet' tasks for all contexts</description>
|
||||||
|
<job-class>org.jclouds.demo.paas.service.scheduler.HttpRequestJob</job-class>
|
||||||
|
<job-data-map>
|
||||||
|
<entry>
|
||||||
|
<key>url</key>
|
||||||
|
<value>/stores/do</value>
|
||||||
|
</entry>
|
||||||
|
</job-data-map>
|
||||||
|
</job>
|
||||||
|
|
||||||
|
<trigger>
|
||||||
|
<calendar-interval>
|
||||||
|
<name>submit-recurring-job</name>
|
||||||
|
<job-name>enqueue-store-tweet-tasks</job-name>
|
||||||
|
<repeat-interval>10</repeat-interval>
|
||||||
|
<repeat-interval-unit>MINUTE</repeat-interval-unit>
|
||||||
|
</calendar-interval>
|
||||||
|
</trigger>
|
||||||
|
</schedule>
|
||||||
|
</job-scheduling-data>
|
|
@ -0,0 +1,28 @@
|
||||||
|
#============================================================================
|
||||||
|
# Configure Main Scheduler Properties
|
||||||
|
#============================================================================
|
||||||
|
|
||||||
|
org.quartz.scheduler.skipUpdateCheck: true
|
||||||
|
|
||||||
|
#============================================================================
|
||||||
|
# Configure ThreadPool
|
||||||
|
#============================================================================
|
||||||
|
|
||||||
|
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
|
||||||
|
org.quartz.threadPool.threadCount: 1
|
||||||
|
|
||||||
|
#============================================================================
|
||||||
|
# Configure JobStore
|
||||||
|
#============================================================================
|
||||||
|
|
||||||
|
org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore
|
||||||
|
|
||||||
|
#============================================================================
|
||||||
|
# Configure the Job Initialization Plugin
|
||||||
|
#============================================================================
|
||||||
|
|
||||||
|
org.quartz.plugin.jobInitializer.class: org.jclouds.demo.paas.service.scheduler.quartz.plugins.TransactionlessXmlSchedulingDataProcessorPlugin
|
||||||
|
org.quartz.plugin.jobInitializer.fileNames: jobs.xml
|
||||||
|
org.quartz.plugin.jobInitializer.failOnFileNotFound: true
|
||||||
|
org.quartz.plugin.jobInitializer.scanInterval: 0
|
||||||
|
#org.quartz.plugin.jobInitializer.wrapInUserTransaction: false
|
|
@ -23,7 +23,12 @@
|
||||||
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_5.xsd"
|
xsi:schemaLocation="http://java.sun.com/xml/ns/j2ee http://java.sun.com/xml/ns/j2ee/web-app_2_5.xsd"
|
||||||
version="2.5">
|
version="2.5">
|
||||||
<display-name>jclouds-tweetstore</display-name>
|
<display-name>jclouds-tweetstore</display-name>
|
||||||
|
|
||||||
|
<context-param>
|
||||||
|
<param-name>quartz:scheduler-context-servlet-context-key</param-name>
|
||||||
|
<param-value>servlet-context</param-value>
|
||||||
|
</context-param>
|
||||||
|
|
||||||
<!-- Servlets -->
|
<!-- Servlets -->
|
||||||
<filter>
|
<filter>
|
||||||
<filter-name>guiceFilter</filter-name>
|
<filter-name>guiceFilter</filter-name>
|
||||||
|
@ -34,10 +39,18 @@
|
||||||
<filter-name>guiceFilter</filter-name>
|
<filter-name>guiceFilter</filter-name>
|
||||||
<url-pattern>/*</url-pattern>
|
<url-pattern>/*</url-pattern>
|
||||||
</filter-mapping>
|
</filter-mapping>
|
||||||
|
|
||||||
|
<!-- must be started first -->
|
||||||
<listener>
|
<listener>
|
||||||
<listener-class>org.jclouds.demo.tweetstore.config.GuiceServletConfig
|
<listener-class>org.jclouds.demo.paas.config.PlatformServicesInitializer</listener-class>
|
||||||
</listener-class>
|
</listener>
|
||||||
|
|
||||||
|
<listener>
|
||||||
|
<listener-class>org.quartz.ee.servlet.QuartzInitializerListener</listener-class>
|
||||||
|
</listener>
|
||||||
|
|
||||||
|
<listener>
|
||||||
|
<listener-class>org.jclouds.demo.tweetstore.config.GuiceServletConfig</listener-class>
|
||||||
</listener>
|
</listener>
|
||||||
|
|
||||||
<welcome-file-list>
|
<welcome-file-list>
|
||||||
|
|
|
@ -0,0 +1,83 @@
|
||||||
|
/**
|
||||||
|
* Licensed to jclouds, Inc. (jclouds) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. jclouds licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jclouds.demo.tweetstore.controller;
|
||||||
|
|
||||||
|
import static org.easymock.EasyMock.*;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.jclouds.blobstore.BlobStoreContext;
|
||||||
|
import org.jclouds.blobstore.BlobStoreContextFactory;
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest;
|
||||||
|
import org.jclouds.demo.paas.RunnableHttpRequest.Factory;
|
||||||
|
import org.jclouds.demo.paas.service.taskqueue.TaskQueue;
|
||||||
|
import org.jclouds.http.HttpRequest;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableMultimap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests behavior of {@code EnqueueStoresController}
|
||||||
|
*
|
||||||
|
* @author Andrew Phillips
|
||||||
|
*/
|
||||||
|
@Test(groups = "unit")
|
||||||
|
public class EnqueueStoresControllerTest {
|
||||||
|
|
||||||
|
Map<String, BlobStoreContext> createBlobStores() {
|
||||||
|
Map<String, BlobStoreContext> contexts = ImmutableMap.of(
|
||||||
|
"test1", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"),
|
||||||
|
"test2", new BlobStoreContextFactory().createContext("transient", "dummy", "dummy"));
|
||||||
|
return contexts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testEnqueueStores() {
|
||||||
|
Map<String, BlobStoreContext> stores = createBlobStores();
|
||||||
|
TaskQueue taskQueue = createMock(TaskQueue.class);
|
||||||
|
Factory httpRequestFactory = createMock(Factory.class);
|
||||||
|
EnqueueStoresController function = new EnqueueStoresController(stores,
|
||||||
|
taskQueue, "http://localhost:8080");
|
||||||
|
|
||||||
|
expect(taskQueue.getHttpRequestFactory()).andStubReturn(httpRequestFactory);
|
||||||
|
|
||||||
|
HttpRequest storeInTest1Request = HttpRequest.builder().endpoint(
|
||||||
|
URI.create("http://localhost:8080/store/do"))
|
||||||
|
.headers(ImmutableMultimap.of("context", "test1")).method("GET").build();
|
||||||
|
RunnableHttpRequest storeInTest1Task = null;
|
||||||
|
expect(httpRequestFactory.create(eq(storeInTest1Request))).andReturn(storeInTest1Task);
|
||||||
|
|
||||||
|
HttpRequest storeInTest2Request = HttpRequest.builder().endpoint(
|
||||||
|
URI.create("http://localhost:8080/store/do"))
|
||||||
|
.headers(ImmutableMultimap.of("context", "test2")).method("GET").build();
|
||||||
|
RunnableHttpRequest storeInTest2Task = null;
|
||||||
|
expect(httpRequestFactory.create(eq(storeInTest2Request))).andReturn(storeInTest2Task);
|
||||||
|
|
||||||
|
taskQueue.add(storeInTest1Task);
|
||||||
|
expectLastCall();
|
||||||
|
taskQueue.add(storeInTest2Task);
|
||||||
|
expectLastCall();
|
||||||
|
replay(httpRequestFactory, taskQueue);
|
||||||
|
|
||||||
|
function.enqueueStoreTweetTasks();
|
||||||
|
|
||||||
|
verify(taskQueue);
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,7 +19,7 @@
|
||||||
package org.jclouds.demo.tweetstore.integration;
|
package org.jclouds.demo.tweetstore.integration;
|
||||||
|
|
||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
import static org.jclouds.demo.tweetstore.controller.StoreTweetsController.AUTHORIZED_REQUEST_ORIGINATOR_HEADER;
|
import static org.jclouds.demo.paas.RunnableHttpRequest.PLATFORM_REQUEST_ORIGINATOR_HEADER;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
|
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_BLOBSTORES;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
|
import static org.jclouds.demo.tweetstore.reference.TweetStoreConstants.PROPERTY_TWEETSTORE_CONTAINER;
|
||||||
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
|
import static org.jclouds.demo.tweetstore.reference.TwitterConstants.PROPERTY_TWITTER_ACCESSTOKEN;
|
||||||
|
@ -199,7 +199,7 @@ public class TweetStoreLiveTest {
|
||||||
for (String context : blobstores) {
|
for (String context : blobstores) {
|
||||||
System.out.println("storing at context: " + context);
|
System.out.println("storing at context: " + context);
|
||||||
HttpURLConnection connection = (HttpURLConnection) gurl.openConnection();
|
HttpURLConnection connection = (HttpURLConnection) gurl.openConnection();
|
||||||
connection.addRequestProperty(AUTHORIZED_REQUEST_ORIGINATOR_HEADER, "twitter");
|
connection.addRequestProperty(PLATFORM_REQUEST_ORIGINATOR_HEADER, "taskqueue-twitter");
|
||||||
connection.addRequestProperty("context", context);
|
connection.addRequestProperty("context", context);
|
||||||
InputStream i = connection.getInputStream();
|
InputStream i = connection.getInputStream();
|
||||||
String string = Strings2.toStringAndClose(i);
|
String string = Strings2.toStringAndClose(i);
|
||||||
|
|
Loading…
Reference in New Issue