https://issues.apache.org/activemq/browse/AMQ-2617 - refactor ProtocolConverter so we can reuse it

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@911413 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-02-18 14:20:03 +00:00
parent c75001c5a4
commit ceed011907
4 changed files with 53 additions and 15 deletions

View File

@ -82,7 +82,7 @@ public class ProtocolConverter {
private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>(); private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>(); private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>(); private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final StompTransportFilter transportFilter; private final StompTransport stompTransport;
private final Object commnadIdMutex = new Object(); private final Object commnadIdMutex = new Object();
private int lastCommandId; private int lastCommandId;
@ -91,8 +91,8 @@ public class ProtocolConverter {
private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/"); private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
private final ApplicationContext applicationContext; private final ApplicationContext applicationContext;
public ProtocolConverter(StompTransportFilter stompTransportFilter, FrameTranslator translator, ApplicationContext applicationContext) { public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, ApplicationContext applicationContext) {
this.transportFilter = stompTransportFilter; this.stompTransport = stompTransport;
this.frameTranslator = translator; this.frameTranslator = translator;
this.applicationContext = applicationContext; this.applicationContext = applicationContext;
} }
@ -118,7 +118,7 @@ public class ProtocolConverter {
sc.setAction(Stomp.Responses.RECEIPT); sc.setAction(Stomp.Responses.RECEIPT);
sc.setHeaders(new HashMap<String, String>(1)); sc.setHeaders(new HashMap<String, String>(1));
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
transportFilter.sendToStomp(sc); stompTransport.sendToStomp(sc);
} }
} }
}; };
@ -132,11 +132,11 @@ public class ProtocolConverter {
command.setResponseRequired(true); command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);
} }
transportFilter.sendToActiveMQ(command); stompTransport.sendToActiveMQ(command);
} }
protected void sendToStomp(StompFrame command) throws IOException { protected void sendToStomp(StompFrame command) throws IOException {
transportFilter.sendToStomp(command); stompTransport.sendToStomp(command);
} }
protected FrameTranslator findTranslator(String header) { protected FrameTranslator findTranslator(String header) {
@ -195,7 +195,7 @@ public class ProtocolConverter {
handleException(e, command); handleException(e, command);
// Some protocol errors can cause the connection to get closed. // Some protocol errors can cause the connection to get closed.
if( e.isFatal() ) { if( e.isFatal() ) {
getTransportFilter().onException(e); getStompTransport().onException(e);
} }
} }
} }
@ -492,7 +492,7 @@ public class ProtocolConverter {
connectionInfo.setResponseRequired(true); connectionInfo.setResponseRequired(true);
connectionInfo.setUserName(login); connectionInfo.setUserName(login);
connectionInfo.setPassword(passcode); connectionInfo.setPassword(passcode);
connectionInfo.setTransportContext(transportFilter.getPeerCertificates()); connectionInfo.setTransportContext(stompTransport.getPeerCertificates());
sendToActiveMQ(connectionInfo, new ResponseHandler() { sendToActiveMQ(connectionInfo, new ResponseHandler() {
public void onResponse(ProtocolConverter converter, Response response) throws IOException { public void onResponse(ProtocolConverter converter, Response response) throws IOException {
@ -501,7 +501,7 @@ public class ProtocolConverter {
// If the connection attempt fails we close the socket. // If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse)response).getException(); Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, command); handleException(exception, command);
getTransportFilter().onException(IOExceptionSupport.create(exception)); getStompTransport().onException(IOExceptionSupport.create(exception));
return; return;
} }
@ -516,7 +516,7 @@ public class ProtocolConverter {
// If the connection attempt fails we close the socket. // If the connection attempt fails we close the socket.
Throwable exception = ((ExceptionResponse)response).getException(); Throwable exception = ((ExceptionResponse)response).getException();
handleException(exception, command); handleException(exception, command);
getTransportFilter().onException(IOExceptionSupport.create(exception)); getStompTransport().onException(IOExceptionSupport.create(exception));
} }
connected.set(true); connected.set(true);
@ -605,8 +605,8 @@ public class ProtocolConverter {
} }
} }
public StompTransportFilter getTransportFilter() { public StompTransport getStompTransport() {
return transportFilter; return stompTransport;
} }
public ActiveMQDestination createTempQueue(String name) { public ActiveMQDestination createTempQueue(String name) {

View File

@ -75,7 +75,7 @@ public class StompSubscription {
} }
} else if (ackMode == AUTO_ACK) { } else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1); MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
protocolConverter.getTransportFilter().sendToActiveMQ(ack); protocolConverter.getStompTransport().sendToActiveMQ(ack);
} }
boolean ignoreTransformation = false; boolean ignoreTransformation = false;
@ -96,7 +96,7 @@ public class StompSubscription {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId); command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
} }
protocolConverter.getTransportFilter().sendToStomp(command); protocolConverter.getStompTransport().sendToStomp(command);
} }
synchronized void onStompAbort(TransactionId transactionId) { synchronized void onStompAbort(TransactionId transactionId) {

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import java.io.IOException;
import java.security.cert.X509Certificate;
import org.apache.activemq.command.Command;
/**
* Basic interface that mediates between protocol converter and transport
*
*/
public interface StompTransport {
public void sendToActiveMQ(Command command);
public void sendToStomp(StompFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();
public void onException(IOException error);
}

View File

@ -40,7 +40,7 @@ import org.springframework.context.ApplicationContext;
* *
* @author <a href="http://hiramchirino.com">chirino</a> * @author <a href="http://hiramchirino.com">chirino</a>
*/ */
public class StompTransportFilter extends TransportFilter { public class StompTransportFilter extends TransportFilter implements StompTransport {
private static final Log LOG = LogFactory.getLog(StompTransportFilter.class); private static final Log LOG = LogFactory.getLog(StompTransportFilter.class);
private final ProtocolConverter protocolConverter; private final ProtocolConverter protocolConverter;
private final FrameTranslator frameTranslator; private final FrameTranslator frameTranslator;