git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@660906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-05-28 11:11:35 +00:00
parent 7d87837e08
commit 92604947a5
1 changed files with 62 additions and 27 deletions

View File

@ -63,6 +63,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
protected final TcpTransportFactory transportFactory;
protected long maxInactivityDuration = 30000;
protected int minmumWireFormatVersion;
protected boolean useQueueForAccept=true;
/**
* trace=true -> the Transport stack where this TcpTransport
@ -210,6 +211,35 @@ public class TcpTransportServer extends TransportServerThreadSupport {
public void setStartLogging(boolean startLogging) {
this.startLogging = startLogging;
}
/**
* @return the backlog
*/
public int getBacklog() {
return backlog;
}
/**
* @param backlog the backlog to set
*/
public void setBacklog(int backlog) {
this.backlog = backlog;
}
/**
* @return the useQueueForAccept
*/
public boolean isUseQueueForAccept() {
return useQueueForAccept;
}
/**
* @param useQueueForAccept the useQueueForAccept to set
*/
public void setUseQueueForAccept(boolean useQueueForAccept) {
this.useQueueForAccept = useQueueForAccept;
}
/**
* pull Sockets from the ServerSocket
@ -223,7 +253,11 @@ public class TcpTransportServer extends TransportServerThreadSupport {
if (isStopped() || getAcceptListener() == null) {
socket.close();
} else {
socketQueue.put(socket);
if (useQueueForAccept) {
socketQueue.put(socket);
}else {
handleSocket(socket);
}
}
}
} catch (SocketTimeoutException ste) {
@ -274,33 +308,36 @@ public class TcpTransportServer extends TransportServerThreadSupport {
}
protected void doStart() throws Exception {
Runnable run = new Runnable() {
public void run() {
try {
while (!isStopped() && !isStopping()) {
Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
if (sock != null) {
handleSocket(sock);
if(useQueueForAccept) {
Runnable run = new Runnable() {
public void run() {
try {
while (!isStopped() && !isStopping()) {
Socket sock = socketQueue.poll(1, TimeUnit.SECONDS);
if (sock != null) {
handleSocket(sock);
}
}
} catch (InterruptedException e) {
LOG.info("socketQueue interuppted - stopping");
if (!isStopping()) {
onAcceptError(e);
}
}
} catch (InterruptedException e) {
LOG.info("socketQueue interuppted - stopping");
if (!isStopping()) {
onAcceptError(e);
}
}
}
};
socketHandlerThread = new Thread(null, run,
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
};
socketHandlerThread = new Thread(null, run,
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
socketHandlerThread.start();
}
super.doStart();
socketHandlerThread.start();
}
protected void doStop(ServiceStopper stopper) throws Exception {
@ -348,7 +385,5 @@ public class TcpTransportServer extends TransportServerThreadSupport {
onAcceptError(e);
}
}
}
}
}