ACTIVEMQ6-20 removing twitter integration

https://issues.apache.org/jira/browse/ACTIVEMQ6-20

this has been identified as a dead feature.
It was only useful for presentations...
it had its time and we have better examples we can use...
It's time for this feature to go!
This commit is contained in:
Clebert Suconic 2014-12-02 12:15:25 -05:00
parent cb7774cfea
commit a3afd62575
18 changed files with 0 additions and 1998 deletions

View File

@ -45,9 +45,6 @@ import org.jboss.logging.annotations.MessageLogger;
@MessageLogger(projectCode = "AMQ")
public interface ActiveMQRestLogger extends BasicLogger
{
/**
* The twitter logger.
*/
ActiveMQRestLogger LOGGER = Logger.getMessageLogger(ActiveMQRestLogger.class, ActiveMQRestLogger.class.getPackage().getName());
@LogMessage(level = Logger.Level.INFO)

View File

@ -1,184 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq.examples.core</groupId>
<artifactId>core-examples</artifactId>
<version>6.0.0-SNAPSHOT</version>
</parent>
<artifactId>activemq-twitter-example</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ6 Twitter Example</name>
<properties>
<TWITTER_CONSUMER_KEY>consumerKey</TWITTER_CONSUMER_KEY>
<TWITTER_CONSUMER_SECRET>consumerSecret</TWITTER_CONSUMER_SECRET>
<TWITTER_ACCESS_TOKEN>twitterAccess</TWITTER_ACCESS_TOKEN>
<TWITTER_ACCESS_TOKEN_SECRET>twitterToken</TWITTER_ACCESS_TOKEN_SECRET>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.javaee</groupId>
<artifactId>jboss-jms-api</artifactId>
<version>1.1.0.GA</version>
</dependency>
<dependency>
<groupId>org.jboss.naming</groupId>
<artifactId>jnp-client</artifactId>
<version>5.0.5.Final</version>
</dependency>
<dependency>
<groupId>org.jboss.spec.javax.jms</groupId>
<artifactId>jboss-jms-api_2.0_spec</artifactId>
</dependency>
</dependencies>
<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-maven-plugin</artifactId>
<executions>
<execution>
<id>start</id>
<goals>
<goal>start</goal>
</goals>
<configuration>
<waitOnStart>true</waitOnStart>
<hornetqConfigurationDir>${basedir}/target/classes/server0</hornetqConfigurationDir>
<systemProperties>
<property>
<name>build.directory</name>
<value>${basedir}/target/</value>
</property>
<property>
<name>TWITTER_CONSUMER_KEY</name>
<value>${TWITTER_CONSUMER_KEY}</value>
</property>
<property>
<name>TWITTER_CONSUMER_SECRET</name>
<value>${TWITTER_CONSUMER_SECRET}</value>
</property>
<property>
<name>TWITTER_ACCESS_TOKEN</name>
<value>${TWITTER_ACCESS_TOKEN}</value>
</property>
<property>
<name>TWITTER_ACCESS_TOKEN_SECRET</name>
<value>${TWITTER_ACCESS_TOKEN_SECRET}</value>
</property>
</systemProperties>
</configuration>
</execution>
</executions>
<configuration>
<waitOnStart>false</waitOnStart>
<hornetqConfigurationDir>${basedir}/target/classes/server0</hornetqConfigurationDir>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.activemq.examples.core</groupId>
<artifactId>activemq-twitter-example</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-twitter-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.javaee</groupId>
<artifactId>jboss-jms-api</artifactId>
<version>1.1.0.GA</version>
</dependency>
<dependency>
<groupId>org.jboss.naming</groupId>
<artifactId>jnpserver</artifactId>
<version>5.0.3.GA</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>example</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>org.apache.activemq.core.example.TwitterConnectorExample</mainClass>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>

View File

@ -1,96 +0,0 @@
<html>
<head>
<title>ActiveMQ Twitter Connector Service Example</title>
<link rel="stylesheet" type="text/css" href="../../common/common.css" />
<link rel="stylesheet" type="text/css" href="../../common/prettify.css" />
<script type="text/javascript" src="../../common/prettify.js"></script>
</head>
<body onload="prettyPrint()">
<h1>Twitter Connector Service Example</h1>
<p>This example shows you how to configure ActiveMQ to use the Twitter Connector Service.</p>
<p>ActiveMQ supports 2 types of Twitter connector, incoming and outgoing.
Incoming connector consumes from twitter and forwards to a configurable address.
Outgoing connector consumes from a configurable address and forwards to twitter.
</p>
<p>In this example, incoming connector and outgoing connector is related to same twitter account.
So if you send a message to an outgoing address, outgoing connector forwards it to twitter,
and then incoming connector consumes it and forwards to incoming address.</p>
<h2>Example step-by-step</h2>
<p><i>To run the server, simply type <code>mvn-Dtwitter.consumerKey=consumer -Dtwitter.consumerSecret=secret -Dtwitter.accessToken=token -Dtwitter.accessTokenSecret=secret verify</code>
from this directory but replacing the system properties with those of the twitter account you want to use. Then run the example
by using the command <code>mvn -Pexample package</code></p>
<ol>
<li>First we need to create a ClientSessionFactory with Netty transport configuration</li>
<pre class="prettyprint">
<code>csf = ActiveMQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));</code>
</pre>
<li>We create a core session with auto-commit mode</li>
<pre class="prettyprint">
<code>session = csf.createSession(true,true);</code>
</pre>
<li>We Create a core producer for queue.outgoingQueue</li>
<pre class="prettyprint">
<code>ClientProducer cp = session.createProducer(OUTGOING_QUEUE);</code>
</pre>
<li>We create a core consumer for queue.incomingQueue</li>
<pre class="prettyprint">
<code>ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);</code>
</pre>
<li>We create a core message that we are going to send</li>
<pre class="prettyprint">
<code>ClientMessage cm = session.createMessage(org.apache.activemq.api.core.Message.TEXT_TYPE,true);
String testMessage = System.currentTimeMillis() + ": twitter connector test example";
cm.getBodyBuffer().writeString(testMessage);</code>
</pre>
<li>We send the message to queue.outgoingQueue</li>
<pre class="prettyprint">
<code>cp.send(cm);</code>
</pre>
<li>We start the session</li>
<pre class="prettyprint">
<code>session.start();</code>
</pre>
<li>We will receive a message from queue.incomingQueue.
Outgoing connector forwards a message(we sent before) to twitter immediately.
Since incoming connector consumes from twitter and forwards to queue.incomingQueue
every 60 seconds, It will be received in 60+x seconds.</li>
<pre class="prettyprint">
<code>ClientMessage received = cc.receive(70 * 1000);
received.acknowledge();
String receivedText = received.getBodyBuffer().readString();</code>
</pre>
<li>And finally, remember to close core session and ClientSessionFactory in a <code>finally</code> block.</li>
<pre class="prettyprint">
<code>finally
{
if(session != null)
{
session.close();
}
if(csf != null)
{
csf.close();
}
}</code>
</pre>
</ol>
</body>
</html>

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.core.example;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
/**
* A simple example of using twitter connector service.
*
* @author <a href="tm.igarashi@gmail.com">Tomohisa Igarashi</a>
*/
public class TwitterConnectorExample
{
private static final String INCOMING_QUEUE = "queue.incomingQueue";
private static final String OUTGOING_QUEUE = "queue.outgoingQueue";
public static void main(final String[] args) throws Exception
{
ServerLocator locator = null;
ClientSessionFactory csf = null;
ClientSession session = null;
try
{
String testMessage = System.currentTimeMillis() + ": " + System.getProperty("twitter.example.alternativeMessage");
if(testMessage == null || testMessage.trim().equals("")) {
testMessage = System.currentTimeMillis() + ": ### Hello, ActiveMQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###";
}
// Step 1. Create a ClientSessionFactory
locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
csf = locator.createSessionFactory();
// Step 2. Create a core session.
session = csf.createSession(true,true);
// Step 3. Create a core producer for queue.outgoingQueue.
ClientProducer cp = session.createProducer(OUTGOING_QUEUE);
// Step 4. Create a core consumer for queue.incomingQueue.
ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);
// Step 5. Create a core message.
ClientMessage cm = session.createMessage(org.apache.activemq.api.core.Message.TEXT_TYPE,true);
cm.getBodyBuffer().writeString(testMessage);
// Step 6. Send a message to queue.outgoingQueue.
cp.send(cm);
System.out.println("#### Sent a message to " + OUTGOING_QUEUE + ": " + testMessage);
// Step 7. Start the session.
session.start();
// Step 8. Receive a message from queue.incomingQueue.
// Outgoing connector forwards a message(sent at Step 6.) to twitter immediately.
// Since incoming connector consumes from twitter and forwards to queue.incomingQueue
// every 60 seconds, It will be received in 60+x seconds.
System.out.println("#### A message will be received in 60 seconds. Please wait...");
ClientMessage received = cc.receive(70 * 1000);
received.acknowledge();
String receivedText = received.getBodyBuffer().readString();
while(!receivedText.equals(testMessage))
{
// ignoring other tweets
received = cc.receiveImmediate();
if(received == null) {
// no other tweets. test message has gone...
return;
}
received.acknowledge();
receivedText = received.getBodyBuffer().readString();
}
System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText);
}
finally
{
// Step 9. Be sure to close some resources.
if(session != null)
{
session.close();
}
if(csf != null)
{
csf.close();
}
if (locator != null)
{
locator.close();
}
}
}
}

View File

@ -1,71 +0,0 @@
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/activemq-configuration.xsd">
<bindings-directory>target/server0/data/messaging/bindings</bindings-directory>
<journal-directory>target/server0/data/messaging/journal</journal-directory>
<large-messages-directory>target/server0/data/messaging/largemessages</large-messages-directory>
<paging-directory>target/server0/data/messaging/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">
<factory-class>org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
</connector>
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="netty-acceptor">
<factory-class>org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
</acceptor>
</acceptors>
<!-- Other config -->
<security-settings>
<!--security for example queue-->
<security-setting match="queue.incomingQueue">
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
<security-setting match="queue.outgoingQueue">
<permission type="consume" roles="guest"/>
<permission type="send" roles="guest"/>
</security-setting>
</security-settings>
<queues>
<queue name="queue.incomingQueue">
<address>queue.incomingQueue</address>
</queue>
<queue name="queue.outgoingQueue">
<address>queue.outgoingQueue</address>
</queue>
</queues>
<connector-services>
<connector-service name="my-incoming-tweets">
<factory-class>org.apache.activemq.integration.twitter.TwitterIncomingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.incomingQueue"/>
<param key="consumerKey" value="${twitter.consumerKey}"/>
<param key="consumerSecret" value="${twitter.consumerSecret}"/>
<param key="accessToken" value="${twitter.accessToken}"/>
<param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
<param key="interval" value="60"/>
</connector-service>
<connector-service name="my-outgoing-tweets">
<factory-class>org.apache.activemq.integration.twitter.TwitterOutgoingConnectorServiceFactory</factory-class>
<param key="queue" value="queue.outgoingQueue"/>
<param key="consumerKey" value="${twitter.consumerKey}"/>
<param key="consumerSecret" value="${twitter.consumerSecret}"/>
<param key="accessToken" value="${twitter.accessToken}"/>
<param key="accessTokenSecret" value="${twitter.accessTokenSecret}"/>
</connector-service>
</connector-services>
</configuration>

View File

@ -1,19 +0,0 @@
<configuration xmlns="urn:activemq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/activemq-jms.xsd">
<!--the connection factory used by the example-->
<connection-factory name="ConnectionFactory">
<connectors>
<connector-ref connector-name="netty-connector"/>
</connectors>
<entries>
<entry name="ConnectionFactory"/>
</entries>
</connection-factory>
<!--the queue used by the example-->
<queue name="exampleQueue">
<entry name="/queue/exampleQueue"/>
</queue>
</configuration>

View File

@ -1,7 +0,0 @@
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:activemq /schema/activemq-users.xsd">
<!-- the default user. this is used where username is null-->
<defaultuser name="guest" password="guest">
<role name="guest"/>
</defaultuser>
</configuration>

View File

@ -1,44 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pom</artifactId>
<version>6.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>activemq-twitter-integration</artifactId>
<packaging>jar</packaging>
<name>ActiveMQ6 Twitter Integration</name>
<properties>
<activemq.basedir>${project.basedir}/../..</activemq.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging-processor</artifactId>
</dependency>
<!--
JBoss Logging
-->
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -1,99 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.integration.twitter;
import java.util.HashSet;
import java.util.Set;
/**
* A TwitterConstants
*
* @author <a href="tm.igarashi@gmail.com">Tomohisa Igarashi</a>
*/
public final class TwitterConstants
{
public static final String KEY_ID = "id";
public static final String KEY_SOURCE = "source";
public static final String KEY_CREATED_AT = "createdAt";
public static final String KEY_IS_TRUNCATED = "isTruncated";
public static final String KEY_IN_REPLY_TO_STATUS_ID = "inReplyToStatusId";
public static final String KEY_IN_REPLY_TO_USER_ID = "inReplyToUserId";
public static final String KEY_IN_REPLY_TO_SCREEN_NAME = "inReplyToScreenName";
public static final String KEY_IS_FAVORITED = "isFavorited";
public static final String KEY_IS_RETWEET = "isRetweet";
public static final String KEY_CONTRIBUTORS = "contributors";
public static final String KEY_GEO_LOCATION_LATITUDE = "geoLocation.latitude";
public static final String KEY_GEO_LOCATION_LONGITUDE = "geoLocation.longitude";
public static final String KEY_PLACE_ID = "place.id";
public static final String KEY_DISPLAY_COODINATES = "displayCoodinates";
public static final int DEFAULT_POLLING_INTERVAL_SECS = 10;
public static final int DEFAULT_PAGE_SIZE = 100;
public static final int FIRST_ATTEMPT_PAGE_SIZE = 1;
public static final int START_SINCE_ID = 1;
public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50;
public static final Set<String> ALLOWABLE_INCOMING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_INCOMING_CONNECTOR_KEYS;
public static final Set<String> ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
public static final Set<String> REQUIRED_OUTGOING_CONNECTOR_KEYS;
public static final String CONSUMER_KEY = "consumerKey";
public static final String CONSUMER_SECRET = "consumerSecret";
public static final String ACCESS_TOKEN = "accessToken";
public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret";
public static final String QUEUE_NAME = "queue";
public static final String INCOMING_INTERVAL = "interval";
static
{
ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL);
REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet<String>();
REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet<String>();
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET);
REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME);
}
private TwitterConstants()
{
// utility class
}
}

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.integration.twitter;
import org.apache.activemq.core.persistence.StorageManager;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.server.ConnectorService;
import org.apache.activemq.core.server.ConnectorServiceFactory;
import org.apache.activemq.integration.twitter.impl.IncomingTweetsHandler;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jun 29, 2010
*/
public class TwitterIncomingConnectorServiceFactory implements ConnectorServiceFactory
{
public ConnectorService createConnectorService(String connectorName, final Map<String, Object> configuration,
final StorageManager storageManager,
final PostOffice postOffice,
final ScheduledExecutorService scheduledThreadPool)
{
return new IncomingTweetsHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool);
}
public Set<String> getAllowableProperties()
{
return TwitterConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS;
}
public Set<String> getRequiredProperties()
{
return TwitterConstants.REQUIRED_INCOMING_CONNECTOR_KEYS;
}
}

View File

@ -1,49 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.integration.twitter;
import org.apache.activemq.core.persistence.StorageManager;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.server.ConnectorService;
import org.apache.activemq.core.server.ConnectorServiceFactory;
import org.apache.activemq.integration.twitter.impl.OutgoingTweetsHandler;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jun 29, 2010
*/
public class TwitterOutgoingConnectorServiceFactory implements ConnectorServiceFactory
{
public ConnectorService createConnectorService(String connectorName, Map<String, Object> configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool)
{
return new OutgoingTweetsHandler(connectorName, configuration, postOffice);
}
public Set<String> getAllowableProperties()
{
return TwitterConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS;
}
public Set<String> getRequiredProperties()
{
return TwitterConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS;
}
}

View File

@ -1,234 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.integration.twitter.impl;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.persistence.StorageManager;
import org.apache.activemq.core.postoffice.Binding;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.server.ConnectorService;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.core.server.impl.ServerMessageImpl;
import org.apache.activemq.integration.twitter.TwitterConstants;
import org.apache.activemq.twitter.ActiveMQTwitterLogger;
import org.apache.activemq.utils.ConfigurationHelper;
import twitter4j.GeoLocation;
import twitter4j.Paging;
import twitter4j.Place;
import twitter4j.ResponseList;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.http.AccessToken;
/**
* IncomingTweetsHandler consumes from twitter and forwards to the
* configured ActiveMQ address.
*/
public class IncomingTweetsHandler implements ConnectorService
{
private final String connectorName;
private final String consumerKey;
private final String consumerSecret;
private final String accessToken;
private final String accessTokenSecret;
private final String queueName;
private final int intervalSeconds;
private final StorageManager storageManager;
private final PostOffice postOffice;
private Paging paging;
private Twitter twitter;
private boolean isStarted = false;
private final ScheduledExecutorService scheduledPool;
private ScheduledFuture<?> scheduledFuture;
public IncomingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final StorageManager storageManager,
final PostOffice postOffice,
final ScheduledExecutorService scheduledThreadPool)
{
this.connectorName = connectorName;
this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
Integer intervalSeconds = ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration);
if (intervalSeconds > 0)
{
this.intervalSeconds = intervalSeconds;
}
else
{
this.intervalSeconds = TwitterConstants.DEFAULT_POLLING_INTERVAL_SECS;
}
this.storageManager = storageManager;
this.postOffice = postOffice;
this.scheduledPool = scheduledThreadPool;
}
public void start() throws Exception
{
Binding b = postOffice.getBinding(new SimpleString(queueName));
if (b == null)
{
throw new Exception(connectorName + ": queue " + queueName + " not found");
}
paging = new Paging();
TwitterFactory tf = new TwitterFactory();
this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
this.consumerSecret,
new AccessToken(this.accessToken,
this.accessTokenSecret));
this.twitter.verifyCredentials();
// getting latest ID
this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
// If I used annotations here, it won't compile under JDK 1.7
ResponseList res = this.twitter.getHomeTimeline(paging);
this.paging.setSinceId(((Status) res.get(0)).getId());
ActiveMQTwitterLogger.LOGGER.debug(connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());
// TODO make page size configurable
this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE);
scheduledFuture = this.scheduledPool.scheduleWithFixedDelay(new TweetsRunnable(),
intervalSeconds,
intervalSeconds,
TimeUnit.SECONDS);
isStarted = true;
}
public void stop() throws Exception
{
if (!isStarted)
{
return;
}
scheduledFuture.cancel(true);
paging = null;
isStarted = false;
}
public boolean isStarted()
{
return isStarted;
}
private void poll() throws Exception
{
// get new tweets
// If I used annotations here, it won't compile under JDK 1.7
ResponseList res = this.twitter.getHomeTimeline(paging);
if (res == null || res.size() == 0)
{
return;
}
for (int i = res.size() - 1; i >= 0; i--)
{
Status status = (Status) res.get(i);
ServerMessage msg = new ServerMessageImpl(this.storageManager.generateID(),
TwitterConstants.INITIAL_MESSAGE_BUFFER_SIZE);
msg.setAddress(new SimpleString(this.queueName));
msg.setDurable(true);
msg.encodeMessageIDToBuffer();
putTweetIntoMessage(status, msg);
this.postOffice.route(msg, false);
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": routed: " + status.toString());
}
this.paging.setSinceId(((Status) res.get(0)).getId());
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": update latest ID: " + this.paging.getSinceId());
}
private void putTweetIntoMessage(final Status status, final ServerMessage msg)
{
msg.getBodyBuffer().writeString(status.getText());
msg.putLongProperty(TwitterConstants.KEY_ID, status.getId());
msg.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource());
msg.putLongProperty(TwitterConstants.KEY_CREATED_AT, status.getCreatedAt().getTime());
msg.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED, status.isTruncated());
msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, status.getInReplyToStatusId());
msg.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID, status.getInReplyToUserId());
msg.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED, status.isFavorited());
msg.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet());
msg.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS, status.getContributors());
GeoLocation gl;
if ((gl = status.getGeoLocation()) != null)
{
msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE, gl.getLatitude());
msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE, gl.getLongitude());
}
Place place;
if ((place = status.getPlace()) != null)
{
msg.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId());
}
}
public String getName()
{
return connectorName;
}
private final class TweetsRunnable implements Runnable
{
/**
* TODO streaming API support
* TODO rate limit support
*/
public void run()
{
// Avoid canceling the task with RuntimeException
try
{
poll();
}
catch (Throwable t)
{
ActiveMQTwitterLogger.LOGGER.errorPollingTwitter(t);
}
}
}
}

View File

@ -1,266 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.integration.twitter.impl;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.core.filter.Filter;
import org.apache.activemq.core.postoffice.Binding;
import org.apache.activemq.core.postoffice.PostOffice;
import org.apache.activemq.core.server.ConnectorService;
import org.apache.activemq.core.server.Consumer;
import org.apache.activemq.core.server.HandleStatus;
import org.apache.activemq.core.server.ActiveMQServerLogger;
import org.apache.activemq.core.server.MessageReference;
import org.apache.activemq.core.server.Queue;
import org.apache.activemq.core.server.ServerMessage;
import org.apache.activemq.integration.twitter.TwitterConstants;
import org.apache.activemq.twitter.ActiveMQTwitterLogger;
import org.apache.activemq.utils.ConfigurationHelper;
import twitter4j.GeoLocation;
import twitter4j.StatusUpdate;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.http.AccessToken;
/**
* OutgoingTweetsHandler consumes from configured ActiveMQ address
* and forwards to the twitter.
*/
public class OutgoingTweetsHandler implements Consumer, ConnectorService
{
private final String connectorName;
private final String consumerKey;
private final String consumerSecret;
private final String accessToken;
private final String accessTokenSecret;
private final String queueName;
private final PostOffice postOffice;
private Twitter twitter = null;
private Queue queue = null;
private Filter filter = null;
private boolean isStarted = false;
public String debug()
{
return toString();
}
public OutgoingTweetsHandler(final String connectorName,
final Map<String, Object> configuration,
final PostOffice postOffice)
{
this.connectorName = connectorName;
this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration);
this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration);
this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration);
this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration);
this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration);
this.postOffice = postOffice;
}
/**
* TODO streaming API support
* TODO rate limit support
*/
public synchronized void start() throws Exception
{
if (this.isStarted)
{
return;
}
if (this.connectorName == null || this.connectorName.trim().equals(""))
{
throw new Exception("invalid connector name: " + this.connectorName);
}
if (this.queueName == null || this.queueName.trim().equals(""))
{
throw new Exception("invalid queue name: " + queueName);
}
SimpleString name = new SimpleString(this.queueName);
Binding b = this.postOffice.getBinding(name);
if (b == null)
{
throw new Exception(connectorName + ": queue " + queueName + " not found");
}
this.queue = (Queue) b.getBindable();
TwitterFactory tf = new TwitterFactory();
this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey,
this.consumerSecret,
new AccessToken(this.accessToken,
this.accessTokenSecret));
this.twitter.verifyCredentials();
// TODO make filter-string configurable
// this.filter = FilterImpl.createFilter(filterString);
this.filter = null;
this.queue.addConsumer(this);
this.queue.deliverAsync();
this.isStarted = true;
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": started");
}
public boolean isStarted()
{
return isStarted; //To change body of implemented methods use File | Settings | File Templates.
}
public synchronized void stop() throws Exception
{
if (!this.isStarted)
{
return;
}
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": receive shutdown request");
this.queue.removeConsumer(this);
this.isStarted = false;
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": shutdown");
}
public String getName()
{
return connectorName;
}
public Filter getFilter()
{
return filter;
}
/* (non-Javadoc)
* @see org.apache.activemq.core.server.Consumer#getDeliveringMessages()
*/
@Override
public List<MessageReference> getDeliveringMessages()
{
return Collections.emptyList();
}
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (filter != null && !filter.match(ref.getMessage()))
{
return HandleStatus.NO_MATCH;
}
synchronized (this)
{
ref.handled();
ServerMessage message = ref.getMessage();
StatusUpdate status = new StatusUpdate(message.getBodyBuffer().readString());
// set optional property
if (message.containsProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID))
{
status.setInReplyToStatusId(message.getLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID));
}
if (message.containsProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE))
{
double geolat = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE);
double geolong = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE);
status.setLocation(new GeoLocation(geolat, geolong));
}
if (message.containsProperty(TwitterConstants.KEY_PLACE_ID))
{
status.setPlaceId(message.getStringProperty(TwitterConstants.KEY_PLACE_ID));
}
if (message.containsProperty(TwitterConstants.KEY_DISPLAY_COODINATES))
{
status.setDisplayCoordinates(message.getBooleanProperty(TwitterConstants.KEY_DISPLAY_COODINATES));
}
// send to Twitter
try
{
this.twitter.updateStatus(status);
}
catch (TwitterException e)
{
if (e.getStatusCode() == 403)
{
// duplicated message
ActiveMQTwitterLogger.LOGGER.error403(connectorName);
queue.acknowledge(ref);
return HandleStatus.HANDLED;
}
else
{
throw e;
}
}
queue.acknowledge(ref);
ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": forwarded to twitter: " + message.getMessageID());
return HandleStatus.HANDLED;
}
}
public void proceedDeliver(MessageReference ref)
{
// no op
}
@Override
public String toManagementString()
{
return toString();
}
@Override
public void disconnect()
{
try
{
stop();
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, getName());
}
}
}

View File

@ -1,35 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.twitter;
import org.jboss.logging.annotations.MessageBundle;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 3/12/12
*
* Logger Code 18
*
* each message id must be 6 digits long starting with 18, the 3rd digit should be 9
*
* so 189000 to 189999
*/
@MessageBundle(projectCode = "AMQ")
public class ActiveMQTwitterBundle
{
}

View File

@ -1,58 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.twitter;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.LogMessage;
import org.jboss.logging.annotations.Message;
import org.jboss.logging.annotations.MessageLogger;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* 3/15/12
*
* Logger Code 18
*
* each message id must be 6 digits long starting with 18, the 3rd digit donates the level so
*
* INF0 1
* WARN 2
* DEBUG 3
* ERROR 4
* TRACE 5
* FATAL 6
*
* so an INFO message would be 181000 to 181999
*/
@MessageLogger(projectCode = "AMQ")
public interface ActiveMQTwitterLogger extends BasicLogger
{
/**
* The twitter logger.
*/
ActiveMQTwitterLogger LOGGER = Logger.getMessageLogger(ActiveMQTwitterLogger.class, ActiveMQTwitterLogger.class.getPackage().getName());
@LogMessage(level = Logger.Level.WARN)
@Message(id = 182001, value = "{0}: HTTP status code = 403: Ignore duplicated message", format = Message.Format.MESSAGE_FORMAT)
void error403(String connectorName);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 182002, value = "Error polling Twitter", format = Message.Format.MESSAGE_FORMAT)
void errorPollingTwitter(@Cause Throwable t);
}

14
pom.xml
View File

@ -281,14 +281,6 @@
<artifactId>jboss-logging-spi</artifactId>
<version>2.1.0.GA</version>
</dependency>
<!--needed to compile twitter support-->
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
<!-- there is a new version of this JAR but it breaks our usage of it -->
<version>2.1.2</version>
</dependency>
<!---->
<dependency>
<groupId>org.apache.qpid</groupId>
@ -505,7 +497,6 @@
<module>activemq-service-extensions</module>
<!-- <module>integration/activemq-jboss-as-integration</module> -->
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
@ -532,7 +523,6 @@
<module>activemq-service-extensions</module>
<module>integration/activemq-jboss-as-integration</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>examples</module>
@ -559,7 +549,6 @@
<module>activemq-service-extensions</module>
<module>integration/activemq-jboss-as-integration</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>examples</module>
@ -588,7 +577,6 @@
<module>activemq-service-extensions</module>
<module>integration/activemq-jboss-as-integration</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
@ -627,7 +615,6 @@
<module>activemq-service-extensions</module>
<module>integration/activemq-jboss-as-integration</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>
@ -662,7 +649,6 @@
<module>activemq-service-extensions</module>
<module>integration/activemq-jboss-as-integration</module>
<module>integration/activemq-spring-integration</module>
<module>integration/activemq-twitter-integration</module>
<module>integration/activemq-aerogear-integration</module>
<module>integration/activemq-vertx-integration</module>
<module>tests</module>

View File

@ -69,11 +69,6 @@
<artifactId>activemq-tools</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-twitter-integration</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring-integration</artifactId>
@ -121,10 +116,6 @@
<groupId>org.jboss.security</groupId>
<artifactId>jbosssx</artifactId>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -1,637 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.twitter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.ConnectorServiceConfiguration;
import org.apache.activemq.core.config.CoreQueueConfiguration;
import org.apache.activemq.core.server.ConnectorService;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.integration.twitter.TwitterConstants;
import org.apache.activemq.integration.twitter.TwitterIncomingConnectorServiceFactory;
import org.apache.activemq.integration.twitter.TwitterOutgoingConnectorServiceFactory;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.apache.activemq.tests.util.UnitTestCase;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import twitter4j.Paging;
import twitter4j.ResponseList;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterFactory;
import twitter4j.http.AccessToken;
/**
* A TwitterTest
*
* @author tm.igarashi@gmail.com
*/
public class TwitterTest extends ServiceTestBase
{
private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
private static final String KEY_CONNECTOR_NAME = "connector.name";
private static final String KEY_CONSUMER_KEY = "consumerKey";
private static final String KEY_CONSUMER_SECRET = "consumerSecret";
private static final String KEY_ACCESS_TOKEN = "accessToken";
private static final String KEY_ACCESS_TOKEN_SECRET = "accessTokenSecret";
private static final String KEY_QUEUE_NAME = "queue.name";
private static final String TWITTER_CONSUMER_KEY = System.getProperty("twitter.consumerKey");
private static final String TWITTER_CONSUMER_SECRET = System.getProperty("twitter.consumerSecret");
private static final String TWITTER_ACCESS_TOKEN = System.getProperty("twitter.accessToken");
private static final String TWITTER_ACCESS_TOKEN_SECRET = System.getProperty("twitter.accessTokenSecret");
// incoming
@Override
@Before
public void setUp() throws Exception
{
super.setUp();
}
@BeforeClass
public static void hasCredentials()
{
Assume.assumeNotNull(TWITTER_CONSUMER_KEY);
Assume.assumeFalse("null".equals(TWITTER_CONSUMER_KEY));
}
@Test
public void testSimpleIncoming() throws Exception
{
internalTestIncoming(true, false);
}
@Test
public void testIncomingNoQueue() throws Exception
{
internalTestIncoming(false, false);
}
@Test
public void testIncomingWithRestart() throws Exception
{
internalTestIncoming(true, true);
}
@Test
public void testIncomingWithEmptyConnectorName() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_CONNECTOR_NAME, "");
internalTestIncomingFailedToInitialize(params);
}
@Test
public void testIncomingWithEmptyQueueName() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_QUEUE_NAME, "");
internalTestIncomingFailedToInitialize(params);
}
@Test
public void testIncomingWithInvalidCredentials() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestIncomingFailedToInitialize(params);
}
//outgoing
@Test
public void testSimpleOutgoing() throws Exception
{
internalTestOutgoing(true, false);
}
@Test
public void testOutgoingNoQueue() throws Exception
{
internalTestOutgoing(false, false);
}
@Test
public void testOutgoingWithRestart() throws Exception
{
internalTestOutgoing(true, true);
}
@Test
public void testOutgoingWithEmptyConnectorName() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_CONNECTOR_NAME, "");
internalTestOutgoingFailedToInitialize(params);
}
@Test
public void testOutgoingWithEmptyQueueName() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_QUEUE_NAME, "");
internalTestOutgoingFailedToInitialize(params);
}
@Test
public void testOutgoingWithInvalidCredentials() throws Exception
{
HashMap<String, String> params = new HashMap<String, String>();
params.put(KEY_CONSUMER_KEY, "invalidConsumerKey");
params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret");
params.put(KEY_ACCESS_TOKEN, "invalidAccessToken");
params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret");
internalTestOutgoingFailedToInitialize(params);
}
@Test
public void testOutgoingWithInReplyTo() throws Exception
{
internalTestOutgoingWithInReplyTo();
}
protected void internalTestIncoming(boolean createQueue, boolean restart) throws Exception
{
ActiveMQServer server0 = null;
ClientSession session = null;
ServerLocator locator = null;
String queue = "TwitterTestQueue";
int interval = 5;
Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
new AccessToken(TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis();
log.debug("test incoming: " + testMessage);
try
{
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration inconf = new ConnectorServiceConfiguration()
.setFactoryClassName(TwitterIncomingConnectorServiceFactory.class.getName())
.setParams(config)
.setName("test-incoming-connector");
Configuration configuration = createDefaultConfig(false)
.addConnectorServiceConfiguration(inconf);
if (createQueue)
{
CoreQueueConfiguration qc = new CoreQueueConfiguration()
.setAddress(queue)
.setName(queue);
configuration.getQueueConfigurations().add(qc);
}
server0 = createServer(false, configuration);
server0.start();
if (restart)
{
server0.getConnectorsService().stop();
server0.getConnectorsService().start();
}
assertEquals(1, server0.getConnectorsService().getConnectors().size());
Iterator<ConnectorService> connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator();
if (createQueue)
{
Assert.assertTrue(connectorServiceIterator.next().isStarted());
}
else
{
Assert.assertFalse(connectorServiceIterator.next().isStarted());
return;
}
twitter.updateStatus(testMessage);
TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf);
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
ClientConsumer consumer = session.createConsumer(queue);
session.start();
ClientMessage msg = consumer.receive(60 * 1000);
Assert.assertNotNull(msg);
Assert.assertEquals(testMessage, msg.getBodyBuffer().readString());
msg.acknowledge();
}
finally
{
try
{
session.close();
}
catch (Throwable t)
{
}
try
{
locator.close();
}
catch (Throwable ignored)
{
}
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
}
}
protected void internalTestIncomingFailedToInitialize(HashMap<String, String> params) throws Exception
{
ActiveMQServer server0 = null;
String connectorName = "test-incoming-connector";
String queue = "TwitterTestQueue";
String consumerKey = "invalidConsumerKey";
String consumerSecret = "invalidConsumerSecret";
String accessToken = "invalidAccessToken";
String accessTokenSecret = "invalidAccessTokenSecret";
int interval = 5;
if (params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
if (params.containsKey(KEY_CONSUMER_KEY))
{
consumerKey = params.get(KEY_CONSUMER_KEY);
}
if (params.containsKey(KEY_CONSUMER_SECRET))
{
consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
if (params.containsKey(KEY_ACCESS_TOKEN))
{
accessToken = params.get(KEY_ACCESS_TOKEN);
}
if (params.containsKey(KEY_ACCESS_TOKEN_SECRET))
{
accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
}
if (params.containsKey(KEY_QUEUE_NAME))
{
queue = params.get(KEY_QUEUE_NAME);
}
try
{
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.INCOMING_INTERVAL, interval);
config.put(TwitterConstants.QUEUE_NAME, queue);
config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration inconf = new ConnectorServiceConfiguration()
.setFactoryClassName(TwitterIncomingConnectorServiceFactory.class.getName())
.setParams(config)
.setName(connectorName);
CoreQueueConfiguration qc = new CoreQueueConfiguration()
.setAddress(queue)
.setName(queue);
Configuration configuration = createDefaultConfig(false)
.addConnectorServiceConfiguration(inconf)
.addQueueConfiguration(qc);
server0 = createServer(false, configuration);
server0.start();
Set<ConnectorService> conns = server0.getConnectorsService().getConnectors();
Assert.assertEquals(1, conns.size());
Iterator<ConnectorService> it = conns.iterator();
Assert.assertFalse(it.next().isStarted());
}
finally
{
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
}
}
protected void internalTestOutgoing(boolean createQueue, boolean restart) throws Exception
{
ActiveMQServer server0 = null;
ServerLocator locator = null;
ClientSession session = null;
String queue = "TwitterTestQueue";
Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
new AccessToken(TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis();
log.debug("test outgoing: " + testMessage);
try
{
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration()
.setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName())
.setParams(config)
.setName("test-outgoing-connector");
Configuration configuration = createDefaultConfig(false)
.addConnectorServiceConfiguration(outconf);
if (createQueue)
{
CoreQueueConfiguration qc = new CoreQueueConfiguration()
.setAddress(queue)
.setName(queue)
.setDurable(false);
configuration.getQueueConfigurations().add(qc);
}
server0 = createServer(false, configuration);
server0.start();
if (restart)
{
server0.getConnectorsService().stop();
server0.getConnectorsService().start();
}
assertEquals(1, server0.getConnectorsService().getConnectors().size());
Iterator<ConnectorService> connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator();
if (createQueue)
{
Assert.assertTrue(connectorServiceIterator.next().isStarted());
}
else
{
Assert.assertFalse(connectorServiceIterator.next().isStarted());
return;
}
TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf);
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
ClientProducer producer = session.createProducer(queue);
ClientMessage msg = session.createMessage(false);
msg.getBodyBuffer().writeString(testMessage);
session.start();
producer.send(msg);
Thread.sleep(3000);
Paging page = new Paging();
page.setCount(1);
ResponseList res = twitter.getHomeTimeline(page);
Assert.assertEquals(testMessage, ((Status) (res.get(0))).getText());
}
finally
{
try
{
session.close();
}
catch (Throwable t)
{
}
try
{
locator.close();
}
catch (Throwable t)
{
}
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
}
}
protected void internalTestOutgoingFailedToInitialize(HashMap<String, String> params) throws Exception
{
ActiveMQServer server0 = null;
String connectorName = "test-outgoing-connector";
String queue = "TwitterTestQueue";
String consumerKey = TWITTER_CONSUMER_KEY;
String consumerSecret = TWITTER_CONSUMER_SECRET;
String accessToken = TWITTER_ACCESS_TOKEN;
String accessTokenSecret = TWITTER_ACCESS_TOKEN_SECRET;
if (params.containsKey(KEY_CONNECTOR_NAME))
{
connectorName = params.get(KEY_CONNECTOR_NAME);
}
if (params.containsKey(KEY_CONSUMER_KEY))
{
consumerKey = params.get(KEY_CONSUMER_KEY);
}
if (params.containsKey(KEY_CONSUMER_SECRET))
{
consumerSecret = params.get(KEY_CONSUMER_SECRET);
}
if (params.containsKey(KEY_ACCESS_TOKEN))
{
accessToken = params.get(KEY_ACCESS_TOKEN);
}
if (params.containsKey(KEY_ACCESS_TOKEN_SECRET))
{
accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET);
}
if (params.containsKey(KEY_QUEUE_NAME))
{
queue = params.get(KEY_QUEUE_NAME);
}
try
{
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
config.put(TwitterConstants.CONSUMER_KEY, consumerKey);
config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret);
config.put(TwitterConstants.ACCESS_TOKEN, accessToken);
config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret);
ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration()
.setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName())
.setParams(config)
.setName(connectorName);
CoreQueueConfiguration qc = new CoreQueueConfiguration()
.setAddress(queue)
.setName(queue)
.setDurable(false);
Configuration configuration = createDefaultConfig(false)
.addConnectorServiceConfiguration(outconf)
.addQueueConfiguration(qc);
server0 = createServer(false, configuration);
server0.start();
}
finally
{
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
}
}
protected void internalTestOutgoingWithInReplyTo() throws Exception
{
ActiveMQServer server0 = null;
ClientSession session = null;
ServerLocator locator = null;
String queue = "TwitterTestQueue";
Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY,
TWITTER_CONSUMER_SECRET,
new AccessToken(TWITTER_ACCESS_TOKEN,
TWITTER_ACCESS_TOKEN_SECRET));
String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis();
String replyMessage = "@" + twitter.getScreenName() + " TwitterTest/outgoing reply: " + System.currentTimeMillis();
try
{
HashMap<String, Object> config = new HashMap<String, Object>();
config.put(TwitterConstants.QUEUE_NAME, queue);
config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY);
config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET);
config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN);
config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET);
ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration()
.setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName())
.setParams(config)
.setName("test-outgoing-with-in-reply-to");
CoreQueueConfiguration qc = new CoreQueueConfiguration()
.setAddress(queue)
.setName(queue)
.setDurable(false);
Configuration configuration = createDefaultConfig(false)
.addConnectorServiceConfiguration(outconf)
.addQueueConfiguration(qc);
Status s = twitter.updateStatus(testMessage);
server0 = createServer(false, configuration);
server0.start();
TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY);
locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf);
ClientSessionFactory sf = createSessionFactory(locator);
session = sf.createSession(false, true, true);
ClientProducer producer = session.createProducer(queue);
ClientMessage msg = session.createMessage(false);
msg.getBodyBuffer().writeString(replyMessage);
msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, s.getId());
session.start();
producer.send(msg);
Thread.sleep(3000);
Paging page = new Paging();
page.setCount(2);
ResponseList res = twitter.getHomeTimeline(page);
Assert.assertEquals(testMessage, ((Status) (res.get(1))).getText());
Assert.assertEquals(-1, ((Status) (res.get(1))).getInReplyToStatusId());
Assert.assertEquals(replyMessage, ((Status) (res.get(0))).getText());
Assert.assertEquals(s.getId(), ((Status) (res.get(0))).getInReplyToStatusId());
}
finally
{
try
{
session.close();
}
catch (Throwable t)
{
}
try
{
locator.close();
}
catch (Throwable t)
{
}
try
{
server0.stop();
}
catch (Throwable ignored)
{
}
}
}
}