mirror of https://github.com/apache/activemq.git
AMQ-7446 - Remove some synchronized blocks in the MessageServlet
This commit is contained in:
parent
05c43fe347
commit
8f6cfa4190
|
@ -32,8 +32,9 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.HashMap;
|
import java.util.Map;
|
||||||
import java.util.HashSet;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A servlet for sending and receiving messages to/from JMS destinations using
|
* A servlet for sending and receiving messages to/from JMS destinations using
|
||||||
|
@ -68,8 +69,8 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
private long requestTimeout = 1000;
|
private long requestTimeout = 1000;
|
||||||
private String defaultContentType;
|
private String defaultContentType;
|
||||||
|
|
||||||
private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
|
private final Map<String, WebClient> clients = new ConcurrentHashMap<>();
|
||||||
private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
|
private final Set<MessageAvailableConsumer> activeConsumers = ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() throws ServletException {
|
public void init() throws ServletException {
|
||||||
|
@ -107,7 +108,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
|
|
||||||
String action = request.getParameter("action");
|
String action = request.getParameter("action");
|
||||||
String clientId = request.getParameter("clientId");
|
String clientId = request.getParameter("clientId");
|
||||||
if (action != null && clientId != null && action.equals("unsubscribe")) {
|
if (clientId != null && "unsubscribe".equals(action)) {
|
||||||
LOG.info("Unsubscribing client " + clientId);
|
LOG.info("Unsubscribing client " + clientId);
|
||||||
WebClient client = getWebClient(request);
|
WebClient client = getWebClient(request);
|
||||||
client.close();
|
client.close();
|
||||||
|
@ -149,7 +150,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Supports a HTTP DELETE to be equivalent of consuming a singe message
|
* Supports a HTTP DELETE to be equivalent of consuming a single message
|
||||||
* from a queue
|
* from a queue
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -158,7 +159,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Supports a HTTP DELETE to be equivalent of consuming a singe message
|
* Supports a HTTP DELETE to be equivalent of consuming a single message
|
||||||
* from a queue
|
* from a queue
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
|
@ -188,14 +189,8 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
|
|
||||||
// Don't allow concurrent use of the consumer. Do make sure to allow
|
// Don't allow concurrent use of the consumer. Do make sure to allow
|
||||||
// subsequent calls on continuation to use the consumer.
|
// subsequent calls on continuation to use the consumer.
|
||||||
if (continuation.isInitial()) {
|
if (continuation.isInitial() && !activeConsumers.add(consumer)) {
|
||||||
synchronized (activeConsumers) {
|
throw new ServletException("Concurrent access to consumer is not supported");
|
||||||
if (activeConsumers.contains(consumer)) {
|
|
||||||
throw new ServletException("Concurrent access to consumer is not supported");
|
|
||||||
} else {
|
|
||||||
activeConsumers.add(consumer);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Message message = null;
|
Message message = null;
|
||||||
|
@ -234,9 +229,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
writeResponse(request, response, message);
|
writeResponse(request, response, message);
|
||||||
closeConsumerOnOneShot(request, client, destination);
|
closeConsumerOnOneShot(request, client, destination);
|
||||||
|
|
||||||
synchronized (activeConsumers) {
|
activeConsumers.remove(consumer);
|
||||||
activeConsumers.remove(consumer);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (JMSException e) {
|
} catch (JMSException e) {
|
||||||
throw new ServletException("Could not post JMS message: " + e, e);
|
throw new ServletException("Could not post JMS message: " + e, e);
|
||||||
|
@ -260,9 +253,7 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
}
|
}
|
||||||
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
|
||||||
closeConsumerOnOneShot(request, client, destination);
|
closeConsumerOnOneShot(request, client, destination);
|
||||||
synchronized (activeConsumers) {
|
activeConsumers.remove(consumer);
|
||||||
activeConsumers.remove(consumer);
|
|
||||||
}
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,17 +352,8 @@ public class MessageServlet extends MessageServletSupport {
|
||||||
public WebClient getWebClient(HttpServletRequest request) {
|
public WebClient getWebClient(HttpServletRequest request) {
|
||||||
String clientId = request.getParameter("clientId");
|
String clientId = request.getParameter("clientId");
|
||||||
if (clientId != null) {
|
if (clientId != null) {
|
||||||
synchronized (this) {
|
LOG.debug("Getting local client [" + clientId + "]");
|
||||||
LOG.debug("Getting local client [" + clientId + "]");
|
return clients.computeIfAbsent(clientId, k -> new WebClient());
|
||||||
WebClient client = clients.get(clientId);
|
|
||||||
if (client == null) {
|
|
||||||
LOG.debug("Creating new client [" + clientId + "]");
|
|
||||||
client = new WebClient();
|
|
||||||
clients.put(clientId, client);
|
|
||||||
}
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
return WebClient.getWebClient(request);
|
return WebClient.getWebClient(request);
|
||||||
}
|
}
|
||||||
|
|
|
@ -345,7 +345,7 @@ public abstract class MessageServletSupport extends HttpServlet {
|
||||||
LOG.debug("Content-Type={}", contentType);
|
LOG.debug("Content-Type={}", contentType);
|
||||||
// lets read the message body instead
|
// lets read the message body instead
|
||||||
BufferedReader reader = request.getReader();
|
BufferedReader reader = request.getReader();
|
||||||
StringBuffer buffer = new StringBuffer();
|
StringBuilder buffer = new StringBuilder();
|
||||||
while (true) {
|
while (true) {
|
||||||
String line = reader.readLine();
|
String line = reader.readLine();
|
||||||
if (line == null) {
|
if (line == null) {
|
||||||
|
|
Loading…
Reference in New Issue