Hiram R. Chirino 2006-10-15 03:31:28 +00:00
parent 171b108944
commit 9ca56a0049
40 changed files with 191 additions and 162 deletions

View File

@ -1143,7 +1143,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else {
try {
Response response = this.transport.request(command);
Response response = (Response) this.transport.request(command);
if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
if (er.getException() instanceof JMSException)
@ -1171,7 +1171,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} else {
try {
Response response = this.transport.request(command,timeout);
Response response = (Response) this.transport.request(command,timeout);
if (response!=null && response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
if (er.getException() instanceof JMSException)
@ -1427,7 +1427,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
/**
* @param command - the command to consume
*/
public void onCommand(final Command command) {
public void onCommand(final Object o) {
final Command command = (Command) o;
if (!closed.get() && command != null) {
if (command.isMessageDispatch()) {
MessageDispatch md = (MessageDispatch) command;

View File

@ -58,7 +58,8 @@ public class TransportConnection extends AbstractConnection {
connector.setBrokerName(broker.getBrokerName());
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
Response response = service(command);
if (response != null) {
dispatch(response);

View File

@ -360,7 +360,7 @@ public class MasterBroker extends InsertableMutableBrokerFilter{
protected void sendSyncToSlave(Command command){
try{
Response response=slave.request(command);
Response response=(Response) slave.request(command);
if (response.isException()){
ExceptionResponse er=(ExceptionResponse)response;
log.error("Slave Failed",er.getException());

View File

@ -109,7 +109,7 @@ public class MasterConnector implements Service, BrokerServiceAware {
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
localBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
}
public void onException(IOException error) {
@ -120,7 +120,8 @@ public class MasterConnector implements Service, BrokerServiceAware {
});
remoteBroker.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
if (started.get()) {
serviceRemoteCommand(command);
}

View File

@ -116,7 +116,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
public void start() throws Exception {
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
public void onCommand(Object o){
Command command = (Command) o;
serviceLocalCommand(command);
}
@ -125,7 +126,8 @@ public abstract class DemandForwardingBridgeSupport implements Bridge {
}
});
remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command){
public void onCommand(Object o){
Command command = (Command) o;
serviceRemoteCommand(command);
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.activemq.network;
import java.io.IOException;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
@ -43,8 +45,6 @@ import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
/**
* Forwards all messages from the local broker to the remote broker.
*
@ -86,7 +86,8 @@ public class ForwardingBridge implements Bridge {
log.info("Starting a network connection between " + localBroker + " and " + remoteBroker + " has been established.");
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
public void onException(IOException error) {
@ -95,7 +96,8 @@ public class ForwardingBridge implements Bridge {
});
remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
serviceRemoteCommand(command);
}
public void onException(IOException error) {

View File

@ -19,7 +19,6 @@ package org.apache.activemq.proxy;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
@ -60,7 +59,7 @@ class ProxyConnection implements Service {
}
this.localTransport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
boolean shutdown=false;
if( command.getClass() == ShutdownInfo.class ) {
shuttingDown.set(true);
@ -82,7 +81,7 @@ class ProxyConnection implements Service {
});
this.remoteTransport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
try {
localTransport.oneway(command);
} catch (IOException error) {

View File

@ -46,7 +46,8 @@ public class CommandJoiner extends TransportFilter {
this.wireFormat = wireFormat;
}
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
byte type = command.getDataStructureType();
if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
PartialCommand header = (PartialCommand) command;

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
/**
* An asynchronous listener of commands
@ -32,7 +31,7 @@ public class DefaultTransportListener implements TransportListener {
* called to process a command
* @param command
*/
public void onCommand(Command command){
public void onCommand(Object command){
}
/**
* An unrecoverable exception has occured on the transport

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.thread.Scheduler;
@ -107,10 +106,10 @@ public class InactivityMonitor extends TransportFilter {
commandReceived.set(false);
}
public void onCommand(Command command) {
public void onCommand(Object command) {
inReceive.set(true);
try {
if( command.isWireFormatInfo() ) {
if( command.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
remoteWireFormatInfo = (WireFormatInfo) command;
try {
@ -120,7 +119,7 @@ public class InactivityMonitor extends TransportFilter {
}
}
}
getTransportListener().onCommand(command);
transportListener.onCommand(command);
} finally {
inReceive.set(false);
commandReceived.set(true);
@ -128,18 +127,18 @@ public class InactivityMonitor extends TransportFilter {
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
// Disable inactivity monitoring while processing a command.
inSend.set(true);
commandSent.set(true);
try {
if( command.isWireFormatInfo() ) {
if( o.getClass() == WireFormatInfo.class ) {
synchronized( this ) {
localWireFormatInfo = (WireFormatInfo) command;
localWireFormatInfo = (WireFormatInfo) o;
startMonitorThreads();
}
}
next.oneway(command);
next.oneway(o);
} finally {
inSend.set(false);
}
@ -147,7 +146,7 @@ public class InactivityMonitor extends TransportFilter {
public void onException(IOException error) {
stopMonitorThreads();
getTransportListener().onException(error);
transportListener.onException(error);
}

View File

@ -33,11 +33,11 @@ public class MarshallingTransportFilter extends TransportFilter {
this.remoteWireFormat = remoteWireFormat;
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
next.oneway((Command) remoteWireFormat.unmarshal(localWireFormat.marshal(command)));
}
public void onCommand(Command command) {
public void onCommand(Object command) {
try {
getTransportListener().onCommand((Command)localWireFormat.unmarshal(remoteWireFormat.marshal(command)));
} catch (IOException e) {

View File

@ -19,8 +19,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
/**
@ -34,25 +32,25 @@ public class MutexTransport extends TransportFilter {
super(next);
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
synchronized(writeMutex) {
return next.asyncRequest(command, null);
}
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
synchronized(writeMutex) {
next.oneway(command);
}
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
synchronized(writeMutex) {
return next.request(command);
}
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
synchronized(writeMutex){
return next.request(command,timeout);
}

View File

@ -53,13 +53,15 @@ public class ResponseCorrelator extends TransportFilter {
this.sequenceGenerator = sequenceGenerator;
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(false);
next.oneway(command);
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
Command command = (Command) o;
command.setCommandId(sequenceGenerator.getNextSequenceId());
command.setResponseRequired(true);
FutureResponse future = new FutureResponse(responseCallback);
@ -68,17 +70,18 @@ public class ResponseCorrelator extends TransportFilter {
return future;
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult();
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
FutureResponse response = asyncRequest(command, null);
return response.getResult(timeout);
}
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
boolean debug = log.isDebugEnabled();
if( command.isResponse() ) {
Response response = (Response) command;

View File

@ -18,8 +18,6 @@
package org.apache.activemq.transport;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import java.io.IOException;
@ -36,7 +34,7 @@ public interface Transport extends Service {
* @param command
* @throws IOException
*/
public void oneway(Command command) throws IOException;
public void oneway(Object command) throws IOException;
/**
* An asynchronous request response where the Receipt will be returned
@ -48,7 +46,7 @@ public interface Transport extends Service {
* @return the FutureResponse
* @throws IOException
*/
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException;
/**
* A synchronous request response
@ -56,7 +54,7 @@ public interface Transport extends Service {
* @return the response
* @throws IOException
*/
public Response request(Command command) throws IOException;
public Object request(Object command) throws IOException;
/**
* A synchronous request response
@ -65,8 +63,45 @@ public interface Transport extends Service {
* @return the repsonse or null if timeout
* @throws IOException
*/
public Response request(Command command, int timeout) throws IOException;
public Object request(Object command, int timeout) throws IOException;
// /**
// * A one way asynchronous send
// * @param command
// * @throws IOException
// */
// public void oneway(Command command) throws IOException;
//
// /**
// * An asynchronous request response where the Receipt will be returned
// * in the future. If responseCallback is not null, then it will be called
// * when the response has been completed.
// *
// * @param command
// * @param responseCallback TODO
// * @return the FutureResponse
// * @throws IOException
// */
// public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException;
//
// /**
// * A synchronous request response
// * @param command
// * @return the response
// * @throws IOException
// */
// public Response request(Command command) throws IOException;
//
// /**
// * A synchronous request response
// * @param command
// * @param timeout
// * @return the repsonse or null if timeout
// * @throws IOException
// */
// public Response request(Command command, int timeout) throws IOException;
/**
* Returns the current transport listener
* @return

View File

@ -18,8 +18,6 @@
package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
/**
* @version $Revision: 1.5 $
*/
@ -63,7 +61,7 @@ public class TransportFilter implements TransportListener,Transport{
next.stop();
}
public void onCommand(Command command){
public void onCommand(Object command){
transportListener.onCommand(command);
}
@ -78,19 +76,19 @@ public class TransportFilter implements TransportListener,Transport{
return next.toString();
}
public void oneway(Command command) throws IOException{
public void oneway(Object command) throws IOException{
next.oneway(command);
}
public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
return next.asyncRequest(command,null);
}
public Response request(Command command) throws IOException{
public Object request(Object command) throws IOException{
return next.request(command);
}
public Response request(Command command,int timeout) throws IOException{
public Object request(Object command,int timeout) throws IOException{
return next.request(command,timeout);
}

View File

@ -19,7 +19,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
/**
* An asynchronous listener of commands
@ -32,7 +31,7 @@ public interface TransportListener {
* called to process a command
* @param command
*/
public void onCommand(Command command);
public void onCommand(Object command);
/**
* An unrecoverable exception has occured on the transport
* @param error

View File

@ -19,8 +19,6 @@ package org.apache.activemq.transport;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -46,40 +44,36 @@ public class TransportLogger extends TransportFilter {
this.log = log;
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
log.debug("SENDING REQUEST: "+command);
Response rc = super.request(command);
Object rc = super.request(command);
log.debug("GOT RESPONSE: "+rc);
return rc;
}
public Response request(Command command, int timeout) throws IOException {
public Object request(Object command, int timeout) throws IOException {
log.debug("SENDING REQUEST: "+command);
Response rc = super.request(command, timeout);
Object rc = super.request(command, timeout);
log.debug("GOT RESPONSE: "+rc);
return rc;
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
log.debug("SENDING ASNYC REQUEST: "+command);
FutureResponse rc = next.asyncRequest(command, responseCallback);
return rc;
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
if( log.isDebugEnabled() ) {
log.debug("SENDING: "+command);
}
next.oneway(command);
}
public void onCommand(Command command) {
public void onCommand(Object command) {
if( log.isDebugEnabled() ) {
if( command.getFrom()!=null ) {
log.debug("RECEIVED: from: "+ command.getFrom() + " : " + command);
} else {
log.debug("RECEIVED: " + command);
}
log.debug("RECEIVED: " + command);
}
getTransportListener().onCommand(command);
}

View File

@ -17,16 +17,12 @@
*/
package org.apache.activemq.transport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.ShutdownInfo;
import java.io.IOException;
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
/**
* A useful base class for transport implementations.
*
@ -67,22 +63,22 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
return null;
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
/**
* Process the inbound command
*/
public void doConsume(Command command) {
public void doConsume(Object command) {
if (command != null) {
if (transportListener != null) {
transportListener.onCommand(command);
@ -102,7 +98,7 @@ public abstract class TransportSupport extends ServiceSupport implements Transpo
}
}
protected void checkStarted(Command command) throws IOException {
protected void checkStarted() throws IOException {
if (!isStarted()) {
throw new IOException("The transport is not running.");
}

View File

@ -22,7 +22,6 @@ import java.io.InterruptedIOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
@ -83,7 +82,7 @@ public class WireFormatNegotiator extends TransportFilter {
readyCountDownLatch.countDown();
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
try {
if( !readyCountDownLatch.await(negotiateTimeout, TimeUnit.MILLISECONDS) )
throw new IOException("Wire format negociation timeout: peer did not send his wire format.");
@ -95,7 +94,8 @@ public class WireFormatNegotiator extends TransportFilter {
}
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
if( command.isWireFormatInfo() ) {
WireFormatInfo info = (WireFormatInfo) command;
if (log.isDebugEnabled()) {

View File

@ -85,7 +85,8 @@ public class FailoverTransport implements CompositeTransport {
TransportListener createTransportListener() {
return new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
if (command == null) {
return;
}
@ -340,7 +341,8 @@ public class FailoverTransport implements CompositeTransport {
this.randomize=randomize;
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
Command command = (Command) o;
Exception error = null;
try {
@ -429,15 +431,15 @@ public class FailoverTransport implements CompositeTransport {
}
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -107,7 +107,8 @@ public class FanoutTransport implements CompositeTransport {
this.uri=uri;
}
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
if (command.isResponse()) {
Integer id = new Integer(((Response) command).getCorrelationId());
RequestCounter rc = (RequestCounter) requestMap.get(id);
@ -331,7 +332,8 @@ public class FanoutTransport implements CompositeTransport {
this.maxReconnectAttempts = maxReconnectAttempts;
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
final Command command = (Command) o;
try {
synchronized (reconnectMutex) {
@ -414,15 +416,15 @@ public class FanoutTransport implements CompositeTransport {
return true;
}
public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}

View File

@ -19,8 +19,6 @@ package org.apache.activemq.transport.mock;
import java.io.IOException;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
@ -71,7 +69,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
next.stop();
}
synchronized public void onCommand(Command command) {
synchronized public void onCommand(Object command) {
transportListener.onCommand(command);
}
@ -93,19 +91,19 @@ public class MockTransport extends DefaultTransportListener implements Transport
return next.toString();
}
synchronized public void oneway(Command command) throws IOException {
synchronized public void oneway(Object command) throws IOException {
next.oneway(command);
}
synchronized public FutureResponse asyncRequest(Command command, ResponseCallback responseCallback) throws IOException {
synchronized public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
}
synchronized public Response request(Command command) throws IOException {
synchronized public Object request(Object command) throws IOException {
return next.request(command);
}
public Response request(Command command,int timeout) throws IOException {
public Object request(Object command,int timeout) throws IOException {
return next.request(command, timeout);
}

View File

@ -17,6 +17,10 @@
*/
package org.apache.activemq.transport.reliable;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ReplayCommand;
import org.apache.activemq.command.Response;
@ -25,14 +29,9 @@ import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCorrelator;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.udp.UdpTransport;
import org.apache.activemq.util.IntSequenceGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* This interceptor deals with out of order commands together with being able to
* handle dropped commands and the re-requesting dropped commands.
@ -78,7 +77,8 @@ public class ReliableTransport extends ResponseCorrelator {
}
}
public Response request(Command command) throws IOException {
public Object request(Object o) throws IOException {
final Command command = (Command) o;
FutureResponse response = asyncRequest(command, null);
while (true) {
Response result = response.getResult(requestTimeout);
@ -89,7 +89,8 @@ public class ReliableTransport extends ResponseCorrelator {
}
}
public Response request(Command command, int timeout) throws IOException {
public Object request(Object o, int timeout) throws IOException {
final Command command = (Command) o;
FutureResponse response = asyncRequest(command, null);
while (timeout > 0) {
int time = timeout;
@ -106,7 +107,8 @@ public class ReliableTransport extends ResponseCorrelator {
return response.getResult(0);
}
public void onCommand(Command command) {
public void onCommand(Object o) {
Command command = (Command) o;
// lets pass wireformat through
if (command.isWireFormatInfo()) {
super.onCommand(command);

View File

@ -50,15 +50,16 @@ public class StompTransportFilter extends TransportFilter {
this.protocolConverter = new ProtocolConverter(this, translator);
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
protocolConverter.onActiveMQCommad(command);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
}
public void onCommand(Command command) {
public void onCommand(Object command) {
try {
protocolConverter.onStompCommad((StompFrame) command);
} catch (IOException e) {

View File

@ -34,7 +34,6 @@ import java.util.Map;
import javax.net.SocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportThreadSupport;
import org.apache.activemq.util.IntrospectionSupport;
@ -113,8 +112,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
/**
* A one way asynchronous send
*/
public void oneway(Command command) throws IOException {
checkStarted(command);
public void oneway(Object command) throws IOException {
checkStarted();
wireFormat.marshal(command, dataOut);
dataOut.flush();
}
@ -133,7 +132,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
log.trace("TCP consumer thread starting");
while (!isStopped()) {
try {
Command command = (Command) wireFormat.unmarshal(dataIn);
Object command = wireFormat.unmarshal(dataIn);
doConsume(command);
}
catch (SocketTimeoutException e) {

View File

@ -34,7 +34,8 @@ public class ResponseRedirectInterceptor extends TransportFilter {
this.transport = transport;
}
public void onCommand(Command command) {
public void onCommand(Object o) {
final Command command = (Command) o;
// redirect to the endpoint that the last response came from
Endpoint from = command.getFrom();
transport.setTargetEndpoint(from);

View File

@ -113,19 +113,19 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* A one way asynchronous send
*/
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
oneway(command, targetAddress);
}
/**
* A one way asynchronous send to a given address
*/
public void oneway(Command command, SocketAddress address) throws IOException {
public void oneway(Object command, SocketAddress address) throws IOException {
if (log.isDebugEnabled()) {
log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
}
checkStarted(command);
commandChannel.write(command, address);
checkStarted();
commandChannel.write((Command) command, address);
}
/**

View File

@ -17,6 +17,13 @@
*/
package org.apache.activemq.transport.udp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.openwire.OpenWireFormat;
@ -33,13 +40,6 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* A UDP based implementation of {@link TransportServer}
*
@ -81,7 +81,8 @@ public class UdpTransportServer extends TransportServerSupport {
log.info("Starting " + this);
configuredTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object o) {
final Command command = (Command) o;
processInboundConnection(command);
}

View File

@ -20,7 +20,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -70,7 +69,7 @@ public class VMTransport implements Transport,Task{
this.peer=peer;
}
public void oneway(Command command) throws IOException{
public void oneway(Object command) throws IOException{
if(disposed){
throw new TransportDisposedIOException("Transport disposed.");
}
@ -88,7 +87,7 @@ public class VMTransport implements Transport,Task{
}
}
protected void syncOneWay(Command command){
protected void syncOneWay(Object command){
final TransportListener tl=peer.transportListener;
prePeerSetQueue=peer.prePeerSetQueue;
if(tl==null){
@ -98,7 +97,7 @@ public class VMTransport implements Transport,Task{
}
}
protected void asyncOneWay(Command command) throws IOException{
protected void asyncOneWay(Object command) throws IOException{
messageQueue=getMessageQueue();
try{
messageQueue.put(command);
@ -109,15 +108,15 @@ public class VMTransport implements Transport,Task{
}
}
public FutureResponse asyncRequest(Command command,ResponseCallback responseCallback) throws IOException{
public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
throw new AssertionError("Unsupported Method");
}
public Response request(Command command) throws IOException{
public Object request(Object command) throws IOException{
throw new AssertionError("Unsupported Method");
}
public Response request(Command command,int timeout) throws IOException{
public Object request(Object command,int timeout) throws IOException{
throw new AssertionError("Unsupported Method");
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.broker;
import java.io.IOException;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ExceptionResponse;
@ -30,6 +31,7 @@ import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@ -66,7 +68,7 @@ public class StubConnection implements Service {
};
}
protected void dispatch(Command command) throws InterruptedException, IOException {
protected void dispatch(Object command) throws InterruptedException, IOException {
dispatchQueue.put(command);
}
@ -77,7 +79,7 @@ public class StubConnection implements Service {
public StubConnection(Transport transport) throws Exception {
this.transport = transport;
transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
try {
if (command.getClass() == ShutdownInfo.class) {
shuttingDown = true;
@ -135,7 +137,7 @@ public class StubConnection implements Service {
return response;
}
else if (transport != null) {
Response response = transport.request(command);
Response response = (Response) transport.request(command);
if (response != null && response.isException()) {
ExceptionResponse er = (ExceptionResponse) response;
throw JMSExceptionSupport.create(er.getException());

View File

@ -20,7 +20,6 @@ package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.util.ServiceStopper;
import java.io.IOException;
@ -39,7 +38,7 @@ public class StubTransport extends TransportSupport {
protected void doStart() throws Exception {
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
queue.add(command);
}

View File

@ -20,7 +20,6 @@ package org.apache.activemq.transport;
import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.command.Command;
import java.io.IOException;
@ -41,7 +40,7 @@ public class StubTransportListener implements TransportListener {
return exceptions;
}
public void onCommand(Command command) {
public void onCommand(Object command) {
commands.add(command);
}

View File

@ -23,7 +23,6 @@ import java.net.URI;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@ -58,7 +57,7 @@ public class BadConnectionTest extends TestCase {
transport = createTransport();
transport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
}
public void onException(IOException error) {

View File

@ -28,7 +28,6 @@ import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
@ -159,7 +158,7 @@ public class FanoutTransportBrokerTest extends NetworkTestSupport {
// Slip in a new transport filter after the MockTransport
MockTransport mt = (MockTransport) connection3.getTransport().narrow(MockTransport.class);
mt.install(new TransportFilter(mt.getNext()) {
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
log.info("Dropping: "+command);
// just eat it! to simulate a recent failure.
}

View File

@ -21,7 +21,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.Transport;
@ -65,7 +64,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
private void startClient() throws Exception, URISyntaxException {
clientTransport = TransportFactory.connect(new URI("tcp://localhost:61616?trace=true&wireFormat.maxInactivityDuration=1000"));
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
clientReceiveCount.incrementAndGet();
if( clientRunOnCommand !=null ) {
clientRunOnCommand.run();
@ -117,7 +116,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
log.info("["+getName()+"] Server Accepted a Connection");
serverTransport = transport;
serverTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
serverReceiveCount.incrementAndGet();
if( serverRunOnCommand !=null ) {
serverRunOnCommand.run();
@ -151,7 +150,7 @@ public class InactivityMonitorTest extends CombinationTestSupport implements Tra
// this should simulate a client hang.
clientTransport = new TcpTransport(new OpenWireFormat(), SocketFactory.getDefault(), new URI("tcp://localhost:61616"), null);
clientTransport.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
clientReceiveCount.incrementAndGet();
if( clientRunOnCommand !=null ) {
clientRunOnCommand.run();

View File

@ -169,7 +169,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
producer = createProducer();
producer.setTransportListener(new TransportListener() {
public void onCommand(Command command) {
public void onCommand(Object command) {
log.info("Producer received: " + command);
}
@ -200,7 +200,8 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen
}
}
public void onCommand(Command command) {
public void onCommand(Object o) {
final Command command = (Command) o;
if (command instanceof WireFormatInfo) {
log.info("Got WireFormatInfo: " + command);
}

View File

@ -40,7 +40,7 @@ public class UdpTransportUsingServerTest extends UdpTestSupport {
expected.setSelector("Edam");
expected.setResponseRequired(true);
log.info("About to send: " + expected);
Response response = producer.request(expected, 2000);
Response response = (Response) producer.request(expected, 2000);
log.info("Received: " + response);
assertNotNull("Received a response", response);

View File

@ -21,7 +21,6 @@ import edu.emory.mathcs.backport.java.util.Queue;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.apache.activemq.command.Command;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
@ -47,7 +46,7 @@ public class BlockingQueueTransport extends TransportSupport {
return queue;
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
try {
boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
if (!success)

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Response;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.activemq.util.ByteArrayInputStream;
@ -62,11 +60,11 @@ public class HttpClientTransport extends HttpTransportSupport {
super(wireFormat, remoteUrl);
}
public FutureResponse asyncRequest(Command command) throws IOException {
public FutureResponse asyncRequest(Object command) throws IOException {
return null;
}
public void oneway(Command command) throws IOException {
public void oneway(Object command) throws IOException {
if( isStopped() ) {
throw new IOException("stopped.");
@ -96,7 +94,7 @@ public class HttpClientTransport extends HttpTransportSupport {
}
}
public Response request(Command command) throws IOException {
public Object request(Object command) throws IOException {
return null;
}
@ -131,7 +129,7 @@ public class HttpClientTransport extends HttpTransportSupport {
else {
// checkSession(httpMethod);
DataInputStream stream = new DataInputStream(httpMethod.getResponseBodyAsStream());
Command command = (Command) getTextWireFormat().unmarshal(stream);
Object command = (Object) getTextWireFormat().unmarshal(stream);
if (command == null) {
log.warn("Received null command from url: " + remoteUrl);
} else {

View File

@ -53,7 +53,8 @@ public class HttpTransport extends HttpTransportSupport {
url = new URL(remoteUrl.toString());
}
public void oneway(Command command) throws IOException {
public void oneway(Object o) throws IOException {
final Command command = (Command) o;
try {
if (command.getDataStructureType()==ConnectionInfo.DATA_STRUCTURE_TYPE) {
boolean startGetThread = clientID==null;