git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692009 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-09-04 13:34:39 +00:00
parent ff2bc927df
commit 391077ee15
3 changed files with 91 additions and 4 deletions

View File

@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
import javax.net.ServerSocketFactory; import javax.net.ServerSocketFactory;
import org.apache.activemq.Service;
import org.apache.activemq.ThreadPriorities; import org.apache.activemq.ThreadPriorities;
import org.apache.activemq.command.BrokerInfo; import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory; import org.apache.activemq.openwire.OpenWireFormatFactory;
@ -41,7 +42,9 @@ import org.apache.activemq.transport.TransportLoggerFactory;
import org.apache.activemq.transport.TransportServer; import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport; import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceListener;
import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.apache.activemq.wireformat.WireFormatFactory; import org.apache.activemq.wireformat.WireFormatFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -54,7 +57,7 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
public class TcpTransportServer extends TransportServerThreadSupport { public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
private static final Log LOG = LogFactory.getLog(TcpTransportServer.class); private static final Log LOG = LogFactory.getLog(TcpTransportServer.class);
protected ServerSocket serverSocket; protected ServerSocket serverSocket;
@ -104,6 +107,11 @@ public class TcpTransportServer extends TransportServerThreadSupport {
protected final ServerSocketFactory serverSocketFactory; protected final ServerSocketFactory serverSocketFactory;
protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>(); protected BlockingQueue<Socket> socketQueue = new LinkedBlockingQueue<Socket>();
protected Thread socketHandlerThread; protected Thread socketHandlerThread;
/**
* The maximum number of sockets allowed for this server
*/
protected int maximumConnections = Integer.MAX_VALUE;
protected int currentTransportCount=0;
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException { public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location); super(location);
@ -360,8 +368,11 @@ public class TcpTransportServer extends TransportServerThreadSupport {
this.transportOptions = transportOptions; this.transportOptions = transportOptions;
} }
protected void handleSocket(Socket socket) { protected final void handleSocket(Socket socket) {
try { try {
if (this.currentTransportCount >= this.maximumConnections) {
}else {
HashMap<String, Object> options = new HashMap<String, Object>(); HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long options.put("maxInactivityDuration", Long
.valueOf(maxInactivityDuration)); .valueOf(maxInactivityDuration));
@ -380,9 +391,13 @@ public class TcpTransportServer extends TransportServerThreadSupport {
options.putAll(transportOptions); options.putAll(transportOptions);
WireFormat format = wireFormatFactory.createWireFormat(); WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format); Transport transport = createTransport(socket, format);
if (transport instanceof ServiceSupport) {
((ServiceSupport) transport).addServiceListener(this);
}
Transport configuredTransport = transportFactory.serverConfigure( Transport configuredTransport = transportFactory.serverConfigure(
transport, format, options); transport, format, options);
getAcceptListener().onAccept(configuredTransport); getAcceptListener().onAccept(configuredTransport);
}
} catch (SocketTimeoutException ste) { } catch (SocketTimeoutException ste) {
// expect this to happen // expect this to happen
} catch (Exception e) { } catch (Exception e) {
@ -393,6 +408,7 @@ public class TcpTransportServer extends TransportServerThreadSupport {
onAcceptError(e); onAcceptError(e);
} }
} }
} }
public int getSoTimeout() { public int getSoTimeout() {
@ -418,4 +434,27 @@ public class TcpTransportServer extends TransportServerThreadSupport {
public void setConnectionTimeout(int connectionTimeout) { public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout; this.connectionTimeout = connectionTimeout;
} }
/**
* @return the maximumConnections
*/
public int getMaximumConnections() {
return maximumConnections;
}
/**
* @param maximumConnections the maximumConnections to set
*/
public void setMaximumConnections(int maximumConnections) {
this.maximumConnections = maximumConnections;
}
public void started(Service service) {
this.currentTransportCount++;
}
public void stopped(Service service) {
this.currentTransportCount--;
}
} }

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util;
import org.apache.activemq.Service;
/**
* A listener for service start, stop events
*
* @version $Revision: 1.1 $
*/
public interface ServiceListener{
public void started(Service service);
public void stopped(Service service);
}

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.util; package org.apache.activemq.util;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service; import org.apache.activemq.Service;
@ -34,6 +36,7 @@ public abstract class ServiceSupport implements Service {
private AtomicBoolean started = new AtomicBoolean(false); private AtomicBoolean started = new AtomicBoolean(false);
private AtomicBoolean stopping = new AtomicBoolean(false); private AtomicBoolean stopping = new AtomicBoolean(false);
private AtomicBoolean stopped = new AtomicBoolean(false); private AtomicBoolean stopped = new AtomicBoolean(false);
private List<ServiceListener>serviceListeners = new CopyOnWriteArrayList<ServiceListener>();
public static void dispose(Service service) { public static void dispose(Service service) {
try { try {
@ -52,6 +55,9 @@ public abstract class ServiceSupport implements Service {
} finally { } finally {
started.set(success); started.set(success);
} }
for(ServiceListener l:this.serviceListeners) {
l.started(this);
}
} }
} }
@ -67,6 +73,9 @@ public abstract class ServiceSupport implements Service {
stopped.set(true); stopped.set(true);
started.set(false); started.set(false);
stopping.set(false); stopping.set(false);
for(ServiceListener l:this.serviceListeners) {
l.stopped(this);
}
stopper.throwFirstException(); stopper.throwFirstException();
} }
} }
@ -92,6 +101,14 @@ public abstract class ServiceSupport implements Service {
return stopped.get(); return stopped.get();
} }
public void addServiceListener(ServiceListener l) {
this.serviceListeners.add(l);
}
public void removeServiceListener(ServiceListener l) {
this.serviceListeners.remove(l);
}
protected abstract void doStop(ServiceStopper stopper) throws Exception; protected abstract void doStop(ServiceStopper stopper) throws Exception;
protected abstract void doStart() throws Exception; protected abstract void doStart() throws Exception;