From 8c5c33bfc2e581edcba0c164fb0cb7f892e65b3f Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 6 Dec 2012 21:22:21 +0000 Subject: [PATCH] AMQP: Clean up old commented code, pick up the trace setting from the transport filter. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1418089 13f79535-47bb-0310-9956-ffa450edef68 --- .../transport/amqp/AmqpProtocolConverter.java | 219 +----------------- .../transport/amqp/AmqpTransport.java | 2 + 2 files changed, 6 insertions(+), 215 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 37d41fc895..2fdafa3a4a 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -65,39 +65,16 @@ class AmqpProtocolConverter { private static final UnsignedInteger DURABLE = new UnsignedInteger(2); private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED"); - public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) { - this.amqpTransport = amqpTransport; - } + int prefetch = 100; ReentrantLock lock = new ReentrantLock(); - -// -// private static final Buffer PING_RESP_FRAME = new PINGRESP().encode(); -// -// -// private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); -// private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); -// -// private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); -// private final ConcurrentHashMap amqpSubscriptionByTopic = new ConcurrentHashMap(); -// private final Map activeMQTopicMap = new LRUCache(); -// private final Map amqpTopicMap = new LRUCache(); -// private final Map consumerAcks = new LRUCache(); -// private final Map publisherRecs = new LRUCache(); -// -// private final AtomicBoolean connected = new AtomicBoolean(false); -// private CONNECT connect; -// private String clientId; -// private final String QOS_PROPERTY_NAME = "QoSPropertyName"; - int prefetch = 100; - boolean trace = true; - TransportImpl protonTransport = new TransportImpl(); ConnectionImpl protonConnection = new ConnectionImpl(); - { + public AmqpProtocolConverter(AmqpTransport transport, BrokerContext brokerContext) { + this.amqpTransport = transport; this.protonTransport.bind(this.protonConnection); - if( trace ) { + if( transport.isTrace() ) { this.protonTransport.setProtocolTracer(new ProtocolTracer() { @Override public void receivedFrame(TransportFrame transportFrame) { @@ -351,18 +328,6 @@ class AmqpProtocolConverter { connectionInfo.setClientId(clientId); } - -// String userName = ""; -// if (connect.userName() != null) { -// userName = connect.userName().toString(); -// } -// String passswd = ""; -// if (connect.password() != null) { -// passswd = connect.password().toString(); -// } -// connectionInfo.setUserName(userName); -// connectionInfo.setPassword(passswd); - connectionInfo.setTransportContext(amqpTransport.getPeerCertificates()); sendToActiveMQ(connectionInfo, new ResponseHandler() { @@ -1020,182 +985,6 @@ class AmqpProtocolConverter { return rc; } -// void onUnSubscribe(UNSUBSCRIBE command) { -// UTF8Buffer[] topics = command.topics(); -// if (topics != null) { -// for (int i = 0; i < topics.length; i++) { -// onUnSubscribe(topics[i]); -// } -// } -// UNSUBACK ack = new UNSUBACK(); -// ack.messageId(command.messageId()); -// pumpOut(ack.encode()); -// -// } -// -// void onUnSubscribe(UTF8Buffer topicName) { -// AmqpSubscription subs = amqpSubscriptionByTopic.remove(topicName); -// if (subs != null) { -// ConsumerInfo info = subs.getConsumerInfo(); -// if (info != null) { -// subscriptionsByConsumerId.remove(info.getConsumerId()); -// } -// RemoveInfo removeInfo = info.createRemoveCommand(); -// sendToActiveMQ(removeInfo, null); -// } -// } -// -// -// /** -// * Dispatch a ActiveMQ command -// */ -// -// -// -// void onAMQPPublish(PUBLISH command) throws IOException, JMSException { -// checkConnected(); -// } -// -// void onAMQPPubAck(PUBACK command) { -// short messageId = command.messageId(); -// MessageAck ack; -// synchronized (consumerAcks) { -// ack = consumerAcks.remove(messageId); -// } -// if (ack != null) { -// amqpTransport.sendToActiveMQ(ack); -// } -// } -// -// void onAMQPPubRec(PUBREC commnand) { -// //from a subscriber - send a PUBREL in response -// PUBREL pubrel = new PUBREL(); -// pubrel.messageId(commnand.messageId()); -// pumpOut(pubrel.encode()); -// } -// -// void onAMQPPubRel(PUBREL command) { -// PUBREC ack; -// synchronized (publisherRecs) { -// ack = publisherRecs.remove(command.messageId()); -// } -// if (ack == null) { -// LOG.warn("Unknown PUBREL: " + command.messageId() + " received"); -// } -// PUBCOMP pubcomp = new PUBCOMP(); -// pubcomp.messageId(command.messageId()); -// pumpOut(pubcomp.encode()); -// } -// -// void onAMQPPubComp(PUBCOMP command) { -// short messageId = command.messageId(); -// MessageAck ack; -// synchronized (consumerAcks) { -// ack = consumerAcks.remove(messageId); -// } -// if (ack != null) { -// amqpTransport.sendToActiveMQ(ack); -// } -// } -// -// -// -// -// public AmqpTransport amqpTransport { -// return amqpTransport; -// } -// -// -// -// void configureInactivityMonitor(short heartBeat) { -// try { -// -// int heartBeatMS = heartBeat * 1000; -// AmqpInactivityMonitor monitor = amqpTransport.getInactivityMonitor(); -// monitor.setProtocolConverter(this); -// monitor.setReadCheckTime(heartBeatMS); -// monitor.setInitialDelayTime(heartBeatMS); -// monitor.startMonitorThread(); -// -// } catch (Exception ex) { -// LOG.warn("Failed to start AMQP InactivityMonitor ", ex); -// } -// -// LOG.debug(getClientId() + " AMQP Connection using heart beat of " + heartBeat + " secs"); -// } -// -// -// -// void checkConnected() throws AmqpProtocolException { -// if (!connected.get()) { -// throw new AmqpProtocolException("Not connected."); -// } -// } -// -// private String getClientId() { -// if (clientId == null) { -// if (connect != null && connect.clientId() != null) { -// clientId = connect.clientId().toString(); -// } -// } else { -// clientId = ""; -// } -// return clientId; -// } -// -// private void stopTransport() { -// try { -// amqpTransport.stop(); -// } catch (Throwable e) { -// LOG.debug("Failed to stop AMQP transport ", e); -// } -// } -// -// ResponseHandler createResponseHandler(final PUBLISH command) { -// -// if (command != null) { -// switch (command.qos()) { -// case AT_LEAST_ONCE: -// return new ResponseHandler() { -// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { -// if (response.isException()) { -// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException()); -// } else { -// PUBACK ack = new PUBACK(); -// ack.messageId(command.messageId()); -// converter.amqpTransport.sendToAmqp(ack.encode()); -// } -// } -// }; -// case EXACTLY_ONCE: -// return new ResponseHandler() { -// public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException { -// if (response.isException()) { -// LOG.warn("Failed to send AMQP Publish: ", command, ((ExceptionResponse) response).getException()); -// } else { -// PUBREC ack = new PUBREC(); -// ack.messageId(command.messageId()); -// synchronized (publisherRecs) { -// publisherRecs.put(command.messageId(), ack); -// } -// converter.amqpTransport.sendToAmqp(ack.encode()); -// } -// } -// }; -// case AT_MOST_ONCE: -// break; -// } -// } -// return null; -// } -// -// private String convertAMQPToActiveMQ(String name) { -// String result = name.replace('#', '>'); -// result = result.replace('+', '*'); -// result = result.replace('/', '.'); -// return result; -// } - //////////////////////////////////////////////////////////////////////////// // // Implementation methods diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java index 3ba38245dc..50d50853ec 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransport.java @@ -45,4 +45,6 @@ public interface AmqpTransport { public String getRemoteAddress(); + public boolean isTrace(); + }