From ceed0119071a879416d4336eb83d8a4385e80ba5 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Thu, 18 Feb 2010 14:20:03 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2617 - refactor ProtocolConverter so we can reuse it git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@911413 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/stomp/ProtocolConverter.java | 24 ++++++------ .../transport/stomp/StompSubscription.java | 4 +- .../transport/stomp/StompTransport.java | 38 +++++++++++++++++++ .../transport/stomp/StompTransportFilter.java | 2 +- 4 files changed, 53 insertions(+), 15 deletions(-) create mode 100644 activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index b2a01f30d9..c553846c21 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -82,7 +82,7 @@ public class ProtocolConverter { private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); private final Map transactions = new ConcurrentHashMap(); - private final StompTransportFilter transportFilter; + private final StompTransport stompTransport; private final Object commnadIdMutex = new Object(); private int lastCommandId; @@ -91,8 +91,8 @@ public class ProtocolConverter { private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); private final ApplicationContext applicationContext; - public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) { - this.transportFilter = stompTransportFilter; + public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, ApplicationContext applicationContext) { + this.stompTransport = stompTransport; this.frameTranslator = translator; this.applicationContext = applicationContext; } @@ -118,7 +118,7 @@ public class ProtocolConverter { sc.setAction(Stomp.Responses.RECEIPT); sc.setHeaders(new HashMap(1)); sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); - transportFilter.sendToStomp(sc); + stompTransport.sendToStomp(sc); } } }; @@ -132,11 +132,11 @@ public class ProtocolConverter { command.setResponseRequired(true); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); } - transportFilter.sendToActiveMQ(command); + stompTransport.sendToActiveMQ(command); } protected void sendToStomp(StompFrame command) throws IOException { - transportFilter.sendToStomp(command); + stompTransport.sendToStomp(command); } protected FrameTranslator findTranslator(String header) { @@ -195,7 +195,7 @@ public class ProtocolConverter { handleException(e, command); // Some protocol errors can cause the connection to get closed. if( e.isFatal() ) { - getTransportFilter().onException(e); + getStompTransport().onException(e); } } } @@ -492,7 +492,7 @@ public class ProtocolConverter { connectionInfo.setResponseRequired(true); connectionInfo.setUserName(login); connectionInfo.setPassword(passcode); - connectionInfo.setTransportContext(transportFilter.getPeerCertificates()); + connectionInfo.setTransportContext(stompTransport.getPeerCertificates()); sendToActiveMQ(connectionInfo, new ResponseHandler() { public void onResponse(ProtocolConverter converter, Response response) throws IOException { @@ -501,7 +501,7 @@ public class ProtocolConverter { // If the connection attempt fails we close the socket. Throwable exception = ((ExceptionResponse)response).getException(); handleException(exception, command); - getTransportFilter().onException(IOExceptionSupport.create(exception)); + getStompTransport().onException(IOExceptionSupport.create(exception)); return; } @@ -516,7 +516,7 @@ public class ProtocolConverter { // If the connection attempt fails we close the socket. Throwable exception = ((ExceptionResponse)response).getException(); handleException(exception, command); - getTransportFilter().onException(IOExceptionSupport.create(exception)); + getStompTransport().onException(IOExceptionSupport.create(exception)); } connected.set(true); @@ -605,8 +605,8 @@ public class ProtocolConverter { } } - public StompTransportFilter getTransportFilter() { - return transportFilter; + public StompTransport getStompTransport() { + return stompTransport; } public ActiveMQDestination createTempQueue(String name) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 4fe7e5d1da..56e872e6fe 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -75,7 +75,7 @@ public class StompSubscription { } } else if (ackMode == AUTO_ACK) { MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); - protocolConverter.getTransportFilter().sendToActiveMQ(ack); + protocolConverter.getStompTransport().sendToActiveMQ(ack); } boolean ignoreTransformation = false; @@ -96,7 +96,7 @@ public class StompSubscription { command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); } - protocolConverter.getTransportFilter().sendToStomp(command); + protocolConverter.getStompTransport().sendToStomp(command); } synchronized void onStompAbort(TransactionId transactionId) { diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java new file mode 100644 index 0000000000..04c499be14 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.stomp; + +import java.io.IOException; +import java.security.cert.X509Certificate; + +import org.apache.activemq.command.Command; + +/** + * Basic interface that mediates between protocol converter and transport + * + */ +public interface StompTransport { + + public void sendToActiveMQ(Command command); + + public void sendToStomp(StompFrame command) throws IOException; + + public X509Certificate[] getPeerCertificates(); + + public void onException(IOException error); + +} diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java index c07112a8e8..4af2e27f56 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java @@ -40,7 +40,7 @@ import org.springframework.context.ApplicationContext; * * @author chirino */ -public class StompTransportFilter extends TransportFilter { +public class StompTransportFilter extends TransportFilter implements StompTransport { private static final Log LOG = LogFactory.getLog(StompTransportFilter.class); private final ProtocolConverter protocolConverter; private final FrameTranslator frameTranslator;