Issue #3374 - comments, name changes and fixes to timeout algorithm

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-05-06 15:50:34 +10:00 committed by Greg Wilkins
parent 1ef191ffcd
commit 5f4c6c3b94
5 changed files with 59 additions and 31 deletions

View File

@ -99,12 +99,12 @@ public class JavaxWebSocketRemoteEndpoint implements javax.websocket.RemoteEndpo
public long getWriteIdleTimeout()
{
return channel.getWriteIdleTimeout().toMillis();
return channel.getWriteTimeout().toMillis();
}
public void setWriteIdleTimeout(long ms)
{
channel.setWriteIdleTimeout(Duration.ofMillis(ms));
channel.setWriteTimeout(Duration.ofMillis(ms));
}
@Override

View File

@ -141,16 +141,26 @@ public interface FrameHandler extends IncomingFrames
*/
Duration getIdleTimeout();
Duration getWriteIdleTimeout();
/**
* Get the Write Timeout
*
* @return the write timeout
*/
Duration getWriteTimeout();
/**
* Set the Idle Timeout.
*
* @param timeout the timeout duration
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setIdleTimeout(Duration timeout);
void setWriteIdleTimeout(Duration timeout);
/**
* Set the Write Timeout.
*
* @param timeout the timeout duration (timeout &lt;= 0 implies an infinite timeout)
*/
void setWriteTimeout(Duration timeout);
boolean isAutoFragment();
@ -396,7 +406,7 @@ public interface FrameHandler extends IncomingFrames
}
@Override
public Duration getWriteIdleTimeout()
public Duration getWriteTimeout()
{
return Duration.ZERO;
}
@ -407,7 +417,7 @@ public interface FrameHandler extends IncomingFrames
}
@Override
public void setWriteIdleTimeout(Duration timeout)
public void setWriteTimeout(Duration timeout)
{
}
@ -511,7 +521,7 @@ public interface FrameHandler extends IncomingFrames
class ConfigurationCustomizer implements Customizer, Configuration
{
private Duration timeout;
private Duration idleTimeout;
private Duration writeTimeout;
private Boolean autoFragment;
private Long maxFrameSize;
@ -523,25 +533,25 @@ public interface FrameHandler extends IncomingFrames
@Override
public Duration getIdleTimeout()
{
return timeout==null ? Duration.ZERO : timeout;
return idleTimeout;
}
@Override
public Duration getWriteIdleTimeout()
public Duration getWriteTimeout()
{
return timeout==null ? Duration.ZERO : timeout;
return writeTimeout;
}
@Override
public void setIdleTimeout(Duration timeout)
{
this.timeout = timeout;
this.idleTimeout = timeout==null ? Duration.ZERO : timeout;
}
@Override
public void setWriteIdleTimeout(Duration timeout)
public void setWriteTimeout(Duration timeout)
{
this.writeTimeout = timeout;
this.writeTimeout = timeout==null ? Duration.ZERO : timeout;
}
@Override
@ -619,10 +629,10 @@ public interface FrameHandler extends IncomingFrames
@Override
public void customize(CoreSession session)
{
if (timeout!=null)
session.setIdleTimeout(timeout);
if (idleTimeout !=null)
session.setIdleTimeout(idleTimeout);
if (writeTimeout!=null)
session.setWriteIdleTimeout(timeout);
session.setWriteTimeout(idleTimeout);
if (autoFragment!=null)
session.setAutoFragment(autoFragment);
if (maxFrameSize!=null)

View File

@ -62,6 +62,7 @@ public class FrameFlusher extends IteratingCallback
private ByteBuffer batchBuffer = null;
private boolean canEnqueue = true;
private boolean flushed = true;
private Throwable closedCause;
private LongAdder messagesOut = new LongAdder();
private LongAdder bytesOut = new LongAdder();
@ -108,6 +109,10 @@ public class FrameFlusher extends IteratingCallback
else
queue.offerLast(entry);
/* If the queue was empty then no timeout has been set, so we set a timeout to check the current
entry when it expires. When the timeout expires we will go over entries in the queue and
entries list to see if any of them have expired, it will then reset the timeout for the frame
with the soonest expiry time. */
if ((idleTimeout > 0) && (queue.size()==1) && entries.isEmpty())
timeoutScheduler.schedule(this::timeoutExpired, idleTimeout, TimeUnit.MILLISECONDS);
@ -159,6 +164,9 @@ public class FrameFlusher extends IteratingCallback
previousEntries.addAll(entries);
entries.clear();
if (flushed && batchBuffer!=null)
BufferUtil.clear(batchBuffer);
while (!queue.isEmpty() && entries.size() <= maxGather)
{
Entry entry = queue.poll();
@ -202,10 +210,7 @@ public class FrameFlusher extends IteratingCallback
// Add the payload to the list of buffers
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
buffers.add(payload);
break;
}
}
else
{
@ -216,6 +221,8 @@ public class FrameFlusher extends IteratingCallback
if (BufferUtil.hasContent(payload))
buffers.add(payload);
}
flushed = flush;
}
}
@ -265,7 +272,6 @@ public class FrameFlusher extends IteratingCallback
}
return Action.SCHEDULED;
}
private int getQueueSize()
@ -284,17 +290,19 @@ public class FrameFlusher extends IteratingCallback
if (closedCause != null)
return;
long time = System.currentTimeMillis();
long nextTimeout = -1;
long currentTime = System.currentTimeMillis();
long expiredIfCreatedBefore = currentTime - idleTimeout;
long earliestEntry = currentTime;
/* Iterate through entries in both the queue and entries list.
If any entry has expired then we fail the FrameFlusher.
Otherwise we will try to schedule a new timeout. */
Iterator<Entry> iterator = TypeUtil.concat(entries.iterator(), queue.iterator());
while (iterator.hasNext())
{
Entry entry = iterator.next();
long timeSinceCreation = time - entry.getTimeOfCreation();
if (timeSinceCreation >= idleTimeout)
if (entry.getTimeOfCreation() <= expiredIfCreatedBefore)
{
LOG.warn("FrameFlusher write timeout on entry: {}", entry);
failed = true;
@ -307,12 +315,16 @@ public class FrameFlusher extends IteratingCallback
break;
}
if (timeSinceCreation > nextTimeout)
nextTimeout = timeSinceCreation;
if (entry.getTimeOfCreation() < earliestEntry)
earliestEntry = entry.getTimeOfCreation();
}
if (!failed && idleTimeout>0 && !entries.isEmpty())
// if a timeout is set schedule a new timeout if we haven't failed and still have entries
if (!failed && idleTimeout>0 && !(entries.isEmpty() && queue.isEmpty()))
{
long nextTimeout = earliestEntry + idleTimeout - currentTime;
timeoutScheduler.schedule(this::timeoutExpired, nextTimeout, TimeUnit.MILLISECONDS);
}
}
if (failed)

View File

@ -241,7 +241,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
@Override
public Duration getWriteIdleTimeout()
public Duration getWriteTimeout()
{
if (getConnection() == null)
return idleWriteTimeout;
@ -250,7 +250,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
@Override
public void setWriteIdleTimeout(Duration timeout)
public void setWriteTimeout(Duration timeout)
{
if (getConnection() == null)
idleWriteTimeout = timeout;

View File

@ -35,6 +35,12 @@ public interface WebSocketServletFactory extends FrameHandler.Configuration
@Override
void setIdleTimeout(Duration duration);
@Override
Duration getWriteTimeout();
@Override
void setWriteTimeout(Duration duration);
@Override
int getInputBufferSize();