480827 Implemented Unix Domain Socket Connector

Squashed commit of the following:

commit fbb680ba7cdb73495524ea9e5b0f49caee7f68d8
Merge: ed2550b 64ea0db
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Nov 6 10:44:00 2015 +1100

    Merge branch 'master' into unix-socket

commit ed2550b50f978e1984e19fbda642baf450dbe6a1
Merge: 88d7b35 de137ab
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Nov 5 18:41:25 2015 +1100

    Merge branch 'master' into unix-socket

commit 88d7b35f885de2da7dec836e6e4ae2b522d517f0
Merge: 7d4155f 3e2658a
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Nov 5 17:38:27 2015 +1100

    Merge branch 'master' into unix-socket

commit 7d4155fe4a407d493683b66709bc638879b0b422
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Nov 5 17:24:48 2015 +1100

    Unix socket configuration

commit 2737b19f73ad153c20e1762874558a5d62849f90
Merge: cd0cc2e 92cc44c
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Nov 4 15:13:40 2015 +1100

    Merge branch 'master' into unix-socket

    Conflicts:
    	jetty-server/src/main/java/org/eclipse/jetty/server/SecureRequestCustomizer.java

commit cd0cc2ef36a558d948bf26aff4f9e3519da2f823
Merge: 639753b 303f98e
Author: Greg Wilkins <gregw@webtide.com>
Date:   Mon Nov 2 12:21:19 2015 +1100

    Merge branch 'master' into unix-socket

commit 639753b5ed37778d7231acfe9d52039aed66351e
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 30 15:42:58 2015 +1100

    local connectors report NOIP address

commit 6d38c4195f145b99f775a06d546960b119094b0c
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 30 12:11:13 2015 +1100

    Gentler ssl close test

commit 40b46b66a738c9187f859d522a0165bb09b113c8
Merge: b7eb082 fa53b11
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 30 11:09:42 2015 +1100

    Merge branch 'master' into unix-socket

commit b7eb082be44864c058b6f01b10364013596d3650
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 30 08:34:30 2015 +1100

    SecureRequestCustomizer for UnixSockets

commit edbb3c080200b4c6aa2836eff6c81bf31a73a8c1
Merge: 90e8cc0 de7ac72
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 29 19:47:05 2015 +1100

    Merge branch 'master' into unix-socket

commit 90e8cc060ff6dc4b249818db38334ffa543f002f
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 29 19:31:50 2015 +1100

    use proxy connection

commit b1772ef5dcac9ddf9bb5ecda1cda6d038ca21755
Merge: e95a932 9fe7332
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 29 16:46:29 2015 +1100

    Merge branch 'master' into unix-socket

commit e95a932bda5a96bf98ada4fb47664790af2aa0a2
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Oct 28 15:58:25 2015 +1100

    fixed config

commit f8963b2ed6b6b4b96f1d9403194c9d50ab1f12fd
Merge: 9c56b53 8b27484
Author: Greg Wilkins <gregw@webtide.com>
Date:   Wed Oct 28 15:34:28 2015 +1100

    Merge branch 'master' into unix-socket

commit 9c56b53cbec20d98e5cb05cf8d1f668fe84b95e0
Merge: 298a311 39768f8
Author: Greg Wilkins <gregw@webtide.com>
Date:   Mon Oct 26 13:04:34 2015 +1100

    Merge branch 'master' into unix-socket

commit 298a311af952ad3ef5d1c7635deabf47bddaa1c6
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 15:24:19 2015 +1100

    Async accepting

commit 8266753d124c04ec8bca8aa02be1ef3d826d6769
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 14:45:35 2015 +1100

    set acceptors

commit 6a56c5b9e19063fcb95cdc1228adf723c0d41362
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 11:44:54 2015 +1100

    licence

commit d80e5748e6c4327bae57de2af01983990dd2afe0
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 11:24:14 2015 +1100

    unixsocket work in progress adding modules

commit cf0c1153d9966cc7182ba29411527b4341c34da6
Merge: 645b671 c39bfa2
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 08:52:01 2015 +1100

    Merge branch 'master' into unix-socket

commit 645b6712d37282c26011a95c8c98e45c249b2e5f
Author: Greg Wilkins <gregw@webtide.com>
Date:   Fri Oct 23 08:49:52 2015 +1100

    IP headers

commit 67b210b9ea81af68ce3848a114bbbd1b80a8ca52
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 22 23:48:57 2015 +1100

    working with haproxy

commit e4fce9cf5d896a9a29a7c6280fcaa0336dfcc0f2
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 22 18:25:09 2015 +1100

    unixsocket working

commit 6f013788cad44e6641cd89720c5e7f3652cdc257
Author: Greg Wilkins <gregw@webtide.com>
Date:   Thu Oct 22 15:39:55 2015 +1100

    work in progress
This commit is contained in:
Greg Wilkins 2015-11-06 11:17:46 +11:00
parent 64ea0db9b3
commit 7843b7348e
69 changed files with 1819 additions and 699 deletions

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Map;
@ -31,6 +32,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
@ -173,13 +175,15 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key)
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
return new SelectChannelEndPoint(channel, selector, key, getScheduler(), client.getIdleTimeout());
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(client.getIdleTimeout());
return endp;
}
@Override
public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException
public org.eclipse.jetty.io.Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
@ -188,7 +192,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable x, Object attachment)
protected void connectionFailed(SelectableChannel channel, Throwable x, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;

View File

@ -54,10 +54,10 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.ssl.SslBytesTest.TLSRecord.Type;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConnection;
@ -173,9 +173,9 @@ public class SslBytesServerTest extends SslBytesTest
ServerConnector connector = new ServerConnector(server, null,null,null,1,1,sslFactory, httpFactory)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
SelectChannelEndPoint endp = super.newEndPoint(channel,selectSet,key);
ChannelEndPoint endp = super.newEndPoint(channel,selectSet,key);
serverEndPoint.set(endp);
return endp;
}
@ -367,11 +367,19 @@ public class SslBytesServerTest extends SslBytesTest
System.arraycopy(doneBytes, 0, chunk, recordBytes.length, doneBytes.length);
System.arraycopy(closeRecordBytes, 0, chunk, recordBytes.length + doneBytes.length, closeRecordBytes.length);
proxy.flushToServer(0, chunk);
// Close the raw socket
proxy.flushToServer(null);
// Expect the server to send a FIN as well
record = proxy.readFromServer();
if (record!=null)
{
// Close alert snuck out // TODO check if this is acceptable
Assert.assertEquals(Type.ALERT,record.getType());
record = proxy.readFromServer();
}
Assert.assertNull(record);
// Check that we did not spin

View File

@ -1,3 +1,4 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.io.ChannelEndPoint.LEVEL=DEBUG

View File

@ -710,6 +710,11 @@
<artifactId>jetty-proxy</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-unixsocket</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.fcgi</groupId>
<artifactId>fcgi-server</artifactId>

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
@ -36,8 +37,8 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -318,13 +319,15 @@ public class HTTP2Client extends ContainerLifeCycle
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
endp.setIdleTimeout(getIdleTimeout());
return endp;
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
@ -335,7 +338,7 @@ public class HTTP2Client extends ContainerLifeCycle
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable failure, Object attachment)
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;

View File

@ -57,6 +57,7 @@ import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
@ -329,7 +330,7 @@ public class HTTP2ServerTest extends AbstractServerTest
ServerConnector connector2 = new ServerConnector(server, new HTTP2ServerConnectionFactory(new HttpConfiguration()))
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{

View File

@ -19,9 +19,9 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -31,10 +31,10 @@ import org.eclipse.jetty.util.thread.Scheduler;
public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
{
enum State {OPEN, ISHUTTING, ISHUT, OSHUTTING, OSHUT, CLOSED};
private static final Logger LOG = Log.getLogger(AbstractEndPoint.class);
private final AtomicReference<State> _state = new AtomicReference<>(State.OPEN);
private final long _created=System.currentTimeMillis();
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
private volatile Connection _connection;
private final FillInterest _fillInterest = new FillInterest()
@ -55,11 +55,231 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
}
};
protected AbstractEndPoint(Scheduler scheduler,InetSocketAddress local,InetSocketAddress remote)
protected AbstractEndPoint(Scheduler scheduler)
{
super(scheduler);
_local=local;
_remote=remote;
}
protected final void shutdownInput()
{
while(true)
{
State s = _state.get();
switch(s)
{
case OPEN:
if (!_state.compareAndSet(s,State.ISHUTTING))
continue;
try
{
doShutdownInput();
}
finally
{
if(!_state.compareAndSet(State.ISHUTTING,State.ISHUT))
{
// If somebody else switched to CLOSED while we were ishutting,
// then we do the close for them
if (_state.get()==State.CLOSED)
doOnClose();
else
throw new IllegalStateException();
}
}
return;
case ISHUTTING: // Somebody else ishutting
case ISHUT: // Already ishut
return;
case OSHUTTING:
if (!_state.compareAndSet(s,State.CLOSED))
continue;
// The thread doing the OSHUT will close
return;
case OSHUT:
if (!_state.compareAndSet(s,State.CLOSED))
continue;
// Already OSHUT so we close
doOnClose();
return;
case CLOSED: // already closed
return;
}
}
}
@Override
public final void shutdownOutput()
{
while(true)
{
State s = _state.get();
switch(s)
{
case OPEN:
if (!_state.compareAndSet(s,State.OSHUTTING))
continue;
try
{
doShutdownOutput();
}
finally
{
if(!_state.compareAndSet(State.OSHUTTING,State.OSHUT))
{
// If somebody else switched to CLOSED while we were oshutting,
// then we do the close for them
if (_state.get()==State.CLOSED)
doOnClose();
else
throw new IllegalStateException();
}
}
return;
case ISHUTTING:
if (!_state.compareAndSet(s,State.CLOSED))
continue;
// The thread doing the ISHUT will close
return;
case ISHUT:
if (!_state.compareAndSet(s,State.CLOSED))
continue;
// Already ISHUT so we close
doOnClose();
return;
case OSHUTTING: // Somebody else oshutting
case OSHUT: // Already oshut
return;
case CLOSED: // already closed
return;
}
}
}
@Override
public final void close()
{
while(true)
{
State s = _state.get();
switch(s)
{
case OPEN:
case ISHUT: // Already ishut
case OSHUT: // Already oshut
if (!_state.compareAndSet(s,State.CLOSED))
continue;
doOnClose();
return;
case ISHUTTING: // Somebody else ishutting
case OSHUTTING: // Somebody else oshutting
if (!_state.compareAndSet(s,State.CLOSED))
continue;
// The thread doing the IO SHUT will call doOnClose
return;
case CLOSED: // already closed
return;
}
}
}
protected void doShutdownInput()
{}
protected void doShutdownOutput()
{}
protected void doClose()
{}
private void doOnClose()
{
try
{
doClose();
}
finally
{
onClose();
}
}
@Override
public boolean isOutputShutdown()
{
switch(_state.get())
{
case CLOSED:
case OSHUT:
case OSHUTTING:
return true;
default:
return false;
}
}
@Override
public boolean isInputShutdown()
{
switch(_state.get())
{
case CLOSED:
case ISHUT:
case ISHUTTING:
return true;
default:
return false;
}
}
@Override
public boolean isOpen()
{
switch(_state.get())
{
case CLOSED:
return false;
default:
return true;
}
}
public void checkFlush() throws IOException
{
State s=_state.get();
switch(s)
{
case OSHUT:
case OSHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
}
public void checkFill() throws IOException
{
State s=_state.get();
switch(s)
{
case ISHUT:
case ISHUTTING:
case CLOSED:
throw new IOException(s.toString());
default:
break;
}
}
@Override
@ -68,18 +288,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
return _created;
}
@Override
public InetSocketAddress getLocalAddress()
{
return _local;
}
@Override
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
@Override
public Connection getConnection()
{
@ -98,12 +306,22 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
return false;
}
protected void reset()
{
_state.set(State.OPEN);
_writeFlusher.onClose();
_fillInterest.onClose();
}
@Override
public void onOpen()
{
if (LOG.isDebugEnabled())
LOG.debug("onOpen {}",this);
super.onOpen();
if (_state.get()!=State.OPEN)
throw new IllegalStateException();
}
@Override
@ -116,12 +334,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
_fillInterest.onClose();
}
@Override
public void close()
{
onClose();
}
@Override
public void fillInterested(Callback callback) throws IllegalStateException
{
@ -207,15 +419,13 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
c=c.getSuperclass();
name=c.getSimpleName();
}
return String.format("%s@%x{%s<->%d,%s,%s,%s,%s,%s,%d/%d,%s}",
return String.format("%s@%x{%s<->%s,%s,%s|%s,%d/%d,%s}",
name,
hashCode(),
getRemoteAddress(),
getLocalAddress().getPort(),
isOpen()?"Open":"CLOSED",
isInputShutdown()?"ISHUT":"in",
isOutputShutdown()?"OSHUT":"out",
getLocalAddress(),
_state.get(),
_fillInterest.toStateString(),
_writeFlusher.toStateString(),
getIdleFor(),

View File

@ -20,7 +20,10 @@ package org.eclipse.jetty.io;
import java.io.EOFException;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
@ -42,7 +45,28 @@ import org.eclipse.jetty.util.thread.Scheduler;
public class ByteArrayEndPoint extends AbstractEndPoint
{
static final Logger LOG = Log.getLogger(ByteArrayEndPoint.class);
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
static final InetAddress NOIP;
static final InetSocketAddress NOIPPORT;
static
{
InetAddress noip=null;
try
{
noip = Inet4Address.getByName("0.0.0.0");
}
catch (UnknownHostException e)
{
LOG.warn(e);
}
finally
{
NOIP=noip;
NOIPPORT=new InetSocketAddress(NOIP,0);
}
}
private static final ByteBuffer EOF = BufferUtil.allocate(0);
private final Runnable _runFillable = new Runnable()
@ -57,9 +81,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint
private final Locker _locker = new Locker();
private final Queue<ByteBuffer> _inQ = new ArrayQueue<>();
private ByteBuffer _out;
private boolean _ishut;
private boolean _oshut;
private boolean _closed;
private boolean _growOutput;
/* ------------------------------------------------------------ */
@ -112,11 +133,26 @@ public class ByteArrayEndPoint extends AbstractEndPoint
/* ------------------------------------------------------------ */
public ByteArrayEndPoint(Scheduler timer, long idleTimeoutMs, ByteBuffer input, ByteBuffer output)
{
super(timer,NOIP,NOIP);
super(timer);
if (BufferUtil.hasContent(input))
addInput(input);
_out=output==null?BufferUtil.allocate(1024):output;
setIdleTimeout(idleTimeoutMs);
onOpen();
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getLocalAddress()
{
return NOIPPORT;
}
/* ------------------------------------------------------------ */
@Override
public InetSocketAddress getRemoteAddress()
{
return NOIPPORT;
}
/* ------------------------------------------------------------ */
@ -138,7 +174,7 @@ public class ByteArrayEndPoint extends AbstractEndPoint
{
try(Locker.Lock lock = _locker.lock())
{
if (_closed)
if (!isOpen())
throw new ClosedChannelException();
ByteBuffer in = _inQ.peek();
@ -287,92 +323,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint
getWriteFlusher().completeWrite();
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#isOpen()
*/
@Override
public boolean isOpen()
{
try(Locker.Lock lock = _locker.lock())
{
return !_closed;
}
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public boolean isInputShutdown()
{
try(Locker.Lock lock = _locker.lock())
{
return _ishut||_closed;
}
}
/* ------------------------------------------------------------ */
/*
*/
@Override
public boolean isOutputShutdown()
{
try(Locker.Lock lock = _locker.lock())
{
return _oshut||_closed;
}
}
/* ------------------------------------------------------------ */
public void shutdownInput()
{
boolean close=false;
try(Locker.Lock lock = _locker.lock())
{
_ishut=true;
if (_oshut && !_closed)
close=_closed=true;
}
if (close)
super.close();
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#shutdownOutput()
*/
@Override
public void shutdownOutput()
{
boolean close=false;
try(Locker.Lock lock = _locker.lock())
{
_oshut=true;
if (_ishut && !_closed)
close=_closed=true;
}
if (close)
super.close();
}
/* ------------------------------------------------------------ */
/*
* @see org.eclipse.io.EndPoint#close()
*/
@Override
public void close()
{
boolean close=false;
try(Locker.Lock lock = _locker.lock())
{
if (!_closed)
close=_closed=_ishut=_oshut=true;
}
if (close)
super.close();
}
/* ------------------------------------------------------------ */
/**
* @return <code>true</code> if there are bytes remaining to be read from the encoded input
@ -390,15 +340,14 @@ public class ByteArrayEndPoint extends AbstractEndPoint
public int fill(ByteBuffer buffer) throws IOException
{
int filled=0;
boolean close=false;
try(Locker.Lock lock = _locker.lock())
{
while(true)
{
if (_closed)
if (!isOpen())
throw new EofException("CLOSED");
if (_ishut)
if (isInputShutdown())
return -1;
if (_inQ.isEmpty())
@ -407,9 +356,6 @@ public class ByteArrayEndPoint extends AbstractEndPoint
ByteBuffer in= _inQ.peek();
if (in==EOF)
{
_ishut=true;
if (_oshut)
close=_closed=true;
filled=-1;
break;
}
@ -425,10 +371,10 @@ public class ByteArrayEndPoint extends AbstractEndPoint
}
}
if (close)
super.close();
if (filled>0)
notIdle();
else if (filled<0)
shutdownInput();
return filled;
}
@ -439,9 +385,9 @@ public class ByteArrayEndPoint extends AbstractEndPoint
@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
if (_closed)
if (!isOpen())
throw new IOException("CLOSED");
if (_oshut)
if (isOutputShutdown())
throw new IOException("OSHUT");
boolean flushed=true;
@ -483,13 +429,12 @@ public class ByteArrayEndPoint extends AbstractEndPoint
*/
public void reset()
{
getFillInterest().onClose();
getWriteFlusher().onClose();
_ishut=false;
_oshut=false;
_closed=false;
_inQ.clear();
try(Locker.Lock lock = _locker.lock())
{
_inQ.clear();
}
BufferUtil.clear(_out);
super.reset();
}
/* ------------------------------------------------------------ */

View File

@ -19,37 +19,104 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.SelectionKey;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* Channel End Point.
* <p>Holds the channel and socket for an NIO endpoint.
*/
public class ChannelEndPoint extends AbstractEndPoint
public abstract class ChannelEndPoint extends AbstractEndPoint implements ManagedSelector.Selectable
{
private static final Logger LOG = Log.getLogger(ChannelEndPoint.class);
private final SocketChannel _channel;
private final Socket _socket;
private volatile boolean _ishut;
private volatile boolean _oshut;
private final Locker _locker = new Locker();
private final ByteChannel _channel;
private final GatheringByteChannel _gather;
protected final ManagedSelector _selector;
protected final SelectionKey _key;
public ChannelEndPoint(Scheduler scheduler,SocketChannel channel)
private boolean _updatePending;
/**
* The current value for {@link SelectionKey#interestOps()}.
*/
protected int _currentInterestOps;
/**
* The desired value for {@link SelectionKey#interestOps()}.
*/
protected int _desiredInterestOps;
private abstract class RunnableTask implements Runnable
{
super(scheduler,
(InetSocketAddress)channel.socket().getLocalSocketAddress(),
(InetSocketAddress)channel.socket().getRemoteSocketAddress());
final String _operation;
RunnableTask(String op)
{
_operation=op;
}
@Override
public String toString()
{
return ChannelEndPoint.this.toString()+":"+_operation;
}
}
private final Runnable _runUpdateKey = new RunnableTask("runUpdateKey")
{
@Override
public void run()
{
updateKey();
}
};
private final Runnable _runFillable = new RunnableTask("runFillable")
{
@Override
public void run()
{
getFillInterest().fillable();
}
};
private final Runnable _runCompleteWrite = new RunnableTask("runCompleteWrite")
{
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
};
private final Runnable _runFillableCompleteWrite = new RunnableTask("runFillableCompleteWrite")
{
@Override
public void run()
{
getFillInterest().fillable();
getWriteFlusher().completeWrite();
}
};
public ChannelEndPoint(ByteChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(scheduler);
_channel=channel;
_socket=channel.socket();
_selector=selector;
_key=key;
_gather=(channel instanceof GatheringByteChannel)?(GatheringByteChannel)channel:null;
}
@Override
@ -64,27 +131,16 @@ public class ChannelEndPoint extends AbstractEndPoint
return _channel.isOpen();
}
protected void shutdownInput()
{
if (LOG.isDebugEnabled())
LOG.debug("ishut {}", this);
_ishut=true;
if (_oshut)
close();
}
@Override
public void shutdownOutput()
public void doClose()
{
if (LOG.isDebugEnabled())
LOG.debug("oshut {}", this);
_oshut = true;
if (_channel.isOpen())
LOG.debug("doClose {}", this);
try
{
try
{
if (!_socket.isOutputShutdown())
_socket.shutdownOutput();
_channel.close();
}
catch (IOException e)
{
@ -92,51 +148,20 @@ public class ChannelEndPoint extends AbstractEndPoint
}
finally
{
if (_ishut)
{
close();
}
super.doClose();
}
}
}
@Override
public boolean isOutputShutdown()
{
return _oshut || !_channel.isOpen() || _socket.isOutputShutdown();
}
@Override
public boolean isInputShutdown()
{
return _ishut || !_channel.isOpen() || _socket.isInputShutdown();
}
@Override
public void close()
{
super.close();
if (LOG.isDebugEnabled())
LOG.debug("close {}", this);
try
{
_channel.close();
}
catch (IOException e)
{
LOG.debug(e);
}
finally
{
_ishut=true;
_oshut=true;
if (_selector!=null)
_selector.onClose(this);
}
}
@Override
public int fill(ByteBuffer buffer) throws IOException
{
if (_ishut)
if (isInputShutdown())
return -1;
int pos=BufferUtil.flipToFill(buffer);
@ -173,8 +198,8 @@ public class ChannelEndPoint extends AbstractEndPoint
{
if (buffers.length==1)
flushed=_channel.write(buffers[0]);
else if (buffers.length>1)
flushed=_channel.write(buffers,0,buffers.length);
else if (_gather!=null && buffers.length>1)
flushed=_gather.write(buffers,0,buffers.length);
else
{
for (ByteBuffer b : buffers)
@ -218,20 +243,160 @@ public class ChannelEndPoint extends AbstractEndPoint
return _channel;
}
public Socket getSocket()
@Override
protected void needsFillInterest()
{
return _socket;
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
throw new UnsupportedOperationException();
changeInterests(SelectionKey.OP_WRITE);
}
@Override
protected void needsFillInterest() throws IOException
public Runnable onSelected()
{
throw new UnsupportedOperationException();
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (Locker.Lock lock = _locker.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
// Run non-blocking code immediately.
// This producer knows that this non-blocking code is special
// and that it must be run in this thread and not fed to the
// ExecutionStrategy, which could not have any thread to run these
// tasks (or it may starve forever just after having run them).
if (readable && getFillInterest().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct readable run {}",this);
_runFillable.run();
readable = false;
}
if (writable && getWriteFlusher().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct writable run {}",this);
_runCompleteWrite.run();
writable = false;
}
// return task to complete the job
Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
: (writable ? _runCompleteWrite : null);
if (LOG.isDebugEnabled())
LOG.debug("task {}",task);
return task;
}
@Override
public void updateKey()
{
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
try
{
int oldInterestOps;
int newInterestOps;
try (Locker.Lock lock = _locker.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
private void changeInterests(int operation)
{
/**
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
int oldInterestOps;
int newInterestOps;
boolean pending;
try (Locker.Lock lock = _locker.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending && _selector!=null)
_selector.submit(_runUpdateKey);
}
@Override
public String toString()
{
// We do a best effort to print the right toString() and that's it.
try
{
boolean valid = _key != null && _key.isValid();
int keyInterests = valid ? _key.interestOps() : -1;
int keyReadiness = valid ? _key.readyOps() : -1;
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toString(),
_currentInterestOps,
_desiredInterestOps,
keyInterests,
keyReadiness);
}
catch (Throwable x)
{
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps);
}
}
}

View File

@ -94,7 +94,7 @@ import org.eclipse.jetty.util.IteratingCallback;
* </pre></blockquote>
*/
public interface EndPoint extends Closeable
{
{
/* ------------------------------------------------------------ */
/**
* @return The local Inet address to which this <code>EndPoint</code> is bound, or <code>null</code>

View File

@ -23,10 +23,9 @@ import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
@ -77,12 +76,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
protected void doStart() throws Exception
{
super.doStart();
_selector = newSelector();
}
protected Selector newSelector() throws IOException
{
return Selector.open();
_selector = _selectorManager.newSelector();
}
public int size()
@ -137,10 +131,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
/**
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
* A {@link Selectable} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
public interface Selectable
{
/**
* Callback method invoked when a read or write events has been
@ -264,12 +258,14 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
if (key.isValid())
{
Object attachment = key.attachment();
if (LOG.isDebugEnabled())
LOG.debug("selected {} {} ",key,attachment);
try
{
if (attachment instanceof SelectableEndPoint)
if (attachment instanceof Selectable)
{
// Try to produce a task
Runnable task = ((SelectableEndPoint)attachment).onSelected();
Runnable task = ((Selectable)attachment).onSelected();
if (task != null)
return task;
}
@ -323,8 +319,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
if (attachment instanceof Selectable)
((Selectable)attachment).updateKey();
}
}
@ -334,11 +330,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private Runnable processConnect(SelectionKey key, final Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
SelectableChannel channel = (SelectableChannel)key.channel();
try
{
key.attach(connect.attachment);
boolean connected = _selectorManager.finishConnect(channel);
boolean connected = _selectorManager.doFinishConnect(channel);
if (LOG.isDebugEnabled())
LOG.debug("Connected {} {}", connected, channel);
if (connected)
@ -375,14 +371,13 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private void processAccept(SelectionKey key)
{
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = null;
SelectableChannel server = key.channel();
SelectableChannel channel = null;
try
{
while ((channel = server.accept()) != null)
{
channel = _selectorManager.doAccept(server);
if (channel!=null)
_selectorManager.accepted(channel);
}
}
catch (Throwable x)
{
@ -404,7 +399,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
}
}
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
private EndPoint createEndPoint(SelectableChannel channel, SelectionKey selectionKey) throws IOException
{
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
_selectorManager.endPointOpened(endPoint);
@ -417,7 +412,7 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
return endPoint;
}
public void destroyEndPoint(final EndPoint endPoint)
public void onClose(final EndPoint endPoint)
{
final Connection connection = endPoint.getConnection();
submit(new Product()
@ -517,9 +512,9 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
private final SelectableChannel _channel;
public Acceptor(ServerSocketChannel channel)
public Acceptor(SelectableChannel channel)
{
this._channel = channel;
}
@ -543,10 +538,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
class Accept implements Runnable
{
private final SocketChannel channel;
private final SelectableChannel channel;
private final Object attachment;
Accept(SocketChannel channel, Object attachment)
Accept(SelectableChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
@ -570,10 +565,10 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
private class CreateEndPoint implements Product
{
private final SocketChannel channel;
private final SelectableChannel channel;
private final SelectionKey key;
public CreateEndPoint(SocketChannel channel, SelectionKey key)
public CreateEndPoint(SelectableChannel channel, SelectionKey key)
{
this.channel = channel;
this.key = key;
@ -603,11 +598,11 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
class Connect implements Runnable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SocketChannel channel;
private final SelectableChannel channel;
private final Object attachment;
private final Scheduler.Task timeout;
Connect(SocketChannel channel, Object attachment)
Connect(SelectableChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
@ -650,8 +645,8 @@ public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dump
@Override
public void run()
{
SocketChannel channel = connect.channel;
if (channel.isConnectionPending())
SelectableChannel channel = connect.channel;
if (_selectorManager.isConnectionPending(channel))
{
if (LOG.isDebugEnabled())
LOG.debug("Channel {} timed out while connecting, closing it", channel);

View File

@ -18,285 +18,24 @@
package org.eclipse.jetty.io;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* An ChannelEndpoint that can be scheduled by {@link SelectorManager}.
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements ManagedSelector.SelectableEndPoint
@Deprecated
public class SelectChannelEndPoint extends SocketChannelEndPoint implements ManagedSelector.Selectable
{
public static final Logger LOG = Log.getLogger(SelectChannelEndPoint.class);
private final Locker _locker = new Locker();
private boolean _updatePending;
/**
* true if {@link ManagedSelector#destroyEndPoint(EndPoint)} has not been called
*/
private final AtomicBoolean _open = new AtomicBoolean();
private final ManagedSelector _selector;
private final SelectionKey _key;
/**
* The current value for {@link SelectionKey#interestOps()}.
*/
private int _currentInterestOps;
/**
* The desired value for {@link SelectionKey#interestOps()}.
*/
private int _desiredInterestOps;
private final Runnable _runUpdateKey = new Runnable()
{
@Override
public void run()
{
updateKey();
}
@Override
public String toString()
{
return SelectChannelEndPoint.this.toString()+":runUpdateKey";
}
};
private final Runnable _runFillable = new Runnable()
{
@Override
public void run()
{
getFillInterest().fillable();
}
@Override
public String toString()
{
return SelectChannelEndPoint.this.toString()+":runFillable";
}
};
private final Runnable _runCompleteWrite = new Runnable()
{
@Override
public void run()
{
getWriteFlusher().completeWrite();
}
@Override
public String toString()
{
return SelectChannelEndPoint.this.toString()+":runCompleteWrite";
}
};
private final Runnable _runFillableCompleteWrite = new Runnable()
{
@Override
public void run()
{
getFillInterest().fillable();
getWriteFlusher().completeWrite();
}
@Override
public String toString()
{
return SelectChannelEndPoint.this.toString()+":runFillableCompleteWrite";
}
};
public SelectChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
{
super(scheduler, channel);
_selector = selector;
_key = key;
super(channel,selector,key,scheduler);
setIdleTimeout(idleTimeout);
}
@Override
protected void needsFillInterest()
{
changeInterests(SelectionKey.OP_READ);
}
@Override
protected void onIncompleteFlush()
{
changeInterests(SelectionKey.OP_WRITE);
}
@Override
public Runnable onSelected()
{
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
int readyOps = _key.readyOps();
int oldInterestOps;
int newInterestOps;
try (Locker.Lock lock = _locker.lock())
{
_updatePending = true;
// Remove the readyOps, that here can only be OP_READ or OP_WRITE (or both).
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps & ~readyOps;
_desiredInterestOps = newInterestOps;
}
boolean readable = (readyOps & SelectionKey.OP_READ) != 0;
boolean writable = (readyOps & SelectionKey.OP_WRITE) != 0;
if (LOG.isDebugEnabled())
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, readable, writable, this);
// Run non-blocking code immediately.
// This producer knows that this non-blocking code is special
// and that it must be run in this thread and not fed to the
// ExecutionStrategy, which could not have any thread to run these
// tasks (or it may starve forever just after having run them).
if (readable && getFillInterest().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct readable run {}",this);
_runFillable.run();
readable = false;
}
if (writable && getWriteFlusher().isCallbackNonBlocking())
{
if (LOG.isDebugEnabled())
LOG.debug("Direct writable run {}",this);
_runCompleteWrite.run();
writable = false;
}
// return task to complete the job
Runnable task= readable ? (writable ? _runFillableCompleteWrite : _runFillable)
: (writable ? _runCompleteWrite : null);
if (LOG.isDebugEnabled())
LOG.debug("task {}",task);
return task;
}
@Override
public void updateKey()
{
/**
* This method may run concurrently with {@link #changeInterests(int)}.
*/
try
{
int oldInterestOps;
int newInterestOps;
try (Locker.Lock lock = _locker.lock())
{
_updatePending = false;
oldInterestOps = _currentInterestOps;
newInterestOps = _desiredInterestOps;
if (oldInterestOps != newInterestOps)
{
_currentInterestOps = newInterestOps;
_key.interestOps(newInterestOps);
}
}
if (LOG.isDebugEnabled())
LOG.debug("Key interests updated {} -> {} on {}", oldInterestOps, newInterestOps, this);
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring key update for concurrently closed channel {}", this);
close();
}
catch (Throwable x)
{
LOG.warn("Ignoring key update for " + this, x);
close();
}
}
private void changeInterests(int operation)
{
/**
* This method may run concurrently with
* {@link #updateKey()} and {@link #onSelected()}.
*/
int oldInterestOps;
int newInterestOps;
boolean pending;
try (Locker.Lock lock = _locker.lock())
{
pending = _updatePending;
oldInterestOps = _desiredInterestOps;
newInterestOps = oldInterestOps | operation;
if (newInterestOps != oldInterestOps)
_desiredInterestOps = newInterestOps;
}
if (LOG.isDebugEnabled())
LOG.debug("changeInterests p={} {}->{} for {}", pending, oldInterestOps, newInterestOps, this);
if (!pending)
_selector.submit(_runUpdateKey);
}
@Override
public void close()
{
if (_open.compareAndSet(true, false))
{
super.close();
_selector.destroyEndPoint(this);
}
}
@Override
public boolean isOpen()
{
// We cannot rely on super.isOpen(), because there is a race between calls to close() and isOpen():
// a thread may call close(), which flips the boolean but has not yet called super.close(), and
// another thread calls isOpen() which would return true - wrong - if based on super.isOpen().
return _open.get();
}
@Override
public void onOpen()
{
if (_open.compareAndSet(false, true))
super.onOpen();
}
@Override
public String toString()
{
// We do a best effort to print the right toString() and that's it.
try
{
boolean valid = _key != null && _key.isValid();
int keyInterests = valid ? _key.interestOps() : -1;
int keyReadiness = valid ? _key.readyOps() : -1;
return String.format("%s{io=%d/%d,kio=%d,kro=%d}",
super.toString(),
_currentInterestOps,
_desiredInterestOps,
keyInterests,
keyReadiness);
}
catch (Throwable x)
{
return String.format("%s{io=%s,kio=-2,kro=-2}", super.toString(), _desiredInterestOps);
}
}
}

View File

@ -22,7 +22,9 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
@ -133,7 +135,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
return _selectors.length;
}
private ManagedSelector chooseSelector(SocketChannel channel)
private ManagedSelector chooseSelector(SelectableChannel channel)
{
// Ideally we would like to have all connections from the same client end
// up on the same selector (to try to avoid smearing the data from a single
@ -145,14 +147,17 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
{
try
{
SocketAddress remote = channel.getRemoteAddress();
if (remote instanceof InetSocketAddress)
if (channel instanceof SocketChannel)
{
byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
if (addr != null)
SocketAddress remote = ((SocketChannel)channel).getRemoteAddress();
if (remote instanceof InetSocketAddress)
{
int s = addr[addr.length - 1] & 0xFF;
candidate1 = _selectors[s % getSelectorCount()];
byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
if (addr != null)
{
int s = addr[addr.length - 1] & 0xFF;
candidate1 = _selectors[s % getSelectorCount()];
}
}
}
}
@ -184,7 +189,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param attachment the attachment object
* @see #accept(SocketChannel, Object)
*/
public void connect(SocketChannel channel, Object attachment)
public void connect(SelectableChannel channel, Object attachment)
{
ManagedSelector set = chooseSelector(channel);
set.submit(set.new Connect(channel, attachment));
@ -194,7 +199,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param channel the channel to accept
* @see #accept(SocketChannel, Object)
*/
public void accept(SocketChannel channel)
public void accept(SelectableChannel channel)
{
accept(channel, null);
}
@ -209,7 +214,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param channel the channel to register
* @param attachment the attachment object
*/
public void accept(SocketChannel channel, Object attachment)
public void accept(SelectableChannel channel, Object attachment)
{
final ManagedSelector selector = chooseSelector(channel);
selector.submit(selector.new Accept(channel, attachment));
@ -223,7 +228,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
*
* @param server the server channel to register
*/
public void acceptor(ServerSocketChannel server)
public void acceptor(SelectableChannel server)
{
final ManagedSelector selector = chooseSelector(null);
selector.submit(selector.new Acceptor(server));
@ -238,7 +243,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param channel the
* @throws IOException if unable to accept channel
*/
protected void accepted(SocketChannel channel) throws IOException
protected void accepted(SelectableChannel channel) throws IOException
{
throw new UnsupportedOperationException();
}
@ -332,10 +337,21 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
}
}
protected boolean finishConnect(SocketChannel channel) throws IOException
protected boolean doFinishConnect(SelectableChannel channel) throws IOException
{
return channel.finishConnect();
return ((SocketChannel)channel).finishConnect();
}
protected boolean isConnectionPending(SelectableChannel channel)
{
return ((SocketChannel)channel).isConnectionPending();
}
protected SelectableChannel doAccept(SelectableChannel server) throws IOException
{
return ((ServerSocketChannel)server).accept();
}
/**
* <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
@ -345,11 +361,16 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @param ex the exception that caused the connect to fail
* @param attachment the attachment object associated at registration
*/
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment)
{
LOG.warn(String.format("%s - %s", channel, attachment), ex);
}
protected Selector newSelector() throws IOException
{
return Selector.open();
}
/**
* <p>Factory method to create {@link EndPoint}.</p>
* <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
@ -362,7 +383,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @throws IOException if the endPoint cannot be created
* @see #newConnection(SocketChannel, EndPoint, Object)
*/
protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException;
protected abstract EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException;
/**
* <p>Factory method to create {@link Connection}.</p>
@ -374,7 +395,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
* @throws IOException if unable to create new connection
* @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
*/
public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
public abstract Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException;
@Override
public String dump()
@ -388,4 +409,5 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, TypeUtil.asList(_selectors));
}
}

View File

@ -0,0 +1,81 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
public class SocketChannelEndPoint extends ChannelEndPoint
{
private static final Logger LOG = Log.getLogger(SocketChannelEndPoint.class);
private final Socket _socket;
private final InetSocketAddress _local;
private final InetSocketAddress _remote;
public SocketChannelEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
this((SocketChannel)channel,selector,key,scheduler);
}
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler);
_socket=channel.socket();
_local=(InetSocketAddress)_socket.getLocalSocketAddress();
_remote=(InetSocketAddress)_socket.getRemoteSocketAddress();
}
public Socket getSocket()
{
return _socket;
}
public InetSocketAddress getLocalAddress()
{
return _local;
}
public InetSocketAddress getRemoteAddress()
{
return _remote;
}
@Override
protected void doShutdownOutput()
{
try
{
if (!_socket.isOutputShutdown())
_socket.shutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.io.ssl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Executor;
@ -328,10 +329,28 @@ public class SslConnection extends AbstractConnection
public DecryptedEndPoint()
{
super(null,getEndPoint().getLocalAddress(), getEndPoint().getRemoteAddress());
super(((AbstractEndPoint)getEndPoint()).getScheduler());
setIdleTimeout(getEndPoint().getIdleTimeout());
}
@Override
public InetSocketAddress getLocalAddress()
{
return getEndPoint().getLocalAddress();
}
@Override
public InetSocketAddress getRemoteAddress()
{
return getEndPoint().getRemoteAddress();
}
@Override
public void setIdleTimeout(long idleTimeout)
{
@ -868,12 +887,11 @@ public class SslConnection extends AbstractConnection
}
@Override
public void shutdownOutput()
public void doShutdownOutput()
{
boolean ishut = isInputShutdown();
boolean oshut = isOutputShutdown();
if (DEBUG)
LOG.debug("{} shutdownOutput: oshut={}, ishut={}", SslConnection.this, oshut, ishut);
LOG.debug("{} shutdownOutput: ishut={}", SslConnection.this, ishut);
if (ishut)
{
// Aggressively close, since inbound close alert has already been processed
@ -882,7 +900,7 @@ public class SslConnection extends AbstractConnection
// reply. If a TLS close reply is sent, most implementations send a RST.
getEndPoint().close();
}
else if (!oshut)
else
{
try
{
@ -914,12 +932,27 @@ public class SslConnection extends AbstractConnection
}
@Override
public void close()
public void doClose()
{
// First send the TLS Close Alert, then the FIN
shutdownOutput();
if (!_sslEngine.isOutboundDone())
{
try
{
synchronized (this) // TODO review synchronized boundary
{
_sslEngine.closeOutbound();
flush(BufferUtil.EMPTY_BUFFER); // Send close handshake
ensureFillInterested();
}
}
catch (Exception e)
{
LOG.ignore(e);
}
}
getEndPoint().close();
super.close();
super.doClose();
}
@Override

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.io;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
@ -45,11 +50,6 @@ import org.eclipse.jetty.util.IO;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class IOTest
{
@Test

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -62,10 +63,11 @@ public class SelectChannelEndPointInterestsTest
selectorManager = new SelectorManager(threadPool, scheduler)
{
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), 60000)
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler())
{
@Override
protected void onIncompleteFlush()
@ -74,10 +76,13 @@ public class SelectChannelEndPointInterestsTest
interested.onIncompleteFlush();
}
};
endp.setIdleTimeout(60000);
return endp;
}
@Override
public Connection newConnection(SocketChannel channel, final EndPoint endPoint, Object attachment)
public Connection newConnection(SelectableChannel channel, final EndPoint endPoint, Object attachment)
{
return new AbstractConnection(endPoint, getExecutor())
{
@ -136,7 +141,7 @@ public class SelectChannelEndPointInterestsTest
connection.fillInterested();
ByteBuffer output = ByteBuffer.allocate(size.get());
endPoint.write(new Callback.Adapter(), output);
endPoint.write(new Callback(){}, output);
latch1.countDown();
}

View File

@ -26,6 +26,7 @@ import java.io.File;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
@ -71,7 +72,7 @@ public class SelectChannelEndPointSslTest extends SelectChannelEndPointTest
}
@Override
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
protected Connection newConnection(SelectableChannel channel, EndPoint endpoint)
{
SSLEngine engine = __sslCtxFactory.newSSLEngine();
engine.setUseClientMode(false);

View File

@ -32,6 +32,7 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -64,19 +65,21 @@ public class SelectChannelEndPointTest
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
{
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel, endpoint);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(60000);
_lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp;
}
};
// Must be volatile or the test may fail spuriously
@ -110,7 +113,7 @@ public class SelectChannelEndPointTest
return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort());
}
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
protected Connection newConnection(SelectableChannel channel, EndPoint endpoint)
{
return new TestConnection(endpoint);
}
@ -228,11 +231,11 @@ public class SelectChannelEndPointTest
}
catch (InterruptedException | EofException e)
{
SelectChannelEndPoint.LOG.ignore(e);
Log.getRootLogger().ignore(e);
}
catch (Exception e)
{
SelectChannelEndPoint.LOG.warn(e);
Log.getRootLogger().warn(e);
}
finally
{

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -69,20 +70,22 @@ public class SelectorManagerTest
SelectorManager selectorManager = new SelectorManager(executor, scheduler)
{
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), connectTimeout / 2);
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(connectTimeout/2);
return endp;
}
@Override
protected boolean finishConnect(SocketChannel channel) throws IOException
protected boolean doFinishConnect(SelectableChannel channel) throws IOException
{
try
{
long timeout = timeoutConnection.get();
if (timeout > 0)
TimeUnit.MILLISECONDS.sleep(timeout);
return super.finishConnect(channel);
return super.doFinishConnect(channel);
}
catch (InterruptedException e)
{
@ -91,7 +94,7 @@ public class SelectorManagerTest
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
((Callback)attachment).succeeded();
return new AbstractConnection(endpoint, executor)
@ -104,7 +107,7 @@ public class SelectorManagerTest
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment)
{
((Callback)attachment).failed(ex);
}

View File

@ -24,7 +24,7 @@ import java.nio.channels.SocketChannel;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class ChannelEndPointTest extends EndPointTest<ChannelEndPoint>
public class SocketChannelEndPointTest extends EndPointTest<SocketChannelEndPoint>
{
static ServerSocketChannel connector;
@ -43,15 +43,21 @@ public class ChannelEndPointTest extends EndPointTest<ChannelEndPoint>
}
@Override
protected EndPointPair<ChannelEndPoint> newConnection() throws Exception
protected EndPointPair<SocketChannelEndPoint> newConnection() throws Exception
{
EndPointPair<ChannelEndPoint> c = new EndPointPair<>();
EndPointPair<SocketChannelEndPoint> c = new EndPointPair<>();
c.client=new ChannelEndPoint(null,SocketChannel.open(connector.socket().getLocalSocketAddress()));
c.server=new ChannelEndPoint(null,connector.accept());
c.client=new SocketChannelEndPoint(SocketChannel.open(connector.socket().getLocalSocketAddress()),null,null,null);
c.server=new SocketChannelEndPoint(connector.accept(),null,null,null);
return c;
}
@Override
public void testClientClose() throws Exception
{
super.testClientClose();
}
@Override
public void testClientServerExchange() throws Exception
{

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
@ -39,6 +40,7 @@ import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
@ -74,7 +76,7 @@ public class SslConnectionTest
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
{
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment)
{
SSLEngine engine = __sslCtxFactory.newSSLEngine();
engine.setUseClientMode(false);
@ -85,10 +87,12 @@ public class SslConnectionTest
return sslConnection;
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new TestEP(channel,selectSet, selectionKey, getScheduler(), 60000);
SocketChannelEndPoint endp = new TestEP(channel, selector, selectionKey, getScheduler());
endp.setIdleTimeout(60000);
_lastEndp=endp;
return endp;
}
@ -96,12 +100,11 @@ public class SslConnectionTest
static final AtomicInteger __startBlocking = new AtomicInteger();
static final AtomicInteger __blockFor = new AtomicInteger();
private static class TestEP extends SelectChannelEndPoint
private static class TestEP extends SocketChannelEndPoint
{
public TestEP(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
public TestEP(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler,idleTimeout);
super((SocketChannel)channel,selector,key,scheduler);
}
@Override
@ -121,7 +124,6 @@ public class SslConnectionTest
return false;
}
}
String s=BufferUtil.toDetailString(buffers[0]);
boolean flushed=super.flush(buffers);
return flushed;
}
@ -235,11 +237,11 @@ public class SslConnectionTest
}
catch(InterruptedException|EofException e)
{
SelectChannelEndPoint.LOG.ignore(e);
Log.getRootLogger().ignore(e);
}
catch(Exception e)
{
SelectChannelEndPoint.LOG.warn(e);
Log.getRootLogger().warn(e);
}
finally
{

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.HashSet;
@ -45,6 +46,7 @@ import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpTransport;
@ -502,16 +504,18 @@ public class ConnectHandler extends HandlerWrapper
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selector, selectionKey, getScheduler(), getIdleTimeout());
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, key, getScheduler());
endp.setIdleTimeout(getIdleTimeout());
return endp;
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
if (ConnectHandler.LOG.isDebugEnabled())
ConnectHandler.LOG.debug("Connected to {}", channel.getRemoteAddress());
ConnectHandler.LOG.debug("Connected to {}", ((SocketChannel)channel).getRemoteAddress());
ConnectContext connectContext = (ConnectContext)attachment;
UpstreamConnection connection = newUpstreamConnection(endpoint, connectContext);
connection.setInputBufferSize(getBufferSize());
@ -519,7 +523,7 @@ public class ConnectHandler extends HandlerWrapper
}
@Override
protected void connectionFailed(SocketChannel channel, final Throwable ex, final Object attachment)
protected void connectionFailed(SelectableChannel channel, final Throwable ex, final Object attachment)
{
close(channel);
ConnectContext connectContext = (ConnectContext)attachment;

View File

@ -253,9 +253,11 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
@Override
protected void doStart() throws Exception
{
if(_defaultProtocol==null)
throw new IllegalStateException("No default protocol for "+this);
_defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
if(_defaultConnectionFactory==null)
throw new IllegalStateException("No protocol factory for default protocol: "+_defaultProtocol);
throw new IllegalStateException("No protocol factory for default protocol '"+_defaultProtocol+"' in "+this);
super.doStart();
@ -298,7 +300,7 @@ public abstract class AbstractConnector extends ContainerLifeCycle implements Co
// If we have a stop timeout
long stopTimeout = getStopTimeout();
CountDownLatch stopping=_stopping;
if (stopTimeout > 0 && stopping!=null)
if (stopTimeout > 0 && stopping!=null && getAcceptors()>0)
stopping.await(stopTimeout,TimeUnit.MILLISECONDS);
_stopping=null;

View File

@ -33,7 +33,6 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletRequestEvent;
import javax.servlet.ServletRequestListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandler.Context;

View File

@ -215,6 +215,7 @@ public class ForwardedRequestCustomizer implements Customizer
{
request.setAttribute("javax.servlet.request.ssl_session_id", ssl_session_id);
request.setScheme(HttpScheme.HTTPS.asString());
request.setSecure(true);
}
}

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.server;
import static javax.servlet.RequestDispatcher.ERROR_EXCEPTION;
import static javax.servlet.RequestDispatcher.ERROR_MESSAGE;
import static javax.servlet.RequestDispatcher.ERROR_STATUS_CODE;
import java.io.IOException;
@ -32,8 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.DispatcherType;
import javax.servlet.RequestDispatcher;
import javax.servlet.UnavailableException;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpFields;

View File

@ -196,28 +196,17 @@ public class LocalConnector extends AbstractConnector
getExecutor().execute(task);
}
@Override
public void close()
{
boolean wasOpen=isOpen();
super.close();
if (wasOpen)
{
getConnection().onClose();
onClose();
}
}
@Override
public void onClose()
{
getConnection().onClose();
LocalConnector.this.onEndPointClosed(this);
super.onClose();
_closed.countDown();
}
@Override
public void shutdownOutput()
public void doShutdownOutput()
{
super.shutdownOutput();
close();

View File

@ -26,10 +26,10 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.NetworkTrafficListener;
import org.eclipse.jetty.io.NetworkTrafficSelectChannelEndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
@ -84,7 +84,7 @@ public class NetworkTrafficServerConnector extends ServerConnector
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
NetworkTrafficSelectChannelEndPoint endPoint = new NetworkTrafficSelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout(), listeners);
return endPoint;

View File

@ -938,22 +938,25 @@ public class Request implements HttpServletRequest
@Override
public String getLocalName()
{
if (_channel==null)
if (_channel!=null)
{
try
{
String name =InetAddress.getLocalHost().getHostName();
if (StringUtil.ALL_INTERFACES.equals(name))
return null;
return name;
}
catch (java.net.UnknownHostException e)
{
LOG.ignore(e);
}
InetSocketAddress local=_channel.getLocalAddress();
if (local!=null)
return local.getHostString();
}
InetSocketAddress local=_channel.getLocalAddress();
return local.getHostString();
try
{
String name =InetAddress.getLocalHost().getHostName();
if (StringUtil.ALL_INTERFACES.equals(name))
return null;
return name;
}
catch (java.net.UnknownHostException e)
{
LOG.ignore(e);
}
return null;
}
/* ------------------------------------------------------------ */
@ -966,7 +969,7 @@ public class Request implements HttpServletRequest
if (_channel==null)
return 0;
InetSocketAddress local=_channel.getLocalAddress();
return local.getPort();
return local==null?0:local.getPort();
}
/* ------------------------------------------------------------ */

View File

@ -18,10 +18,10 @@
package org.eclipse.jetty.server;
import java.util.ArrayList;
import static java.util.Arrays.asList;
import java.util.ArrayList;
class RequestLogCollection
implements RequestLog
{

View File

@ -27,6 +27,7 @@ import javax.servlet.ServletRequest;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.util.TypeUtil;
@ -66,14 +67,10 @@ public class SecureRequestCustomizer implements HttpConfiguration.Customizer
@Override
public void customize(Connector connector, HttpConfiguration channelConfig, Request request)
{
if (request.getHttpChannel().getEndPoint() instanceof DecryptedEndPoint)
EndPoint endp = request.getHttpChannel().getEndPoint();
if (endp instanceof DecryptedEndPoint)
{
request.setSecure(true);
if (request.getHttpURI().getScheme()==null)
request.getHttpURI().setScheme(HttpScheme.HTTPS.asString());
SslConnection.DecryptedEndPoint ssl_endp = (DecryptedEndPoint)request.getHttpChannel().getEndPoint();
SslConnection.DecryptedEndPoint ssl_endp = (DecryptedEndPoint)endp;
SslConnection sslConnection = ssl_endp.getSslConnection();
SSLEngine sslEngine=sslConnection.getSSLEngine();
customize(sslEngine,request);
@ -81,6 +78,12 @@ public class SecureRequestCustomizer implements HttpConfiguration.Customizer
if (request.getHttpURI().getScheme()==null)
request.setScheme(HttpScheme.HTTPS.asString());
}
else if (endp instanceof ProxyConnectionFactory.ProxyEndPoint)
{
ProxyConnectionFactory.ProxyEndPoint proxy = (ProxyConnectionFactory.ProxyEndPoint)endp;
if (request.getHttpURI().getScheme()==null && proxy.getAttribute(ProxyConnectionFactory.TLS_VERSION)!=null)
request.setScheme(HttpScheme.HTTPS.asString());
}
if (HttpScheme.HTTPS.is(request.getScheme()))
request.setSecure(true);

View File

@ -24,6 +24,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.Channel;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@ -32,6 +33,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
@ -229,7 +231,6 @@ public class ServerConnector extends AbstractNetworkConnector
_manager = newSelectorManager(getExecutor(), getScheduler(),
selectors>0?selectors:Math.max(1,Math.min(4,Runtime.getRuntime().availableProcessors()/2)));
addBean(_manager, true);
setSelectorPriorityDelta(-1);
setAcceptorPriorityDelta(-2);
}
@ -426,7 +427,7 @@ public class ServerConnector extends AbstractNetworkConnector
return _localPort;
}
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout());
}
@ -493,19 +494,19 @@ public class ServerConnector extends AbstractNetworkConnector
}
@Override
protected void accepted(SocketChannel channel) throws IOException
protected void accepted(SelectableChannel channel) throws IOException
{
ServerConnector.this.accepted(channel);
ServerConnector.this.accepted((SocketChannel)channel);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
protected ChannelEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
return ServerConnector.this.newEndPoint(channel, selectSet, selectionKey);
return ServerConnector.this.newEndPoint((SocketChannel)channel, selectSet, selectionKey);
}
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return getDefaultConnectionFactory().newConnection(ServerConnector.this, endpoint);
}

View File

@ -20,12 +20,12 @@ package org.eclipse.jetty.server;
import java.net.Socket;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.Connection.Listener;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslConnection.DecryptedEndPoint;
import org.eclipse.jetty.io.EndPoint;
/* ------------------------------------------------------------ */
@ -70,9 +70,9 @@ public class SocketCustomizationListener implements Listener
ssl=true;
}
if (endp instanceof ChannelEndPoint)
if (endp instanceof SocketChannelEndPoint)
{
Socket socket = ((ChannelEndPoint)endp).getSocket();
Socket socket = ((SocketChannelEndPoint)endp).getSocket();
customize(socket,connection.getClass(),ssl);
}
}

View File

@ -27,7 +27,6 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.Connector;

View File

@ -28,7 +28,6 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.ArrayUtil;
import org.eclipse.jetty.util.MultiException;
import org.eclipse.jetty.util.annotation.ManagedAttribute;

View File

@ -25,10 +25,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.LifeCycle;

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
@ -26,6 +29,7 @@ import java.io.PrintWriter;
import java.net.Socket;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -41,9 +45,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
public abstract class AbstractHttpTest
{
@Rule

View File

@ -18,6 +18,11 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -42,11 +47,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class AsyncRequestReadTest
{
private static Server server;

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.server;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;

View File

@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
@ -60,7 +61,7 @@ public class ExtendedServerTest extends HttpServerTestBase
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new ExtendedEndPoint(channel,selectSet,key, getScheduler(), getIdleTimeout());
}

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
@ -33,8 +42,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerList;
@ -42,15 +51,6 @@ import org.eclipse.jetty.toolchain.test.OS;
import org.eclipse.jetty.util.IO;
import org.junit.Test;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
public class ServerConnectorTest
{
public static class ReuseInfoHandler extends AbstractHandler
@ -61,8 +61,8 @@ public class ServerConnectorTest
response.setContentType("text/plain");
EndPoint endPoint = baseRequest.getHttpChannel().getEndPoint();
assertThat("Endpoint",endPoint,instanceOf(ChannelEndPoint.class));
ChannelEndPoint channelEndPoint = (ChannelEndPoint)endPoint;
assertThat("Endpoint",endPoint,instanceOf(SocketChannelEndPoint.class));
SocketChannelEndPoint channelEndPoint = (SocketChannelEndPoint)endPoint;
Socket socket = channelEndPoint.getSocket();
ServerConnector connector = (ServerConnector)baseRequest.getHttpChannel().getConnector();

View File

@ -26,26 +26,16 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;

View File

@ -18,8 +18,9 @@
package org.eclipse.jetty.server.handler;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.io.ByteArrayOutputStream;
import java.io.File;

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.server.ssl;
import static org.junit.Assert.assertEquals;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

View File

@ -49,6 +49,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.server.HttpChannel;
@ -110,7 +111,7 @@ public class ThreadStarvationTest
ServerConnector connector = new ServerConnector(_server, 0, 1)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{
@ -264,7 +265,7 @@ public class ThreadStarvationTest
ServerConnector connector = new ServerConnector(_server, acceptors, selectors)
{
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
protected ChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey key) throws IOException
{
return new SelectChannelEndPoint(channel, selectSet, key, getScheduler(), getIdleTimeout())
{

1
jetty-unixsocket/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target/

43
jetty-unixsocket/pom.xml Normal file
View File

@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-project</artifactId>
<version>9.4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-unixsocket</artifactId>
<name>Jetty :: UnixSocket</name>
<description>Jetty UnixSocket</description>
<url>http://www.eclipse.org/jetty</url>
<properties>
<bundle-symbolic-name>${project.groupId}.unixsocket</bundle-symbolic-name>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId>
<configuration>
<onlyAnalyze>org.eclipse.jetty.unixsocket.*</onlyAnalyze>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,17 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration">
<Call name="addCustomizer">
<Arg>
<New class="org.eclipse.jetty.server.ForwardedRequestCustomizer">
<Set name="forwardedHostHeader"><Property name="jetty.unixSocketHttpConfig.forwardedHostHeader" default="X-Forwarded-Host"/></Set>
<Set name="forwardedServerHeader"><Property name="jetty.unixSocketHttpConfig.forwardedServerHeader" default="X-Forwarded-Server"/></Set>
<Set name="forwardedProtoHeader"><Property name="jetty.unixSocketHttpConfig.forwardedProtoHeader" default="X-Forwarded-Proto"/></Set>
<Set name="forwardedForHeader"><Property name="jetty.unixSocketHttpConfig.forwardedForHeader" default="X-Forwarded-For"/></Set>
<Set name="forwardedSslSessionIdHeader"><Property name="jetty.unixSocketHttpConfig.forwardedSslSessionIdHeader" /></Set>
<Set name="forwardedCipherSuiteHeader"><Property name="jetty.unixSocketHttpConfig.forwardedCipherSuiteHeader" /></Set>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,13 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Call name="addConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.HttpConnectionFactory">
<Arg name="config"><Ref refid="unixSocketHttpConfig" /></Arg>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<!-- ============================================================= -->
<!-- Configure a HTTP2 on the ssl connector. -->
<!-- ============================================================= -->
<Configure id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Call name="addConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory">
<Arg name="config"><Ref refid="unixSocketHttpConfig"/></Arg>
<Set name="maxConcurrentStreams"><Property name="jetty.http2c.maxConcurrentStreams" default="1024"/></Set>
<Set name="initialStreamSendWindow"><Property name="jetty.http2c.initialStreamSendWindow" default="65535"/></Set>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,10 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketConnector" class="org.eclipse.jetty.server.ServerConnector">
<Call name="addFirstConnectionFactory">
<Arg>
<New class="org.eclipse.jetty.server.ProxyConnectionFactory"/>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,11 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration">
<Call name="addCustomizer">
<Arg>
<New class="org.eclipse.jetty.server.SecureRequestCustomizer">
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,25 @@
<?xml version="1.0"?>
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<New id="unixSocketHttpConfig" class="org.eclipse.jetty.server.HttpConfiguration">
<Arg><Ref refid="httpConfig"/></Arg>
</New>
<Call name="addConnector">
<Arg>
<New id="unixSocketConnector" class="org.eclipse.jetty.unixsocket.UnixSocketConnector">
<Arg name="server"><Ref refid="Server" /></Arg>
<Arg name="selectors" type="int"><Property name="jetty.unixsocket.selectors" default="-1"/></Arg>
<Arg name="factories">
<Array type="org.eclipse.jetty.server.ConnectionFactory">
</Array>
</Arg>
<Set name="unixSocket"><Property name="jetty.unixsocket" default="/tmp/jetty.sock" /></Set>
<Set name="idleTimeout"><Property name="jetty.unixsocket.idleTimeout" default="30000"/></Set>
<Set name="acceptQueueSize"><Property name="jetty.unixsocket.acceptQueueSize" default="0"/></Set>
</New>
</Arg>
</Call>
</Configure>

View File

@ -0,0 +1,18 @@
[depend]
unixsocket-http
[xml]
etc/jetty-unixsocket-forwarded.xml
[ini-template]
### ForwardedRequestCustomizer Configuration
# jetty.unixSocketHttpConfig.forwardedHostHeader=X-Forwarded-Host
# jetty.unixSocketHttpConfig.forwardedServerHeader=X-Forwarded-Server
# jetty.unixSocketHttpConfig.forwardedProtoHeader=X-Forwarded-Proto
# jetty.unixSocketHttpConfig.forwardedForHeader=X-Forwarded-For
# jetty.unixSocketHttpConfig.forwardedSslSessionIdHeader=
# jetty.unixSocketHttpConfig.forwardedCipherSuiteHeader=

View File

@ -0,0 +1,8 @@
[depend]
unixsocket
[xml]
etc/jetty-unixsocket-http.xml

View File

@ -0,0 +1,16 @@
[depend]
unixsocket-http
[lib]
lib/http2/*.jar
[xml]
etc/jetty-unixsocket-http2c.xml
[ini-template]
## Max number of concurrent streams per connection
# jetty.http2.maxConcurrentStreams=1024
## Initial stream send (server to client) window
# jetty.http2.initialStreamSendWindow=65535

View File

@ -0,0 +1,9 @@
#
# PROXY Protocol Module - UnixSocket
#
[depend]
unixsocket
[xml]
etc/jetty-unixsocket-proxy-protocol.xml

View File

@ -0,0 +1,10 @@
[depend]
unixsocket-http
[xml]
etc/jetty-unixsocket-secure.xml
[ini-template]
### SecureRequestCustomizer Configuration

View File

@ -0,0 +1,49 @@
#
# Jetty UnixSocket Connector
#
[depend]
server
[xml]
etc/jetty-unixsocket.xml
[files]
maven://com.github.jnr/jnr-unixsocket/0.8|lib/jnr/jnr-unixsocket-0.8.jar
maven://com.github.jnr/jnr-ffi/2.0.3|lib/jnr/jnr-ffi-2.0.3.jar
maven://com.github.jnr/jffi/1.2.9|lib/jnr/jffi-1.2.9.jar
maven://com.github.jnr/jffi/1.2.9/jar/native|lib/jnr/jffi-1.2.9-native.jar
maven://org.ow2.asm/asm/5.0.1|lib/jnr/asm-5.0.1.jar
maven://org.ow2.asm/asm-commons/5.0.1|lib/jnr/asm-commons-5.0.1.jar
maven://org.ow2.asm/asm-analysis/5.0.3|lib/jnr/asm-analysis-5.0.3.jar
maven://org.ow2.asm/asm-tree/5.0.3|lib/jnr/asm-tree-5.0.3.jar
maven://org.ow2.asm/asm-util/5.0.3|lib/jnr/asm-util-5.0.3.jar
maven://com.github.jnr/jnr-x86asm/1.0.2|lib/jnr/jnr-x86asm-1.0.2.jar
maven://com.github.jnr/jnr-constants/0.8.7|lib/jnr/jnr-constants-0.8.7.jar
maven://com.github.jnr/jnr-enxio/0.9|lib/jnr/jnr-enxio-0.9.jar
maven://com.github.jnr/jnr-posix/3.0.12|lib/jnr/jnr-posix-3.0.12.jar
[lib]
lib/jetty-unixsocket-${jetty.version}.jar
lib/jnr/*.jar
[license]
Jetty UnixSockets is implmented using the Java Native Runtime, which is an
open source project hosted on Github and released under the Apache 2.0 license.
https://github.com/jnr/jnr-unixsocket
http://www.apache.org/licenses/LICENSE-2.0.html
[ini-template]
### HTTP Connector Configuration
## Connector host/address to bind to
# jetty.unixsocket=/tmp/jetty.sock
## Connector idle timeout in milliseconds
# jetty.unixsocket.idleTimeout=30000
## Number of selectors (-1 picks default 1)
# jetty.unixsocket.selectors=-1
## ServerSocketChannel backlog (0 picks platform default)
# jetty.unixsocket.acceptorQueueSize=0

View File

@ -0,0 +1,436 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.unixsocket;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.Name;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixServerSocketChannel;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
/**
*
*/
@ManagedObject("HTTP connector using NIO ByteChannels and Selectors")
public class UnixSocketConnector extends AbstractConnector
{
private static final Logger LOG = Log.getLogger(UnixSocketConnector.class);
private final SelectorManager _manager;
private String _unixSocket = "/tmp/jetty.sock";
private volatile UnixServerSocketChannel _acceptChannel;
private volatile int _acceptQueueSize = 0;
private volatile boolean _reuseAddress = true;
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
*/
public UnixSocketConnector( @Name("server") Server server)
{
this(server,null,null,null,-1,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors)
{
this(server,null,null,null,selectors,new HttpConnectionFactory());
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the only factory.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors,
@Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,selectors,factories);
}
/* ------------------------------------------------------------ */
/** Generic Server Connection with default configuration.
* <p>Construct a Server Connector with the passed Connection factories.</p>
* @param server The {@link Server} this connector will accept connection for.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("factories") ConnectionFactory... factories)
{
this(server,null,null,null,-1,factories);
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of HTTP Connection Factory.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("sslContextFactory") SslContextFactory sslContextFactory)
{
this(server,null,null,null,-1,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
}
/* ------------------------------------------------------------ */
/** HTTP Server Connection.
* <p>Construct a ServerConnector with a private instance of {@link HttpConnectionFactory} as the primary protocol</p>.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of HTTP Connection Factory.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value. Selectors notice and schedule established connection that can make IO progress.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("selectors") int selectors,
@Name("sslContextFactory") SslContextFactory sslContextFactory)
{
this(server,null,null,null,selectors,AbstractConnectionFactory.getFactories(sslContextFactory,new HttpConnectionFactory()));
}
/* ------------------------------------------------------------ */
/** Generic SSL Server Connection.
* @param server The {@link Server} this connector will accept connection for.
* @param sslContextFactory If non null, then a {@link SslConnectionFactory} is instantiated and prepended to the
* list of ConnectionFactories, with the first factory being the default protocol for the SslConnectionFactory.
* @param factories Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("sslContextFactory") SslContextFactory sslContextFactory,
@Name("factories") ConnectionFactory... factories)
{
this(server, null, null, null, -1, AbstractConnectionFactory.getFactories(sslContextFactory, factories));
}
/** Generic Server Connection.
* @param server
* The server this connector will be accept connection for.
* @param executor
* An executor used to run tasks for handling requests, acceptors and selectors.
* If null then use the servers executor
* @param scheduler
* A scheduler used to schedule timeouts. If null then use the servers scheduler
* @param bufferPool
* A ByteBuffer pool used to allocate buffers. If null then create a private pool with default configuration.
* @param selectors
* the number of selector threads, or &lt;=0 for a default value(1). Selectors notice and schedule established connection that can make IO progress.
* @param factories
* Zero or more {@link ConnectionFactory} instances used to create and configure connections.
*/
public UnixSocketConnector(
@Name("server") Server server,
@Name("executor") Executor executor,
@Name("scheduler") Scheduler scheduler,
@Name("bufferPool") ByteBufferPool bufferPool,
@Name("selectors") int selectors,
@Name("factories") ConnectionFactory... factories)
{
super(server,executor,scheduler,bufferPool,0,factories);
_manager = newSelectorManager(getExecutor(), getScheduler(),
selectors>0?selectors:1);
addBean(_manager, true);
setAcceptorPriorityDelta(-2);
}
@ManagedAttribute
public String getUnixSocket()
{
return _unixSocket;
}
public void setUnixSocket(String filename)
{
_unixSocket=filename;
}
protected SelectorManager newSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
return new UnixSocketConnectorManager(executor, scheduler, selectors);
}
@Override
protected void doStart() throws Exception
{
open();
super.doStart();
if (getAcceptors()==0)
_manager.acceptor(_acceptChannel);
}
@Override
protected void doStop() throws Exception
{
super.doStop();
close();
}
public boolean isOpen()
{
UnixServerSocketChannel channel = _acceptChannel;
return channel!=null && channel.isOpen();
}
public void open() throws IOException
{
if (_acceptChannel == null)
{
UnixServerSocketChannel serverChannel = UnixServerSocketChannel.open();
SocketAddress bindAddress = new UnixSocketAddress(new File(_unixSocket));
serverChannel.socket().bind(bindAddress, getAcceptQueueSize());
serverChannel.configureBlocking(getAcceptors()>0);
addBean(serverChannel);
LOG.debug("opened {}",serverChannel);
_acceptChannel = serverChannel;
}
}
@Override
public Future<Void> shutdown()
{
// shutdown all the connections
return super.shutdown();
}
public void close()
{
UnixServerSocketChannel serverChannel = _acceptChannel;
_acceptChannel = null;
if (serverChannel != null)
{
removeBean(serverChannel);
// If the interrupt did not close it, we should close it
if (serverChannel.isOpen())
{
try
{
serverChannel.close();
}
catch (IOException e)
{
LOG.warn(e);
}
}
new File(_unixSocket).delete();
}
}
@Override
public void accept(int acceptorID) throws IOException
{
LOG.warn("Blocking UnixSocket accept used. Cannot be interrupted!");
UnixServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
LOG.debug("accept {}",serverChannel);
UnixSocketChannel channel = serverChannel.accept();
LOG.debug("accepted {}",channel);
accepted(channel);
}
}
protected void accepted(UnixSocketChannel channel) throws IOException
{
channel.configureBlocking(false);
_manager.accept(channel);
}
public SelectorManager getSelectorManager()
{
return _manager;
}
@Override
public Object getTransport()
{
return _acceptChannel;
}
protected UnixSocketEndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) throws IOException
{
return new UnixSocketEndPoint((UnixSocketChannel)channel,selector,key,getScheduler());
}
/**
* @return the accept queue size
*/
@ManagedAttribute("Accept Queue size")
public int getAcceptQueueSize()
{
return _acceptQueueSize;
}
/**
* @param acceptQueueSize the accept queue size (also known as accept backlog)
*/
public void setAcceptQueueSize(int acceptQueueSize)
{
_acceptQueueSize = acceptQueueSize;
}
/**
* @return whether the server socket reuses addresses
* @see ServerSocket#getReuseAddress()
*/
public boolean getReuseAddress()
{
return _reuseAddress;
}
/**
* @param reuseAddress whether the server socket reuses addresses
* @see ServerSocket#setReuseAddress(boolean)
*/
public void setReuseAddress(boolean reuseAddress)
{
_reuseAddress = reuseAddress;
}
@Override
public String toString()
{
return String.format("%s{%s}",
super.toString(),
_unixSocket);
}
protected class UnixSocketConnectorManager extends SelectorManager
{
public UnixSocketConnectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
protected void accepted(SelectableChannel channel) throws IOException
{
UnixSocketConnector.this.accepted((UnixSocketChannel)channel);
}
@Override
protected Selector newSelector() throws IOException
{
return NativeSelectorProvider.getInstance().openSelector();
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
UnixSocketEndPoint endp = UnixSocketConnector.this.newEndPoint(channel, selector, selectionKey);
endp.setIdleTimeout(getIdleTimeout());
return endp;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endpoint, Object attachment) throws IOException
{
return getDefaultConnectionFactory().newConnection(UnixSocketConnector.this, endpoint);
}
@Override
protected void endPointOpened(EndPoint endpoint)
{
super.endPointOpened(endpoint);
onEndPointOpened(endpoint);
}
@Override
protected void endPointClosed(EndPoint endpoint)
{
onEndPointClosed(endpoint);
super.endPointClosed(endpoint);
}
@Override
protected boolean doFinishConnect(SelectableChannel channel) throws IOException
{
return ((UnixSocketChannel)channel).finishConnect();
}
@Override
protected boolean isConnectionPending(SelectableChannel channel)
{
return ((UnixSocketChannel)channel).isConnectionPending();
}
@Override
protected SelectableChannel doAccept(SelectableChannel server) throws IOException
{
LOG.debug("doAccept async {}",server);
UnixSocketChannel channel = ((UnixServerSocketChannel)server).accept();
LOG.debug("accepted async {}",channel);
return channel;
}
}
}

View File

@ -0,0 +1,74 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.unixsocket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import jnr.unixsocket.UnixSocketChannel;
public class UnixSocketEndPoint extends ChannelEndPoint
{
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private final UnixSocketChannel _channel;
public UnixSocketEndPoint(UnixSocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler);
_channel=channel;
}
@Override
public InetSocketAddress getLocalAddress()
{
return null;
}
@Override
public InetSocketAddress getRemoteAddress()
{
return null;
}
@Override
protected void doShutdownOutput()
{
if (LOG.isDebugEnabled())
LOG.debug("oshut {}", this);
try
{
_channel.shutdownOutput();
super.doShutdownOutput();
}
catch (IOException e)
{
LOG.debug(e);
}
}
}

View File

@ -0,0 +1,57 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.unixsocket;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.util.Date;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;
public class UnixSocketClient
{
public static void main(String[] args) throws Exception
{
java.io.File path = new java.io.File("/tmp/jetty.sock");
String data = "GET / HTTP/1.1\r\nHost: unixsock\r\n\r\n";
UnixSocketAddress address = new UnixSocketAddress(path);
UnixSocketChannel channel = UnixSocketChannel.open(address);
System.out.println("connected to " + channel.getRemoteSocketAddress());
PrintWriter w = new PrintWriter(Channels.newOutputStream(channel));
InputStreamReader r = new InputStreamReader(Channels.newInputStream(channel));
while (true)
{
w.print(data);
w.flush();
CharBuffer result = CharBuffer.allocate(4096);
r.read(result);
result.flip();
System.out.println("read from server: " + result.toString());
Thread.sleep(1000);
}
}
}

View File

@ -0,0 +1,63 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.unixsocket;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.ProxyConnectionFactory;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
public class UnixSocketServer
{
public static void main (String... args) throws Exception
{
Server server = new Server();
HttpConnectionFactory http = new HttpConnectionFactory();
ProxyConnectionFactory proxy = new ProxyConnectionFactory(http.getProtocol());
UnixSocketConnector connector = new UnixSocketConnector(server,proxy,http);
server.addConnector(connector);
server.setHandler(new AbstractHandler()
{
@Override
protected void doHandle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
response.getWriter().write("Hello World\r\n");
response.getWriter().write("remote="+request.getRemoteAddr()+":"+request.getRemotePort()+"\r\n");
response.getWriter().write("local ="+request.getLocalAddr()+":"+request.getLocalPort()+"\r\n");
}
});
server.start();
server.join();
}
}

Binary file not shown.

View File

@ -0,0 +1,7 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
#org.eclipse.jetty.LEVEL=DEBUG
#org.eclipse.jetty.client.LEVEL=DEBUG
#org.eclipse.jetty.proxy.LEVEL=DEBUG
org.eclipse.jetty.unixsocket.LEVEL=DEBUG
org.eclipse.jetty.io.LEVEL=DEBUG
org.eclipse.jetty.server.ProxyConnectionFactory.LEVEL=DEBUG

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.websocket.client.io;
import java.io.IOException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
@ -31,6 +32,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -53,7 +55,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection Failed",ex);
@ -67,7 +69,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
}
@Override
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException
public Connection newConnection(final SelectableChannel channel, EndPoint endPoint, final Object attachment) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("newConnection({},{},{})",channel,endPoint,attachment);
@ -114,24 +116,33 @@ public class WebSocketClientSelectorManager extends SelectorManager
}
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("newEndPoint({}, {}, {})",channel,selectSet,selectionKey);
return new SelectChannelEndPoint(channel,selectSet,selectionKey,getScheduler(),policy.getIdleTimeout());
LOG.debug("newEndPoint({}, {}, {})",channel,selector,selectionKey);
SocketChannelEndPoint endp = new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
endp.setIdleTimeout(policy.getIdleTimeout());
return endp;
}
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SocketChannel channel)
public SSLEngine newSSLEngine(SslContextFactory sslContextFactory, SelectableChannel channel)
{
String peerHost = channel.socket().getInetAddress().getHostName();
int peerPort = channel.socket().getPort();
String peerHost = null;
int peerPort = 0;
if (channel instanceof SocketChannel)
{
SocketChannel sc = (SocketChannel)channel;
peerHost = sc.socket().getInetAddress().getHostName();
peerPort = sc.socket().getPort();
}
SSLEngine engine = sslContextFactory.newSSLEngine(peerHost,peerPort);
engine.setUseClientMode(true);
return engine;
}
public UpgradeConnection newUpgradeConnection(SocketChannel channel, EndPoint endPoint, ConnectPromise connectPromise)
public UpgradeConnection newUpgradeConnection(SelectableChannel channel, EndPoint endPoint, ConnectPromise connectPromise)
{
WebSocketClient client = connectPromise.getClient();
Executor executor = client.getExecutor();

View File

@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
@ -37,6 +38,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.BufferUtil;
@ -283,19 +285,21 @@ public class ClientCloseTest
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
return new TestEndPoint(channel,selectSet,selectionKey,getScheduler(),getPolicy().getIdleTimeout());
TestEndPoint endp = new TestEndPoint(channel,selectSet,selectionKey,getScheduler());
endp.setIdleTimeout(getPolicy().getIdleTimeout());
return endp;
}
}
public static class TestEndPoint extends SelectChannelEndPoint
public static class TestEndPoint extends SocketChannelEndPoint
{
public AtomicBoolean congestedFlush = new AtomicBoolean(false);
public TestEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler, long idleTimeout)
public TestEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
{
super(channel,selector,key,scheduler,idleTimeout);
super((SocketChannel)channel,selector,key,scheduler);
}
@Override

View File

@ -531,6 +531,7 @@
<module>jetty-nosql</module>
<module>jetty-infinispan</module>
<module>jetty-gcloud</module>
<module>jetty-unixsocket</module>
<module>tests</module>
<module>examples</module>
<module>jetty-quickstart</module>