NO-JIRA: remove unused methods and related dependencies left from initial copy plus some other cleanup

This commit is contained in:
Robbie Gemmell 2022-06-20 16:37:04 +01:00
parent fd345f0a03
commit 78587f0a82
5 changed files with 38 additions and 115 deletions

View File

@ -35,10 +35,6 @@
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@ -67,28 +63,6 @@
<groupId>jakarta.jms</groupId>
<artifactId>jakarta.jms-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.management.j2ee</groupId>
<artifactId>jakarta.management.j2ee-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.fusesource.hawtbuf</groupId>
<artifactId>hawtbuf</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -16,17 +16,12 @@
*/
package org.apache.activemq.transport.amqp;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.Map;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.fusesource.hawtbuf.Buffer;
/**
* Set of useful methods and definitions used in the AMQP protocol handling
@ -118,84 +113,4 @@ public class AmqpSupport {
return null;
}
/**
* Conversion from Java ByteBuffer to a HawtBuf buffer.
*
* @param data the ByteBuffer instance to convert.
* @return a new HawtBuf buffer converted from the given ByteBuffer.
*/
public static Buffer toBuffer(ByteBuffer data) {
if (data == null) {
return null;
}
Buffer rc;
if (data.isDirect()) {
rc = new Buffer(data.remaining());
data.get(rc.data);
} else {
rc = new Buffer(data);
data.position(data.position() + data.remaining());
}
return rc;
}
/**
* Given a long value, convert it to a byte array for marshalling.
*
* @param value the value to convert.
* @return a new byte array that holds the big endian value of the long.
*/
public static byte[] toBytes(long value) {
Buffer buffer = new Buffer(8);
buffer.bigEndianEditor().writeLong(value);
return buffer.data;
}
/**
* Converts a Binary value to a long assuming that the contained value is
* stored in Big Endian encoding.
*
* @param value the Binary object whose payload is converted to a long.
* @return a long value constructed from the bytes of the Binary instance.
*/
public static long toLong(Binary value) {
Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
return buffer.bigEndianEditor().readLong();
}
/**
* Given an AMQP endpoint, deduce the appropriate ActiveMQDestination type and create
* a new instance. By default if the endpoint address does not carry the standard prefix
* value then we default to a Queue type destination. If the endpoint is null or is an
* AMQP Coordinator type endpoint this method returns null to indicate no destination
* can be mapped.
*
* @param endpoint the AMQP endpoint to construct an ActiveMQDestination from.
* @return a new ActiveMQDestination that best matches the address of the given endpoint
* @throws AmqpProtocolException if an error occurs while deducing the destination type.
*/
public static ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
if (endpoint == null) {
return null;
} else if (endpoint instanceof Coordinator) {
return null;
} else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
} else {
throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
}
}
return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
} else {
throw new RuntimeException("Unexpected terminus type: " + endpoint);
}
}
}

View File

@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
import org.apache.activemq.transport.netty.NettyTransportListener;
@ -614,7 +613,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
pumpToProtonTransport();
if (protonTransport.isClosed()) {
LOG.debug("Transport closed after inactivity check.");
throw new InactivityIOException("Channel was inactive for too long");
throw new IllegalStateException("Channel was inactive for too long");
} else {
if (deadline != 0) {
getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS);

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.client;
import javax.jms.JMSException;
/**
* Exception that indicates a blocking operation timed out while waiting
* for the remote to acknowledge or process it.
*/
public class AmqpOperationTimedOutException extends JMSException {
private static final long serialVersionUID = -2509921066407708297L;
public AmqpOperationTimedOutException(String reason) {
super(reason);
}
public AmqpOperationTimedOutException(String reason, String errorCode) {
super(reason, errorCode);
}
}

View File

@ -36,7 +36,6 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
import org.apache.qpid.jms.JmsOperationTimedOutException;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
@ -1043,7 +1042,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
@Override
public void run() {
LOG.trace("Consumer {} drain request timed out", this);
Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
Exception cause = new AmqpOperationTimedOutException("Remote did not respond to a drain request in time");
locallyClosed(session.getConnection(), cause);
stopRequest.onFailure(cause);
session.pumpToProtonTransport(stopRequest);