WebSocket - More concurrency fixes found by FindBugs

This commit is contained in:
Joakim Erdfelt 2013-08-01 13:51:18 -07:00
parent 07041f6439
commit 833aa6723d
8 changed files with 171 additions and 114 deletions

View File

@ -448,4 +448,30 @@ public class QuoteUtil
}
return ret.toString();
}
public static String join(Object[] objs, String delim)
{
if (objs == null)
{
return "";
}
StringBuilder ret = new StringBuilder();
int len = objs.length;
for (int i = 0; i < len; i++)
{
if (i > 0)
{
ret.append(delim);
}
if (objs[i] instanceof String)
{
ret.append('"').append(objs[i]).append('"');
}
else
{
ret.append(objs[i]);
}
}
return ret.toString();
}
}

View File

@ -31,7 +31,7 @@ public class JettyAnnotatedImpl implements EventDriverImpl
public EventDriver create(Object websocket, WebSocketPolicy policy)
{
Class<?> websocketClass = websocket.getClass();
synchronized (cache)
synchronized (this)
{
JettyAnnotatedMetadata metadata = cache.get(websocketClass);
if (metadata == null)

View File

@ -27,6 +27,7 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.util.QuoteUtil;
public class EventMethod
{
@ -103,7 +104,7 @@ public class EventMethod
}
catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e)
{
String err = String.format("Cannot call method %s on %s with args: %s",method,pojo,args);
String err = String.format("Cannot call method %s on %s with args: %s",method,pojo, QuoteUtil.join(args,","));
throw new WebSocketException(err,e);
}
}

View File

@ -21,8 +21,6 @@ package org.eclipse.jetty.websocket.common.io;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -64,12 +62,11 @@ public class IOState
private ConnectionState state;
private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean inputAvailable;
private final AtomicBoolean outputAvailable;
private final AtomicReference<CloseHandshakeSource> closeHandshakeSource;
private final AtomicReference<CloseInfo> closeInfo;
private final AtomicBoolean cleanClose;
private boolean inputAvailable;
private boolean outputAvailable;
private CloseHandshakeSource closeHandshakeSource;
private CloseInfo closeInfo;
private boolean cleanClose;
/**
* Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
@ -77,11 +74,11 @@ public class IOState
public IOState()
{
this.state = ConnectionState.CONNECTING;
this.inputAvailable = new AtomicBoolean(false);
this.outputAvailable = new AtomicBoolean(false);
this.closeHandshakeSource = new AtomicReference<>(CloseHandshakeSource.NONE);
this.closeInfo = new AtomicReference<>();
this.cleanClose = new AtomicBoolean(false);
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.NONE;
this.closeInfo = null;
this.cleanClose = false;
}
public void addListener(ConnectionStateListener listener)
@ -107,7 +104,7 @@ public class IOState
public CloseInfo getCloseInfo()
{
return closeInfo.get();
return closeInfo;
}
public ConnectionState getConnectionState()
@ -125,7 +122,7 @@ public class IOState
public boolean isInputAvailable()
{
return inputAvailable.get();
return inputAvailable;
}
public boolean isOpen()
@ -135,7 +132,7 @@ public class IOState
public boolean isOutputAvailable()
{
return outputAvailable.get();
return outputAvailable;
}
private void notifyStateListeners(ConnectionState state)
@ -154,7 +151,7 @@ public class IOState
public void onAbnormalClose(CloseInfo close)
{
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
if (this.state == ConnectionState.CLOSED)
{
@ -164,14 +161,15 @@ public class IOState
if (this.state == ConnectionState.OPEN)
{
this.cleanClose.set(false);
this.cleanClose = false;
}
this.state = ConnectionState.CLOSED;
this.closeInfo.compareAndSet(null,close);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
if (closeInfo == null)
this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
@ -200,22 +198,26 @@ public class IOState
onOpened();
}
synchronized (this.state)
synchronized (this)
{
closeInfo.compareAndSet(null,close);
if (closeInfo == null)
closeInfo = close;
boolean in = inputAvailable.get();
boolean out = outputAvailable.get();
closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.LOCAL);
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE)
{
closeHandshakeSource = CloseHandshakeSource.LOCAL;
}
out = false;
outputAvailable.set(false);
outputAvailable = false;
LOG.debug("onCloseLocal(), input={}, output={}",in,out);
if (!in && !out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose.set(true);
cleanClose = true;
this.state = ConnectionState.CLOSED;
event = this.state;
}
@ -226,7 +228,7 @@ public class IOState
event = this.state;
}
}
LOG.debug("event = {}",event);
// Only notify on state change events
@ -239,13 +241,13 @@ public class IOState
if (close.isHarsh())
{
LOG.debug("Harsh close, disconnecting");
synchronized (this.state)
synchronized (this)
{
this.state = ConnectionState.CLOSED;
cleanClose.set(false);
outputAvailable.set(false);
inputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
state = ConnectionState.CLOSED;
cleanClose = false;
outputAvailable = false;
inputAvailable = false;
closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
@ -261,7 +263,7 @@ public class IOState
{
LOG.debug("onCloseRemote({})",close);
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
if (this.state == ConnectionState.CLOSED)
{
@ -269,21 +271,25 @@ public class IOState
return;
}
closeInfo.compareAndSet(null,close);
if (closeInfo == null)
closeInfo = close;
boolean in = inputAvailable.get();
boolean out = outputAvailable.get();
closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.REMOTE);
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE)
{
closeHandshakeSource = CloseHandshakeSource.REMOTE;
}
in = false;
inputAvailable.set(false);
inputAvailable = false;
LOG.debug("onCloseRemote(), input={}, output={}",in,out);
if (!in && !out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose.set(true);
this.state = ConnectionState.CLOSED;
cleanClose = true;
state = ConnectionState.CLOSED;
event = this.state;
}
else if (this.state == ConnectionState.OPEN)
@ -315,11 +321,11 @@ public class IOState
}
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
this.state = ConnectionState.CONNECTED;
this.inputAvailable.set(false); // cannot read (yet)
this.outputAvailable.set(true); // write allowed
inputAvailable = false; // cannot read (yet)
outputAvailable = true; // write allowed
event = this.state;
}
notifyStateListeners(event);
@ -332,12 +338,12 @@ public class IOState
{
assert (this.state == ConnectionState.CONNECTING);
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
this.state = ConnectionState.CLOSED;
this.cleanClose.set(false);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
cleanClose = false;
inputAvailable = false;
outputAvailable = false;
event = this.state;
}
notifyStateListeners(event);
@ -357,11 +363,11 @@ public class IOState
assert (this.state == ConnectionState.CONNECTED);
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
this.state = ConnectionState.OPEN;
this.inputAvailable.set(true);
this.outputAvailable.set(true);
this.inputAvailable = true;
this.outputAvailable = true;
event = this.state;
}
notifyStateListeners(event);
@ -375,7 +381,7 @@ public class IOState
public void onReadEOF()
{
ConnectionState event = null;
synchronized (this.state)
synchronized (this)
{
if (this.state == ConnectionState.CLOSED)
{
@ -385,12 +391,13 @@ public class IOState
CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
this.cleanClose.set(false);
this.cleanClose = false;
this.state = ConnectionState.CLOSED;
this.closeInfo.compareAndSet(null,close);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
if (closeInfo == null)
this.closeInfo = close;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
@ -398,21 +405,21 @@ public class IOState
public boolean wasAbnormalClose()
{
return closeHandshakeSource.get() == CloseHandshakeSource.ABNORMAL;
return closeHandshakeSource == CloseHandshakeSource.ABNORMAL;
}
public boolean wasCleanClose()
{
return cleanClose.get();
return cleanClose;
}
public boolean wasLocalCloseInitiated()
{
return closeHandshakeSource.get() == CloseHandshakeSource.LOCAL;
return closeHandshakeSource == CloseHandshakeSource.LOCAL;
}
public boolean wasRemoteCloseInitiated()
{
return closeHandshakeSource.get() == CloseHandshakeSource.REMOTE;
return closeHandshakeSource == CloseHandshakeSource.REMOTE;
}
}

View File

@ -35,7 +35,8 @@ public class AnnotatedBinaryStreamSocket
@OnWebSocketMessage
public void onBinary(InputStream stream)
{
if(stream == null) {
if (stream == null)
{
new RuntimeException("Stream cannot be null").printStackTrace(System.err);
}
capture.add("onBinary(%s)",stream);

View File

@ -18,31 +18,56 @@
package org.eclipse.jetty.websocket.common.events;
import java.util.ArrayList;
import static org.hamcrest.Matchers.*;
import java.util.regex.Pattern;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.junit.Assert;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
@SuppressWarnings("serial")
public class EventCapture extends ArrayList<String>
public class EventCapture extends EventQueue<String>
{
public static class Assertable
{
private final String event;
public Assertable(String event)
{
this.event = event;
}
public void assertEventContains(String expected)
{
Assert.assertThat("Event",event,containsString(expected));
}
public void assertEventRegex(String regex)
{
Assert.assertTrue("Event: regex:[" + regex + "] in [" + event + "]",Pattern.matches(regex,event));
}
public void assertEventStartsWith(String expected)
{
Assert.assertThat("Event",event,startsWith(expected));
}
public void assertEvent(String expected)
{
Assert.assertThat("Event",event,is(expected));
}
}
public void add(String format, Object... args)
{
super.add(String.format(format,args));
String msg = String.format(format,args);
System.err.printf("### EVENT: %s%n",msg);
super.offer(msg);
}
public void assertEvent(int eventNum, String expected)
public Assertable pop()
{
Assert.assertThat("Event[" + eventNum + "]",get(eventNum),is(expected));
}
public void assertEventContains(int eventNum, String expected)
{
Assert.assertThat("Event[" + eventNum + "]",get(eventNum),containsString(expected));
return new Assertable(super.poll());
}
public void assertEventCount(int expectedCount)
@ -50,17 +75,6 @@ public class EventCapture extends ArrayList<String>
Assert.assertThat("Event Count",size(),is(expectedCount));
}
public void assertEventRegex(int eventNum, String regex)
{
String event = get(eventNum);
Assert.assertTrue("Event[" + eventNum + "]: regex:[" + regex + "] in [" + event + "]",Pattern.matches(regex,event));
}
public void assertEventStartsWith(int eventNum, String expected)
{
Assert.assertThat("Event[" + eventNum + "]",get(eventNum),startsWith(expected));
}
public String q(String str)
{
if (str == null)

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -61,8 +63,8 @@ public class EventDriverTest
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(2);
socket.capture.assertEventStartsWith(0,"onWebSocketConnect");
socket.capture.assertEventStartsWith(1,"onWebSocketClose");
socket.capture.pop().assertEventStartsWith("onWebSocketConnect");
socket.capture.pop().assertEventStartsWith("onWebSocketClose");
}
}
@ -79,9 +81,9 @@ public class EventDriverTest
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
socket.capture.assertEvent(1,"onBinary([11],0,11)");
socket.capture.assertEventStartsWith(2,"onClose(1000,");
socket.capture.pop().assertEventStartsWith("onConnect");
socket.capture.pop().assertEvent("onBinary([11],0,11)");
socket.capture.pop().assertEventStartsWith("onClose(1000,");
}
}
@ -98,9 +100,9 @@ public class EventDriverTest
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
socket.capture.assertEventStartsWith(1,"onError(WebSocketException: oof)");
socket.capture.assertEventStartsWith(2,"onClose(1000,");
socket.capture.pop().assertEventStartsWith("onConnect");
socket.capture.pop().assertEventStartsWith("onError(WebSocketException: oof)");
socket.capture.pop().assertEventStartsWith("onClose(1000,");
}
}
@ -119,17 +121,17 @@ public class EventDriverTest
driver.incomingFrame(new CloseInfo(StatusCode.SHUTDOWN,"testcase").asFrame());
socket.capture.assertEventCount(6);
socket.capture.assertEventStartsWith(0,"onConnect(");
socket.capture.assertEventStartsWith(1,"onFrame(PING[");
socket.capture.assertEventStartsWith(2,"onFrame(TEXT[");
socket.capture.assertEventStartsWith(3,"onFrame(BINARY[");
socket.capture.assertEventStartsWith(4,"onFrame(CLOSE[");
socket.capture.assertEventStartsWith(5,"onClose(1001,");
socket.capture.pop().assertEventStartsWith("onConnect(");
socket.capture.pop().assertEventStartsWith("onFrame(PING[");
socket.capture.pop().assertEventStartsWith("onFrame(TEXT[");
socket.capture.pop().assertEventStartsWith("onFrame(BINARY[");
socket.capture.pop().assertEventStartsWith("onFrame(CLOSE[");
socket.capture.pop().assertEventStartsWith("onClose(1001,");
}
}
@Test
public void testAnnotated_InputStream() throws IOException
public void testAnnotated_InputStream() throws IOException, TimeoutException, InterruptedException
{
AnnotatedBinaryStreamSocket socket = new AnnotatedBinaryStreamSocket();
EventDriver driver = wrap(socket);
@ -139,11 +141,11 @@ public class EventDriverTest
conn.open();
driver.incomingFrame(makeBinaryFrame("Hello World",true));
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onConnect");
socket.capture.assertEventRegex(1,"^onBinary\\(.*InputStream.*");
socket.capture.assertEventStartsWith(2,"onClose(1000,");
socket.capture.pop().assertEventStartsWith("onConnect");
socket.capture.pop().assertEventRegex("^onBinary\\(.*InputStream.*");
socket.capture.pop().assertEventStartsWith("onClose(1000,");
}
}
@ -161,9 +163,9 @@ public class EventDriverTest
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());
socket.capture.assertEventCount(3);
socket.capture.assertEventStartsWith(0,"onWebSocketConnect");
socket.capture.assertEventStartsWith(1,"onWebSocketText(\"Hello World\")");
socket.capture.assertEventStartsWith(2,"onWebSocketClose(1000,");
socket.capture.pop().assertEventStartsWith("onWebSocketConnect");
socket.capture.pop().assertEventStartsWith("onWebSocketText(\"Hello World\")");
socket.capture.pop().assertEventStartsWith("onWebSocketClose(1000,");
}
}

View File

@ -38,6 +38,12 @@ public class LocalWebSocketSession extends WebSocketSession
setOutgoingHandler(outgoingCapture);
}
@Override
public void dispatch(Runnable runnable)
{
runnable.run();
}
public OutgoingFramesCapture getOutgoingCapture()
{
return outgoingCapture;