mirror of https://github.com/apache/activemq.git
http://jira.activemq.org/jira/browse/AMQ-613 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382934 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
44b00e9309
commit
202b3efb96
|
@ -29,14 +29,14 @@ import javax.resource.spi.endpoint.MessageEndpoint;
|
||||||
public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
||||||
|
|
||||||
private static final MessageEndpointState ALIVE = new MessageEndpointAlive();
|
private static final MessageEndpointState ALIVE = new MessageEndpointAlive();
|
||||||
private static final MessageEndpointState GOING_TO_DIE = new MessageEndpointInTheElectricChair();
|
|
||||||
private static final MessageEndpointState DEAD = new MessageEndpointDead();
|
private static final MessageEndpointState DEAD = new MessageEndpointDead();
|
||||||
|
|
||||||
|
|
||||||
private static int proxyCount = 0;
|
private static int proxyCount = 0;
|
||||||
private final int proxyID;
|
private final int proxyID;
|
||||||
|
|
||||||
private MessageEndpoint endpoint;
|
private final MessageEndpoint endpoint;
|
||||||
|
private final MessageListener messageListener;
|
||||||
private MessageEndpointState state = ALIVE;
|
private MessageEndpointState state = ALIVE;
|
||||||
|
|
||||||
private static int getID() {
|
private static int getID() {
|
||||||
|
@ -47,6 +47,7 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
||||||
if (!(endpoint instanceof MessageListener)) {
|
if (!(endpoint instanceof MessageListener)) {
|
||||||
throw new IllegalArgumentException("MessageEndpoint is not a MessageListener");
|
throw new IllegalArgumentException("MessageEndpoint is not a MessageListener");
|
||||||
}
|
}
|
||||||
|
messageListener = (MessageListener) endpoint;
|
||||||
proxyID = getID();
|
proxyID = getID();
|
||||||
this.endpoint = endpoint;
|
this.endpoint = endpoint;
|
||||||
}
|
}
|
||||||
|
@ -56,7 +57,6 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onMessage(Message message) {
|
public void onMessage(Message message) {
|
||||||
// log.warn("Delivery Count: " + getNextDeliveryCount() );
|
|
||||||
state.onMessage(this, message);
|
state.onMessage(this, message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,12 +117,7 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void onMessage(MessageEndpointProxy proxy, Message message) {
|
public void onMessage(MessageEndpointProxy proxy, Message message) {
|
||||||
try {
|
proxy.messageListener.onMessage(message);
|
||||||
((MessageListener) proxy.endpoint).onMessage(message);
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
transition(proxy, GOING_TO_DIE);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
|
public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
|
||||||
|
@ -139,28 +134,10 @@ public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MessageEndpointInTheElectricChair extends MessageEndpointState {
|
|
||||||
|
|
||||||
public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
|
|
||||||
try {
|
|
||||||
proxy.endpoint.afterDelivery();
|
|
||||||
} catch (ResourceException e) {
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
transition(proxy, DEAD);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void release(MessageEndpointProxy proxy) {
|
|
||||||
transition(proxy, DEAD);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class MessageEndpointDead extends MessageEndpointState {
|
private static class MessageEndpointDead extends MessageEndpointState {
|
||||||
|
|
||||||
protected void enter(MessageEndpointProxy proxy) {
|
protected void enter(MessageEndpointProxy proxy) {
|
||||||
proxy.endpoint.release();
|
proxy.endpoint.release();
|
||||||
proxy.endpoint = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void beforeDelivery(MessageEndpointProxy proxy, Method method) throws NoSuchMethodException, ResourceException {
|
public void beforeDelivery(MessageEndpointProxy proxy, Method method) throws NoSuchMethodException, ResourceException {
|
||||||
|
|
|
@ -76,6 +76,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
|
||||||
} catch (UnavailableException e) {
|
} catch (UnavailableException e) {
|
||||||
// The container could be limiting us on the number of endpoints
|
// The container could be limiting us on the number of endpoints
|
||||||
// that are being created.
|
// that are being created.
|
||||||
|
log.debug("Could not create an endpoint.", e);
|
||||||
session.close();
|
session.close();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -104,15 +105,19 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
|
||||||
} else {
|
} else {
|
||||||
// Are we at the upper limit?
|
// Are we at the upper limit?
|
||||||
if (activeSessions.size() >= maxSessions) {
|
if (activeSessions.size() >= maxSessions) {
|
||||||
// then reuse the allready created sessions..
|
// then reuse the already created sessions..
|
||||||
// This is going to queue up messages into a session for
|
// This is going to queue up messages into a session for
|
||||||
// processing.
|
// processing.
|
||||||
return getExistingServerSession();
|
return getExistingServerSession();
|
||||||
}
|
}
|
||||||
ServerSessionImpl ss = createServerSessionImpl();
|
ServerSessionImpl ss = createServerSessionImpl();
|
||||||
// We may not be able to create a session due to the conatiner
|
// We may not be able to create a session due to the container
|
||||||
// restricting us.
|
// restricting us.
|
||||||
if (ss == null) {
|
if (ss == null) {
|
||||||
|
if (idleSessions.size() == 0) {
|
||||||
|
throw new JMSException("Endpoint factory did not allows to any endpoints.");
|
||||||
|
}
|
||||||
|
|
||||||
return getExistingServerSession();
|
return getExistingServerSession();
|
||||||
}
|
}
|
||||||
activeSessions.addLast(ss);
|
activeSessions.addLast(ss);
|
||||||
|
|
Loading…
Reference in New Issue