git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1461133 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2013-03-26 13:58:26 +00:00
parent 8a49bbe4b6
commit 9c909b5975
8 changed files with 0 additions and 522 deletions

View File

@ -1,175 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An agent which listens to commands on a JMS destination
*
*
* @org.apache.xbean.XBean
*/
public class CommandAgent implements Service, ExceptionListener {
private static final Logger LOG = LoggerFactory.getLogger(CommandAgent.class);
private String brokerUrl = "vm://localhost";
private String username;
private String password;
private ConnectionFactory connectionFactory;
private Connection connection;
private Destination commandDestination;
private CommandMessageListener listener;
private Session session;
private MessageConsumer consumer;
/**
*
* @throws Exception
* @org.apache.xbean.InitMethod
*/
@PostConstruct
public void start() throws Exception {
session = getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
listener = new CommandMessageListener(session);
Destination destination = getCommandDestination();
if (LOG.isDebugEnabled()) {
LOG.debug("Agent subscribing to control destination: " + destination);
}
consumer = session.createConsumer(destination);
consumer.setMessageListener(listener);
}
/**
*
* @throws Exception
* @org.apache.xbean.DestroyMethod
*/
@PreDestroy
public void stop() throws Exception {
if (consumer != null) {
try {
consumer.close();
consumer = null;
} catch (JMSException ignored) {
}
}
if (session != null) {
try {
session.close();
session = null;
} catch (JMSException ignored) {
}
}
if (connection != null) {
try {
connection.close();
connection = null;
} catch (JMSException ignored) {
}
}
}
// Properties
// -------------------------------------------------------------------------
public String getBrokerUrl() {
return brokerUrl;
}
public void setBrokerUrl(String brokerUrl) {
this.brokerUrl = brokerUrl;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
}
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public Connection getConnection() throws JMSException {
if (connection == null) {
connection = createConnection();
connection.setExceptionListener(this);
connection.start();
}
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public Destination getCommandDestination() {
if (commandDestination == null) {
commandDestination = createCommandDestination();
}
return commandDestination;
}
public void setCommandDestination(Destination commandDestination) {
this.commandDestination = commandDestination;
}
protected Connection createConnection() throws JMSException {
return getConnectionFactory().createConnection(username, password);
}
protected Destination createCommandDestination() {
return AdvisorySupport.getAgentDestination();
}
public void onException(JMSException exception) {
try {
stop();
} catch (Exception e) {
}
}
}

View File

@ -1,28 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import javax.jms.TextMessage;
/**
* Represents a processor of text based commands
*
*
*/
public interface CommandHandler {
void processCommand(TextMessage request, TextMessage response) throws Exception;
}

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.util;
import java.io.IOException;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.FactoryFinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class CommandMessageListener implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(CommandMessageListener.class);
private Session session;
private MessageProducer producer;
private CommandHandler handler;
public CommandMessageListener(Session session) {
this.session = session;
}
public void onMessage(Message message) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received command: " + message);
}
if (message instanceof TextMessage) {
TextMessage request = (TextMessage)message;
try {
Destination replyTo = message.getJMSReplyTo();
if (replyTo == null) {
LOG.warn("Ignored message as no JMSReplyTo set: " + message);
return;
}
Message response = processCommand(request);
addReplyHeaders(request, response);
getProducer().send(replyTo, response);
} catch (Exception e) {
LOG.error("Failed to process message due to: " + e + ". Message: " + message, e);
}
} else {
LOG.warn("Ignoring invalid message: " + message);
}
}
protected void addReplyHeaders(TextMessage request, Message response) throws JMSException {
String correlationID = request.getJMSCorrelationID();
if (correlationID != null) {
response.setJMSCorrelationID(correlationID);
}
}
/**
* Processes an incoming JMS message returning the response message
*/
public Message processCommand(TextMessage request) throws Exception {
TextMessage response = session.createTextMessage();
getHandler().processCommand(request, response);
return response;
}
/**
* Processes an incoming command from a console and returning the text to
* output
*/
public String processCommandText(String line) throws Exception {
TextMessage request = new ActiveMQTextMessage();
request.setText(line);
TextMessage response = new ActiveMQTextMessage();
getHandler().processCommand(request, response);
return response.getText();
}
public Session getSession() {
return session;
}
public MessageProducer getProducer() throws JMSException {
if (producer == null) {
producer = getSession().createProducer(null);
}
return producer;
}
public CommandHandler getHandler() throws IllegalAccessException, IOException, InstantiationException, ClassNotFoundException {
if (handler == null) {
handler = createHandler();
}
return handler;
}
private CommandHandler createHandler() throws IllegalAccessException, IOException, ClassNotFoundException, InstantiationException {
FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/broker/");
return (CommandHandler)factoryFinder.newInstance("agent");
}
}

View File

@ -1,68 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import javax.jms.TextMessage;
import org.apache.activemq.broker.util.CommandHandler;
import org.apache.activemq.console.command.ShellCommand;
import org.apache.activemq.console.formatter.CommandShellOutputFormatter;
/**
* A default implementation of the @{link CommandHandler} interface
*
*
*/
public class ConsoleCommandHandler implements CommandHandler {
private ShellCommand command = new ShellCommand(true);
public void processCommand(TextMessage request, TextMessage response) throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
CommandContext ctx = new CommandContext();
ctx.setFormatter(new CommandShellOutputFormatter(out));
// lets turn the text into a list of arguments
String requestText = request.getText();
List<String> tokens = tokenize(requestText);
command.setCommandContext(ctx);
command.execute(tokens);
out.flush();
byte[] bytes = out.toByteArray();
String answer = new String(bytes);
response.setText(answer);
}
protected List<String> tokenize(String text) {
List<String> answer = new ArrayList<String>();
StringTokenizer iter = new StringTokenizer(text);
while (iter.hasMoreTokens()) {
answer.add(iter.nextToken());
}
return answer;
}
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.console.util;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.activemq.broker.util.CommandMessageListener;
/**
* A simple interactive console which can be used to communicate with a running
* broker assuming that the classpath is fully setup
*
*
*/
public final class SimpleConsole {
private SimpleConsole() {
}
public static void main(String[] args) {
CommandMessageListener listener = new CommandMessageListener(null);
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
if (line == null || "quit".equalsIgnoreCase(line)) {
break;
}
line = line.trim();
if (line.length() == 0) {
continue;
}
System.out.println(listener.processCommandText(line));
}
} catch (Exception e) {
System.out.println("Caught: " + e);
e.printStackTrace();
}
}
}

View File

@ -1,17 +0,0 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.console.ConsoleCommandHandler

View File

@ -1,50 +0,0 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!--
Communicate with a broker using command agent over XMPP
For more information, see:
http://activemq.apache.org/command-agent.html and
http://activemq.apache.org/xmpp.html
To run ActiveMQ with this configuration add xbean:conf/activemq-command.xml to your command
e.g. $ bin/activemq console xbean:conf/activemq-command.xml
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="command-broker" dataDirectory="${activemq.data}">
<managementContext>
<managementContext createConnector="true"/>
</managementContext>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616"/>
</transportConnectors>
</broker>
<!-- Create a command agent -->
<commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost"/>
</beans>

View File

@ -124,12 +124,6 @@
</broker>
<!--
Configure command agent to be used in secured broker environment
Notice how we used ${activemq.username} and ${activemq.password} configured in credential.properties
-->
<commandAgent xmlns="http://activemq.apache.org/schema/core" brokerUrl="vm://localhost" username="${activemq.username}" password="${activemq.password}"/>
<!-- Use Web applications and Camel in secured broker environment -->
<import resource="jetty.xml"/>