doing 3.x reorg

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@357725 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2005-12-19 16:50:54 +00:00
parent 298ccf2a77
commit 06bfecb6ed
179 changed files with 808 additions and 1958 deletions

View File

@ -19,6 +19,7 @@
<pomVersion>3</pomVersion> <pomVersion>3</pomVersion>
<extend>${basedir}/../etc/project.xml</extend> <extend>${basedir}/../etc/project.xml</extend>
<currentVersion>3.0-SNAPSHOT</currentVersion>
<name>ActiveIO</name> <name>ActiveIO</name>
<artifactId>activeio</artifactId> <artifactId>activeio</artifactId>
@ -254,6 +255,9 @@
<includes> <includes>
<include>**/*Test.java</include> <include>**/*Test.java</include>
</includes> </includes>
<excludes>
<exclude>**/OpenORBOnePortSocketFactoryTest.*</exclude>
</excludes>
<resources> <resources>
<resource> <resource>
<directory>src/test</directory> <directory>src/test</directory>

View File

@ -1,24 +0,0 @@
/**
*
* Copyright 2005 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* @deprecated Use AsyncChannel instead. This class will be removed very soon.
*/
public interface AsynchChannel extends AsyncChannel{
}

View File

@ -1,24 +0,0 @@
/**
*
* Copyright 2005 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* @deprecated Use AsyncChannelListener instead. This class will be removed very soon.
*/
public interface AsynchChannelListener extends AsyncChannelListener {
}

View File

@ -1,24 +0,0 @@
/**
*
* Copyright 2005 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* @deprecated Use AsyncChannelServer instead. This class will be removed very soon.
*/
public interface AsynchChannelServer extends AsyncChannelServer {
}

View File

@ -27,5 +27,5 @@ package org.activeio;
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface Channel extends Disposable, Service, Adaptable { public interface Channel extends Service, Adaptable {
} }

View File

@ -17,20 +17,26 @@
**/ **/
package org.activeio; package org.activeio;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import org.activeio.adapter.AsyncToSyncChannelFactory;
import org.activeio.adapter.SyncToAsyncChannelFactory;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.sync.SyncChannel;
import org.activeio.packet.sync.SyncChannelFactory;
import org.activeio.packet.sync.SyncChannelServer;
import org.activeio.util.FactoryFinder;
import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import org.activeio.adapter.AsyncToSyncChannelFactory;
import org.activeio.adapter.SyncToAsyncChannelFactory;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
/** /**
* A {@see ChannelFactory}uses the requested URI's scheme to determine the * A {@see ChannelFactory}uses the requested URI's scheme to determine the
* actual {@see org.activeio.SynchChannelFactory}or * actual {@see org.activeio.SynchChannelFactory}or

View File

@ -27,7 +27,7 @@ import java.net.URI;
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface ChannelServer extends Channel { public interface ChannelServer extends Service, Adaptable {
/** /**
* The URI that was used when the channel was bound. This could be different * The URI that was used when the channel was bound. This could be different

View File

@ -1,34 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* The Disposable interface is implemented by objects the aquire resources whoes life cycle must be
* managed. Once a Disposable has been disposed, it cannot be un-disposed.
*
* @version $Revision$
*/
public interface Disposable {
/**
* This method should not throw any exceptions. Cleaning up a Disposable object
* should be easy of an end user therefore do not make him have to handle an Exception.
*/
void dispose();
}

View File

@ -1,41 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* InputAsyncChannel objects asynchronously push 'up' {@see org.activeio.Packet} objects
* to a registered {@see org.activeio.AsyncChannelListener}.
*
* @version $Revision$
*/
public interface InputAsyncChannel extends Channel {
/**
* Registers the {@see ChannelConsumer} that the protcol will use to deliver packets
* coming 'up' the channel.
*
* @param packetListener
*/
void setAsyncChannelListener(AsyncChannelListener channelListener);
/**
* @return the registered Packet consumer
*/
AsyncChannelListener getAsyncChannelListener();
}

View File

@ -1,37 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
import java.io.IOException;
/**
* @version $Revision$
*/
public interface InputStreamChannel extends Channel {
public int available() throws IOException;
public void mark(int arg0);
public boolean markSupported();
public int read(byte[] arg0, int arg1, int arg2) throws IOException;
public int read(byte[] arg0) throws IOException;
public void reset() throws IOException;
public long skip(long arg0) throws IOException;
public int read() throws IOException;
}

View File

@ -1,40 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
import java.io.IOException;
/**
* SynchChannel objects allow threaded to synchronously block on the <code>read</code>
* method to get {@see org.activeio.Packet} objects when they arrive from the peer.
*
* @version $Revision$
*/
public interface InputSyncChannel extends Channel {
/**
* Used to synchronously receive a packet of information going 'up' the channel.
* This method blocks until a packet is received or the operation experiences timeout.
*
* @param timeout
* @return the packet received or null if the timeout occurred.
* @throws IOException
*/
Packet read(long timeout) throws IOException;
}

View File

@ -1,37 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
import java.io.IOException;
/**
* @version $Revision$
*/
public interface OutputStreamChannel extends Channel {
public void write(byte[] data, int pos, int length) throws IOException;
public void write(byte[] data) throws IOException;
public void write(int data) throws IOException;
/**
* Some channels may buffer data which may be sent down if flush() is called.
*
* @throws IOException
*/
void flush() throws IOException;
}

View File

@ -1,59 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
import java.io.IOException;
/**
* RequestChannel are used to model the request/reponse exchange that is used
* by higher level protcols such as HTTP and RMI.
*
* @version $Revision$
*/
public interface RequestChannel extends Channel {
/**
* Used to send a packet of information going 'down' the channel and wait for
* it's reponse 'up' packet.
*
* This method blocks until the response packet is received or the operation
* experiences a timeout.
*
* @param request
* @param timeout
* @return the respnse packet or null if the timeout occured.
* @throws IOException
*/
Packet request(Packet request, long timeout) throws IOException;
/**
* Registers the {@see RequestListener} that the protcol will use to deliver request packets
* comming 'up' the channel.
*
* @param packetListener
* @throws IOException
*/
void setRequestListener(RequestListener requestListener) throws IOException;
/**
* @return the registered RequestListener
*/
RequestListener getRequestListener();
}

View File

@ -1,42 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
import java.io.IOException;
/**
* An RequestListener object is used to receive remote requests from a a {@see org.activeio.RequestChannel}
*
* @version $Revision$
*/
public interface RequestListener {
/**
* A {@see RequestChannel} will call this method when a new request arrives.
*
* @param packet
*/
Packet onRequest(Packet request);
/**
* A {@see RequestChannel} will call this method when a async failure occurs when receiving a request.
*
* @param error the exception that describes the failure.
*/
void onRquestError(IOException error);
}

View File

@ -43,11 +43,14 @@ public interface Service {
/** /**
* Stops the channel. Once stopped, the channel is in the stopped state. * Stops the channel. Once stopped, the channel is in the stopped state.
* *
* @param timeout The amount of time the channel is allowed to take to gracefully stop. If the timeout
* is exceeded, the channel should do a forcefull stop.
*
* @throws IOException * @throws IOException
*/ */
void stop(long timeout) throws IOException; void stop() throws IOException;
/**
* Disposes the channel. Once disposed, the channel cannot be used anymore.
*
* @throws IOException
*/
void dispose();
} }

View File

@ -1,24 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* @version $Revision$
*/
public interface StreamChannel extends OutputStreamChannel, InputStreamChannel {
}

View File

@ -1,27 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio;
/**
* SyncChannel objets allow threadd to synchronously block on the <code>receiveUpPacket</code>
* method to get 'up' {@see org.activeio.Packet} objects when they arrive.
*
* @version $Revision$
*/
public interface SyncChannel extends OutputChannel, InputSyncChannel {
}

View File

@ -1,70 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.adapter;
import java.io.IOException;
import org.activeio.Packet;
import org.activeio.RequestChannel;
import org.activeio.RequestListener;
import org.activeio.SyncChannel;
/**
* Creates a {@see org.activeio.RequestChannel} out of a {@see org.activeio.SyncChannel}.
* Does not support handing requests. It can only be used to send requests.
*
* @version $Revision$
*/
final public class AsyncChannelToClientRequestChannel implements RequestChannel {
private final SyncChannel next;
public AsyncChannelToClientRequestChannel(SyncChannel next) {
this.next = next;
}
public Packet request(Packet request, long timeout) throws IOException {
next.write(request);
next.flush();
return next.read(timeout);
}
public void setRequestListener(RequestListener requestListener) throws IOException {
throw new IOException("Operation not supported.");
}
public RequestListener getRequestListener() {
return null;
}
public Object getAdapter(Class target) {
return next.getAdapter(target);
}
public void dispose() {
next.dispose();
}
public void start() throws IOException {
next.start();
}
public void stop(long timeout) throws IOException {
next.stop(timeout);
}
}

View File

@ -1,189 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.adapter;
import java.io.IOException;
import java.io.InterruptedIOException;
import org.activeio.AsyncChannel;
import org.activeio.ChannelFactory;
import org.activeio.FilterAsyncChannel;
import org.activeio.Packet;
import org.activeio.PacketData;
import org.activeio.RequestChannel;
import org.activeio.RequestListener;
import org.activeio.packet.AppendedPacket;
import org.activeio.packet.ByteArrayPacket;
import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
/**
* Creates a {@see org.activeio.RequestChannel} out of a {@see org.activeio.AsyncChannel}. This
* {@see org.activeio.RequestChannel} is thread safe and mutiplexes concurrent requests and responses over
* the underlying {@see org.activeio.AsyncChannel}.
*
* @version $Revision$
*/
final public class AsyncChannelToConcurrentRequestChannel extends FilterAsyncChannel implements RequestChannel {
private static final byte PASSTHROUGH = 0x00;
private static final byte REQUEST = 0x01;
private static final byte RESPONSE = 0x02;
private static final ByteArrayPacket PASSTHROUGH_PACKET = new ByteArrayPacket(new byte[]{PASSTHROUGH});
private final ConcurrentHashMap requestMap = new ConcurrentHashMap();
private final Executor requestExecutor;
private short nextRequestId = 0;
private final Object writeMutex = new Object();
private RequestListener requestListener;
public AsyncChannelToConcurrentRequestChannel(AsyncChannel next) {
this(next, ChannelFactory.DEFAULT_EXECUTOR);
}
public AsyncChannelToConcurrentRequestChannel(AsyncChannel next, Executor requestExecutor) {
super(next);
this.requestExecutor=requestExecutor;
}
synchronized short getNextRequestId() {
return nextRequestId++;
}
/**
* @see org.activeio.FilterAsyncChannel#write(org.activeio.Packet)
*/
public void write(Packet packet) throws IOException {
Packet passThrough = AppendedPacket.join(PASSTHROUGH_PACKET.duplicate(), packet);
synchronized(writeMutex) {
super.write(passThrough);
}
}
/**
* @see org.activeio.FilterAsyncChannel#onPacket(org.activeio.Packet)
*/
public void onPacket(final Packet packet) {
switch( packet.read() ) {
case PASSTHROUGH:
super.onPacket(packet);
break;
case REQUEST:
requestExecutor.execute(new Runnable(){
public void run() {
serviceRequest(packet);
}
});
break;
case RESPONSE:
serviceReponse(packet);
break;
}
}
private void serviceRequest(Packet packet) {
try {
if( requestListener ==null )
throw new IOException("The RequestListener has not been set.");
PacketData data = new PacketData(packet);
short requestId = data.readShort();
Packet reponse = requestListener.onRequest(packet);
// Send the response...
Packet header = createHeaderPacket(RESPONSE, requestId);
Packet rc = AppendedPacket.join(header, packet);
synchronized(writeMutex) {
super.write(rc);
}
} catch (IOException e) {
super.onPacketError(e);
}
}
private void serviceReponse(Packet packet) {
try {
PacketData data = new PacketData(packet);
short requestId = data.readShort();
ArrayBlockingQueue responseSlot = (ArrayBlockingQueue) requestMap.get(new Short(requestId));
responseSlot.put(packet);
} catch (IOException e) {
super.onPacketError(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public Packet request(Packet request, long timeout) throws IOException {
Short requestId = new Short(getNextRequestId());
ArrayBlockingQueue responseSlot = new ArrayBlockingQueue(1);
requestMap.put(requestId, responseSlot);
Packet header = createHeaderPacket(REQUEST, requestId.shortValue());
Packet packet = AppendedPacket.join(header, request);
synchronized(writeMutex) {
super.write(packet);
}
try {
if( timeout == WAIT_FOREVER_TIMEOUT ) {
return (Packet) responseSlot.take();
} else if (timeout == NO_WAIT_TIMEOUT ) {
return (Packet) responseSlot.poll(1, TimeUnit.MILLISECONDS);
} else {
return (Packet) responseSlot.poll(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
requestMap.remove(requestId);
}
}
private Packet createHeaderPacket(byte type, short requestId) throws IOException {
ByteArrayPacket header = new ByteArrayPacket(new byte[]{3});
PacketData data = new PacketData(header);
data.writeByte(type);
data.writeShort(requestId);
header.flip();
return header;
}
public void setRequestListener(RequestListener requestListener) throws IOException {
this.requestListener = requestListener;
}
public RequestListener getRequestListener() {
return requestListener;
}
}

View File

@ -19,9 +19,9 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.AsyncChannel;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.BytePacket; import org.activeio.packet.BytePacket;
import org.activeio.packet.async.AsyncChannel;
/** /**
*/ */

View File

@ -1,89 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.adapter;
import java.io.IOException;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.Packet;
import org.activeio.RequestChannel;
import org.activeio.RequestListener;
import org.activeio.packet.EOSPacket;
/**
* Creates a {@see org.activeio.RequestChannel} out of a {@see org.activeio.AsyncChannel}.
* Does not support sending requests. It can only be used to handle requests.
*
* @version $Revision$
*/
public class AsyncChannelToServerRequestChannel implements RequestChannel, AsyncChannelListener {
private final AsyncChannel next;
private RequestListener requestListener;
public AsyncChannelToServerRequestChannel(AsyncChannel next) throws IOException {
this.next = next;
next.setAsyncChannelListener(this);
}
public Packet request(Packet request, long timeout) throws IOException {
throw new IOException("Operation not supported.");
}
public void setRequestListener(RequestListener requestListener) throws IOException {
this.requestListener = requestListener;
}
public RequestListener getRequestListener() {
return requestListener;
}
public Object getAdapter(Class target) {
return next.getAdapter(target);
}
public void dispose() {
next.dispose();
}
public void start() throws IOException {
next.start();
}
public void stop(long timeout) throws IOException {
next.stop(timeout);
}
public void onPacket(Packet packet) {
if( packet == EOSPacket.EOS_PACKET ) {
return;
}
try {
Packet response = requestListener.onRequest(packet);
next.write(response);
next.flush();
} catch (IOException e) {
requestListener.onRquestError(e);
}
}
public void onPacketError(IOException error) {
requestListener.onRquestError(error);
}
}

View File

@ -20,10 +20,10 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.activeio.AsyncChannel; import org.activeio.packet.Packet;
import org.activeio.AsyncChannelListener; import org.activeio.packet.async.AsyncChannel;
import org.activeio.Packet; import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.SyncChannel; import org.activeio.packet.sync.SyncChannel;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@ -79,9 +79,9 @@ final public class AsyncToSyncChannel implements SyncChannel, AsyncChannelListen
} }
/** /**
* @see org.activeio.Channel#write(org.activeio.Packet) * @see org.activeio.Channel#write(org.activeio.packet.Packet)
*/ */
public void write(org.activeio.Packet packet) throws IOException { public void write(org.activeio.packet.Packet packet) throws IOException {
asyncChannel.write(packet); asyncChannel.write(packet);
} }
@ -93,7 +93,7 @@ final public class AsyncToSyncChannel implements SyncChannel, AsyncChannelListen
} }
/** /**
* @see org.activeio.SyncChannel#read(long) * @see org.activeio.packet.sync.SyncChannel#read(long)
*/ */
public Packet read(long timeout) throws IOException { public Packet read(long timeout) throws IOException {
try { try {
@ -136,14 +136,14 @@ final public class AsyncToSyncChannel implements SyncChannel, AsyncChannelListen
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
asyncChannel.stop(timeout); asyncChannel.stop();
} }
/** /**
* @see org.activeio.AsyncChannelListener#onPacket(org.activeio.Packet) * @see org.activeio.packet.async.AsyncChannelListener#onPacket(org.activeio.packet.Packet)
*/ */
public void onPacket(Packet packet) { public void onPacket(Packet packet) {
try { try {
@ -154,7 +154,7 @@ final public class AsyncToSyncChannel implements SyncChannel, AsyncChannelListen
} }
/** /**
* @see org.activeio.AsyncChannelListener#onPacketError(org.activeio.ChannelException) * @see org.activeio.packet.async.AsyncChannelListener#onPacketError(org.activeio.ChannelException)
*/ */
public void onPacketError(IOException error) { public void onPacketError(IOException error) {
try { try {

View File

@ -20,10 +20,10 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.activeio.AsyncChannelFactory; import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.SyncChannel; import org.activeio.packet.sync.SyncChannel;
import org.activeio.SyncChannelFactory; import org.activeio.packet.sync.SyncChannelFactory;
import org.activeio.SyncChannelServer; import org.activeio.packet.sync.SyncChannelServer;
/** /**
* @version $Revision$ * @version $Revision$

View File

@ -22,10 +22,10 @@ import java.io.InterruptedIOException;
import java.net.URI; import java.net.URI;
import org.activeio.AcceptListener; import org.activeio.AcceptListener;
import org.activeio.AsyncChannelServer;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.ChannelServer; import org.activeio.ChannelServer;
import org.activeio.SyncChannelServer; import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.sync.SyncChannelServer;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
@ -82,7 +82,7 @@ final public class AsyncToSyncChannelServer implements SyncChannelServer, Accept
} }
/** /**
* @see org.activeio.SyncChannelServer#accept(long) * @see org.activeio.packet.sync.SyncChannelServer#accept(long)
*/ */
public org.activeio.Channel accept(long timeout) throws IOException { public org.activeio.Channel accept(long timeout) throws IOException {
try { try {
@ -124,10 +124,10 @@ final public class AsyncToSyncChannelServer implements SyncChannelServer, Accept
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
asyncChannelServer.stop(timeout); asyncChannelServer.stop();
} }
public URI getBindURI() { public URI getBindURI() {

View File

@ -1,85 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.adapter;
import java.io.IOException;
import java.io.InputStream;
import org.activeio.InputStreamChannel;
/**
* Provides an InputStream for a given InputStreamChannel.
*
* @version $Revision$
*/
public class InputStreamChannelToInputStream extends InputStream {
private final InputStreamChannel channel;
/**
* @param channel
*/
public InputStreamChannelToInputStream(final InputStreamChannel channel) {
this.channel = channel;
}
public int available() throws IOException {
return channel.available();
}
public synchronized void mark(int arg0) {
channel.mark(arg0);
}
public boolean markSupported() {
return channel.markSupported();
}
public int read(byte[] arg0) throws IOException {
return channel.read(arg0);
}
public synchronized void reset() throws IOException {
channel.reset();
}
public long skip(long arg0) throws IOException {
return channel.skip(arg0);
}
/**
* @see java.io.InputStream#read()
*/
public int read() throws IOException {
return channel.read();
}
/**
* @see java.io.InputStream#read(byte[], int, int)
*/
public int read(byte[] b, int off, int len) throws IOException {
return channel.read(b,off,len);
}
/**
* @see java.io.InputStream#close()
*/
public void close() throws IOException {
channel.dispose();
super.close();
}
}

View File

@ -1,70 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.adapter;
import java.io.IOException;
import java.io.OutputStream;
import org.activeio.OutputStreamChannel;
/**
*/
public class OutputStreamChannelToOutputStream extends OutputStream {
private final OutputStreamChannel channel;
/**
* @param channel
*/
public OutputStreamChannelToOutputStream(OutputStreamChannel channel) {
this.channel = channel;
}
/**
* @see java.io.OutputStream#write(int)
*/
public void write(int b) throws IOException {
channel.write(b);
}
/**
* @see java.io.OutputStream#write(byte[], int, int)
*/
public void write(byte[] b, int off, int len) throws IOException {
channel.write(b, off, len);
}
/**
* @see java.io.OutputStream#flush()
*/
public void flush() throws IOException {
channel.flush();
}
public void write(byte[] b) throws IOException {
channel.write(b);
}
/**
* @see java.io.InputStream#close()
*/
public void close() throws IOException {
channel.dispose();
super.close();
}
}

View File

@ -20,9 +20,9 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.Packet;
import org.activeio.packet.AppendedPacket; import org.activeio.packet.AppendedPacket;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
/** /**
* *

View File

@ -16,7 +16,7 @@
*/ */
package org.activeio.adapter; package org.activeio.adapter;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* @deprecated Use PacketToInputStream instead. This class will be removed very soon. * @deprecated Use PacketToInputStream instead. This class will be removed very soon.

View File

@ -19,7 +19,7 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* Provides an OutputStream for a given Packet. * Provides an OutputStream for a given Packet.

View File

@ -19,7 +19,7 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* Provides an InputStream for a given Packet. * Provides an InputStream for a given Packet.

View File

@ -28,8 +28,8 @@ import java.net.URI;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.SyncChannel; import org.activeio.packet.sync.SyncChannel;
import org.activeio.SyncChannelServer; import org.activeio.packet.sync.SyncChannelServer;
/** /**
*/ */
@ -91,12 +91,6 @@ public class SyncChannelServerToServerSocket extends ServerSocket {
throw new SocketException("Already bound"); throw new SocketException("Already bound");
} }
public void close() throws IOException {
if (!isClosed()) {
channelServer.dispose();
}
}
public ServerSocketChannel getChannel() { public ServerSocketChannel getChannel() {
return null; return null;
} }

View File

@ -20,9 +20,9 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.Packet;
import org.activeio.SyncChannel;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.sync.SyncChannel;
/** /**
* Provides an InputStream for a given SynchChannel. * Provides an InputStream for a given SynchChannel.

View File

@ -19,9 +19,9 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.SyncChannel;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.BytePacket; import org.activeio.packet.BytePacket;
import org.activeio.packet.sync.SyncChannel;
/** /**
*/ */

View File

@ -25,10 +25,10 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.activeio.Packet;
import org.activeio.SyncChannel;
import org.activeio.net.SocketMetadata;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.sync.SyncChannel;
import org.activeio.stream.sync.socket.SocketMetadata;
/** /**
* Provides a {@see java.net.Socket} interface to a {@see org.activeio.SynchChannel}. * Provides a {@see java.net.Socket} interface to a {@see org.activeio.SynchChannel}.
@ -80,7 +80,7 @@ public class SyncChannelToSocket extends Socket {
closed = true; closed = true;
inputStream.close(); inputStream.close();
outputStream.close(); outputStream.close();
channel.dispose(); channel.stop();
} }
public void connect(SocketAddress endpoint) throws IOException { public void connect(SocketAddress endpoint) throws IOException {

View File

@ -22,14 +22,13 @@ import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.ChannelFactory; import org.activeio.ChannelFactory;
import org.activeio.Packet;
import org.activeio.Service;
import org.activeio.SyncChannel;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.packet.sync.SyncChannel;
import java.io.IOException; import java.io.IOException;
@ -100,34 +99,13 @@ public class SyncToAsyncChannel implements AsyncChannel, Runnable {
} }
} }
synchronized public void stop(long timeout) throws IOException { synchronized public void stop() throws IOException {
if (running.compareAndSet(true, false)) { if (running.compareAndSet(true, false)) {
try { try {
if( timeout == NO_WAIT_TIMEOUT ) { doneCountDownLatch.await(5, TimeUnit.SECONDS);
syncChannel.stop(NO_WAIT_TIMEOUT);
} else if( timeout == WAIT_FOREVER_TIMEOUT ) {
doneCountDownLatch.await();
syncChannel.stop(WAIT_FOREVER_TIMEOUT);
} else {
long start = System.currentTimeMillis();
if( doneCountDownLatch.await(timeout, TimeUnit.MILLISECONDS) ) {
timeout -= (System.currentTimeMillis() - start);
} else {
timeout=0;
}
if( timeout <= 0 ) {
syncChannel.stop(NO_WAIT_TIMEOUT);
} else {
syncChannel.stop(timeout);
}
}
} catch (IOException e) {
throw e;
} catch (Throwable e) { } catch (Throwable e) {
throw (IOException)new IOException("stop failed: " + e.getMessage()).initCause(e);
} }
syncChannel.stop();
} }
} }
@ -169,7 +147,7 @@ public class SyncToAsyncChannel implements AsyncChannel, Runnable {
} }
/** /**
* @see org.activeio.AsyncChannel#setAsyncChannelListener(org.activeio.UpPacketListener) * @see org.activeio.packet.async.AsyncChannel#setAsyncChannelListener(org.activeio.UpPacketListener)
*/ */
public void setAsyncChannelListener(AsyncChannelListener channelListener) { public void setAsyncChannelListener(AsyncChannelListener channelListener) {
if (running.get()) if (running.get())
@ -178,9 +156,9 @@ public class SyncToAsyncChannel implements AsyncChannel, Runnable {
} }
/** /**
* @see org.activeio.Channel#write(org.activeio.Packet) * @see org.activeio.Channel#write(org.activeio.packet.Packet)
*/ */
public void write(org.activeio.Packet packet) throws IOException { public void write(org.activeio.packet.Packet packet) throws IOException {
syncChannel.write(packet); syncChannel.write(packet);
} }
@ -196,7 +174,7 @@ public class SyncToAsyncChannel implements AsyncChannel, Runnable {
*/ */
public void dispose() { public void dispose() {
try { try {
stop(Service.NO_WAIT_TIMEOUT); stop();
} catch ( IOException ignore) { } catch ( IOException ignore) {
} }
syncChannel.dispose(); syncChannel.dispose();

View File

@ -20,11 +20,11 @@ package org.activeio.adapter;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelFactory;
import org.activeio.AsyncChannelServer;
import org.activeio.ChannelFactory; import org.activeio.ChannelFactory;
import org.activeio.SyncChannelFactory; import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.sync.SyncChannelFactory;
import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.Executor;

View File

@ -18,23 +18,21 @@
package org.activeio.adapter; package org.activeio.adapter;
import java.io.IOException;
import java.net.URI;
import org.activeio.AcceptListener;
import org.activeio.Channel;
import org.activeio.ChannelFactory;
import org.activeio.ChannelServer;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.sync.SyncChannelServer;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import org.activeio.AcceptListener;
import org.activeio.AsyncChannelServer;
import org.activeio.Channel;
import org.activeio.ChannelFactory;
import org.activeio.ChannelServer;
import org.activeio.Disposable;
import org.activeio.Service;
import org.activeio.SyncChannelServer;
import java.io.IOException;
import java.net.URI;
/** /**
* Adapts a {@see org.activeio,SynchChannelServer} so that it provides an * Adapts a {@see org.activeio,SynchChannelServer} so that it provides an
* {@see org.activeio.AsyncChannelServer} interface. When this channel * {@see org.activeio.AsyncChannelServer} interface. When this channel
@ -93,36 +91,13 @@ final public class SyncToAsyncChannelServer implements AsyncChannelServer, Runna
} }
} }
synchronized public void stop(long timeout) throws IOException { synchronized public void stop() throws IOException {
if (running.compareAndSet(true, false)) { if (running.compareAndSet(true, false)) {
try { try {
doneCountDownLatch.await(5, TimeUnit.SECONDS);
if( timeout == NO_WAIT_TIMEOUT ) {
syncChannelServer.stop(NO_WAIT_TIMEOUT);
} else if( timeout == WAIT_FOREVER_TIMEOUT ) {
doneCountDownLatch.await();
syncChannelServer.stop(WAIT_FOREVER_TIMEOUT);
} else {
long start = System.currentTimeMillis();
if( doneCountDownLatch.await(timeout, TimeUnit.MILLISECONDS) ) {
timeout -= (System.currentTimeMillis() - start);
} else {
timeout=0;
}
if( timeout <= 0 ) {
syncChannelServer.stop(NO_WAIT_TIMEOUT);
} else {
syncChannelServer.stop(timeout);
}
}
} catch (IOException e) {
throw e;
} catch (Throwable e) { } catch (Throwable e) {
throw (IOException)new IOException("stop failed: " + e.getMessage()).initCause(e);
} }
syncChannelServer.stop();
} }
} }
@ -153,7 +128,7 @@ final public class SyncToAsyncChannelServer implements AsyncChannelServer, Runna
} }
/** /**
* @see org.activeio.AsyncChannelServer#setAcceptListener(org.activeio.AcceptListener) * @see org.activeio.packet.async.AsyncChannelServer#setAcceptListener(org.activeio.AcceptListener)
*/ */
public void setAcceptListener(AcceptListener acceptListener) { public void setAcceptListener(AcceptListener acceptListener) {
if(running.get()) if(running.get())
@ -166,12 +141,10 @@ final public class SyncToAsyncChannelServer implements AsyncChannelServer, Runna
*/ */
public void dispose() { public void dispose() {
try { try {
stop(Service.NO_WAIT_TIMEOUT); stop();
} catch ( IOException ignore) { } catch ( IOException ignore) {
} }
if( syncChannelServer instanceof Disposable ) { syncChannelServer.dispose();
((Disposable)syncChannelServer).dispose();
}
} }
public URI getBindURI() { public URI getBindURI() {

View File

@ -16,9 +16,10 @@
*/ */
package org.activeio.adapter; package org.activeio.adapter;
import org.activeio.SyncChannel;
import org.activeio.AsyncChannel;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.sync.SyncChannel;
import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.Executor;
/** /**

View File

@ -17,10 +17,10 @@
**/ **/
package org.activeio.command; package org.activeio.command;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.Packet;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
@ -54,8 +54,8 @@ public class AsyncChannelToAsyncCommandChannel implements AsyncCommandChannel {
channel.start(); channel.start();
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
channel.stop(timeout); channel.stop();
} }
public void setCommandListener(final CommandListener listener) { public void setCommandListener(final CommandListener listener) {

View File

@ -22,10 +22,10 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import org.activeio.ByteArrayOutputStream;
import org.activeio.Packet;
import org.activeio.adapter.PacketToInputStream; import org.activeio.adapter.PacketToInputStream;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
import org.activeio.util.ByteArrayOutputStream;
/** /**
* A default implementation which uses serialization * A default implementation which uses serialization

View File

@ -21,7 +21,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* Provides a mechanism to marshal commands into and out of packets * Provides a mechanism to marshal commands into and out of packets

View File

@ -1,11 +0,0 @@
<html>
<head>
</head>
<body>
<p>
Some simple filters that may be added to your channels.
</p>
</body>
</html>

View File

@ -19,7 +19,7 @@ package org.activeio.journal;
import java.io.IOException; import java.io.IOException;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* A Journal is a record logging Interface that can be used to implement * A Journal is a record logging Interface that can be used to implement

View File

@ -17,7 +17,7 @@
**/ **/
package org.activeio.journal.active; package org.activeio.journal.active;
import org.activeio.Packet; import org.activeio.packet.Packet;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;

View File

@ -27,9 +27,8 @@ import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Set; import java.util.Set;
import org.activeio.Disposable;
import org.activeio.Packet;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.Packet;
/** /**
* Control file holds the last known good state of the journal. It stores the state in * Control file holds the last known good state of the journal. It stores the state in
@ -39,7 +38,7 @@ import org.activeio.packet.ByteBufferPacket;
* *
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
final public class ControlFile implements Disposable { final public class ControlFile {
/** The File that holds the control data. */ /** The File that holds the control data. */
private final RandomAccessFile file; private final RandomAccessFile file;

View File

@ -21,14 +21,13 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.activeio.Disposable;
import org.activeio.Packet;
import org.activeio.journal.InvalidRecordLocationException; import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.journal.Journal; import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener; import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation; import org.activeio.journal.RecordLocation;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.ByteBufferPacketPool; import org.activeio.packet.ByteBufferPacketPool;
import org.activeio.packet.Packet;
import edu.emory.mathcs.backport.java.util.concurrent.Callable; import edu.emory.mathcs.backport.java.util.concurrent.Callable;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException;
@ -60,7 +59,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
* *
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
final public class JournalImpl implements Journal, Disposable { final public class JournalImpl implements Journal {
public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5))); public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5)));
public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4))); public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4)));

View File

@ -21,9 +21,9 @@ import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import org.activeio.Packet;
import org.activeio.PacketData;
import org.activeio.journal.RecordLocation; import org.activeio.journal.RecordLocation;
import org.activeio.packet.Packet;
import org.activeio.packet.PacketData;
/** /**
* Defines a where a record can be located in the Journal. * Defines a where a record can be located in the Journal.

View File

@ -24,14 +24,12 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import org.activeio.Disposable;
/** /**
* Allows read/append access to a LogFile. * Allows read/append access to a LogFile.
* *
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
final public class LogFile implements Disposable { final public class LogFile {
private final RandomAccessFile file; private final RandomAccessFile file;
private final FileChannel channel; private final FileChannel channel;

View File

@ -27,12 +27,12 @@ import java.nio.ByteBuffer;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.util.HashMap; import java.util.HashMap;
import org.activeio.Packet;
import org.activeio.adapter.PacketOutputStream; import org.activeio.adapter.PacketOutputStream;
import org.activeio.adapter.PacketToInputStream; import org.activeio.adapter.PacketToInputStream;
import org.activeio.journal.InvalidRecordLocationException; import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.Packet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;

View File

@ -24,11 +24,10 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import org.activeio.Disposable;
import org.activeio.Packet;
import org.activeio.adapter.PacketToInputStream;
import org.activeio.adapter.PacketOutputStream; import org.activeio.adapter.PacketOutputStream;
import org.activeio.adapter.PacketToInputStream;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
/** /**
@ -36,7 +35,7 @@ import org.activeio.packet.ByteArrayPacket;
* *
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
final public class Record implements Disposable { final public class Record {
static final public int RECORD_HEADER_SIZE=8+Location.SERIALIZED_SIZE; static final public int RECORD_HEADER_SIZE=8+Location.SERIALIZED_SIZE;
static final public int RECORD_FOOTER_SIZE=12+Location.SERIALIZED_SIZE; static final public int RECORD_FOOTER_SIZE=12+Location.SERIALIZED_SIZE;

View File

@ -20,12 +20,12 @@ package org.activeio.journal.howl;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.activeio.Packet;
import org.activeio.journal.InvalidRecordLocationException; import org.activeio.journal.InvalidRecordLocationException;
import org.activeio.journal.Journal; import org.activeio.journal.Journal;
import org.activeio.journal.JournalEventListener; import org.activeio.journal.JournalEventListener;
import org.activeio.journal.RecordLocation; import org.activeio.journal.RecordLocation;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
import org.objectweb.howl.log.Configuration; import org.objectweb.howl.log.Configuration;
import org.objectweb.howl.log.InvalidFileSetException; import org.objectweb.howl.log.InvalidFileSetException;
import org.objectweb.howl.log.InvalidLogBufferException; import org.objectweb.howl.log.InvalidLogBufferException;

View File

@ -1,226 +0,0 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activeio.net;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import org.activeio.ByteSequence;
import org.activeio.Packet;
import org.activeio.SyncChannel;
import org.activeio.SyncChannelServer;
import org.activeio.adapter.OutputStreamChannelToOutputStream;
import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.EOSPacket;
import org.activeio.packet.EmptyPacket;
/**
* A {@see org.activeio.SynchChannel} implementation that uses a {@see java.net.Socket}
* to talk to the network.
*
* @version $Revision$
*/
public class SocketSyncChannel implements SyncChannel, SocketMetadata {
protected static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
private final SocketStreamChannel channel;
private Packet inputPacket;
private final OutputStreamChannelToOutputStream outputStream;
protected SocketSyncChannel(Socket socket) throws IOException {
this(new SocketStreamChannel(socket));
}
public SocketSyncChannel(SocketStreamChannel channel) throws IOException {
this.channel = channel;
outputStream = new OutputStreamChannelToOutputStream(channel);
setReceiveBufferSize(DEFAULT_BUFFER_SIZE);
setSendBufferSize(DEFAULT_BUFFER_SIZE);
}
/**
* @see org.activeio.SynchChannel#read(long)
*/
synchronized public org.activeio.Packet read(long timeout) throws IOException {
try {
if( timeout==SyncChannelServer.WAIT_FOREVER_TIMEOUT )
setSoTimeout( 0 );
else if( timeout==SyncChannelServer.NO_WAIT_TIMEOUT )
setSoTimeout( 1 );
else
setSoTimeout( (int)timeout );
if( inputPacket==null || !inputPacket.hasRemaining() ) {
inputPacket = allocatePacket();
}
ByteSequence sequence = inputPacket.asByteSequence();
int size = channel.read(sequence.getData(), sequence.getOffset(), sequence.getLength());
if( size == -1 )
return EOSPacket.EOS_PACKET;
if( size == 0 )
return EmptyPacket.EMPTY_PACKET;
inputPacket.position(size);
Packet remaining = inputPacket.slice();
inputPacket.flip();
Packet data = inputPacket.slice();
// Keep the remaining buffer around to fill with data.
inputPacket = remaining;
return data;
} catch (SocketTimeoutException e) {
return null;
}
}
private Packet allocatePacket() {
byte[] data = new byte[DEFAULT_BUFFER_SIZE];
return new ByteArrayPacket(data);
}
protected void setSoTimeout(int i) throws SocketException {
channel.setSoTimeout(i);
}
/**
* @see org.activeio.Channel#write(org.activeio.Packet)
*/
public void write(Packet packet) throws IOException {
packet.writeTo(outputStream);
}
/**
* @see org.activeio.Channel#flush()
*/
public void flush() throws IOException {
channel.flush();
}
/**
* @see org.activeio.Disposable#dispose()
*/
public void dispose() {
channel.dispose();
}
public void start() throws IOException {
channel.start();
}
public void stop(long timeout) throws IOException {
channel.stop(timeout);
}
public InetAddress getInetAddress() {
return channel.getInetAddress();
}
public boolean getKeepAlive() throws SocketException {
return channel.getKeepAlive();
}
public InetAddress getLocalAddress() {
return channel.getLocalAddress();
}
public int getLocalPort() {
return channel.getLocalPort();
}
public SocketAddress getLocalSocketAddress() {
return channel.getLocalSocketAddress();
}
public boolean getOOBInline() throws SocketException {
return channel.getOOBInline();
}
public int getPort() {
return channel.getPort();
}
public int getReceiveBufferSize() throws SocketException {
return channel.getReceiveBufferSize();
}
public SocketAddress getRemoteSocketAddress() {
return channel.getRemoteSocketAddress();
}
public boolean getReuseAddress() throws SocketException {
return channel.getReuseAddress();
}
public int getSendBufferSize() throws SocketException {
return channel.getSendBufferSize();
}
public int getSoLinger() throws SocketException {
return channel.getSoLinger();
}
public int getSoTimeout() throws SocketException {
return channel.getSoTimeout();
}
public boolean getTcpNoDelay() throws SocketException {
return channel.getTcpNoDelay();
}
public int getTrafficClass() throws SocketException {
return channel.getTrafficClass();
}
public boolean isBound() {
return channel.isBound();
}
public boolean isClosed() {
return channel.isClosed();
}
public boolean isConnected() {
return channel.isConnected();
}
public void setKeepAlive(boolean on) throws SocketException {
channel.setKeepAlive(on);
}
public void setOOBInline(boolean on) throws SocketException {
channel.setOOBInline(on);
}
public void setReceiveBufferSize(int size) throws SocketException {
channel.setReceiveBufferSize(size);
}
public void setReuseAddress(boolean on) throws SocketException {
channel.setReuseAddress(on);
}
public void setSendBufferSize(int size) throws SocketException {
channel.setSendBufferSize(size);
}
public void setSoLinger(boolean on, int linger) throws SocketException {
channel.setSoLinger(on, linger);
}
public void setTcpNoDelay(boolean on) throws SocketException {
channel.setTcpNoDelay(on);
}
public void setTrafficClass(int tc) throws SocketException {
channel.setTrafficClass(tc);
}
public Object getAdapter(Class target) {
if( target.isAssignableFrom(getClass()) ) {
return this;
}
return channel.getAdapter(target);
}
public String toString() {
return channel.toString();
}
}

View File

@ -1,11 +0,0 @@
<html>
<head>
</head>
<body>
<p>
A set of Channel implementations that are implemented using the Socket and DatagramSocket classes.
</p>
</body>
</html>

View File

@ -19,7 +19,7 @@ package org.activeio.oneport;
import java.util.HashSet; import java.util.HashSet;
import org.activeio.Packet; import org.activeio.packet.Packet;
public class HttpRecognizer implements ProtocolRecognizer { public class HttpRecognizer implements ProtocolRecognizer {

View File

@ -17,7 +17,7 @@
**/ **/
package org.activeio.oneport; package org.activeio.oneport;
import org.activeio.Packet; import org.activeio.packet.Packet;
public class IIOPRecognizer implements ProtocolRecognizer { public class IIOPRecognizer implements ProtocolRecognizer {

View File

@ -22,18 +22,18 @@ import java.net.URI;
import java.util.Iterator; import java.util.Iterator;
import org.activeio.AcceptListener; import org.activeio.AcceptListener;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.AsyncChannelServer;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.FilterAsyncChannel;
import org.activeio.FilterAsyncChannelServer;
import org.activeio.Packet;
import org.activeio.SyncChannel;
import org.activeio.adapter.AsyncToSyncChannel; import org.activeio.adapter.AsyncToSyncChannel;
import org.activeio.adapter.SyncToAsyncChannel; import org.activeio.adapter.SyncToAsyncChannel;
import org.activeio.filter.PushbackSyncChannel;
import org.activeio.packet.AppendedPacket; import org.activeio.packet.AppendedPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.async.FilterAsyncChannel;
import org.activeio.packet.async.FilterAsyncChannelServer;
import org.activeio.packet.sync.SyncChannel;
import org.activeio.packet.sync.filter.PushbackSyncChannel;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -121,7 +121,7 @@ final public class OnePortAsyncChannelServer extends FilterAsyncChannelServer {
// packets wiil get // packets wiil get
// delivered. // delivered.
try { try {
stop(NO_WAIT_TIMEOUT); stop();
setAsyncChannelListener(null); setAsyncChannelListener(null);
} catch (IOException e) { } catch (IOException e) {
getAsyncChannelListener().onPacketError(e); getAsyncChannelListener().onPacketError(e);
@ -176,7 +176,7 @@ final public class OnePortAsyncChannelServer extends FilterAsyncChannelServer {
public void start() throws IOException { public void start() throws IOException {
started = true; started = true;
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
started = false; started = false;
} }

View File

@ -17,7 +17,7 @@
**/ **/
package org.activeio.oneport; package org.activeio.oneport;
import org.activeio.Packet; import org.activeio.packet.Packet;
/** /**
* *

View File

@ -17,7 +17,7 @@
**/ **/
package org.activeio.oneport; package org.activeio.oneport;
import org.activeio.Packet; import org.activeio.packet.Packet;
class UnknownRecognizer implements ProtocolRecognizer { class UnknownRecognizer implements ProtocolRecognizer {

View File

@ -15,14 +15,16 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.oneport; package org.activeio.oneport.jetty;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket; import java.net.ServerSocket;
import org.activeio.SyncChannelServer;
import org.activeio.adapter.AsyncToSyncChannelServer; import org.activeio.adapter.AsyncToSyncChannelServer;
import org.activeio.adapter.SyncChannelServerToServerSocket; import org.activeio.adapter.SyncChannelServerToServerSocket;
import org.activeio.oneport.HttpRecognizer;
import org.activeio.oneport.OnePortAsyncChannelServer;
import org.activeio.packet.sync.SyncChannelServer;
import org.mortbay.http.SocketListener; import org.mortbay.http.SocketListener;
import org.mortbay.util.InetAddrPort; import org.mortbay.util.InetAddrPort;

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.oneport; package org.activeio.oneport.openorb;
import org.omg.PortableInterceptor.ORBInitInfo; import org.omg.PortableInterceptor.ORBInitInfo;
import org.openorb.orb.pi.FeatureInitInfo; import org.openorb.orb.pi.FeatureInitInfo;

View File

@ -15,16 +15,18 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.oneport; package org.activeio.oneport.openorb;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import org.activeio.SyncChannelServer;
import org.activeio.adapter.AsyncToSyncChannelServer; import org.activeio.adapter.AsyncToSyncChannelServer;
import org.activeio.adapter.SyncChannelServerToServerSocket; import org.activeio.adapter.SyncChannelServerToServerSocket;
import org.activeio.oneport.IIOPRecognizer;
import org.activeio.oneport.OnePortAsyncChannelServer;
import org.activeio.packet.sync.SyncChannelServer;
import org.openorb.orb.net.SocketFactory; import org.openorb.orb.net.SocketFactory;
/** /**

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Appends two packets together. * Appends two packets together.
@ -154,7 +152,7 @@ final public class AppendedPacket implements Packet {
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
if( first.hasRemaining() ) { if( first.hasRemaining() ) {
@ -167,7 +165,7 @@ final public class AppendedPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
@ -183,7 +181,7 @@ final public class AppendedPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
if( first.hasRemaining() ) { if( first.hasRemaining() ) {
@ -196,7 +194,7 @@ final public class AppendedPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
int rc1 = first.write(data, offset, length); int rc1 = first.write(data, offset, length);

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that is directly backed by a <code>byte[]</code>. * Provides a Packet implementation that is directly backed by a <code>byte[]</code>.
@ -135,7 +133,7 @@ final public class ByteArrayPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
if( !(remaining > 0) ) if( !(remaining > 0) )
@ -147,7 +145,7 @@ final public class ByteArrayPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
if( !(remaining > 0) ) if( !(remaining > 0) )
@ -161,7 +159,7 @@ final public class ByteArrayPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
if( !(remaining > 0) ) if( !(remaining > 0) )
@ -173,7 +171,7 @@ final public class ByteArrayPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
if( !(remaining > 0) ) if( !(remaining > 0) )
@ -191,7 +189,7 @@ final public class ByteArrayPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#sliceAsBytes() * @see org.activeio.packet.Packet#sliceAsBytes()
*/ */
public byte[] sliceAsBytes() { public byte[] sliceAsBytes() {
if( buffer.length == remaining ) { if( buffer.length == remaining ) {

View File

@ -22,8 +22,6 @@ import java.io.OutputStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that is backed by a {@see java.nio.ByteBuffer} * Provides a Packet implementation that is backed by a {@see java.nio.ByteBuffer}
@ -174,7 +172,7 @@ final public class ByteBufferPacket implements Packet {
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
if( !buffer.hasRemaining() ) if( !buffer.hasRemaining() )
@ -183,7 +181,7 @@ final public class ByteBufferPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
if( !hasRemaining() ) if( !hasRemaining() )
@ -195,7 +193,7 @@ final public class ByteBufferPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
if( !buffer.hasRemaining() ) if( !buffer.hasRemaining() )
@ -205,7 +203,7 @@ final public class ByteBufferPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
if( !hasRemaining() ) if( !hasRemaining() )
@ -217,7 +215,7 @@ final public class ByteBufferPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#asByteSequence() * @see org.activeio.packet.Packet#asByteSequence()
*/ */
public ByteSequence asByteSequence() { public ByteSequence asByteSequence() {
if( buffer.hasArray() ) { if( buffer.hasArray() ) {
@ -229,7 +227,7 @@ final public class ByteBufferPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#sliceAsBytes() * @see org.activeio.packet.Packet#sliceAsBytes()
*/ */
public byte[] sliceAsBytes() { public byte[] sliceAsBytes() {
// TODO Auto-generated method stub // TODO Auto-generated method stub

View File

@ -17,7 +17,6 @@
**/ **/
package org.activeio.packet; package org.activeio.packet;
import org.activeio.Packet;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View File

@ -21,8 +21,6 @@ import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that is directly backed by a <code>byte</code>. * Provides a Packet implementation that is directly backed by a <code>byte</code>.
@ -120,7 +118,7 @@ final public class BytePacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
if( !hasRemaining() ) if( !hasRemaining() )
@ -130,7 +128,7 @@ final public class BytePacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
if( !hasRemaining() ) if( !hasRemaining() )
@ -145,7 +143,7 @@ final public class BytePacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
if( !hasRemaining() ) if( !hasRemaining() )
@ -157,7 +155,7 @@ final public class BytePacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
if( !hasRemaining() ) if( !hasRemaining() )
@ -176,7 +174,7 @@ final public class BytePacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#sliceAsBytes() * @see org.activeio.packet.Packet#sliceAsBytes()
*/ */
public byte[] sliceAsBytes() { public byte[] sliceAsBytes() {
return null; return null;

View File

@ -17,7 +17,7 @@
* *
**/ **/
package org.activeio; package org.activeio.packet;
public class ByteSequence { public class ByteSequence {
final byte[] data; final byte[] data;

View File

@ -20,8 +20,6 @@ import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that is used to represent the end of a stream. * Provides a Packet implementation that is used to represent the end of a stream.
@ -93,28 +91,28 @@ final public class EOSPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
return -1; return -1;
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
return -1; return -1;
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
return false; return false;
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
return -1; return -1;

View File

@ -20,8 +20,6 @@ import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that is directly backed by a <code>byte[0]</code>. * Provides a Packet implementation that is directly backed by a <code>byte[0]</code>.
@ -95,28 +93,28 @@ final public class EmptyPacket implements Packet {
} }
/** /**
* @see org.activeio.Packet#read() * @see org.activeio.packet.Packet#read()
*/ */
public int read() { public int read() {
return -1; return -1;
} }
/** /**
* @see org.activeio.Packet#read(byte[], int, int) * @see org.activeio.packet.Packet#read(byte[], int, int)
*/ */
public int read(byte[] data, int offset, int length) { public int read(byte[] data, int offset, int length) {
return -1; return -1;
} }
/** /**
* @see org.activeio.Packet#write(int) * @see org.activeio.packet.Packet#write(int)
*/ */
public boolean write(int data) { public boolean write(int data) {
return false; return false;
} }
/** /**
* @see org.activeio.Packet#write(byte[], int, int) * @see org.activeio.packet.Packet#write(byte[], int, int)
*/ */
public int write(byte[] data, int offset, int length) { public int write(byte[] data, int offset, int length) {
return -1; return -1;

View File

@ -20,8 +20,6 @@ import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.ByteSequence;
import org.activeio.Packet;
/** /**
* Provides a Packet implementation that filters operations to another packet. * Provides a Packet implementation that filters operations to another packet.

View File

@ -14,18 +14,20 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import org.activeio.Adaptable;
/** /**
* Provides a ByteBuffer like interface to work with IO channel packets of data. * Provides a ByteBuffer like interface to work with IO channel packets of data.
* *
* @version $Revision$ * @version $Revision$
*/ */
public interface Packet extends Disposable, Adaptable { public interface Packet extends Adaptable {
public int position(); public int position();
public void position(int position); public void position(int position);
@ -40,6 +42,7 @@ public interface Packet extends Disposable, Adaptable {
public Packet duplicate(); public Packet duplicate();
public Object duplicate(ClassLoader cl) throws IOException; public Object duplicate(ClassLoader cl) throws IOException;
public int capacity(); public int capacity();
public void dispose();
public ByteSequence asByteSequence(); public ByteSequence asByteSequence();
public byte[] sliceAsBytes(); public byte[] sliceAsBytes();

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio; package org.activeio.packet;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;

View File

@ -19,9 +19,6 @@ package org.activeio.packet;
import java.util.ArrayList; import java.util.ArrayList;
import org.activeio.Disposable;
import org.activeio.Packet;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/** /**
@ -30,7 +27,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
* *
* @version $Revision: 1.1 $ * @version $Revision: 1.1 $
*/ */
abstract public class PacketPool implements Disposable { abstract public class PacketPool {
public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5))); public static final int DEFAULT_POOL_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPoolSize", ""+(5)));
public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4))); public static final int DEFAULT_PACKET_SIZE = Integer.parseInt(System.getProperty("org.activeio.journal.active.DefaultPacketSize", ""+(1024*1024*4)));

View File

@ -15,14 +15,34 @@
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import java.io.IOException; import java.io.IOException;
import org.activeio.Channel;
import org.activeio.packet.Packet;
/** /**
* AsyncChannel objects asynchronously push 'up' {@see org.activeio.Packet} objects
* to a registered {@see org.activeio.ChannelConsumer}.
*
* @version $Revision$ * @version $Revision$
*/ */
public interface OutputChannel extends Channel { public interface AsyncChannel extends Channel {
/**
* Registers the {@see ChannelConsumer} that the protcol will use to deliver packets
* coming 'up' the channel.
*
* @param packetListener
*/
void setAsyncChannelListener(AsyncChannelListener channelListener);
/**
* @return the registered Packet consumer
*/
AsyncChannelListener getAsyncChannelListener();
/** /**
* Sends a packet down the channel towards the media. * Sends a packet down the channel towards the media.
@ -38,4 +58,5 @@ public interface OutputChannel extends Channel {
* @throws IOException * @throws IOException
*/ */
void flush() throws IOException; void flush() throws IOException;
} }

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;

View File

@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import java.io.IOException; import java.io.IOException;
import org.activeio.packet.Packet;
/** /**
* A ChannelConsumer object is used to receive 'up' {@see org.activeio.Packet} objects. * A ChannelConsumer object is used to receive 'up' {@see org.activeio.Packet} objects.

View File

@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import org.activeio.AcceptListener;
import org.activeio.ChannelServer;
/** /**

View File

@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import java.io.IOException; import java.io.IOException;
import org.activeio.packet.Packet;
/** /**
* A AsyncChannelFilter can be used as a filter between a {@see org.activeio.AsyncChannel} * A AsyncChannelFilter can be used as a filter between a {@see org.activeio.AsyncChannel}
@ -74,21 +76,21 @@ public class FilterAsyncChannel implements AsyncChannel, AsyncChannelListener {
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
next.stop(timeout); next.stop();
} }
/** /**
* @see org.activeio.AsyncChannelListener#onPacket(org.activeio.Packet) * @see org.activeio.packet.async.AsyncChannelListener#onPacket(org.activeio.packet.Packet)
*/ */
public void onPacket(Packet packet) { public void onPacket(Packet packet) {
channelListener.onPacket(packet); channelListener.onPacket(packet);
} }
/** /**
* @see org.activeio.AsyncChannelListener#onPacketError(org.activeio.ChannelException) * @see org.activeio.packet.async.AsyncChannelListener#onPacketError(org.activeio.ChannelException)
*/ */
public void onPacketError(IOException error) { public void onPacketError(IOException error) {
channelListener.onPacketError(error); channelListener.onPacketError(error);

View File

@ -14,11 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.async;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.activeio.AcceptListener;
import org.activeio.Channel;
/** /**
* A AsyncChannelFilter can be used as a filter between a {@see org.activeio.AsyncChannel} * A AsyncChannelFilter can be used as a filter between a {@see org.activeio.AsyncChannel}
@ -66,10 +69,10 @@ public class FilterAsyncChannelServer implements AsyncChannelServer, AcceptListe
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
next.stop(timeout); next.stop();
} }
public void onAccept(Channel channel) { public void onAccept(Channel channel) {

View File

@ -16,7 +16,7 @@
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.aio;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -26,11 +26,12 @@ import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.Packet;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.stream.sync.socket.SocketMetadata;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
@ -89,7 +90,7 @@ final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener,
channelListener.onPacketError(new SocketException("Socket closed.")); channelListener.onPacketError(new SocketException("Socket closed."));
} }
try { try {
stop(NO_WAIT_TIMEOUT); stop();
} catch (IOException e) { } catch (IOException e) {
} }
try { try {
@ -105,16 +106,10 @@ final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener,
} }
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
if( running.compareAndSet(true, false) ) { if( running.compareAndSet(true, false) ) {
try { try {
if( timeout == NO_WAIT_TIMEOUT ) { doneCountDownLatch.await(5, TimeUnit.SECONDS);
doneCountDownLatch.await(0, TimeUnit.MILLISECONDS);
} else if( timeout == WAIT_FOREVER_TIMEOUT ) {
doneCountDownLatch.await();
} else {
doneCountDownLatch.await(timeout, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(); throw new InterruptedIOException();
} }
@ -259,7 +254,11 @@ final public class AIOAsyncChannel implements AsyncChannel, ICompletionListener,
public void setTrafficClass(int tc) throws SocketException { public void setTrafficClass(int tc) throws SocketException {
socket.setTrafficClass(tc); socket.setTrafficClass(tc);
} }
public void setSoTimeout(int i) throws SocketException {
socket.setSoTimeout(i);
}
public String toString() { public String toString() {
return "AIO Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress(); return "AIO Connection: "+getLocalSocketAddress()+" -> "+getRemoteSocketAddress();
} }
} }

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.aio;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -23,12 +23,13 @@ import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelFactory;
import org.activeio.AsyncChannelServer;
import org.activeio.adapter.SyncToAsyncChannelServer; import org.activeio.adapter.SyncToAsyncChannelServer;
import org.activeio.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.async.filter.WriteBufferedAsyncChannel;
import org.activeio.util.URISupport;
import com.ibm.io.async.AsyncServerSocketChannel; import com.ibm.io.async.AsyncServerSocketChannel;
import com.ibm.io.async.AsyncSocketChannel; import com.ibm.io.async.AsyncSocketChannel;

View File

@ -16,18 +16,18 @@
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.aio;
import java.io.IOException; import java.io.IOException;
import java.net.SocketException; import java.net.SocketException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
import org.activeio.AsyncChannel;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.SyncChannelServer;
import org.activeio.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.sync.SyncChannelServer;
import com.ibm.io.async.AsyncServerSocketChannel; import com.ibm.io.async.AsyncServerSocketChannel;
@ -66,7 +66,7 @@ public class AIOSyncChannelServer implements SyncChannelServer {
synchronized public void start() throws IOException { synchronized public void start() throws IOException {
} }
synchronized public void stop(long timeout) { synchronized public void stop() {
} }
public Channel accept(long timeout) throws IOException { public Channel accept(long timeout) throws IOException {

View File

@ -1,4 +1,4 @@
package org.activeio.filter; package org.activeio.packet.async.filter;
import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue; import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch; import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
@ -8,10 +8,10 @@ import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
import org.activeio.AsyncChannel;
import org.activeio.ChannelFactory; import org.activeio.ChannelFactory;
import org.activeio.FilterAsyncChannel; import org.activeio.packet.Packet;
import org.activeio.Packet; import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.FilterAsyncChannel;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -133,7 +133,7 @@ public class AsyncWriteAsyncChannel extends FilterAsyncChannel {
flush(NO_WAIT_TIMEOUT); flush(NO_WAIT_TIMEOUT);
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
flush(WAIT_FOREVER_TIMEOUT); flush(WAIT_FOREVER_TIMEOUT);
} }

View File

@ -14,13 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio.filter; package org.activeio.packet.async.filter;
import java.io.IOException; import java.io.IOException;
import org.activeio.AsyncChannel; import org.activeio.packet.Packet;
import org.activeio.FilterAsyncChannel; import org.activeio.packet.async.AsyncChannel;
import org.activeio.Packet; import org.activeio.packet.async.FilterAsyncChannel;
/** /**
@ -48,7 +48,7 @@ final public class CounterAsyncChannel extends FilterAsyncChannel {
} }
/** /**
* @see org.activeio.FilterAsyncChannel#onPacket(org.activeio.Packet) * @see org.activeio.packet.async.FilterAsyncChannel#onPacket(org.activeio.packet.Packet)
*/ */
public void onPacket(Packet packet) { public void onPacket(Packet packet) {
inBoundCounter += packet.remaining(); inBoundCounter += packet.remaining();
@ -56,7 +56,7 @@ final public class CounterAsyncChannel extends FilterAsyncChannel {
} }
/** /**
* @see org.activeio.FilterAsyncChannel#write(org.activeio.Packet) * @see org.activeio.packet.async.FilterAsyncChannel#write(org.activeio.packet.Packet)
*/ */
public void write(Packet packet) throws IOException { public void write(Packet packet) throws IOException {
outBoundCounter += packet.position(); outBoundCounter += packet.position();

View File

@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio.filter; package org.activeio.packet.async.filter;
import java.io.IOException; import java.io.IOException;
import org.activeio.AsyncChannel; import org.activeio.packet.Packet;
import org.activeio.FilterAsyncChannel; import org.activeio.packet.async.AsyncChannel;
import org.activeio.Packet; import org.activeio.packet.async.FilterAsyncChannel;
import org.activeio.util.PacketAggregator;
/** /**
* This PacketAggregatingAsyncChannel can be used when the client is sending a * This PacketAggregatingAsyncChannel can be used when the client is sending a

View File

@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio.filter; package org.activeio.packet.async.filter;
import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock; import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock;
import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock;
import org.activeio.AsyncChannel; import org.activeio.packet.Packet;
import org.activeio.FilterAsyncChannel; import org.activeio.packet.async.AsyncChannel;
import org.activeio.Packet; import org.activeio.packet.async.FilterAsyncChannel;
import java.io.IOException; import java.io.IOException;
@ -66,10 +66,6 @@ public class SynchornizedAsyncChannel extends FilterAsyncChannel {
} }
} }
synchronized public void dispose() {
super.dispose();
}
synchronized public Object getAdapter(Class target) { synchronized public Object getAdapter(Class target) {
return super.getAdapter(target); return super.getAdapter(target);
} }
@ -78,8 +74,8 @@ public class SynchornizedAsyncChannel extends FilterAsyncChannel {
super.start(); super.start();
} }
synchronized public void stop(long timeout) throws IOException { synchronized public void stop() throws IOException {
super.stop(timeout); super.stop();
} }
public Lock getWriteLock() { public Lock getWriteLock() {

View File

@ -14,14 +14,14 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio.filter; package org.activeio.packet.async.filter;
import java.io.IOException; import java.io.IOException;
import org.activeio.AsyncChannel;
import org.activeio.FilterAsyncChannel;
import org.activeio.Packet;
import org.activeio.packet.ByteArrayPacket; import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.FilterAsyncChannel;
/** /**
*/ */

View File

@ -16,7 +16,7 @@
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.nio;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
@ -25,14 +25,15 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.ByteSequence;
import org.activeio.Packet;
import org.activeio.net.NIOAsyncChannelSelectorManager.SelectorManagerListener;
import org.activeio.net.NIOAsyncChannelSelectorManager.SocketChannelAsyncChannelSelection;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.ByteSequence;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SelectorManagerListener;
import org.activeio.packet.async.nio.NIOAsyncChannelSelectorManager.SocketChannelAsyncChannelSelection;
import org.activeio.packet.sync.nio.NIOBaseChannel;
/** /**
* @version $Revision$ * @version $Revision$
@ -173,7 +174,7 @@ final public class NIOAsyncChannel extends NIOBaseChannel implements AsyncChanne
selection.setInterestOps(SelectionKey.OP_READ); selection.setInterestOps(SelectionKey.OP_READ);
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
if( !running ) if( !running )
return; return;
running=false; running=false;

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.nio;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
@ -25,12 +25,13 @@ import java.net.URISyntaxException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelFactory;
import org.activeio.AsyncChannelServer;
import org.activeio.adapter.SyncToAsyncChannelServer; import org.activeio.adapter.SyncToAsyncChannelServer;
import org.activeio.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.packet.async.AsyncChannelServer;
import org.activeio.packet.async.filter.WriteBufferedAsyncChannel;
import org.activeio.util.URISupport;
/** /**
* A TcpAsyncChannelFactory creates {@see org.activeio.net.TcpAsyncChannel} * A TcpAsyncChannelFactory creates {@see org.activeio.net.TcpAsyncChannel}

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.activeio.net; package org.activeio.packet.async.nio;
import edu.emory.mathcs.backport.java.util.concurrent.Executor; import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor; import edu.emory.mathcs.backport.java.util.concurrent.ScheduledThreadPoolExecutor;

View File

@ -16,16 +16,18 @@
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.nio;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import org.activeio.AsyncChannel;
import org.activeio.Channel; import org.activeio.Channel;
import org.activeio.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.ByteBufferPacket; import org.activeio.packet.ByteBufferPacket;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.filter.WriteBufferedAsyncChannel;
import org.activeio.packet.sync.socket.SocketSyncChannelServer;
import org.activeio.stream.sync.socket.SocketStreamChannel;
/** /**
* A SynchChannelServer that creates * A SynchChannelServer that creates

View File

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.vmpipe;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
@ -27,11 +27,11 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.activeio.AsyncChannel; import org.activeio.packet.Packet;
import org.activeio.AsyncChannelFactory; import org.activeio.packet.async.AsyncChannel;
import org.activeio.AsyncChannelListener; import org.activeio.packet.async.AsyncChannelFactory;
import org.activeio.AsyncChannelServer; import org.activeio.packet.async.AsyncChannelListener;
import org.activeio.Packet; import org.activeio.packet.async.AsyncChannelServer;
/** /**
* *
@ -107,7 +107,7 @@ final public class VMPipeAsyncChannelFactory implements AsyncChannelFactory {
packetClazz = cl.loadClass(Packet.class.getName()); packetClazz = cl.loadClass(Packet.class.getName());
writeMethod = clazz.getMethod("write", new Class[] { packetClazz }); writeMethod = clazz.getMethod("write", new Class[] { packetClazz });
startMethod = clazz.getMethod("start", new Class[] { }); startMethod = clazz.getMethod("start", new Class[] { });
stopMethod = clazz.getMethod("stop", new Class[] { long.class }); stopMethod = clazz.getMethod("stop", new Class[] {});
disposeMethod = clazz.getMethod("dispose", new Class[] { }); disposeMethod = clazz.getMethod("dispose", new Class[] { });
setListenerMethod = clazz.getMethod("setAsyncChannelListener", new Class[] { listenerClazz }); setListenerMethod = clazz.getMethod("setAsyncChannelListener", new Class[] { listenerClazz });
@ -138,8 +138,8 @@ final public class VMPipeAsyncChannelFactory implements AsyncChannelFactory {
callIOExceptionMethod(startMethod, new Object[] {}); callIOExceptionMethod(startMethod, new Object[] {});
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
callIOExceptionMethod(stopMethod, new Object[] {new Long(timeout)}); callIOExceptionMethod(stopMethod, new Object[] {});
} }
private void callMethod(Method method, Object[] args) { private void callMethod(Method method, Object[] args) {

View File

@ -15,15 +15,15 @@
* limitations under the License. * limitations under the License.
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.vmpipe;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import org.activeio.AsyncChannel;
import org.activeio.AsyncChannelListener;
import org.activeio.Packet;
import org.activeio.packet.EOSPacket; import org.activeio.packet.EOSPacket;
import org.activeio.packet.Packet;
import org.activeio.packet.async.AsyncChannel;
import org.activeio.packet.async.AsyncChannelListener;
import edu.emory.mathcs.backport.java.util.concurrent.Semaphore; import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
@ -97,19 +97,11 @@ final public class VMPipeAsyncChannelPipe {
runMutext.release(); runMutext.release();
} }
public void stop(long timeout) throws IOException { public void stop() throws IOException {
if(!running) if(!running)
return; return;
try { try {
if( timeout == NO_WAIT_TIMEOUT ) { runMutext.tryAcquire(5, TimeUnit.SECONDS);
if( !runMutext.tryAcquire(0, TimeUnit.MILLISECONDS) )
return;
} else if( timeout == WAIT_FOREVER_TIMEOUT ) {
runMutext.acquire();
} else {
if( !runMutext.tryAcquire(timeout, TimeUnit.MILLISECONDS) )
return;
}
running=false; running=false;
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new InterruptedIOException(); throw new InterruptedIOException();

View File

@ -16,14 +16,14 @@
* *
**/ **/
package org.activeio.net; package org.activeio.packet.async.vmpipe;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.activeio.AcceptListener; import org.activeio.AcceptListener;
import org.activeio.AsyncChannel; import org.activeio.packet.async.AsyncChannel;
import org.activeio.AsyncChannelServer; import org.activeio.packet.async.AsyncChannelServer;
/** /**
* @version $Revision$ * @version $Revision$
@ -60,7 +60,7 @@ final public class VMPipeAsyncChannelServer implements AsyncChannelServer {
throw new IOException("acceptListener has not been set."); throw new IOException("acceptListener has not been set.");
} }
public void stop(long timeout) { public void stop() {
} }
public Object getAdapter(Class target) { public Object getAdapter(Class target) {

View File

@ -14,10 +14,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.sync;
import java.io.IOException; import java.io.IOException;
import org.activeio.packet.Packet;
/** /**
* A SynchChannelFilter can be used as a filter another {@see org.activeio.SynchChannel} * A SynchChannelFilter can be used as a filter another {@see org.activeio.SynchChannel}
@ -35,7 +37,7 @@ public class FilterSyncChannel implements SyncChannel {
} }
/** /**
* @see org.activeio.Channel#write(org.activeio.Packet) * @see org.activeio.Channel#write(org.activeio.packet.Packet)
*/ */
public void write(Packet packet) throws IOException { public void write(Packet packet) throws IOException {
next.write(packet); next.write(packet);
@ -63,10 +65,10 @@ public class FilterSyncChannel implements SyncChannel {
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
next.stop(timeout); next.stop();
} }
/** /**
@ -77,7 +79,7 @@ public class FilterSyncChannel implements SyncChannel {
} }
/** /**
* @see org.activeio.SyncChannel#read(long) * @see org.activeio.packet.sync.SyncChannel#read(long)
*/ */
public Packet read(long timeout) throws IOException { public Packet read(long timeout) throws IOException {
return next.read(timeout); return next.read(timeout);

View File

@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.sync;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import org.activeio.Channel;
/** /**
* A SynchChannelFilter can be used as a filter another {@see org.activeio.SynchChannel} * A SynchChannelFilter can be used as a filter another {@see org.activeio.SynchChannel}
@ -50,10 +52,10 @@ public class FilterSyncChannelServer implements SyncChannelServer {
} }
/** /**
* @see org.activeio.Service#stop(long) * @see org.activeio.Service#stop()
*/ */
public void stop(long timeout) throws IOException { public void stop() throws IOException {
next.stop(timeout); next.stop();
} }
/** /**
@ -64,7 +66,7 @@ public class FilterSyncChannelServer implements SyncChannelServer {
} }
/** /**
* @see org.activeio.SyncChannelServer#accept(long) * @see org.activeio.packet.sync.SyncChannelServer#accept(long)
*/ */
public Channel accept(long timeout) throws IOException { public Channel accept(long timeout) throws IOException {
return next.accept(timeout); return next.accept(timeout);

View File

@ -0,0 +1,59 @@
/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.activeio.packet.sync;
import java.io.IOException;
import org.activeio.Channel;
import org.activeio.packet.Packet;
/**
* SyncChannel objects allow threaded to synchronously block on the <code>receiveUpPacket</code>
* method to get 'up' {@see org.activeio.Packet} objects when they arrive.
*
* @version $Revision$
*/
public interface SyncChannel extends Channel {
/**
* Used to synchronously receive a packet of information going 'up' the channel.
* This method blocks until a packet is received or the operation experiences timeout.
*
* @param timeout
* @return the packet received or null if the timeout occurred.
* @throws IOException
*/
Packet read(long timeout) throws IOException;
/**
* Sends a packet down the channel towards the media.
*
* @param packet
* @throws IOException
*/
void write(Packet packet) throws IOException;
/**
* Some channels may buffer data which may be sent down if flush() is called.
*
* @throws IOException
*/
void flush() throws IOException;
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.sync;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;

View File

@ -14,10 +14,13 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.activeio; package org.activeio.packet.sync;
import java.io.IOException; import java.io.IOException;
import org.activeio.Channel;
import org.activeio.ChannelServer;
/** /**

View File

@ -4,7 +4,7 @@
* TODO To change the template for this generated file go to * TODO To change the template for this generated file go to
* Window - Preferences - Java - Code Style - Code Templates * Window - Preferences - Java - Code Style - Code Templates
*/ */
package org.activeio.net; package org.activeio.packet.sync.datagram;
import java.net.DatagramPacket; import java.net.DatagramPacket;
import java.net.InetAddress; import java.net.InetAddress;

Some files were not shown because too many files have changed in this diff Show More