Merge branch 'master' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project

This commit is contained in:
Joakim Erdfelt 2011-08-11 13:21:35 -07:00
commit f39ffad9a9
34 changed files with 1341 additions and 1391 deletions

View File

@ -16,6 +16,8 @@ jetty-7.5.0-SNAPSHOT
+ 353563 HttpDestinationQueueTest too slow
+ 353862 Improve performance of QuotedStringTokenizer.quote()
+ 354014 Content-Length is passed to wrapped response in GZipFilter
+ 354204 Charset encodings property file not used
+ 354466 Typo in example config of jetty-plus.xml
jetty-7.4.4.v20110707 July 7th 2011
+ 308851 Converted all jetty-client module tests to JUnit 4

View File

@ -103,14 +103,6 @@ public class WebSocketUpgradeTest
_results.add("clientWS.onMessage");
_results.add(data);
}
/* ------------------------------------------------------------ */
public void onError(String message, Throwable ex)
{
_results.add("clientWS.onError");
_results.add(message);
_results.add(ex);
}
};
@ -253,18 +245,10 @@ public class WebSocketUpgradeTest
_results.add(data);
}
/* ------------------------------------------------------------ */
public void onError(String message, Throwable ex)
{
_results.add("serverWS.onError");
_results.add(message);
_results.add(ex);
}
/* ------------------------------------------------------------ */
public void onClose(int code, String message)
{
_results.add("onDisconnect");
_results.add("onClose");
_webSockets.remove(this);
}

View File

@ -293,7 +293,6 @@ public class MimeTypes
case TEXT_XML_8859_1_ORDINAL:
return StringUtil.__ISO_8859_1;
case TEXT_JSON_ORDINAL:
case TEXT_HTML_UTF_8_ORDINAL:
case TEXT_PLAIN_UTF_8_ORDINAL:
case TEXT_XML_UTF_8_ORDINAL:
@ -363,6 +362,7 @@ public class MimeTypes
if (state==10)
return CACHE.lookup(value.peek(start,i-start)).toString();
return null;
return (String)__encodings.get(value);
}
}

View File

@ -1,3 +1,4 @@
text/html = ISO-8859-1
text/plain = US-ASCII
text/plain = ISO-8859-1
text/xml = UTF-8
text/json = UTF-8

View File

@ -77,6 +77,19 @@ public class ByteArrayBuffer extends AbstractBuffer
_string = value;
}
public ByteArrayBuffer(String value,boolean immutable)
{
super(READWRITE,NON_VOLATILE);
_bytes = StringUtil.getBytes(value);
setGetIndex(0);
setPutIndex(_bytes.length);
if (immutable)
{
_access=IMMUTABLE;
_string = value;
}
}
public ByteArrayBuffer(String value,String encoding) throws UnsupportedEncodingException
{
super(READWRITE,NON_VOLATILE);

View File

@ -15,9 +15,7 @@
package org.eclipse.jetty.monitor;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@ -34,19 +32,18 @@ import org.eclipse.jetty.util.log.Logger;
public class ThreadMonitor extends AbstractLifeCycle implements Runnable
{
private int _scanInterval;
private int _dumpInterval;
private int _logInterval;
private int _busyThreshold;
private int _dumpThreshold;
private int _logThreshold;
private int _stackDepth;
private ThreadMXBean _threadBean;
private Method findDeadlockedThreadsMethod;
private Thread _runner;
private Logger _logger;
private volatile boolean _done = true;
private Map<Long,ExtThreadInfo> _extInfo;
private Map<Long,ThreadMonitorInfo> _monitorInfo;
/* ------------------------------------------------------------ */
/**
@ -75,7 +72,7 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
_stackDepth = depth;
_logger = Log.getLogger(ThreadMonitor.class.getName());
_extInfo = new HashMap<Long, ExtThreadInfo>();
_monitorInfo = new HashMap<Long, ThreadMonitorInfo>();
init();
}
@ -93,15 +90,15 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
/* ------------------------------------------------------------ */
public int getDumpInterval()
public int getLogInterval()
{
return _dumpInterval;
return _logInterval;
}
/* ------------------------------------------------------------ */
public void setDumpInterval(int ms)
public void setLogInterval(int ms)
{
_dumpInterval = ms;
_logInterval = ms;
}
/* ------------------------------------------------------------ */
@ -117,15 +114,15 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
/* ------------------------------------------------------------ */
public int getDumpThreshold()
public int getLogThreshold()
{
return _dumpThreshold;
return _logThreshold;
}
/* ------------------------------------------------------------ */
public void setDumpThreshold(int percent)
public void setLogThreshold(int percent)
{
_dumpThreshold = percent;
_logThreshold = percent;
}
/* ------------------------------------------------------------ */
@ -141,10 +138,10 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
/* ------------------------------------------------------------ */
public void enableDumpAll(int ms, int percent)
public void logCpuUsage(int frequencyMs, int thresholdPercent)
{
setDumpInterval(ms);
setDumpThreshold(percent);
setLogInterval(frequencyMs);
setLogThreshold(thresholdPercent);
}
/* ------------------------------------------------------------ */
@ -156,6 +153,7 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
_done = false;
_runner = new Thread(this);
_runner.setDaemon(true);
_runner.start();
Log.info("Thread Monitor started successfully");
@ -178,49 +176,6 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
}
/* ------------------------------------------------------------ */
/**
* Initialize JMX objects.
*/
protected void init()
{
_threadBean = ManagementFactory.getThreadMXBean();
if (_threadBean.isThreadCpuTimeSupported())
{
_threadBean.setThreadCpuTimeEnabled(true);
}
String versionStr = System.getProperty("java.version");
float version = Float.valueOf(versionStr.substring(0,versionStr.lastIndexOf('.')));
try
{
if (version < 1.6)
{
findDeadlockedThreadsMethod = ThreadMXBean.class.getMethod("findMonitorDeadlockedThreads");
}
else
{
findDeadlockedThreadsMethod = ThreadMXBean.class.getMethod("findDeadlockedThreads");
}
}
catch (Exception ex)
{
Log.debug(ex);
}
}
/* ------------------------------------------------------------ */
/**
* Find deadlocked threads.
*
* @return array of the deadlocked thread ids
* @throws Exception the exception
*/
protected long[] findDeadlockedThreads() throws Exception
{
return (long[]) findDeadlockedThreadsMethod.invoke(_threadBean,(Object[])null);
}
/* ------------------------------------------------------------ */
/**
* Retrieve all avaliable thread ids
@ -246,79 +201,14 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
/* ------------------------------------------------------------ */
/**
* Retrieve thread info.
*
* @param id thread id
* @param maxDepth maximum stack depth
* @return thread info
* Initialize JMX objects.
*/
protected ThreadInfo getThreadInfo(long id, int maxDepth)
protected void init()
{
return _threadBean.getThreadInfo(id,maxDepth);
}
/* ------------------------------------------------------------ */
/**
* Output thread info to log.
*
* @param threads thread info list
*/
protected void dump(final List<ThreadInfo> threads)
_threadBean = ManagementFactory.getThreadMXBean();
if (_threadBean.isThreadCpuTimeSupported())
{
if (threads != null && threads.size() > 0)
{
for (ThreadInfo info : threads)
{
StringBuffer msg = new StringBuffer();
if (info.getLockOwnerId() < 0)
{
String state = info.isInNative() ? "IN_NATIVE" :
info.getThreadState().toString();
msg.append(String.format("Thread %s[id:%d,%s] is spinning",
info.getThreadName(), info.getThreadId(), state));
}
else
{
msg.append(String.format("Thread %s[id:%d,%s]",
info.getThreadName(), info.getThreadId(), info.getThreadState()));
msg.append(String.format(" on %s owned by %s[id:%d]",
info.getLockName(), info.getLockOwnerName(), info.getLockOwnerId()));
}
_logger.warn(new ThreadMonitorException(msg.toString(), info.getStackTrace()));
}
}
}
protected void dumpAll()
{
if (_extInfo.size() > 0)
{
List<ExtThreadInfo> sorted = new ArrayList<ExtThreadInfo>(_extInfo.values());
Collections.sort(sorted, new Comparator<ExtThreadInfo>() {
/* ------------------------------------------------------------ */
public int compare(ExtThreadInfo eti1, ExtThreadInfo eti2)
{
return (int)Math.signum(eti2.getCpuUtilization()-eti1.getCpuUtilization());
}
});
for (ExtThreadInfo info : sorted)
{
ThreadInfo threadInfo = getThreadInfo(info.getThreadId(), 0);
if (info.getCpuUtilization() > 1.0f)
{
String state = threadInfo.isInNative() ? "IN_NATIVE" :
threadInfo.getThreadState().toString();
_logger.info(String.format("Thread %s[id:%d,%s] is using %.2f%% of CPU",
threadInfo.getThreadName(), threadInfo.getThreadId(),
state, info.getCpuUtilization()));
}
info.setDumpCpuTime(info.getLastCpuTime());
info.setDumpSampleTime(info.getLastSampleTime());
}
_threadBean.setThreadCpuTimeEnabled(true);
}
}
@ -338,7 +228,7 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
{
try
{
Thread.sleep(50);
Thread.sleep(100);
}
catch (InterruptedException ex)
{
@ -347,90 +237,65 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
continue;
}
List<ThreadInfo> threadInfo = new ArrayList<ThreadInfo>();
findSpinningThreads(threadInfo);
findDeadlockedThreads(threadInfo);
collectThreadInfo();
lastTime = System.currentTimeMillis();
if (threadInfo.size() > 0)
{
dump(threadInfo);
}
if (_dumpInterval > 0 && lastTime > lastDumpTime + _dumpInterval)
if (_logInterval > 0 && lastTime > lastDumpTime + _logInterval)
{
logCpuUsage();
lastDumpTime = lastTime;
dumpAll();
}
logThreadState();
}
}
/* ------------------------------------------------------------ */
/**
* Find spinning threads.
*
* @param threadInfo thread info list to add the results
* @return thread info list
* Collect thread info.
*/
private List<ThreadInfo> findSpinningThreads(final List<ThreadInfo> threadInfo)
{
if (threadInfo != null)
private void collectThreadInfo()
{
try
{
long[] allThreadId = getAllThreadIds();
for (int idx=0; idx < allThreadId.length; idx++)
Map<Thread,StackTraceElement[]> all = Thread.getAllStackTraces();
for (Map.Entry<Thread,StackTraceElement[]> entry : all.entrySet())
{
long currId = allThreadId[idx];
Thread thread = entry.getKey();
long threadId = thread.getId();
if (currId == _runner.getId())
if (threadId == _runner.getId())
{
continue;
}
long currCpuTime = getThreadCpuTime(currId);
long currNanoTime = System.nanoTime();
ExtThreadInfo currExtInfo = _extInfo.get(Long.valueOf(currId));
if (currExtInfo != null)
ThreadMonitorInfo currMonitorInfo = _monitorInfo.get(Long.valueOf(threadId));
if (currMonitorInfo == null)
{
long elapsedCpuTime = currCpuTime - currExtInfo.getLastCpuTime();
long elapsedNanoTime = currNanoTime - currExtInfo.getLastSampleTime();
float cpuUtilization = Math.min((elapsedCpuTime * 100.0f) / elapsedNanoTime, 100.0f);
currExtInfo.setCpuUtilization(cpuUtilization);
if (cpuUtilization > _busyThreshold)
{
ThreadInfo currInfo = getThreadInfo(currId, Integer.MAX_VALUE);
if (currInfo != null)
{
StackTraceElement[] lastStackTrace = currExtInfo.getStackTrace();
currExtInfo.setStackTrace(currInfo.getStackTrace());
if (lastStackTrace != null
&& matchStackTraces(lastStackTrace, currInfo.getStackTrace())) {
threadInfo.add(currInfo);
}
}
}
currMonitorInfo = new ThreadMonitorInfo(thread);
currMonitorInfo.setStackTrace(entry.getValue());
currMonitorInfo.setCpuTime(getThreadCpuTime(threadId));
currMonitorInfo.setSampleTime(System.nanoTime());
_monitorInfo.put(Long.valueOf(threadId), currMonitorInfo);
}
else
{
currExtInfo = new ExtThreadInfo(getThreadInfo(currId, 0));
currExtInfo.setFirstCpuTime(currCpuTime);
currExtInfo.setFirstSampleTime(currNanoTime);
currExtInfo.setDumpCpuTime(currCpuTime);
currExtInfo.setDumpSampleTime(currNanoTime);
currMonitorInfo.setStackTrace(entry.getValue());
currMonitorInfo.setCpuTime(getThreadCpuTime(threadId));
currMonitorInfo.setSampleTime(System.nanoTime());
_extInfo.put(Long.valueOf(currId), currExtInfo);
currMonitorInfo.setSpinning(false);
if (currMonitorInfo.getCpuUtilization() > _busyThreshold)
{
StackTraceElement[] lastStackTrace = currMonitorInfo.getStackTrace();
if (lastStackTrace != null
&& matchStackTraces(lastStackTrace, entry.getValue()))
{
currMonitorInfo.setSpinning(true);
}
}
}
currExtInfo.setLastCpuTime(currCpuTime);
currExtInfo.setLastSampleTime(currNanoTime);
}
}
catch (Exception ex)
@ -439,44 +304,67 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
}
return threadInfo;
/* ------------------------------------------------------------ */
protected void logCpuUsage()
{
if (_monitorInfo.size() > 0)
{
long[] running = getAllThreadIds();
List<ThreadMonitorInfo> all = new ArrayList<ThreadMonitorInfo>();
for (int idx=0; idx<running.length; idx++)
{
ThreadMonitorInfo info = _monitorInfo.get(running[idx]);
if (info != null)
{
all.add(info);
}
}
Collections.sort(all, new Comparator<ThreadMonitorInfo>()
{
/* ------------------------------------------------------------ */
public int compare(ThreadMonitorInfo info1, ThreadMonitorInfo info2)
{
return (int)Math.signum(info2.getCpuUtilization()-info1.getCpuUtilization());
}
});
String format = "Thread %1$s[id:%2$d,%3$s] - %4$.2f%%";
for (ThreadMonitorInfo info : all)
{
if (info.getCpuUtilization() > _logThreshold)
{
String message = String.format(format, info.getThreadName(),
info.getThreadId(), info.getThreadState(), info.getCpuUtilization());
_logger.info(message);
}
}
}
}
/* ------------------------------------------------------------ */
/**
* Find deadlocked threads.
* Output thread info to log.
*
* @param threadInfo thread info list to add the results
* @return thread info list
* @param detected thread info list
*/
private List<ThreadInfo> findDeadlockedThreads(final List<ThreadInfo> threadInfo)
protected void logThreadState()
{
if (threadInfo != null)
if (_monitorInfo.size() > 0)
{
try
long[] all = getAllThreadIds();
for (int idx=0; idx<all.length; idx++)
{
long[] threads = findDeadlockedThreads();
if (threads != null && threads.length > 0)
ThreadMonitorInfo info = _monitorInfo.get(all[idx]);
if (info != null && info.isSpinning())
{
ThreadInfo currInfo;
for (int idx=0; idx < threads.length; idx++)
{
currInfo = getThreadInfo(threads[idx], Integer.MAX_VALUE);
if (currInfo != null)
{
threadInfo.add(currInfo);
String message = String.format("%1$s[id:%2$d,%3$s] is SPINNING",
info.getThreadName(), info.getThreadId(), info.getThreadState());
_logger.warn(new ThreadMonitorException(message, info.getStackTrace()));
}
}
}
}
catch (Exception ex)
{
Log.debug(ex);
}
}
return threadInfo;
}
/* ------------------------------------------------------------ */
/**
@ -501,208 +389,4 @@ public class ThreadMonitor extends AbstractLifeCycle implements Runnable
}
return match;
}
/* ------------------------------------------------------------ */
private class ExtThreadInfo
{
private ThreadInfo _threadInfo;
private long _firstCpuTime;
private long _firstSampleTime;
private long _lastCpuTime;
private long _lastSampleTime;
private long _dumpCpuTime;
private long _dumpSampleTime;
private float _cpuUtilization;
private StackTraceElement[] _stackTrace;
/* ------------------------------------------------------------ */
public ExtThreadInfo(ThreadInfo threadInfo)
{
_threadInfo = threadInfo;
}
/* ------------------------------------------------------------ */
/**
* @return thread id associated with the instance
*/
public ThreadInfo getThreadInfo()
{
return _threadInfo;
}
/**
* @return the thread Id
*/
public long getThreadId()
{
return _threadInfo.getThreadId();
}
/* ------------------------------------------------------------ */
/**
* @return the first CPU time of the thread
*/
public long getFirstCpuTime()
{
return _firstCpuTime;
}
/* ------------------------------------------------------------ */
/**
* Set the first CPU time.
*
* @param ns new last CPU time
*/
public void setFirstCpuTime(long ns)
{
_firstCpuTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the time of first sample
*/
public long getFirstSampleTime()
{
return _firstSampleTime;
}
/* ------------------------------------------------------------ */
/**
* Sets the first sample time.
*
* @param ns the time of first sample
*/
public void setFirstSampleTime(long ns)
{
_firstSampleTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the last CPU time of the thread
*/
public long getLastCpuTime()
{
return _lastCpuTime;
}
/* ------------------------------------------------------------ */
/**
* Set the last CPU time.
*
* @param ns new last CPU time
*/
public void setLastCpuTime(long ns)
{
_lastCpuTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the time of last sample
*/
public long getLastSampleTime()
{
return _lastSampleTime;
}
/* ------------------------------------------------------------ */
/**
* Sets the last sample time.
*
* @param ns the time of last sample
*/
public void setLastSampleTime(long ns)
{
_lastSampleTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the dump CPU time of the thread
*/
public long getDumpCpuTime()
{
return _dumpCpuTime;
}
/* ------------------------------------------------------------ */
/**
* Set the dump CPU time.
*
* @param ns new dump CPU time
*/
public void setDumpCpuTime(long ns)
{
_dumpCpuTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the time of dump sample
*/
public long getDumpSampleTime()
{
return _dumpSampleTime;
}
/* ------------------------------------------------------------ */
/**
* Sets the dump sample time.
*
* @param ns the time of dump sample
*/
public void setDumpSampleTime(long ns)
{
_dumpSampleTime = ns;
}
/* ------------------------------------------------------------ */
/**
* Gets the CPU utilization.
*
* @return the CPU utilization percentage
*/
public float getCpuUtilization()
{
return _cpuUtilization;
}
/* ------------------------------------------------------------ */
/**
* Sets the CPU utilization.
*
* @param percentage the new CPU utilization percentage
*/
public void setCpuUtilization(float percentage)
{
_cpuUtilization = percentage;
}
/* ------------------------------------------------------------ */
/**
* Gets the stack trace.
*
* @return the stack trace
*/
public StackTraceElement[] getStackTrace()
{
return _stackTrace;
}
/* ------------------------------------------------------------ */
/**
* Sets the stack trace.
*
* @param stackTrace the new stack trace
*/
public void setStackTrace(StackTraceElement[] stackTrace)
{
_stackTrace = stackTrace;
}
}
}

View File

@ -0,0 +1,176 @@
// ========================================================================
// Copyright (c) Webtide LLC
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.monitor;
/* ------------------------------------------------------------ */
/**
*/
public class ThreadMonitorInfo
{
private Thread _thread;
private StackTraceElement[] _stackTrace;
private boolean _threadSpinning;
private long _prevCpuTime;
private long _prevSampleTime;
private long _currCpuTime;
private long _currSampleTime;
/* ------------------------------------------------------------ */
/**
* Instantiates a new thread monitor info.
*
* @param threadInfo the thread info
*/
public ThreadMonitorInfo(Thread thread)
{
_thread = thread;
}
/* ------------------------------------------------------------ */
/**
* @return Id of the thread
*/
public long getThreadId()
{
return _thread.getId();
}
/* ------------------------------------------------------------ */
/**
* Gets the thread name.
*
* @return the thread name
*/
public String getThreadName()
{
return _thread.getName();
}
/* ------------------------------------------------------------ */
/**
* Gets the thread state.
*
* @return the thread state
*/
public String getThreadState()
{
return _thread.getState().toString();
}
/* ------------------------------------------------------------ */
/**
* Gets the stack trace.
*
* @return the stack trace
*/
public StackTraceElement[] getStackTrace()
{
return _stackTrace;
}
/* ------------------------------------------------------------ */
/**
* Sets the stack trace.
*
* @param stackTrace the new stack trace
*/
public void setStackTrace(StackTraceElement[] stackTrace)
{
_stackTrace = stackTrace;
}
/* ------------------------------------------------------------ */
/**
* Checks if is spinning.
*
* @return true, if is spinning
*/
public boolean isSpinning()
{
return _threadSpinning;
}
/* ------------------------------------------------------------ */
/**
* Sets the spinning flag.
*
* @param value the new value
*/
public void setSpinning(boolean value)
{
_threadSpinning = value;
}
/* ------------------------------------------------------------ */
/**
* @return the CPU time of the thread
*/
public long getCpuTime()
{
return _currCpuTime;
}
/* ------------------------------------------------------------ */
/**
* Set the CPU time.
*
* @param ns new CPU time
*/
public void setCpuTime(long ns)
{
_prevCpuTime = _currCpuTime;
_currCpuTime = ns;
}
/* ------------------------------------------------------------ */
/**
* @return the time of sample
*/
public long getSampleTime()
{
return _currSampleTime;
}
/* ------------------------------------------------------------ */
/**
* Sets the sample time.
*
* @param ns the time of sample
*/
public void setSampleTime(long ns)
{
_prevSampleTime = _currSampleTime;
_currSampleTime = ns;
}
/* ------------------------------------------------------------ */
/**
* Gets the CPU utilization.
*
* @return the CPU utilization percentage
*/
public float getCpuUtilization()
{
long elapsedCpuTime = _currCpuTime - _prevCpuTime;
long elapsedNanoTime = _currSampleTime - _prevSampleTime;
return elapsedNanoTime > 0 ? Math.min((elapsedCpuTime * 100.0f) / elapsedNanoTime, 100.0f) : 0;
}
}

View File

@ -16,8 +16,6 @@ package org.eclipse.jetty.monitor;
import static org.junit.Assert.assertTrue;
import java.lang.management.ThreadInfo;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;
@ -29,29 +27,29 @@ import org.junit.Test;
public class ThreadMonitorTest
{
public final static int DURATION=9000;
private AtomicInteger count=new AtomicInteger(0);
private AtomicInteger countDump=new AtomicInteger(0);
@Test
public void monitorTest() throws Exception
{
final AtomicInteger countCpuLogs=new AtomicInteger(0);
final AtomicInteger countStateLogs=new AtomicInteger(0);
ThreadMonitor monitor = new ThreadMonitor(1000,50,2)
ThreadMonitor monitor = new ThreadMonitor(1000,50,1)
{
@Override
protected void dump(List<ThreadInfo> threads)
protected void logCpuUsage()
{
count.incrementAndGet();
super.dump(threads);
countCpuLogs.incrementAndGet();
super.logCpuUsage();
}
@Override
protected void dumpAll()
protected void logThreadState()
{
countDump.incrementAndGet();
super.dumpAll();
countStateLogs.incrementAndGet();
super.logThreadState();
}
};
monitor.enableDumpAll(2000,1);
monitor.logCpuUsage(2000,1);
monitor.start();
Spinner spinner = new Spinner();
@ -63,8 +61,8 @@ public class ThreadMonitorTest
spinner.setDone();
monitor.stop();
assertTrue(count.get() >= 2);
assertTrue(countDump.get() >= 1);
assertTrue(countCpuLogs.get() >= 1);
assertTrue(countStateLogs.get() >= 1);
}
@ -100,6 +98,5 @@ public class ThreadMonitorTest
if (result==42)
System.err.println("Bingo!");
}
}
}

View File

@ -26,7 +26,7 @@
<!-- the web.xml file. -->
<!-- =========================================================== -->
<!--
<Call name="addLoginService">
<Call name="addBean">
<Arg>
<New class="org.eclipse.jetty.plus.jaas.JAASLoginService">
<Set name="name">xyzrealm</Set>

View File

@ -657,8 +657,8 @@ public class Response implements HttpServletResponse
if (encoding==null)
{
/* implementation of educated defaults */
if(_mimeType!=null)
encoding = null; // TODO getHttpContext().getEncodingByMimeType(_mimeType);
if(_cachedMimeType != null)
encoding = MimeTypes.getCharsetFromContentType(_cachedMimeType);
if (encoding==null)
encoding = StringUtil.__ISO_8859_1;

View File

@ -65,7 +65,7 @@ public abstract class HttpServerTestBase extends HttpServerTestFixture
private static final String REQUEST2_HEADER=
"POST / HTTP/1.0\n"+
"Host: localhost\n"+
"Content-Type: text/xml\n"+
"Content-Type: text/xml;charset=ISO-8859-1\n"+
"Content-Length: ";
private static final String REQUEST2_CONTENT=
"<?xml version=\"1.0\" encoding=\"ISO-8859-1\"?>\n"+

View File

@ -114,11 +114,20 @@ public class ResponseTest
assertEquals("foo2/bar2;charset=ISO-8859-1",response.getContentType());
response.recycle();
response.setContentType("text/xml;charset=ISO-8859-7");
response.getWriter();
response.setContentType("text/html;charset=UTF-8");
assertEquals("text/html;charset=ISO-8859-7",response.getContentType());
response.recycle();
response.setContentType("text/html;charset=US-ASCII");
response.getWriter();
assertEquals("text/html;charset=US-ASCII",response.getContentType());
response.recycle();
response.setContentType("text/json");
response.getWriter();
assertEquals("text/json;charset=UTF-8", response.getContentType());
}
@Test

View File

@ -6,7 +6,6 @@ import java.util.Map;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.WebSocketParser.FrameHandler;
public class AbstractExtension implements Extension

View File

@ -8,7 +8,6 @@ import java.util.zip.Inflater;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.log.Log;
public class DeflateFrameExtension extends AbstractExtension

View File

@ -1,32 +1,18 @@
package org.eclipse.jetty.websocket;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
/**
* @version $Revision$ $Date$
@ -68,14 +54,6 @@ public class TestClient implements WebSocket.OnFrame
{
}
public void onError(String message, Throwable ex)
{
System.err.println("onError: "+message);
if (ex!=null)
ex.printStackTrace();
_handshook.countDown();
}
public void onClose(int closeCode, String message)
{
_handshook.countDown();
@ -141,8 +119,10 @@ public class TestClient implements WebSocket.OnFrame
private void open() throws Exception
{
__client.open(new URI("ws://"+_host+":"+_port+"/"),this,_protocol,_timeout);
_handshook.await(10,TimeUnit.SECONDS);
WebSocketClient client = new WebSocketClient(__client);
client.setProtocol(_protocol);
client.setMaxIdleTime(_timeout);
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
}
public void ping(byte opcode,byte[] data,int fragment) throws Exception

View File

@ -107,12 +107,6 @@ public class TestServer extends Server
System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),connection);
}
public void onError(String message, Throwable ex)
{
if (_verbose)
System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),message);
}
public void onHandshake(FrameConnection connection)
{
if (_verbose)

View File

@ -29,13 +29,6 @@ public interface WebSocket
*/
void onOpen(Connection connection);
/**
* Called when a new websocket connection cannot be created
* @param message The error message
* @param ex The exception or null
*/
void onError(String message, Throwable ex);
/**
* Called when an established websocket connection closes
* @param closeCode
@ -155,7 +148,10 @@ public interface WebSocket
byte textOpcode();
byte continuationOpcode();
byte finMask();
String getProtocol();
void setFakeFragments(boolean fake);
boolean isFakeFragments();
boolean isControl(byte opcode);
boolean isText(byte opcode);
boolean isBinary(byte opcode);

View File

@ -15,11 +15,8 @@ package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.ThreadLocalBuffers;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.BuffersFactory;
/* ------------------------------------------------------------ */

View File

@ -3,14 +3,21 @@ package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
@ -29,97 +36,274 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.Timeout;
/* ------------------------------------------------------------ */
/** WebSocket Client
* <p>This WebSocket Client class can create multiple websocket connections to multiple destinations.
* It uses the same {@link WebSocket} endpoint API as the server.
* Simple usage is as follows: <pre>
* WebSocketClient client = new WebSocketClient();
* client.setMaxIdleTime(500);
* client.start();
*
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"),new WebSocket.OnTextMessage()
* {
* public void onOpen(Connection connection)
* {
* // open notification
* }
*
* public void onClose(int closeCode, String message)
* {
* // close notification
* }
*
* public void onMessage(String data)
* {
* // handle incoming message
* }
* }).get(5,TimeUnit.SECONDS);
*
* connection.sendMessage("Hello World");
* </pre>
*/
public class WebSocketClient extends AggregateLifeCycle
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getCanonicalName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final WebSocketClient _root;
private final WebSocketClient _parent;
private final ThreadPool _threadPool;
private final Selector _selector=new Selector();
private final Timeout _connectQ=new Timeout();
private int _connectTimeout=30000;
private final WebSocketClientSelector _selector;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private int _bufferSize=64*1024;
private boolean _blockingConnect=false;
private String _protocol;
private int _maxIdleTime=-1;
private WebSocketBuffers _buffers;
public WebSocketClient(ThreadPool threadpool)
{
_threadPool=threadpool;
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
*/
public WebSocketClient()
{
this(new QueuedThreadPool());
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
* @param threadpool
*/
public WebSocketClient(ThreadPool threadpool)
{
_root=this;
_parent=null;
_threadPool=threadpool;
_selector=new WebSocketClientSelector();
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client from another.
* <p>If multiple clients are required so that connections created may have different
* configurations, then it is more efficient to create a client based on another, so
* that the thread pool and IO infrastructure may be shared.
*/
public WebSocketClient(WebSocketClient parent)
{
_root=parent._root;
_parent=parent;
_threadPool=parent._threadPool;
_selector=parent._selector;
_parent.addBean(this);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
public int getConnectTimeout()
{
return _connectTimeout;
}
public void setConnectTimeout(int connectTimeout)
{
if (isRunning())
throw new IllegalStateException(getState());
_connectTimeout = connectTimeout;
}
/* ------------------------------------------------------------ */
/** Get the maxIdleTime for connections opened by this client.
* @return The maxIdleTime in ms, or -1 if the default from {@link #getSelectorManager()} is used.
*/
public int getMaxIdleTime()
{
return (int)_selector.getMaxIdleTime();
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Set the maxIdleTime for connections opened by this client.
* @param maxIdleTime max idle time in ms
*/
public void setMaxIdleTime(int maxIdleTime)
{
_selector.setMaxIdleTime(maxIdleTime);
_maxIdleTime=maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Get the WebSocket Buffer size for connections opened by this client.
* @return the buffer size in bytes.
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/** Set the WebSocket Buffer size for connections opened by this client.
* @param bufferSize the buffer size in bytes.
*/
public void setBufferSize(int bufferSize)
{
if (isRunning())
throw new IllegalStateException(getState());
_bufferSize = bufferSize;
}
public boolean isBlockingConnect()
/* ------------------------------------------------------------ */
/** Get the subprotocol string for connections opened by this client.
* @return The subprotocol
*/
public String getProtocol()
{
return _blockingConnect;
return _protocol;
}
public void setBlockingConnect(boolean blockingConnect)
/* ------------------------------------------------------------ */
/** Set the subprotocol string for connections opened by this client.
* @param protocol The subprotocol
*/
public void setProtocol(String protocol)
{
_blockingConnect = blockingConnect;
_protocol = protocol;
}
/* ------------------------------------------------------------ */
public Map<String,String> getCookies()
{
return _cookies;
}
/* ------------------------------------------------------------ */
public List<String> getExtensions()
{
return _extensions;
}
/* ------------------------------------------------------------ */
/** Open a WebSocket connection.
* Open a websocket connection to the URI and block until the connection is accepted or there is an error.
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @param maxConnectTime The interval to wait for a successful connection
* @param units the units of the maxConnectTime
* @return A {@link WebSocket.Connection}
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
{
try
{
return open(uri,websocket).get(maxConnectTime,units);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
if (cause instanceof Error)
throw (Error)cause;
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
throw new RuntimeException(cause);
}
}
/* ------------------------------------------------------------ */
/** Asynchronously open a websocket connection.
* Open a websocket connection and return a {@link Future} to obtain the connection.
* The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.
*
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @return A {@link Future} to the {@link WebSocket.Connection}
* @throws IOException
*/
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
if ("wss".equalsIgnoreCase(scheme))
throw new IOException("wss not supported");
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
return holder;
}
@Override
protected void doStart() throws Exception
{
if (_parent!=null && !_parent.isRunning())
throw new IllegalStateException("parent:"+getState());
_buffers = new WebSocketBuffers(_bufferSize);
super.doStart();
// Start a selector and timer if this is the root client
if (_parent==null)
{
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable(){
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
@ -136,88 +320,14 @@ public class WebSocketClient extends AggregateLifeCycle
}
});
}
_connectQ.setDuration(_connectTimeout);
_threadPool.dispatch(new Runnable(){
public void run()
{
while(isRunning())
{
try
{
Thread.sleep(200); // TODO configure?
_connectQ.tick(System.currentTimeMillis());
}
catch(Exception e)
{
__log.warn(e);
}
}
}
});
}
public void open(URI uri, WebSocket websocket) throws IOException
{
open(uri,websocket,null,(int)_selector.getMaxIdleTime(),null,null);
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime) throws IOException
{
open(uri,websocket,protocol,(int)_selector.getMaxIdleTime(),null,null);
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime,Map<String,String> cookies) throws IOException
{
open(uri,websocket,protocol,(int)_selector.getMaxIdleTime(),cookies,null);
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime,Map<String,String> cookies,List<String> extensions) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
if ("wss".equalsIgnoreCase(scheme))
throw new IOException("wss not supported");
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.socket().setSoTimeout(getMaxIdleTime());
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
WebSocketHolder holder=new WebSocketHolder(websocket,uri,protocol,maxIdleTime,cookies,extensions,channel);
_connectQ.schedule(holder);
boolean thrown=true;
try
{
if (isBlockingConnect())
{
channel.socket().connect(address,0);
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
channel.connect(address);
}
_selector.register( channel, holder);
thrown=false;
}
finally
{
if (thrown)
holder.cancel();
}
}
class Selector extends SelectorManager
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
@ -234,18 +344,20 @@ public class WebSocketClient extends AggregateLifeCycle
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketHolder holder = (WebSocketHolder) endpoint.getSelectionKey().attachment();
WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
@ -257,29 +369,34 @@ public class WebSocketClient extends AggregateLifeCycle
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketHolder))
if (!(attachment instanceof WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketHolder holder = (WebSocketHolder)attachment;
holder.cancel();
holder.getWebSocket().onError(ex.toString(),ex);
WebSocketFuture holder = (WebSocketFuture)attachment;
holder.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketHolder _holder;
private final WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketHolder holder)
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
@ -328,15 +445,18 @@ public class WebSocketClient extends AggregateLifeCycle
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String request=
"GET "+_holder.getURI().getPath()+" HTTP/1.1\r\n"+
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Version: 8\r\n";
"Sec-WebSocket-Version: "+WebSocketConnectionD10.VERSION+"\r\n";
if (holder.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
@ -356,16 +476,16 @@ public class WebSocketClient extends AggregateLifeCycle
try
{
ByteArrayBuffer handshake = new ByteArrayBuffer(request);
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
__log.debug(e);
_holder.getWebSocket().onError("Handshake failed",e);
holder.handshakeFailed(e);
}
}
public Connection handle() throws IOException
@ -375,8 +495,7 @@ public class WebSocketClient extends AggregateLifeCycle
switch (_parser.parseAvailable())
{
case -1:
_holder.cancel();
_holder.getWebSocket().onError("EOF",new EOFException());
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
@ -384,10 +503,11 @@ public class WebSocketClient extends AggregateLifeCycle
break;
}
}
if (_error==null && _accept==null)
if (_error==null)
{
if (_accept==null)
_error="No Sec-WebSocket-Accept";
else if (_error==null && !WebSocketConnectionD10.hashKey(_key).equals(_accept))
else if (!WebSocketConnectionD10.hashKey(_key).equals(_accept))
_error="Bad Sec-WebSocket-Accept";
else
{
@ -398,12 +518,11 @@ public class WebSocketClient extends AggregateLifeCycle
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
if (_holder.getWebSocket() instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_holder.getWebSocket()).onHandshake((WebSocket.FrameConnection)connection.getConnection());
_holder.cancel();
_holder.getWebSocket().onOpen(connection.getConnection());
_holder.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
@ -421,13 +540,18 @@ public class WebSocketClient extends AggregateLifeCycle
public void closed()
{
_holder.cancel();
_holder.getWebSocket().onError(_error==null?"EOF":_error,null);
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
class WebSocketHolder extends Timeout.Task
/* ------------------------------------------------------------ */
/** The Future Websocket Connection.
*/
class WebSocketFuture implements Future<WebSocket.Connection>
{
final WebSocket _websocket;;
final URI _uri;
@ -435,9 +559,13 @@ public class WebSocketClient extends AggregateLifeCycle
final int _maxIdleTime;
final Map<String,String> _cookies;
final List<String> _extensions;
final ByteChannel _channel;
final CountDownLatch _done = new CountDownLatch(1);
public WebSocketHolder(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
ByteChannel _channel;
WebSocketConnection _connection;
Throwable _exception;
public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
@ -448,6 +576,60 @@ public class WebSocketClient extends AggregateLifeCycle
_channel=channel;
}
public void onConnection(WebSocketConnection connection)
{
try
{
synchronized (this)
{
if (_channel!=null)
_connection=connection;
}
if (_connection!=null)
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)connection.getConnection());
_websocket.onOpen(connection.getConnection());
}
}
finally
{
_done.countDown();
}
}
public void handshakeFailed(Throwable ex)
{
try
{
ByteChannel channel=null;
synchronized (this)
{
if (_channel!=null)
{
channel=_channel;
_channel=null;
_exception=ex;
}
}
if (channel!=null)
{
if (ex instanceof ProtocolException)
closeChannel(channel,WebSocketConnectionD10.CLOSE_PROTOCOL,ex.getMessage());
else
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,ex.getMessage());
}
}
finally
{
_done.countDown();
}
}
public Map<String,String> getCookies()
{
return _cookies;
@ -473,25 +655,115 @@ public class WebSocketClient extends AggregateLifeCycle
return _maxIdleTime;
}
@Override
public void expired()
{
try
{
__log.debug("expired "+this);
getWebSocket().onError("expired",null);
_channel.close();
}
catch(IOException e)
{
__log.ignore(e);
}
}
public String toString()
{
return "[" + _uri + ","+_websocket+"]@"+hashCode();
}
public boolean cancel(boolean mayInterruptIfRunning)
{
try
{
ByteChannel channel=null;
synchronized (this)
{
if (_connection==null && _exception==null && _channel!=null)
{
channel=_channel;
_channel=null;
}
}
if (channel!=null)
{
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"cancelled");
return true;
}
return false;
}
finally
{
_done.countDown();
}
}
public boolean isCancelled()
{
synchronized (this)
{
return _channel==null && _connection==null;
}
}
public boolean isDone()
{
synchronized (this)
{
return _connection!=null && _exception==null;
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
{
try
{
return get(Long.MAX_VALUE,TimeUnit.SECONDS);
}
catch(TimeoutException e)
{
throw new IllegalStateException("The universe has ended",e);
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
{
_done.await(timeout,unit);
ByteChannel channel=null;
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
Throwable exception=null;
synchronized (this)
{
exception=_exception;
if (_connection==null)
{
exception=_exception;
channel=_channel;
_channel=null;
}
else
connection=_connection.getConnection();
}
if (channel!=null)
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"timeout");
if (exception!=null)
throw new ExecutionException(exception);
if (connection!=null)
return connection;
throw new TimeoutException();
}
private void closeChannel(ByteChannel channel,int code, String message)
{
try
{
_websocket.onClose(code,message);
}
catch(Exception e)
{
__log.warn(e);
}
try
{
channel.close();
}
catch(IOException e)
{
__log.debug(e);
}
}
}
}

View File

@ -17,4 +17,6 @@ public interface WebSocketConnection extends Connection
void handshake(HttpServletRequest request, HttpServletResponse response, String origin, String subprotocol) throws IOException;
List<Extension> getExtensions();
WebSocket.Connection getConnection();
}

View File

@ -87,6 +87,13 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}
}
/* ------------------------------------------------------------ */
public org.eclipse.jetty.websocket.WebSocket.Connection getConnection()
{
return this;
}
/* ------------------------------------------------------------ */
public void setHixieKeys(String key1,String key2)
{
@ -523,4 +530,16 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
{
return 0;
}
public void setFakeFragments(boolean fake)
{
// TODO Auto-generated method stub
}
public boolean isFakeFragments()
{
// TODO Auto-generated method stub
return false;
}
}

View File

@ -33,10 +33,10 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
{
@ -491,6 +491,15 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
}
public void setFakeFragments(boolean fake)
{
}
public boolean isFakeFragments()
{
return false;
}
}
/* ------------------------------------------------------------ */

View File

@ -33,10 +33,10 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocketGeneratorD10.MaskGen;
public class WebSocketConnectionD10 extends AbstractConnection implements WebSocketConnection
@ -46,6 +46,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
final static byte OP_BINARY = 0x02;
final static byte OP_EXT_DATA = 0x03;
final static byte OP_CONTROL = 0x08;
final static byte OP_CLOSE = 0x08;
final static byte OP_PING = 0x09;
final static byte OP_PONG = 0x0A;
@ -60,14 +61,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
final static int CLOSE_NOCLOSE=1006;
final static int CLOSE_NOTUTF8=1007;
final static int FLAG_FIN=0x8;
final static int VERSION=8;
static boolean isLastFrame(byte flags)
{
return (flags&0x8)!=0;
return (flags&FLAG_FIN)!=0;
}
static boolean isControlFrame(byte opcode)
{
return (opcode&0x8)!=0;
return (opcode&OP_CONTROL)!=0;
}
private final static byte[] MAGIC;
@ -85,8 +90,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
private final String _protocol;
private final int _draft;
private int _close;
private boolean _closedIn;
private boolean _closedOut;
private volatile boolean _closedIn;
private volatile boolean _closedOut;
private int _maxTextMessageSize;
private int _maxBinaryMessageSize=-1;
@ -103,12 +108,12 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
}
}
private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD07();
private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private final WebSocket.FrameConnection _connection = new FrameConnectionD10();
private final WebSocket.FrameConnection _connection = new WSFrameConnection();
/* ------------------------------------------------------------ */
@ -339,7 +344,6 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
_close=code;
}
try
{
if (closed)
@ -358,7 +362,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100);
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length);
}
_outbound.flush();
@ -388,29 +392,29 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameConnectionD10 implements WebSocket.FrameConnection
private class WSFrameConnection implements WebSocket.FrameConnection
{
volatile boolean _disconnecting;
int _maxTextMessage=WebSocketConnectionD10.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD10.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
public synchronized void sendMessage(String content) throws IOException
public void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
byte[] data = content.getBytes(StringUtil.__UTF8);
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_TEXT,data,0,data.length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_TEXT,data,0,data.length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
public void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_BINARY,content,offset,length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_BINARY,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
@ -430,7 +434,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
{
if (_closedOut)
throw new IOException("closing");
_outbound.addFrame((byte)0x8,ctrl,data,offset,length);
_outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
checkWriteable();
_idle.access(_endp);
}
@ -507,7 +511,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
public byte finMask()
{
return 0x8;
return FLAG_FIN;
}
/* ------------------------------------------------------------ */
@ -558,6 +562,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
close(CLOSE_NORMAL,null);
}
/* ------------------------------------------------------------ */
public void setFakeFragments(boolean fake)
{
_parser.setFakeFragments(fake);
}
/* ------------------------------------------------------------ */
public boolean isFakeFragments()
{
return _parser.isFakeFragments();
}
/* ------------------------------------------------------------ */
public String toString()
{
@ -568,7 +584,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameHandlerD07 implements WebSocketParser.FrameHandler
private class WSFrameHandler implements WebSocketParser.FrameHandler
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
@ -583,7 +599,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
// Ignore incoming after a close
if (_closedIn)
return;
}
try
{
byte[] array=buffer.array();
@ -753,7 +769,6 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
Log.warn(th);
}
}
}
public void close(int code,String message)
{

View File

@ -20,7 +20,6 @@ import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */

View File

@ -14,13 +14,11 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */

View File

@ -18,8 +18,6 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;

View File

@ -18,8 +18,6 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
@ -48,7 +46,6 @@ public class WebSocketParserD10 implements WebSocketParser
}
};
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final FrameHandler _handler;
@ -63,6 +60,7 @@ public class WebSocketParserD10 implements WebSocketParser
private final byte[] _mask = new byte[4];
private int _m;
private boolean _skip;
private boolean _fakeFragments=true;
/* ------------------------------------------------------------ */
/**
@ -81,6 +79,24 @@ public class WebSocketParserD10 implements WebSocketParser
_state=State.START;
}
/* ------------------------------------------------------------ */
/**
* @return True if fake fragments should be created for frames larger than the buffer.
*/
public boolean isFakeFragments()
{
return _fakeFragments;
}
/* ------------------------------------------------------------ */
/**
* @param fakeFragments True if fake fragments should be created for frames larger than the buffer.
*/
public void setFakeFragments(boolean fakeFragments)
{
_fakeFragments = fakeFragments;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
@ -121,8 +137,34 @@ public class WebSocketParserD10 implements WebSocketParser
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
{
// Can we send a fake frame?
if (_fakeFragments && _state==State.DATA)
{
Buffer data =_buffer.get(4*(available/4));
_buffer.compact();
if (_masked)
{
if (data.array()==null)
data=_buffer.asMutableBuffer();
byte[] array = data.array();
final int end=data.putIndex();
for (int i=data.getIndex();i<end;i++)
array[i]^=_mask[_m++%4];
}
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
events++;
_bytesNeeded-=data.length();
_handler.onFrame((byte)(_flags&(0xff^WebSocketConnectionD10.FLAG_FIN)), _opcode, data);
_opcode=WebSocketConnectionD10.OP_CONTINUATION;
}
if (_buffer.space() == 0)
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
}
// catch IOExceptions (probably EOF) and try to parse what we have
try
@ -202,7 +244,7 @@ public class WebSocketParserD10 implements WebSocketParser
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
if (_length>_buffer.capacity())
if (_length>_buffer.capacity() && !_fakeFragments)
{
events++;
_handler.close(WebSocketConnectionD10.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());

View File

@ -1,23 +1,22 @@
package org.eclipse.jetty.websocket;
import static org.hamcrest.CoreMatchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO;
@ -28,20 +27,20 @@ import org.junit.Test;
public class WebSocketClientTest
{
private ServerSocket server;
private int serverPort;
private ServerSocket _server;
private int _serverPort;
@Before
public void startServer() throws IOException {
server = new ServerSocket();
server.bind(null);
serverPort = server.getLocalPort();
_server = new ServerSocket();
_server.bind(null);
_serverPort = _server.getLocalPort();
}
@After
public void stopServer() throws IOException {
if(server != null) {
server.close();
if(_server != null) {
_server.close();
}
}
@ -62,10 +61,6 @@ public class WebSocketClientTest
open.set(true);
}
public void onError(String message, Throwable ex)
{
}
public void onClose(int closeCode, String message)
{}
});
@ -81,269 +76,124 @@ public class WebSocketClientTest
}
@Test
public void testBlockingConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
client.setBlockingConnect(true);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
try
{
client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onError(String message, Throwable ex)
{
}
public void onClose(int closeCode, String message)
{}
});
Assert.fail();
}
catch(IOException e)
{
bad=true;
}
Assert.assertTrue(bad);
Assert.assertFalse(open.get());
}
@Test
public void testAsyncConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(1000);
client.start();
client.setBlockingConnect(false);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
Throwable error=null;
try
{
bad=true;
future.get(1,TimeUnit.SECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
error=e.getCause();
}
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof ConnectException);
}
@Test
public void testBlockingConnectionNotAccepted() throws Exception
public void testConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(500);
client.setBlockingConnect(true);
client.start();
boolean bad=false;
final AtomicReference<String> error = new AtomicReference<String>(null);
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
latch.countDown();
}
});
}
catch(IOException e)
{
e.printStackTrace();
bad=true;
}
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertTrue(bad||error.get()!=null);
}
@Test
public void testAsyncConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
Throwable error=null;
try
{
bad=true;
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(TimeoutException e)
{
error=e;
}
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
}
@Test
public void testBlockingConnectionTimeout() throws Exception
public void testConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(500);
client.setBlockingConnect(true);
client.start();
boolean bad=false;
final AtomicReference<String> error = new AtomicReference<String>(null);
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
latch.countDown();
}
});
}
catch(IOException e)
{
e.printStackTrace();
bad=true;
}
Assert.assertNotNull(server.accept());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertTrue(bad||error.get()!=null);
}
@Test
public void testAsyncConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertNotNull(server.accept());
Assert.assertFalse(bad);
Assert.assertNotNull(_server.accept());
Throwable error=null;
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(TimeoutException e)
{
error=e;
}
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
}
@ -351,271 +201,160 @@ public class WebSocketClientTest
public void testBadHandshake() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
System.out.printf("onOpen(%s)%n", connection);
System.out.flush();
// TODO I don't think we should be seeing onOpen called on the
// bad handshake because the error here should mean that there is no
// websocket, so no onOpen call
// what we are seeing is the onOpen is intermittently showing up before the
// onError which triggers the countdown latch and the error message is null
// at that point.
//open.set(true);
//latch.countDown();
}
public void onError(String message, Throwable ex)
{
System.out.printf("onError(%s, %s)%n", message, ex);
System.out.flush();
error.set(message);
latch.countDown();
open.set(true);
}
public void onClose(int closeCode, String message)
{
System.out.printf("onClose(%d, %s)%n", closeCode, message);
System.out.flush();
close.set(closeCode);
latch.countDown();
}
});
Socket connection = server.accept();
Socket connection = _server.accept();
respondToClient(connection, "HTTP/1.1 404 NOT FOUND\r\n\r\n");
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
Assert.assertThat("error.get()", error.get(), containsString("404 NOT FOUND"));
}
private void respondToClient(Socket connection, String serverResponse) throws IOException
Throwable error=null;
try
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try {
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while((line = buf.readLine())!=null) {
System.err.println(line);
if(line.length() == 0) {
// Got the "\r\n" line.
break;
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
error=e.getCause();
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
} finally {
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_PROTOCOL,close.get());
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND")>0);
}
@Test
public void testBadUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
// System.err.println(line);
if (line.length()==0)
break;
}
connection.getOutputStream().write((
Socket connection = _server.accept();
respondToClient(connection,
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: rubbish\r\n" +
"\r\n").getBytes());
"\r\n" );
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
}
@Test
public void testUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
Throwable error=null;
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
error=e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_PROTOCOL,close.get());
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("Bad Sec-WebSocket-Accept")>=0);
}
@Test
public void testUpgradeThenTCPClose() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
_latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
String key="not sent";
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNull(error.get());
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
socket.close();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
}
@Test
public void testIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.setMaxIdleTime(500);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
_latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
String key="not sent";
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
long start=System.currentTimeMillis();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NORMAL,close.get());
}
@ -624,42 +363,24 @@ public class WebSocketClientTest
public void testNotIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.setMaxIdleTime(500);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final Exchanger<Integer> close = new Exchanger<Integer>();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<WebSocket.Connection> connection = new AtomicReference<WebSocket.Connection>();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
final BlockingQueue<String> queue = new BlockingArrayQueue<String>();
try
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket.OnTextMessage()
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket.OnTextMessage()
{
public void onOpen(Connection c)
public void onOpen(Connection connection)
{
open.set(true);
connection.set(c);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
latch.countDown();
}
public void onClose(int closeCode, String message)
{
try
{
close.exchange(closeCode);
}
catch(InterruptedException ex)
{}
latch.countDown();
close.set(closeCode);
_latch.countDown();
}
public void onMessage(String data)
@ -667,30 +388,16 @@ public class WebSocketClientTest
queue.add(data);
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
String key="not sent";
Socket socket = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
socket.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
// Send some messages client to server
byte[] recv = new byte[1024];
@ -698,7 +405,7 @@ public class WebSocketClientTest
for (int i=0;i<10;i++)
{
Thread.sleep(250);
connection.get().sendMessage("Hello");
connection.sendMessage("Hello");
len=socket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len>0);
}
@ -714,9 +421,68 @@ public class WebSocketClientTest
Assert.assertEquals("Hi",queue.poll(1,TimeUnit.SECONDS));
}
// Close with code
long start=System.currentTimeMillis();
socket.getOutputStream().write(new byte[]{(byte)0x88, (byte) 0x02, (byte)4, (byte)87 },0,4);
socket.getOutputStream().flush();
Assert.assertEquals(new Integer(1111),close.exchange(null,1,TimeUnit.SECONDS));
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(1111,close.get());
}
private void respondToClient(Socket connection, String serverResponse) throws IOException
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try {
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while((line = buf.readLine())!=null)
{
// System.err.println(line);
if(line.length() == 0)
{
// Got the "\r\n" line.
break;
}
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
}
finally
{
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
}
}
private void accept(Socket connection) throws IOException
{
String key="not sent";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
}
}

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -117,10 +116,6 @@ public class WebSocketLoadD10Test
this.outbound = outbound;
}
public void onError(String message,Throwable ex)
{
}
public void onMessage(String data)
{
try

View File

@ -250,10 +250,6 @@ public class WebSocketMessageD00Test
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onError(String message,Throwable ex)
{
}
public void onClose(int code,String message)
{
}

View File

@ -22,7 +22,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -757,10 +756,6 @@ public class WebSocketMessageD06Test
this.connection = connection;
}
public void onError(String message,Throwable ex)
{
}
public void onOpen(Connection connection)
{
if (onConnect)

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -982,11 +981,6 @@ public class WebSocketMessageD10Test
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onError(String message,Throwable ex)
{
disconnected.countDown();
}
public void onClose(int code,String message)
{
disconnected.countDown();

View File

@ -24,7 +24,7 @@ public class WebSocketParserD10Test
{
private MaskedByteArrayBuffer _in;
private Handler _handler;
private WebSocketParser _parser;
private WebSocketParserD10 _parser;
private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF};
private int _m;
@ -88,6 +88,7 @@ public class WebSocketParserD10Test
endPoint.setNonBlocking(true);
_handler = new Handler();
_parser=new WebSocketParserD10(buffers, endPoint,_handler,true);
_parser.setFakeFragments(false);
_in = new MaskedByteArrayBuffer();
endPoint.setIn(_in);
@ -250,6 +251,7 @@ public class WebSocketParserD10Test
public void testFrameTooLarge() throws Exception
{
// Buffers are only 1024, so this frame is too large
_parser.setFakeFragments(false);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
@ -287,6 +289,27 @@ public class WebSocketParserD10Test
assertEquals(1024,_handler._data.get(0).length());
}
@Test
public void testFakeFragement() throws Exception
{
// Buffers are only 1024, so this frame will be fake fragmented
_parser.setFakeFragments(true);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(2048>>8));
_in.putUnmasked((byte)(2048&0xff));
_in.sendMask();
for (int i=0;i<2048;i++)
_in.put((byte)'a');
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(2,_handler._frames);
assertEquals(WebSocketConnectionD10.OP_CONTINUATION,_handler._opcode);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
@ -295,9 +318,11 @@ public class WebSocketParserD10Test
private byte _opcode;
int _code;
String _message;
int _frames;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
_frames++;
_flags=flags;
_opcode=opcode;
if ((flags&0x8)==0)

View File

@ -68,12 +68,6 @@ public class WebSocketChatServlet extends WebSocketServlet
}
}
public void onError(String message,Throwable ex)
{
Log.warn(this+" onError",ex);
_members.remove(this);
}
public void onClose(int code, String message)
{
// Log.info(this+" onDisconnect");