mirror of https://github.com/apache/activemq.git
Implements AMQ-5054: Display the number of active transactions and age of oldest transaction on a Connection's JMX info
This commit is contained in:
parent
ef82a4b8b4
commit
93ca04468b
|
@ -16,13 +16,14 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.broker;
|
package org.apache.activemq.broker;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.broker.region.ConnectionStatistics;
|
import org.apache.activemq.broker.region.ConnectionStatistics;
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.command.ConnectionControl;
|
import org.apache.activemq.command.ConnectionControl;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
@ -119,4 +120,19 @@ public interface Connection extends Service {
|
||||||
|
|
||||||
void updateClient(ConnectionControl control);
|
void updateClient(ConnectionControl control);
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active transactions established on this Connection.
|
||||||
|
*
|
||||||
|
* @return the number of active transactions established on this Connection..
|
||||||
|
*/
|
||||||
|
public int getActiveTransactionCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active transactions established on this Connection.
|
||||||
|
*
|
||||||
|
* @return the number of active transactions established on this Connection..
|
||||||
|
*/
|
||||||
|
public Long getOldestActiveTransactionDuration();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,7 @@ import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.HashMap;
|
import java.util.*;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Properties;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -379,6 +374,33 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getActiveTransactionCount() {
|
||||||
|
int rc = 0;
|
||||||
|
for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
|
||||||
|
Collection<TransactionState> transactions = cs.getTransactionStates();
|
||||||
|
for (TransactionState transaction : transactions) {
|
||||||
|
rc++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getOldestActiveTransactionDuration() {
|
||||||
|
TransactionState oldestTX = null;
|
||||||
|
for (TransportConnectionState cs : connectionStateRegister.listConnectionStates()) {
|
||||||
|
Collection<TransactionState> transactions = cs.getTransactionStates();
|
||||||
|
for (TransactionState transaction : transactions) {
|
||||||
|
if( oldestTX ==null || oldestTX.getCreatedAt() < transaction.getCreatedAt() ) {
|
||||||
|
oldestTX = transaction;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if( oldestTX == null ) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis() - oldestTX.getCreatedAt();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Response processEndTransaction(TransactionInfo info) throws Exception {
|
public Response processEndTransaction(TransactionInfo info) throws Exception {
|
||||||
// No need to do anything. This packet is just sent by the client
|
// No need to do anything. This packet is just sent by the client
|
||||||
|
|
|
@ -166,4 +166,14 @@ public class ConnectionView implements ConnectionViewMBean {
|
||||||
throw IOExceptionSupport.create(e);
|
throw IOExceptionSupport.create(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveTransactionCount() {
|
||||||
|
return connection.getActiveTransactionCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getOldestActiveTransactionDuration() {
|
||||||
|
return connection.getOldestActiveTransactionDuration();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,10 @@ package org.apache.activemq.broker.jmx;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
|
import org.apache.activemq.broker.TransportConnectionState;
|
||||||
|
import org.apache.activemq.state.TransactionState;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
public interface ConnectionViewMBean extends Service {
|
public interface ConnectionViewMBean extends Service {
|
||||||
/**
|
/**
|
||||||
|
@ -99,4 +103,19 @@ public interface ConnectionViewMBean extends Service {
|
||||||
@MBeanInfo("The ObjectNames of all Producers created by this Connection")
|
@MBeanInfo("The ObjectNames of all Producers created by this Connection")
|
||||||
ObjectName[] getProducers();
|
ObjectName[] getProducers();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active transactions established on this Connection.
|
||||||
|
*
|
||||||
|
* @return the number of active transactions established on this Connection..
|
||||||
|
*/
|
||||||
|
@MBeanInfo("The number of active transactions established on this Connection.")
|
||||||
|
public int getActiveTransactionCount();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the number of active transactions established on this Connection.
|
||||||
|
*
|
||||||
|
* @return the number of active transactions established on this Connection..
|
||||||
|
*/
|
||||||
|
@MBeanInfo("The age in ms of the oldest active transaction established on this Connection.")
|
||||||
|
public Long getOldestActiveTransactionDuration();
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class TransactionState {
|
||||||
private boolean prepared;
|
private boolean prepared;
|
||||||
private int preparedResult;
|
private int preparedResult;
|
||||||
private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
|
private final Map<ProducerId, ProducerState> producers = new ConcurrentHashMap<ProducerId, ProducerState>();
|
||||||
|
private final long createdAt = System.currentTimeMillis();
|
||||||
|
|
||||||
public TransactionState(TransactionId id) {
|
public TransactionState(TransactionId id) {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
|
@ -92,4 +93,7 @@ public class TransactionState {
|
||||||
return producers;
|
return producers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getCreatedAt() {
|
||||||
|
return createdAt;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -197,6 +197,16 @@ public class QueueOptimizedDispatchExceptionTest {
|
||||||
@Override
|
@Override
|
||||||
public void dispatchAsync(Command command) {
|
public void dispatchAsync(Command command) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getActiveTransactionCount() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long getOldestActiveTransactionDuration() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
final DestinationStatistics destinationStatistics = new DestinationStatistics();
|
||||||
|
|
Loading…
Reference in New Issue