Merge pull request #512 from coheigea/AMQ-7446

AMQ-7446 - Remove some synchronized blocks in the MessageServlet
This commit is contained in:
Jean-Baptiste Onofré 2020-05-12 09:40:11 +02:00 committed by GitHub
commit 4e7d8cd824
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 15 additions and 33 deletions

View File

@ -32,8 +32,9 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
@ -68,8 +69,8 @@ public class MessageServlet extends MessageServletSupport {
private long requestTimeout = 1000;
private String defaultContentType;
private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
private final Map<String, WebClient> clients = new ConcurrentHashMap<>();
private final Set<MessageAvailableConsumer> activeConsumers = ConcurrentHashMap.newKeySet();
@Override
public void init() throws ServletException {
@ -107,7 +108,7 @@ public class MessageServlet extends MessageServletSupport {
String action = request.getParameter("action");
String clientId = request.getParameter("clientId");
if (action != null && clientId != null && action.equals("unsubscribe")) {
if (clientId != null && "unsubscribe".equals(action)) {
LOG.info("Unsubscribing client " + clientId);
WebClient client = getWebClient(request);
client.close();
@ -149,7 +150,7 @@ public class MessageServlet extends MessageServletSupport {
}
/**
* Supports a HTTP DELETE to be equivalent of consuming a singe message
* Supports a HTTP DELETE to be equivalent of consuming a single message
* from a queue
*/
@Override
@ -158,7 +159,7 @@ public class MessageServlet extends MessageServletSupport {
}
/**
* Supports a HTTP DELETE to be equivalent of consuming a singe message
* Supports a HTTP DELETE to be equivalent of consuming a single message
* from a queue
*/
@Override
@ -188,14 +189,8 @@ public class MessageServlet extends MessageServletSupport {
// Don't allow concurrent use of the consumer. Do make sure to allow
// subsequent calls on continuation to use the consumer.
if (continuation.isInitial()) {
synchronized (activeConsumers) {
if (activeConsumers.contains(consumer)) {
throw new ServletException("Concurrent access to consumer is not supported");
} else {
activeConsumers.add(consumer);
}
}
if (continuation.isInitial() && !activeConsumers.add(consumer)) {
throw new ServletException("Concurrent access to consumer is not supported");
}
Message message = null;
@ -234,9 +229,7 @@ public class MessageServlet extends MessageServletSupport {
writeResponse(request, response, message);
closeConsumerOnOneShot(request, client, destination);
synchronized (activeConsumers) {
activeConsumers.remove(consumer);
}
activeConsumers.remove(consumer);
}
} catch (JMSException e) {
throw new ServletException("Could not post JMS message: " + e, e);
@ -260,9 +253,7 @@ public class MessageServlet extends MessageServletSupport {
}
response.setStatus(HttpServletResponse.SC_NO_CONTENT);
closeConsumerOnOneShot(request, client, destination);
synchronized (activeConsumers) {
activeConsumers.remove(consumer);
}
activeConsumers.remove(consumer);
return;
}
@ -361,17 +352,8 @@ public class MessageServlet extends MessageServletSupport {
public WebClient getWebClient(HttpServletRequest request) {
String clientId = request.getParameter("clientId");
if (clientId != null) {
synchronized (this) {
LOG.debug("Getting local client [" + clientId + "]");
WebClient client = clients.get(clientId);
if (client == null) {
LOG.debug("Creating new client [" + clientId + "]");
client = new WebClient();
clients.put(clientId, client);
}
return client;
}
LOG.debug("Getting local client [" + clientId + "]");
return clients.computeIfAbsent(clientId, k -> new WebClient());
} else {
return WebClient.getWebClient(request);
}

View File

@ -345,7 +345,7 @@ public abstract class MessageServletSupport extends HttpServlet {
LOG.debug("Content-Type={}", contentType);
// lets read the message body instead
BufferedReader reader = request.getReader();
StringBuffer buffer = new StringBuffer();
StringBuilder buffer = new StringBuilder();
while (true) {
String line = reader.readLine();
if (line == null) {