mirror of https://github.com/apache/activemq.git
Refactorings of the transport/service base classes
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@384222 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
436fe42778
commit
1ccd7bdbf0
|
@ -21,6 +21,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
import org.apache.activemq.command.Command;
|
import org.apache.activemq.command.Command;
|
||||||
import org.apache.activemq.transport.TransportSupport;
|
import org.apache.activemq.transport.TransportSupport;
|
||||||
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
|
@ -44,13 +45,13 @@ public class BlockingQueueTransport extends TransportSupport {
|
||||||
return queue;
|
return queue;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws JMSException {
|
|
||||||
}
|
|
||||||
|
|
||||||
public void oneway(Command command) throws IOException {
|
public void oneway(Command command) throws IOException {
|
||||||
queue.add(command);
|
queue.add(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void stop() throws Exception {
|
protected void doStart() throws Exception {
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -94,7 +94,7 @@ public class HttpClientTransport extends HttpTransportSupport {
|
||||||
log.trace("HTTP GET consumer thread starting: " + this);
|
log.trace("HTTP GET consumer thread starting: " + this);
|
||||||
HttpClient httpClient = getReceiveHttpClient();
|
HttpClient httpClient = getReceiveHttpClient();
|
||||||
URI remoteUrl = getRemoteUrl();
|
URI remoteUrl = getRemoteUrl();
|
||||||
while (!isClosed()) {
|
while (!isStopped()) {
|
||||||
|
|
||||||
GetMethod httpMethod = new GetMethod(remoteUrl.toString());
|
GetMethod httpMethod = new GetMethod(remoteUrl.toString());
|
||||||
configureMethod(httpMethod);
|
configureMethod(httpMethod);
|
||||||
|
|
|
@ -75,7 +75,7 @@ public class HttpTransport extends HttpTransportSupport {
|
||||||
public void run() {
|
public void run() {
|
||||||
log.trace("HTTP GET consumer thread starting for transport: " + this);
|
log.trace("HTTP GET consumer thread starting for transport: " + this);
|
||||||
URI remoteUrl = getRemoteUrl();
|
URI remoteUrl = getRemoteUrl();
|
||||||
while (!isClosed()) {
|
while (!isStopped()) {
|
||||||
try {
|
try {
|
||||||
HttpURLConnection connection = getReceiveConnection();
|
HttpURLConnection connection = getReceiveConnection();
|
||||||
int answer = connection.getResponseCode();
|
int answer = connection.getResponseCode();
|
||||||
|
@ -100,7 +100,7 @@ public class HttpTransport extends HttpTransportSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
if (!isClosed()) {
|
if (!isStopped()) {
|
||||||
log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
|
log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.command.BrokerInfo;
|
||||||
import org.apache.activemq.transport.TransportServerSupport;
|
import org.apache.activemq.transport.TransportServerSupport;
|
||||||
import org.apache.activemq.transport.util.TextWireFormat;
|
import org.apache.activemq.transport.util.TextWireFormat;
|
||||||
import org.apache.activemq.transport.xstream.XStreamWireFormat;
|
import org.apache.activemq.transport.xstream.XStreamWireFormat;
|
||||||
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
import org.mortbay.jetty.Connector;
|
import org.mortbay.jetty.Connector;
|
||||||
import org.mortbay.jetty.Server;
|
import org.mortbay.jetty.Server;
|
||||||
import org.mortbay.jetty.bio.SocketConnector;
|
import org.mortbay.jetty.bio.SocketConnector;
|
||||||
|
@ -46,7 +47,33 @@ public class HttpTransportServer extends TransportServerSupport {
|
||||||
this.bindAddress = uri;
|
this.bindAddress = uri;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws Exception {
|
public void setBrokerInfo(BrokerInfo brokerInfo) {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Properties
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
public TextWireFormat getWireFormat() {
|
||||||
|
if (wireFormat == null) {
|
||||||
|
wireFormat = createWireFormat();
|
||||||
|
}
|
||||||
|
return wireFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWireFormat(TextWireFormat wireFormat) {
|
||||||
|
this.wireFormat = wireFormat;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Implementation methods
|
||||||
|
// -------------------------------------------------------------------------
|
||||||
|
protected TextWireFormat createWireFormat() {
|
||||||
|
return new XStreamWireFormat();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setConnector(Connector connector) {
|
||||||
|
this.connector = connector;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void doStart() throws Exception {
|
||||||
server = new Server();
|
server = new Server();
|
||||||
if (connector==null)
|
if (connector==null)
|
||||||
connector = new SocketConnector();
|
connector = new SocketConnector();
|
||||||
|
@ -81,7 +108,7 @@ public class HttpTransportServer extends TransportServerSupport {
|
||||||
server.start();
|
server.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void stop() throws Exception {
|
protected void doStop(ServiceStopper stopper) throws Exception {
|
||||||
Server temp = server;
|
Server temp = server;
|
||||||
server = null;
|
server = null;
|
||||||
if (temp != null) {
|
if (temp != null) {
|
||||||
|
@ -89,29 +116,4 @@ public class HttpTransportServer extends TransportServerSupport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Properties
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
public TextWireFormat getWireFormat() {
|
|
||||||
if (wireFormat == null) {
|
|
||||||
wireFormat = createWireFormat();
|
|
||||||
}
|
|
||||||
return wireFormat;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setWireFormat(TextWireFormat wireFormat) {
|
|
||||||
this.wireFormat = wireFormat;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Implementation methods
|
|
||||||
// -------------------------------------------------------------------------
|
|
||||||
protected TextWireFormat createWireFormat() {
|
|
||||||
return new XStreamWireFormat();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setConnector(Connector connector) {
|
|
||||||
this.connector = connector;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setBrokerInfo(BrokerInfo brokerInfo) {
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue