mirror of https://github.com/apache/activemq.git
apply patch for https://issues.apache.org/jira/browse/AMQ-3074
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1143535 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f232ceced0
commit
1ec990575d
|
@ -201,7 +201,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the number of messages to be dispatched to this connection
|
* Returns the number of messages to be dispatched to this connection
|
||||||
*
|
*
|
||||||
* @return size of dispatch queue
|
* @return size of dispatch queue
|
||||||
*/
|
*/
|
||||||
public int getDispatchQueueSize() {
|
public int getDispatchQueueSize() {
|
||||||
|
@ -249,7 +249,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
* Calls the serviceException method in an async thread. Since handling a
|
* Calls the serviceException method in an async thread. Since handling a
|
||||||
* service exception closes a socket, we should not tie up broker threads
|
* service exception closes a socket, we should not tie up broker threads
|
||||||
* since client sockets may hang or cause deadlocks.
|
* since client sockets may hang or cause deadlocks.
|
||||||
*
|
*
|
||||||
* @param e
|
* @param e
|
||||||
*/
|
*/
|
||||||
public void serviceExceptionAsync(final IOException e) {
|
public void serviceExceptionAsync(final IOException e) {
|
||||||
|
@ -700,7 +700,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
if (info.getClientIp() == null) {
|
if (info.getClientIp() == null) {
|
||||||
info.setClientIp(getRemoteAddress());
|
info.setClientIp(getRemoteAddress());
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
broker.addConnection(context, info);
|
broker.addConnection(context, info);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -912,7 +912,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
info.setPeerBrokerInfos(null);
|
info.setPeerBrokerInfos(null);
|
||||||
}
|
}
|
||||||
dispatchAsync(info);
|
dispatchAsync(info);
|
||||||
|
|
||||||
connector.onStarted(this);
|
connector.onStarted(this);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -1160,7 +1160,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
public synchronized boolean isNetworkConnection() {
|
public synchronized boolean isNetworkConnection() {
|
||||||
return networkConnection;
|
return networkConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isFaultTolerantConnection() {
|
public boolean isFaultTolerantConnection() {
|
||||||
return this.faultTolerantConnection;
|
return this.faultTolerantConnection;
|
||||||
}
|
}
|
||||||
|
@ -1187,7 +1187,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
// passive ?
|
// passive ?
|
||||||
boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
|
boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
|
||||||
if (passive == false) {
|
if (passive == false) {
|
||||||
|
|
||||||
// stream messages from this broker (the master) to
|
// stream messages from this broker (the master) to
|
||||||
// the slave
|
// the slave
|
||||||
MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
|
MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
|
||||||
|
@ -1212,7 +1212,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
// It's possible in case of brief network fault to have this transport connector side of the connection always active
|
// It's possible in case of brief network fault to have this transport connector side of the connection always active
|
||||||
// and the duplex network connector side wanting to open a new one
|
// and the duplex network connector side wanting to open a new one
|
||||||
// In this case, the old connection must be broken
|
// In this case, the old connection must be broken
|
||||||
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
|
String duplexNetworkConnectorId = config.getName() + "@" + info.getBrokerId();
|
||||||
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
|
CopyOnWriteArrayList<TransportConnection> connections = this.connector.getConnections();
|
||||||
synchronized (connections) {
|
synchronized (connections) {
|
||||||
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
|
for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
|
||||||
|
@ -1296,7 +1296,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void updateClient(ConnectionControl control) {
|
public void updateClient(ConnectionControl control) {
|
||||||
if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
|
if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
|
||||||
&& this.wireFormatInfo.getVersion() >= 6) {
|
&& this.wireFormatInfo.getVersion() >= 6) {
|
||||||
|
@ -1309,7 +1309,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
synchronized (producerExchanges) {
|
synchronized (producerExchanges) {
|
||||||
result = new ProducerBrokerExchange();
|
result = new ProducerBrokerExchange();
|
||||||
TransportConnectionState state = lookupConnectionState(id);
|
TransportConnectionState state = lookupConnectionState(id);
|
||||||
context = state.getContext();
|
context = state.getContext();
|
||||||
if (context.isReconnect()) {
|
if (context.isReconnect()) {
|
||||||
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
|
result.setLastStoredSequenceId(broker.getBrokerService().getPersistenceAdapter().getLastProducerSequenceId(id));
|
||||||
|
@ -1451,7 +1451,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
|
||||||
protected synchronized String getDuplexNetworkConnectorId() {
|
protected synchronized String getDuplexNetworkConnectorId() {
|
||||||
return this.duplexNetworkConnectorId;
|
return this.duplexNetworkConnectorId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isStopping() {
|
||||||
|
return stopping.get();
|
||||||
|
}
|
||||||
|
|
||||||
protected CountDownLatch getStopped() {
|
protected CountDownLatch getStopped() {
|
||||||
return stopped;
|
return stopped;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue