diff --git a/activemq-rest/src/main/java/org/apache/activemq/rest/ActiveMQRestLogger.java b/activemq-rest/src/main/java/org/apache/activemq/rest/ActiveMQRestLogger.java index bd7bb5c95d..e14b6813a8 100644 --- a/activemq-rest/src/main/java/org/apache/activemq/rest/ActiveMQRestLogger.java +++ b/activemq-rest/src/main/java/org/apache/activemq/rest/ActiveMQRestLogger.java @@ -45,9 +45,6 @@ import org.jboss.logging.annotations.MessageLogger; @MessageLogger(projectCode = "AMQ") public interface ActiveMQRestLogger extends BasicLogger { - /** - * The twitter logger. - */ ActiveMQRestLogger LOGGER = Logger.getMessageLogger(ActiveMQRestLogger.class, ActiveMQRestLogger.class.getPackage().getName()); @LogMessage(level = Logger.Level.INFO) diff --git a/examples/core/twitter-connector/pom.xml b/examples/core/twitter-connector/pom.xml deleted file mode 100644 index 8299070a40..0000000000 --- a/examples/core/twitter-connector/pom.xml +++ /dev/null @@ -1,184 +0,0 @@ - - 4.0.0 - - - org.apache.activemq.examples.core - core-examples - 6.0.0-SNAPSHOT - - - activemq-twitter-example - jar - ActiveMQ6 Twitter Example - - - consumerKey - consumerSecret - twitterAccess - twitterToken - - - - org.apache.activemq - activemq-server - ${project.version} - - - org.apache.activemq - activemq-core-client - ${project.version} - - - org.apache.activemq - activemq-commons - ${project.version} - - - io.netty - netty-all - ${netty.version} - - - org.jboss.javaee - jboss-jms-api - 1.1.0.GA - - - org.jboss.naming - jnp-client - 5.0.5.Final - - - org.jboss.spec.javax.jms - jboss-jms-api_2.0_spec - - - - - - default - - true - - - - - org.apache.activemq - activemq-maven-plugin - - - start - - start - - - true - ${basedir}/target/classes/server0 - - - build.directory - ${basedir}/target/ - - - TWITTER_CONSUMER_KEY - ${TWITTER_CONSUMER_KEY} - - - TWITTER_CONSUMER_SECRET - ${TWITTER_CONSUMER_SECRET} - - - TWITTER_ACCESS_TOKEN - ${TWITTER_ACCESS_TOKEN} - - - TWITTER_ACCESS_TOKEN_SECRET - ${TWITTER_ACCESS_TOKEN_SECRET} - - - - - - - false - ${basedir}/target/classes/server0 - - - - org.apache.activemq.examples.core - activemq-twitter-example - ${project.version} - - - org.apache.activemq - activemq-twitter-integration - ${project.version} - - - org.apache.activemq - activemq-core-client - ${project.version} - - - org.apache.activemq - activemq-server - ${project.version} - - - org.apache.activemq - activemq-jms-client - ${project.version} - - - org.apache.activemq - activemq-jms-server - ${project.version} - - - io.netty - netty-all - ${netty.version} - - - org.jboss.javaee - jboss-jms-api - 1.1.0.GA - - - org.jboss.naming - jnpserver - 5.0.3.GA - - - - - - - - example - - - - org.codehaus.mojo - exec-maven-plugin - 1.1 - - - package - - java - - - - - org.apache.activemq.core.example.TwitterConnectorExample - - - - - - - - - \ No newline at end of file diff --git a/examples/core/twitter-connector/readme.html b/examples/core/twitter-connector/readme.html deleted file mode 100644 index bfd8c3f53f..0000000000 --- a/examples/core/twitter-connector/readme.html +++ /dev/null @@ -1,96 +0,0 @@ - - - ActiveMQ Twitter Connector Service Example - - - - - -

Twitter Connector Service Example

- -

This example shows you how to configure ActiveMQ to use the Twitter Connector Service.

- -

ActiveMQ supports 2 types of Twitter connector, incoming and outgoing. - Incoming connector consumes from twitter and forwards to a configurable address. - Outgoing connector consumes from a configurable address and forwards to twitter. -

- -

In this example, incoming connector and outgoing connector is related to same twitter account. - So if you send a message to an outgoing address, outgoing connector forwards it to twitter, - and then incoming connector consumes it and forwards to incoming address.

- -

Example step-by-step

-

To run the server, simply type mvn-Dtwitter.consumerKey=consumer -Dtwitter.consumerSecret=secret -Dtwitter.accessToken=token -Dtwitter.accessTokenSecret=secret verify - from this directory but replacing the system properties with those of the twitter account you want to use. Then run the example - by using the command mvn -Pexample package

- - -
    -
  1. First we need to create a ClientSessionFactory with Netty transport configuration
  2. -
    -           csf = ActiveMQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
    -        
    - -
  3. We create a core session with auto-commit mode
  4. -
    -           session = csf.createSession(true,true);
    -        
    - -
  5. We Create a core producer for queue.outgoingQueue
  6. -
    -           ClientProducer cp = session.createProducer(OUTGOING_QUEUE);
    -        
    - -
  7. We create a core consumer for queue.incomingQueue
  8. -
    -           ClientConsumer cc = session.createConsumer(INCOMING_QUEUE);
    -        
    - -
  9. We create a core message that we are going to send
  10. -
    -           ClientMessage cm = session.createMessage(org.apache.activemq.api.core.Message.TEXT_TYPE,true);
    -String testMessage = System.currentTimeMillis() + ": twitter connector test example";
    -cm.getBodyBuffer().writeString(testMessage);
    -        
    - -
  11. We send the message to queue.outgoingQueue
  12. -
    -          cp.send(cm);
    -       
    - -
  13. We start the session
  14. -
    -           session.start();
    -        
    - -
  15. We will receive a message from queue.incomingQueue. - Outgoing connector forwards a message(we sent before) to twitter immediately. - Since incoming connector consumes from twitter and forwards to queue.incomingQueue - every 60 seconds, It will be received in 60+x seconds.
  16. -
    -           ClientMessage received = cc.receive(70 * 1000);
    -received.acknowledge();
    -String receivedText = received.getBodyBuffer().readString();
    -        
    - -
  17. And finally, remember to close core session and ClientSessionFactory in a finally block.
  18. - -
    -           finally
    -{
    -    if(session != null)
    -    {
    -       session.close();
    -    }
    -    if(csf != null)
    -    {
    -       csf.close();
    -    }
    -}
    -        
    - - - -
- - diff --git a/examples/core/twitter-connector/src/main/java/org/apache/activemq/core/example/TwitterConnectorExample.java b/examples/core/twitter-connector/src/main/java/org/apache/activemq/core/example/TwitterConnectorExample.java deleted file mode 100644 index cede6a01c5..0000000000 --- a/examples/core/twitter-connector/src/main/java/org/apache/activemq/core/example/TwitterConnectorExample.java +++ /dev/null @@ -1,121 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.core.example; - -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ClientConsumer; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientProducer; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.ActiveMQClient; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory; - -/** - * A simple example of using twitter connector service. - * - * @author Tomohisa Igarashi - */ -public class TwitterConnectorExample -{ - private static final String INCOMING_QUEUE = "queue.incomingQueue"; - private static final String OUTGOING_QUEUE = "queue.outgoingQueue"; - - public static void main(final String[] args) throws Exception - { - ServerLocator locator = null; - ClientSessionFactory csf = null; - ClientSession session = null; - try - { - String testMessage = System.currentTimeMillis() + ": " + System.getProperty("twitter.example.alternativeMessage"); - if(testMessage == null || testMessage.trim().equals("")) { - testMessage = System.currentTimeMillis() + ": ### Hello, ActiveMQ fans!! We are now experiencing so fast, so reliable and so exciting messaging never seen before ;-) ###"; - } - - // Step 1. Create a ClientSessionFactory - - - locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName())); - - csf = locator.createSessionFactory(); - - // Step 2. Create a core session. - session = csf.createSession(true,true); - - // Step 3. Create a core producer for queue.outgoingQueue. - ClientProducer cp = session.createProducer(OUTGOING_QUEUE); - - // Step 4. Create a core consumer for queue.incomingQueue. - ClientConsumer cc = session.createConsumer(INCOMING_QUEUE); - - // Step 5. Create a core message. - ClientMessage cm = session.createMessage(org.apache.activemq.api.core.Message.TEXT_TYPE,true); - cm.getBodyBuffer().writeString(testMessage); - - // Step 6. Send a message to queue.outgoingQueue. - cp.send(cm); - System.out.println("#### Sent a message to " + OUTGOING_QUEUE + ": " + testMessage); - - // Step 7. Start the session. - session.start(); - - // Step 8. Receive a message from queue.incomingQueue. - // Outgoing connector forwards a message(sent at Step 6.) to twitter immediately. - // Since incoming connector consumes from twitter and forwards to queue.incomingQueue - // every 60 seconds, It will be received in 60+x seconds. - System.out.println("#### A message will be received in 60 seconds. Please wait..."); - ClientMessage received = cc.receive(70 * 1000); - received.acknowledge(); - String receivedText = received.getBodyBuffer().readString(); - - while(!receivedText.equals(testMessage)) - { - // ignoring other tweets - received = cc.receiveImmediate(); - if(received == null) { - // no other tweets. test message has gone... - return; - } - - received.acknowledge(); - receivedText = received.getBodyBuffer().readString(); - } - - System.out.println("#### Received a message from " + INCOMING_QUEUE + ": " + receivedText); - } - finally - { - // Step 9. Be sure to close some resources. - if(session != null) - { - session.close(); - } - if(csf != null) - { - csf.close(); - } - - if (locator != null) - { - locator.close(); - } - } - } - -} diff --git a/examples/core/twitter-connector/src/main/resources/server0/activemq-configuration.xml b/examples/core/twitter-connector/src/main/resources/server0/activemq-configuration.xml deleted file mode 100644 index 2db5f4666f..0000000000 --- a/examples/core/twitter-connector/src/main/resources/server0/activemq-configuration.xml +++ /dev/null @@ -1,71 +0,0 @@ - - - - target/server0/data/messaging/bindings - - target/server0/data/messaging/journal - - target/server0/data/messaging/largemessages - - target/server0/data/messaging/paging - - - - - org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory - - - - - - - org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory - - - - - - - - - - - - - - - - - - - -
queue.incomingQueue
-
- -
queue.outgoingQueue
-
-
- - - - org.apache.activemq.integration.twitter.TwitterIncomingConnectorServiceFactory - - - - - - - - - org.apache.activemq.integration.twitter.TwitterOutgoingConnectorServiceFactory - - - - - - - - -
diff --git a/examples/core/twitter-connector/src/main/resources/server0/activemq-jms.xml b/examples/core/twitter-connector/src/main/resources/server0/activemq-jms.xml deleted file mode 100644 index 452b958e9e..0000000000 --- a/examples/core/twitter-connector/src/main/resources/server0/activemq-jms.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/examples/core/twitter-connector/src/main/resources/server0/activemq-users.xml b/examples/core/twitter-connector/src/main/resources/server0/activemq-users.xml deleted file mode 100644 index ae30546490..0000000000 --- a/examples/core/twitter-connector/src/main/resources/server0/activemq-users.xml +++ /dev/null @@ -1,7 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/integration/activemq-twitter-integration/pom.xml b/integration/activemq-twitter-integration/pom.xml deleted file mode 100644 index 8248b0c08f..0000000000 --- a/integration/activemq-twitter-integration/pom.xml +++ /dev/null @@ -1,44 +0,0 @@ - - 4.0.0 - - - org.apache.activemq - activemq-pom - 6.0.0-SNAPSHOT - ../../pom.xml - - - activemq-twitter-integration - jar - ActiveMQ6 Twitter Integration - - - ${project.basedir}/../.. - - - - - org.jboss.logging - jboss-logging-processor - - - - - org.jboss.logging - jboss-logging - - - org.apache.activemq - activemq-server - ${project.version} - - - org.twitter4j - twitter4j-core - - - - diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterConstants.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterConstants.java deleted file mode 100644 index e1713294d3..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterConstants.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.integration.twitter; - -import java.util.HashSet; -import java.util.Set; - -/** - * A TwitterConstants - * - * @author Tomohisa Igarashi - */ -public final class TwitterConstants -{ - public static final String KEY_ID = "id"; - public static final String KEY_SOURCE = "source"; - public static final String KEY_CREATED_AT = "createdAt"; - public static final String KEY_IS_TRUNCATED = "isTruncated"; - public static final String KEY_IN_REPLY_TO_STATUS_ID = "inReplyToStatusId"; - public static final String KEY_IN_REPLY_TO_USER_ID = "inReplyToUserId"; - public static final String KEY_IN_REPLY_TO_SCREEN_NAME = "inReplyToScreenName"; - public static final String KEY_IS_FAVORITED = "isFavorited"; - public static final String KEY_IS_RETWEET = "isRetweet"; - public static final String KEY_CONTRIBUTORS = "contributors"; - public static final String KEY_GEO_LOCATION_LATITUDE = "geoLocation.latitude"; - public static final String KEY_GEO_LOCATION_LONGITUDE = "geoLocation.longitude"; - public static final String KEY_PLACE_ID = "place.id"; - public static final String KEY_DISPLAY_COODINATES = "displayCoodinates"; - - public static final int DEFAULT_POLLING_INTERVAL_SECS = 10; - public static final int DEFAULT_PAGE_SIZE = 100; - public static final int FIRST_ATTEMPT_PAGE_SIZE = 1; - public static final int START_SINCE_ID = 1; - public static final int INITIAL_MESSAGE_BUFFER_SIZE = 50; - - public static final Set ALLOWABLE_INCOMING_CONNECTOR_KEYS; - public static final Set REQUIRED_INCOMING_CONNECTOR_KEYS; - - public static final Set ALLOWABLE_OUTGOING_CONNECTOR_KEYS; - public static final Set REQUIRED_OUTGOING_CONNECTOR_KEYS; - - public static final String CONSUMER_KEY = "consumerKey"; - public static final String CONSUMER_SECRET = "consumerSecret"; - public static final String ACCESS_TOKEN = "accessToken"; - public static final String ACCESS_TOKEN_SECRET = "accessTokenSecret"; - public static final String QUEUE_NAME = "queue"; - public static final String INCOMING_INTERVAL = "interval"; - - static - { - ALLOWABLE_INCOMING_CONNECTOR_KEYS = new HashSet(); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME); - ALLOWABLE_INCOMING_CONNECTOR_KEYS.add(INCOMING_INTERVAL); - - REQUIRED_INCOMING_CONNECTOR_KEYS = new HashSet(); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_KEY); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(CONSUMER_SECRET); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET); - REQUIRED_INCOMING_CONNECTOR_KEYS.add(QUEUE_NAME); - - ALLOWABLE_OUTGOING_CONNECTOR_KEYS = new HashSet(); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET); - ALLOWABLE_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME); - - REQUIRED_OUTGOING_CONNECTOR_KEYS = new HashSet(); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_KEY); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(CONSUMER_SECRET); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(ACCESS_TOKEN_SECRET); - REQUIRED_OUTGOING_CONNECTOR_KEYS.add(QUEUE_NAME); - } - - private TwitterConstants() - { - // utility class - } -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterIncomingConnectorServiceFactory.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterIncomingConnectorServiceFactory.java deleted file mode 100644 index 0934dad6e6..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterIncomingConnectorServiceFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.integration.twitter; - -import org.apache.activemq.core.persistence.StorageManager; -import org.apache.activemq.core.postoffice.PostOffice; -import org.apache.activemq.core.server.ConnectorService; -import org.apache.activemq.core.server.ConnectorServiceFactory; -import org.apache.activemq.integration.twitter.impl.IncomingTweetsHandler; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; - -/** - * @author Andy Taylor - * Created Jun 29, 2010 - */ -public class TwitterIncomingConnectorServiceFactory implements ConnectorServiceFactory -{ - public ConnectorService createConnectorService(String connectorName, final Map configuration, - final StorageManager storageManager, - final PostOffice postOffice, - final ScheduledExecutorService scheduledThreadPool) - { - return new IncomingTweetsHandler(connectorName, configuration, storageManager, postOffice, scheduledThreadPool); - } - - public Set getAllowableProperties() - { - return TwitterConstants.ALLOWABLE_INCOMING_CONNECTOR_KEYS; - } - - public Set getRequiredProperties() - { - return TwitterConstants.REQUIRED_INCOMING_CONNECTOR_KEYS; - } -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java deleted file mode 100644 index 1ceb3c90bd..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/TwitterOutgoingConnectorServiceFactory.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.integration.twitter; - -import org.apache.activemq.core.persistence.StorageManager; -import org.apache.activemq.core.postoffice.PostOffice; -import org.apache.activemq.core.server.ConnectorService; -import org.apache.activemq.core.server.ConnectorServiceFactory; -import org.apache.activemq.integration.twitter.impl.OutgoingTweetsHandler; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; - -/** - * @author Andy Taylor - * Created Jun 29, 2010 - */ -public class TwitterOutgoingConnectorServiceFactory implements ConnectorServiceFactory -{ - public ConnectorService createConnectorService(String connectorName, Map configuration, StorageManager storageManager, PostOffice postOffice, ScheduledExecutorService scheduledThreadPool) - { - return new OutgoingTweetsHandler(connectorName, configuration, postOffice); - } - - public Set getAllowableProperties() - { - return TwitterConstants.ALLOWABLE_OUTGOING_CONNECTOR_KEYS; - } - - public Set getRequiredProperties() - { - return TwitterConstants.REQUIRED_OUTGOING_CONNECTOR_KEYS; - } -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/IncomingTweetsHandler.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/IncomingTweetsHandler.java deleted file mode 100644 index 15279a5ee2..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/IncomingTweetsHandler.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.integration.twitter.impl; - -import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.core.persistence.StorageManager; -import org.apache.activemq.core.postoffice.Binding; -import org.apache.activemq.core.postoffice.PostOffice; -import org.apache.activemq.core.server.ConnectorService; -import org.apache.activemq.core.server.ServerMessage; -import org.apache.activemq.core.server.impl.ServerMessageImpl; -import org.apache.activemq.integration.twitter.TwitterConstants; -import org.apache.activemq.twitter.ActiveMQTwitterLogger; -import org.apache.activemq.utils.ConfigurationHelper; -import twitter4j.GeoLocation; -import twitter4j.Paging; -import twitter4j.Place; -import twitter4j.ResponseList; -import twitter4j.Status; -import twitter4j.Twitter; -import twitter4j.TwitterFactory; -import twitter4j.http.AccessToken; - -/** - * IncomingTweetsHandler consumes from twitter and forwards to the - * configured ActiveMQ address. - */ -public class IncomingTweetsHandler implements ConnectorService -{ - private final String connectorName; - - private final String consumerKey; - - private final String consumerSecret; - - private final String accessToken; - - private final String accessTokenSecret; - - private final String queueName; - - private final int intervalSeconds; - - private final StorageManager storageManager; - - private final PostOffice postOffice; - - private Paging paging; - - private Twitter twitter; - - private boolean isStarted = false; - - private final ScheduledExecutorService scheduledPool; - - private ScheduledFuture scheduledFuture; - - public IncomingTweetsHandler(final String connectorName, - final Map configuration, - final StorageManager storageManager, - final PostOffice postOffice, - final ScheduledExecutorService scheduledThreadPool) - { - this.connectorName = connectorName; - this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration); - this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration); - this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration); - this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration); - this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration); - Integer intervalSeconds = ConfigurationHelper.getIntProperty(TwitterConstants.INCOMING_INTERVAL, 0, configuration); - if (intervalSeconds > 0) - { - this.intervalSeconds = intervalSeconds; - } - else - { - this.intervalSeconds = TwitterConstants.DEFAULT_POLLING_INTERVAL_SECS; - } - this.storageManager = storageManager; - this.postOffice = postOffice; - this.scheduledPool = scheduledThreadPool; - } - - public void start() throws Exception - { - Binding b = postOffice.getBinding(new SimpleString(queueName)); - if (b == null) - { - throw new Exception(connectorName + ": queue " + queueName + " not found"); - } - - paging = new Paging(); - TwitterFactory tf = new TwitterFactory(); - this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey, - this.consumerSecret, - new AccessToken(this.accessToken, - this.accessTokenSecret)); - this.twitter.verifyCredentials(); - - // getting latest ID - this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE); - - // If I used annotations here, it won't compile under JDK 1.7 - ResponseList res = this.twitter.getHomeTimeline(paging); - this.paging.setSinceId(((Status) res.get(0)).getId()); - ActiveMQTwitterLogger.LOGGER.debug(connectorName + " initialise(): got latest ID: " + this.paging.getSinceId()); - - // TODO make page size configurable - this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE); - - scheduledFuture = this.scheduledPool.scheduleWithFixedDelay(new TweetsRunnable(), - intervalSeconds, - intervalSeconds, - TimeUnit.SECONDS); - isStarted = true; - } - - public void stop() throws Exception - { - if (!isStarted) - { - return; - } - scheduledFuture.cancel(true); - paging = null; - isStarted = false; - } - - public boolean isStarted() - { - return isStarted; - } - - private void poll() throws Exception - { - // get new tweets - // If I used annotations here, it won't compile under JDK 1.7 - ResponseList res = this.twitter.getHomeTimeline(paging); - - if (res == null || res.size() == 0) - { - return; - } - - for (int i = res.size() - 1; i >= 0; i--) - { - Status status = (Status) res.get(i); - - ServerMessage msg = new ServerMessageImpl(this.storageManager.generateID(), - TwitterConstants.INITIAL_MESSAGE_BUFFER_SIZE); - msg.setAddress(new SimpleString(this.queueName)); - msg.setDurable(true); - msg.encodeMessageIDToBuffer(); - - putTweetIntoMessage(status, msg); - - this.postOffice.route(msg, false); - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": routed: " + status.toString()); - } - - this.paging.setSinceId(((Status) res.get(0)).getId()); - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": update latest ID: " + this.paging.getSinceId()); - } - - private void putTweetIntoMessage(final Status status, final ServerMessage msg) - { - msg.getBodyBuffer().writeString(status.getText()); - msg.putLongProperty(TwitterConstants.KEY_ID, status.getId()); - msg.putStringProperty(TwitterConstants.KEY_SOURCE, status.getSource()); - - msg.putLongProperty(TwitterConstants.KEY_CREATED_AT, status.getCreatedAt().getTime()); - msg.putBooleanProperty(TwitterConstants.KEY_IS_TRUNCATED, status.isTruncated()); - msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, status.getInReplyToStatusId()); - msg.putIntProperty(TwitterConstants.KEY_IN_REPLY_TO_USER_ID, status.getInReplyToUserId()); - msg.putBooleanProperty(TwitterConstants.KEY_IS_FAVORITED, status.isFavorited()); - msg.putBooleanProperty(TwitterConstants.KEY_IS_RETWEET, status.isRetweet()); - msg.putObjectProperty(TwitterConstants.KEY_CONTRIBUTORS, status.getContributors()); - GeoLocation gl; - if ((gl = status.getGeoLocation()) != null) - { - msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE, gl.getLatitude()); - msg.putDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE, gl.getLongitude()); - } - Place place; - if ((place = status.getPlace()) != null) - { - msg.putStringProperty(TwitterConstants.KEY_PLACE_ID, place.getId()); - } - } - - public String getName() - { - return connectorName; - } - - private final class TweetsRunnable implements Runnable - { - /** - * TODO streaming API support - * TODO rate limit support - */ - public void run() - { - // Avoid canceling the task with RuntimeException - try - { - poll(); - } - catch (Throwable t) - { - ActiveMQTwitterLogger.LOGGER.errorPollingTwitter(t); - } - } - } -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/OutgoingTweetsHandler.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/OutgoingTweetsHandler.java deleted file mode 100644 index 10213a7171..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/integration/twitter/impl/OutgoingTweetsHandler.java +++ /dev/null @@ -1,266 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.integration.twitter.impl; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import org.apache.activemq.api.core.SimpleString; -import org.apache.activemq.core.filter.Filter; -import org.apache.activemq.core.postoffice.Binding; -import org.apache.activemq.core.postoffice.PostOffice; -import org.apache.activemq.core.server.ConnectorService; -import org.apache.activemq.core.server.Consumer; -import org.apache.activemq.core.server.HandleStatus; -import org.apache.activemq.core.server.ActiveMQServerLogger; -import org.apache.activemq.core.server.MessageReference; -import org.apache.activemq.core.server.Queue; -import org.apache.activemq.core.server.ServerMessage; -import org.apache.activemq.integration.twitter.TwitterConstants; -import org.apache.activemq.twitter.ActiveMQTwitterLogger; -import org.apache.activemq.utils.ConfigurationHelper; -import twitter4j.GeoLocation; -import twitter4j.StatusUpdate; -import twitter4j.Twitter; -import twitter4j.TwitterException; -import twitter4j.TwitterFactory; -import twitter4j.http.AccessToken; - -/** - * OutgoingTweetsHandler consumes from configured ActiveMQ address - * and forwards to the twitter. - */ -public class OutgoingTweetsHandler implements Consumer, ConnectorService -{ - private final String connectorName; - - private final String consumerKey; - - private final String consumerSecret; - - private final String accessToken; - - private final String accessTokenSecret; - - private final String queueName; - - private final PostOffice postOffice; - - private Twitter twitter = null; - - private Queue queue = null; - - private Filter filter = null; - - private boolean isStarted = false; - - - public String debug() - { - return toString(); - } - - public OutgoingTweetsHandler(final String connectorName, - final Map configuration, - final PostOffice postOffice) - { - this.connectorName = connectorName; - this.consumerKey = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_KEY, null, configuration); - this.consumerSecret = ConfigurationHelper.getStringProperty(TwitterConstants.CONSUMER_SECRET, null, configuration); - this.accessToken = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN, null, configuration); - this.accessTokenSecret = ConfigurationHelper.getStringProperty(TwitterConstants.ACCESS_TOKEN_SECRET, null, configuration); - this.queueName = ConfigurationHelper.getStringProperty(TwitterConstants.QUEUE_NAME, null, configuration); - this.postOffice = postOffice; - } - - /** - * TODO streaming API support - * TODO rate limit support - */ - public synchronized void start() throws Exception - { - if (this.isStarted) - { - return; - } - - if (this.connectorName == null || this.connectorName.trim().equals("")) - { - throw new Exception("invalid connector name: " + this.connectorName); - } - - if (this.queueName == null || this.queueName.trim().equals("")) - { - throw new Exception("invalid queue name: " + queueName); - } - - SimpleString name = new SimpleString(this.queueName); - Binding b = this.postOffice.getBinding(name); - if (b == null) - { - throw new Exception(connectorName + ": queue " + queueName + " not found"); - } - this.queue = (Queue) b.getBindable(); - - TwitterFactory tf = new TwitterFactory(); - this.twitter = tf.getOAuthAuthorizedInstance(this.consumerKey, - this.consumerSecret, - new AccessToken(this.accessToken, - this.accessTokenSecret)); - this.twitter.verifyCredentials(); - - // TODO make filter-string configurable - // this.filter = FilterImpl.createFilter(filterString); - this.filter = null; - - this.queue.addConsumer(this); - - this.queue.deliverAsync(); - this.isStarted = true; - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": started"); - } - - public boolean isStarted() - { - return isStarted; //To change body of implemented methods use File | Settings | File Templates. - } - - public synchronized void stop() throws Exception - { - if (!this.isStarted) - { - return; - } - - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": receive shutdown request"); - - this.queue.removeConsumer(this); - - this.isStarted = false; - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": shutdown"); - } - - public String getName() - { - return connectorName; - } - - public Filter getFilter() - { - return filter; - } - - /* (non-Javadoc) - * @see org.apache.activemq.core.server.Consumer#getDeliveringMessages() - */ - @Override - public List getDeliveringMessages() - { - return Collections.emptyList(); - } - - public HandleStatus handle(final MessageReference ref) throws Exception - { - if (filter != null && !filter.match(ref.getMessage())) - { - return HandleStatus.NO_MATCH; - } - - synchronized (this) - { - ref.handled(); - - ServerMessage message = ref.getMessage(); - - StatusUpdate status = new StatusUpdate(message.getBodyBuffer().readString()); - - // set optional property - - if (message.containsProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID)) - { - status.setInReplyToStatusId(message.getLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID)); - } - - if (message.containsProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE)) - { - double geolat = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LATITUDE); - double geolong = message.getDoubleProperty(TwitterConstants.KEY_GEO_LOCATION_LONGITUDE); - status.setLocation(new GeoLocation(geolat, geolong)); - } - - if (message.containsProperty(TwitterConstants.KEY_PLACE_ID)) - { - status.setPlaceId(message.getStringProperty(TwitterConstants.KEY_PLACE_ID)); - } - - if (message.containsProperty(TwitterConstants.KEY_DISPLAY_COODINATES)) - { - status.setDisplayCoordinates(message.getBooleanProperty(TwitterConstants.KEY_DISPLAY_COODINATES)); - } - - // send to Twitter - try - { - this.twitter.updateStatus(status); - } - catch (TwitterException e) - { - if (e.getStatusCode() == 403) - { - // duplicated message - ActiveMQTwitterLogger.LOGGER.error403(connectorName); - queue.acknowledge(ref); - - return HandleStatus.HANDLED; - } - else - { - throw e; - } - } - - queue.acknowledge(ref); - ActiveMQTwitterLogger.LOGGER.debug(connectorName + ": forwarded to twitter: " + message.getMessageID()); - return HandleStatus.HANDLED; - } - } - - public void proceedDeliver(MessageReference ref) - { - // no op - } - - @Override - public String toManagementString() - { - return toString(); - } - - @Override - public void disconnect() - { - try - { - stop(); - } - catch (Exception e) - { - ActiveMQServerLogger.LOGGER.errorStoppingConnectorService(e, getName()); - } - } -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterBundle.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterBundle.java deleted file mode 100644 index d6a7718cc9..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterBundle.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.twitter; - - -import org.jboss.logging.annotations.MessageBundle; - -/** - * @author Andy Taylor - * 3/12/12 - * - * Logger Code 18 - * - * each message id must be 6 digits long starting with 18, the 3rd digit should be 9 - * - * so 189000 to 189999 - */ -@MessageBundle(projectCode = "AMQ") -public class ActiveMQTwitterBundle -{ -} diff --git a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterLogger.java b/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterLogger.java deleted file mode 100644 index 9c664076e4..0000000000 --- a/integration/activemq-twitter-integration/src/main/java/org/apache/activemq/twitter/ActiveMQTwitterLogger.java +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.twitter; - -import org.jboss.logging.BasicLogger; -import org.jboss.logging.Logger; -import org.jboss.logging.annotations.Cause; -import org.jboss.logging.annotations.LogMessage; -import org.jboss.logging.annotations.Message; -import org.jboss.logging.annotations.MessageLogger; - -/** - * @author Andy Taylor - * 3/15/12 - * - * Logger Code 18 - * - * each message id must be 6 digits long starting with 18, the 3rd digit donates the level so - * - * INF0 1 - * WARN 2 - * DEBUG 3 - * ERROR 4 - * TRACE 5 - * FATAL 6 - * - * so an INFO message would be 181000 to 181999 - */ -@MessageLogger(projectCode = "AMQ") -public interface ActiveMQTwitterLogger extends BasicLogger -{ - /** - * The twitter logger. - */ - ActiveMQTwitterLogger LOGGER = Logger.getMessageLogger(ActiveMQTwitterLogger.class, ActiveMQTwitterLogger.class.getPackage().getName()); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 182001, value = "{0}: HTTP status code = 403: Ignore duplicated message", format = Message.Format.MESSAGE_FORMAT) - void error403(String connectorName); - - @LogMessage(level = Logger.Level.WARN) - @Message(id = 182002, value = "Error polling Twitter", format = Message.Format.MESSAGE_FORMAT) - void errorPollingTwitter(@Cause Throwable t); -} diff --git a/pom.xml b/pom.xml index 580ce49bf0..912fa6ab58 100644 --- a/pom.xml +++ b/pom.xml @@ -281,14 +281,6 @@ jboss-logging-spi 2.1.0.GA - - - org.twitter4j - twitter4j-core - - 2.1.2 - - org.apache.qpid @@ -505,7 +497,6 @@ activemq-service-extensions integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration tests @@ -532,7 +523,6 @@ activemq-service-extensions integration/activemq-jboss-as-integration integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration examples @@ -559,7 +549,6 @@ activemq-service-extensions integration/activemq-jboss-as-integration integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration examples @@ -588,7 +577,6 @@ activemq-service-extensions integration/activemq-jboss-as-integration integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration tests @@ -627,7 +615,6 @@ activemq-service-extensions integration/activemq-jboss-as-integration integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration tests @@ -662,7 +649,6 @@ activemq-service-extensions integration/activemq-jboss-as-integration integration/activemq-spring-integration - integration/activemq-twitter-integration integration/activemq-aerogear-integration integration/activemq-vertx-integration tests diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 42a31ba039..d09a5751a9 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -69,11 +69,6 @@ activemq-tools ${project.version} - - org.apache.activemq - activemq-twitter-integration - ${project.version} - org.apache.activemq activemq-spring-integration @@ -121,10 +116,6 @@ org.jboss.security jbosssx - - org.twitter4j - twitter4j-core - junit junit diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/twitter/TwitterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/twitter/TwitterTest.java deleted file mode 100644 index 4091073597..0000000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/twitter/TwitterTest.java +++ /dev/null @@ -1,637 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.tests.integration.twitter; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Set; - -import org.apache.activemq.api.core.TransportConfiguration; -import org.apache.activemq.api.core.client.ClientConsumer; -import org.apache.activemq.api.core.client.ClientMessage; -import org.apache.activemq.api.core.client.ClientProducer; -import org.apache.activemq.api.core.client.ClientSession; -import org.apache.activemq.api.core.client.ClientSessionFactory; -import org.apache.activemq.api.core.client.ActiveMQClient; -import org.apache.activemq.api.core.client.ServerLocator; -import org.apache.activemq.core.config.Configuration; -import org.apache.activemq.core.config.ConnectorServiceConfiguration; -import org.apache.activemq.core.config.CoreQueueConfiguration; -import org.apache.activemq.core.server.ConnectorService; -import org.apache.activemq.core.server.ActiveMQServer; -import org.apache.activemq.integration.twitter.TwitterConstants; -import org.apache.activemq.integration.twitter.TwitterIncomingConnectorServiceFactory; -import org.apache.activemq.integration.twitter.TwitterOutgoingConnectorServiceFactory; -import org.apache.activemq.tests.integration.IntegrationTestLogger; -import org.apache.activemq.tests.util.ServiceTestBase; -import org.apache.activemq.tests.util.UnitTestCase; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; -import twitter4j.Paging; -import twitter4j.ResponseList; -import twitter4j.Status; -import twitter4j.Twitter; -import twitter4j.TwitterFactory; -import twitter4j.http.AccessToken; - -/** - * A TwitterTest - * - * @author tm.igarashi@gmail.com - */ -public class TwitterTest extends ServiceTestBase -{ - private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; - private static final String KEY_CONNECTOR_NAME = "connector.name"; - private static final String KEY_CONSUMER_KEY = "consumerKey"; - private static final String KEY_CONSUMER_SECRET = "consumerSecret"; - private static final String KEY_ACCESS_TOKEN = "accessToken"; - private static final String KEY_ACCESS_TOKEN_SECRET = "accessTokenSecret"; - private static final String KEY_QUEUE_NAME = "queue.name"; - - private static final String TWITTER_CONSUMER_KEY = System.getProperty("twitter.consumerKey"); - private static final String TWITTER_CONSUMER_SECRET = System.getProperty("twitter.consumerSecret"); - private static final String TWITTER_ACCESS_TOKEN = System.getProperty("twitter.accessToken"); - private static final String TWITTER_ACCESS_TOKEN_SECRET = System.getProperty("twitter.accessTokenSecret"); - - // incoming - - @Override - @Before - public void setUp() throws Exception - { - super.setUp(); - } - - @BeforeClass - public static void hasCredentials() - { - Assume.assumeNotNull(TWITTER_CONSUMER_KEY); - Assume.assumeFalse("null".equals(TWITTER_CONSUMER_KEY)); - } - - @Test - public void testSimpleIncoming() throws Exception - { - internalTestIncoming(true, false); - } - - @Test - public void testIncomingNoQueue() throws Exception - { - internalTestIncoming(false, false); - } - - @Test - public void testIncomingWithRestart() throws Exception - { - internalTestIncoming(true, true); - } - - @Test - public void testIncomingWithEmptyConnectorName() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_CONNECTOR_NAME, ""); - internalTestIncomingFailedToInitialize(params); - } - - @Test - public void testIncomingWithEmptyQueueName() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_QUEUE_NAME, ""); - internalTestIncomingFailedToInitialize(params); - } - - @Test - public void testIncomingWithInvalidCredentials() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_CONSUMER_KEY, "invalidConsumerKey"); - params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret"); - params.put(KEY_ACCESS_TOKEN, "invalidAccessToken"); - params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret"); - internalTestIncomingFailedToInitialize(params); - } - - //outgoing - - @Test - public void testSimpleOutgoing() throws Exception - { - internalTestOutgoing(true, false); - } - - @Test - public void testOutgoingNoQueue() throws Exception - { - internalTestOutgoing(false, false); - } - - @Test - public void testOutgoingWithRestart() throws Exception - { - internalTestOutgoing(true, true); - } - - @Test - public void testOutgoingWithEmptyConnectorName() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_CONNECTOR_NAME, ""); - internalTestOutgoingFailedToInitialize(params); - } - - @Test - public void testOutgoingWithEmptyQueueName() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_QUEUE_NAME, ""); - internalTestOutgoingFailedToInitialize(params); - } - - @Test - public void testOutgoingWithInvalidCredentials() throws Exception - { - HashMap params = new HashMap(); - params.put(KEY_CONSUMER_KEY, "invalidConsumerKey"); - params.put(KEY_CONSUMER_SECRET, "invalidConsumerSecret"); - params.put(KEY_ACCESS_TOKEN, "invalidAccessToken"); - params.put(KEY_ACCESS_TOKEN_SECRET, "invalidAcccessTokenSecret"); - internalTestOutgoingFailedToInitialize(params); - } - - @Test - public void testOutgoingWithInReplyTo() throws Exception - { - internalTestOutgoingWithInReplyTo(); - } - - protected void internalTestIncoming(boolean createQueue, boolean restart) throws Exception - { - ActiveMQServer server0 = null; - ClientSession session = null; - ServerLocator locator = null; - String queue = "TwitterTestQueue"; - int interval = 5; - Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY, - TWITTER_CONSUMER_SECRET, - new AccessToken(TWITTER_ACCESS_TOKEN, - TWITTER_ACCESS_TOKEN_SECRET)); - String testMessage = "TwitterTest/incoming: " + System.currentTimeMillis(); - log.debug("test incoming: " + testMessage); - - try - { - HashMap config = new HashMap(); - config.put(TwitterConstants.INCOMING_INTERVAL, interval); - config.put(TwitterConstants.QUEUE_NAME, queue); - config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY); - config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET); - config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN); - config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET); - ConnectorServiceConfiguration inconf = new ConnectorServiceConfiguration() - .setFactoryClassName(TwitterIncomingConnectorServiceFactory.class.getName()) - .setParams(config) - .setName("test-incoming-connector"); - - Configuration configuration = createDefaultConfig(false) - .addConnectorServiceConfiguration(inconf); - - if (createQueue) - { - CoreQueueConfiguration qc = new CoreQueueConfiguration() - .setAddress(queue) - .setName(queue); - configuration.getQueueConfigurations().add(qc); - } - - server0 = createServer(false, configuration); - server0.start(); - - if (restart) - { - server0.getConnectorsService().stop(); - server0.getConnectorsService().start(); - } - - assertEquals(1, server0.getConnectorsService().getConnectors().size()); - Iterator connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator(); - if (createQueue) - { - Assert.assertTrue(connectorServiceIterator.next().isStarted()); - } - else - { - Assert.assertFalse(connectorServiceIterator.next().isStarted()); - return; - } - - twitter.updateStatus(testMessage); - - TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY); - locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf); - ClientSessionFactory sf = createSessionFactory(locator); - session = sf.createSession(false, true, true); - ClientConsumer consumer = session.createConsumer(queue); - session.start(); - ClientMessage msg = consumer.receive(60 * 1000); - - Assert.assertNotNull(msg); - Assert.assertEquals(testMessage, msg.getBodyBuffer().readString()); - - msg.acknowledge(); - } - finally - { - try - { - session.close(); - } - catch (Throwable t) - { - } - - try - { - locator.close(); - } - catch (Throwable ignored) - { - } - - try - { - server0.stop(); - } - catch (Throwable ignored) - { - } - } - } - - protected void internalTestIncomingFailedToInitialize(HashMap params) throws Exception - { - ActiveMQServer server0 = null; - String connectorName = "test-incoming-connector"; - String queue = "TwitterTestQueue"; - String consumerKey = "invalidConsumerKey"; - String consumerSecret = "invalidConsumerSecret"; - String accessToken = "invalidAccessToken"; - String accessTokenSecret = "invalidAccessTokenSecret"; - int interval = 5; - - if (params.containsKey(KEY_CONNECTOR_NAME)) - { - connectorName = params.get(KEY_CONNECTOR_NAME); - } - if (params.containsKey(KEY_CONSUMER_KEY)) - { - consumerKey = params.get(KEY_CONSUMER_KEY); - } - if (params.containsKey(KEY_CONSUMER_SECRET)) - { - consumerSecret = params.get(KEY_CONSUMER_SECRET); - } - if (params.containsKey(KEY_ACCESS_TOKEN)) - { - accessToken = params.get(KEY_ACCESS_TOKEN); - } - if (params.containsKey(KEY_ACCESS_TOKEN_SECRET)) - { - accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET); - } - if (params.containsKey(KEY_QUEUE_NAME)) - { - queue = params.get(KEY_QUEUE_NAME); - } - - try - { - HashMap config = new HashMap(); - config.put(TwitterConstants.INCOMING_INTERVAL, interval); - config.put(TwitterConstants.QUEUE_NAME, queue); - config.put(TwitterConstants.CONSUMER_KEY, consumerKey); - config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret); - config.put(TwitterConstants.ACCESS_TOKEN, accessToken); - config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret); - - ConnectorServiceConfiguration inconf = new ConnectorServiceConfiguration() - .setFactoryClassName(TwitterIncomingConnectorServiceFactory.class.getName()) - .setParams(config) - .setName(connectorName); - - CoreQueueConfiguration qc = new CoreQueueConfiguration() - .setAddress(queue) - .setName(queue); - - Configuration configuration = createDefaultConfig(false) - .addConnectorServiceConfiguration(inconf) - .addQueueConfiguration(qc); - - server0 = createServer(false, configuration); - server0.start(); - - Set conns = server0.getConnectorsService().getConnectors(); - Assert.assertEquals(1, conns.size()); - Iterator it = conns.iterator(); - Assert.assertFalse(it.next().isStarted()); - } - finally - { - try - { - server0.stop(); - } - catch (Throwable ignored) - { - } - } - } - - protected void internalTestOutgoing(boolean createQueue, boolean restart) throws Exception - { - ActiveMQServer server0 = null; - ServerLocator locator = null; - ClientSession session = null; - String queue = "TwitterTestQueue"; - Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY, - TWITTER_CONSUMER_SECRET, - new AccessToken(TWITTER_ACCESS_TOKEN, - TWITTER_ACCESS_TOKEN_SECRET)); - String testMessage = "TwitterTest/outgoing: " + System.currentTimeMillis(); - log.debug("test outgoing: " + testMessage); - - try - { - HashMap config = new HashMap(); - config.put(TwitterConstants.QUEUE_NAME, queue); - config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY); - config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET); - config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN); - config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET); - ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration() - .setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName()) - .setParams(config) - .setName("test-outgoing-connector"); - - Configuration configuration = createDefaultConfig(false) - .addConnectorServiceConfiguration(outconf); - - if (createQueue) - { - CoreQueueConfiguration qc = new CoreQueueConfiguration() - .setAddress(queue) - .setName(queue) - .setDurable(false); - configuration.getQueueConfigurations().add(qc); - } - - server0 = createServer(false, configuration); - server0.start(); - - if (restart) - { - server0.getConnectorsService().stop(); - server0.getConnectorsService().start(); - } - - assertEquals(1, server0.getConnectorsService().getConnectors().size()); - Iterator connectorServiceIterator = server0.getConnectorsService().getConnectors().iterator(); - if (createQueue) - { - Assert.assertTrue(connectorServiceIterator.next().isStarted()); - } - else - { - Assert.assertFalse(connectorServiceIterator.next().isStarted()); - return; - } - - TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY); - locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf); - ClientSessionFactory sf = createSessionFactory(locator); - session = sf.createSession(false, true, true); - ClientProducer producer = session.createProducer(queue); - ClientMessage msg = session.createMessage(false); - msg.getBodyBuffer().writeString(testMessage); - session.start(); - producer.send(msg); - - Thread.sleep(3000); - - Paging page = new Paging(); - page.setCount(1); - ResponseList res = twitter.getHomeTimeline(page); - - Assert.assertEquals(testMessage, ((Status) (res.get(0))).getText()); - } - finally - { - try - { - session.close(); - } - catch (Throwable t) - { - } - - try - { - locator.close(); - } - catch (Throwable t) - { - } - - try - { - server0.stop(); - } - catch (Throwable ignored) - { - } - } - } - - protected void internalTestOutgoingFailedToInitialize(HashMap params) throws Exception - { - ActiveMQServer server0 = null; - String connectorName = "test-outgoing-connector"; - String queue = "TwitterTestQueue"; - String consumerKey = TWITTER_CONSUMER_KEY; - String consumerSecret = TWITTER_CONSUMER_SECRET; - String accessToken = TWITTER_ACCESS_TOKEN; - String accessTokenSecret = TWITTER_ACCESS_TOKEN_SECRET; - - if (params.containsKey(KEY_CONNECTOR_NAME)) - { - connectorName = params.get(KEY_CONNECTOR_NAME); - } - if (params.containsKey(KEY_CONSUMER_KEY)) - { - consumerKey = params.get(KEY_CONSUMER_KEY); - } - if (params.containsKey(KEY_CONSUMER_SECRET)) - { - consumerSecret = params.get(KEY_CONSUMER_SECRET); - } - if (params.containsKey(KEY_ACCESS_TOKEN)) - { - accessToken = params.get(KEY_ACCESS_TOKEN); - } - if (params.containsKey(KEY_ACCESS_TOKEN_SECRET)) - { - accessTokenSecret = params.get(KEY_ACCESS_TOKEN_SECRET); - } - if (params.containsKey(KEY_QUEUE_NAME)) - { - queue = params.get(KEY_QUEUE_NAME); - } - - try - { - HashMap config = new HashMap(); - config.put(TwitterConstants.QUEUE_NAME, queue); - config.put(TwitterConstants.CONSUMER_KEY, consumerKey); - config.put(TwitterConstants.CONSUMER_SECRET, consumerSecret); - config.put(TwitterConstants.ACCESS_TOKEN, accessToken); - config.put(TwitterConstants.ACCESS_TOKEN_SECRET, accessTokenSecret); - - ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration() - .setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName()) - .setParams(config) - .setName(connectorName); - - CoreQueueConfiguration qc = new CoreQueueConfiguration() - .setAddress(queue) - .setName(queue) - .setDurable(false); - - Configuration configuration = createDefaultConfig(false) - .addConnectorServiceConfiguration(outconf) - .addQueueConfiguration(qc); - - server0 = createServer(false, configuration); - server0.start(); - - } - finally - { - try - { - server0.stop(); - } - catch (Throwable ignored) - { - } - } - } - - protected void internalTestOutgoingWithInReplyTo() throws Exception - { - ActiveMQServer server0 = null; - ClientSession session = null; - ServerLocator locator = null; - String queue = "TwitterTestQueue"; - Twitter twitter = new TwitterFactory().getOAuthAuthorizedInstance(TWITTER_CONSUMER_KEY, - TWITTER_CONSUMER_SECRET, - new AccessToken(TWITTER_ACCESS_TOKEN, - TWITTER_ACCESS_TOKEN_SECRET)); - String testMessage = "TwitterTest/outgoing with in_reply_to: " + System.currentTimeMillis(); - String replyMessage = "@" + twitter.getScreenName() + " TwitterTest/outgoing reply: " + System.currentTimeMillis(); - try - { - HashMap config = new HashMap(); - config.put(TwitterConstants.QUEUE_NAME, queue); - config.put(TwitterConstants.CONSUMER_KEY, TWITTER_CONSUMER_KEY); - config.put(TwitterConstants.CONSUMER_SECRET, TWITTER_CONSUMER_SECRET); - config.put(TwitterConstants.ACCESS_TOKEN, TWITTER_ACCESS_TOKEN); - config.put(TwitterConstants.ACCESS_TOKEN_SECRET, TWITTER_ACCESS_TOKEN_SECRET); - - ConnectorServiceConfiguration outconf = new ConnectorServiceConfiguration() - .setFactoryClassName(TwitterOutgoingConnectorServiceFactory.class.getName()) - .setParams(config) - .setName("test-outgoing-with-in-reply-to"); - - CoreQueueConfiguration qc = new CoreQueueConfiguration() - .setAddress(queue) - .setName(queue) - .setDurable(false); - - Configuration configuration = createDefaultConfig(false) - .addConnectorServiceConfiguration(outconf) - .addQueueConfiguration(qc); - - Status s = twitter.updateStatus(testMessage); - - server0 = createServer(false, configuration); - server0.start(); - - TransportConfiguration tpconf = new TransportConfiguration(UnitTestCase.INVM_CONNECTOR_FACTORY); - locator = ActiveMQClient.createServerLocatorWithoutHA(tpconf); - - ClientSessionFactory sf = createSessionFactory(locator); - session = sf.createSession(false, true, true); - ClientProducer producer = session.createProducer(queue); - ClientMessage msg = session.createMessage(false); - msg.getBodyBuffer().writeString(replyMessage); - msg.putLongProperty(TwitterConstants.KEY_IN_REPLY_TO_STATUS_ID, s.getId()); - session.start(); - producer.send(msg); - - Thread.sleep(3000); - - Paging page = new Paging(); - page.setCount(2); - ResponseList res = twitter.getHomeTimeline(page); - - Assert.assertEquals(testMessage, ((Status) (res.get(1))).getText()); - Assert.assertEquals(-1, ((Status) (res.get(1))).getInReplyToStatusId()); - Assert.assertEquals(replyMessage, ((Status) (res.get(0))).getText()); - Assert.assertEquals(s.getId(), ((Status) (res.get(0))).getInReplyToStatusId()); - } - finally - { - try - { - session.close(); - } - catch (Throwable t) - { - } - try - { - locator.close(); - } - catch (Throwable t) - { - } - try - { - server0.stop(); - } - catch (Throwable ignored) - { - } - } - } -}