diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java index cb4eff363d..39dadde902 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import javax.net.ServerSocketFactory; +import org.apache.activemq.Service; import org.apache.activemq.ThreadPriorities; import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.openwire.OpenWireFormatFactory; @@ -41,7 +42,9 @@ import org.apache.activemq.transport.TransportLoggerFactory; import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.util.IOExceptionSupport; +import org.apache.activemq.util.ServiceListener; import org.apache.activemq.util.ServiceStopper; +import org.apache.activemq.util.ServiceSupport; import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormatFactory; import org.apache.commons.logging.Log; @@ -54,7 +57,7 @@ import org.apache.commons.logging.LogFactory; * @version $Revision: 1.1 $ */ -public class TcpTransportServer extends TransportServerThreadSupport { +public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{ private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); protected ServerSocket serverSocket; @@ -64,7 +67,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { protected long maxInactivityDuration = 30000; protected int minmumWireFormatVersion; protected boolean useQueueForAccept=true; - + /** * trace=true -> the Transport stack where this TcpTransport * object will be, will have a TransportLogger layer @@ -104,6 +107,11 @@ public class TcpTransportServer extends TransportServerThreadSupport { protected final ServerSocketFactory serverSocketFactory; protected BlockingQueue socketQueue = new LinkedBlockingQueue(); protected Thread socketHandlerThread; + /** + * The maximum number of sockets allowed for this server + */ + protected int maximumConnections = Integer.MAX_VALUE; + protected int currentTransportCount=0; public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { super(location); @@ -287,7 +295,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { * @return * @throws IOException */ - protected Transport createTransport(Socket socket, WireFormat format) throws IOException { + protected Transport createTransport(Socket socket, WireFormat format) throws IOException { return new TcpTransport(format, socket); } @@ -360,8 +368,11 @@ public class TcpTransportServer extends TransportServerThreadSupport { this.transportOptions = transportOptions; } - protected void handleSocket(Socket socket) { + protected final void handleSocket(Socket socket) { try { + if (this.currentTransportCount >= this.maximumConnections) { + + }else { HashMap options = new HashMap(); options.put("maxInactivityDuration", Long .valueOf(maxInactivityDuration)); @@ -380,9 +391,13 @@ public class TcpTransportServer extends TransportServerThreadSupport { options.putAll(transportOptions); WireFormat format = wireFormatFactory.createWireFormat(); Transport transport = createTransport(socket, format); + if (transport instanceof ServiceSupport) { + ((ServiceSupport) transport).addServiceListener(this); + } Transport configuredTransport = transportFactory.serverConfigure( transport, format, options); getAcceptListener().onAccept(configuredTransport); + } } catch (SocketTimeoutException ste) { // expect this to happen } catch (Exception e) { @@ -393,6 +408,7 @@ public class TcpTransportServer extends TransportServerThreadSupport { onAcceptError(e); } } + } public int getSoTimeout() { @@ -418,4 +434,27 @@ public class TcpTransportServer extends TransportServerThreadSupport { public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } + + /** + * @return the maximumConnections + */ + public int getMaximumConnections() { + return maximumConnections; + } + + /** + * @param maximumConnections the maximumConnections to set + */ + public void setMaximumConnections(int maximumConnections) { + this.maximumConnections = maximumConnections; + } + + + public void started(Service service) { + this.currentTransportCount++; + } + + public void stopped(Service service) { + this.currentTransportCount--; + } } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java new file mode 100644 index 0000000000..e8cce15f8f --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceListener.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.util; + +import org.apache.activemq.Service; + +/** + * A listener for service start, stop events + * + * @version $Revision: 1.1 $ + */ +public interface ServiceListener{ + + public void started(Service service); + + public void stopped(Service service); +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java index d7969ac351..aa246fd43e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/util/ServiceSupport.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.util; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.Service; @@ -34,6 +36,7 @@ public abstract class ServiceSupport implements Service { private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean stopping = new AtomicBoolean(false); private AtomicBoolean stopped = new AtomicBoolean(false); + private ListserviceListeners = new CopyOnWriteArrayList(); public static void dispose(Service service) { try { @@ -52,6 +55,9 @@ public abstract class ServiceSupport implements Service { } finally { started.set(success); } + for(ServiceListener l:this.serviceListeners) { + l.started(this); + } } } @@ -67,6 +73,9 @@ public abstract class ServiceSupport implements Service { stopped.set(true); started.set(false); stopping.set(false); + for(ServiceListener l:this.serviceListeners) { + l.stopped(this); + } stopper.throwFirstException(); } } @@ -91,6 +100,14 @@ public abstract class ServiceSupport implements Service { public boolean isStopped() { return stopped.get(); } + + public void addServiceListener(ServiceListener l) { + this.serviceListeners.add(l); + } + + public void removeServiceListener(ServiceListener l) { + this.serviceListeners.remove(l); + } protected abstract void doStop(ServiceStopper stopper) throws Exception;