Add elasticsearch-nio jar for base nio classes (#27801)

This is related to #27802. This commit adds a jar called
elasticsearch-nio that contains the base nio classes that will be used
for the tcp nio transport and eventually the http nio transport.

The jar does not depend on elasticsearch:core, so all references to core
have been removed.
This commit is contained in:
Tim Brooks 2017-12-20 16:29:16 -06:00 committed by GitHub
parent 8bd7a19d61
commit 06b313025c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 694 additions and 346 deletions

View File

@ -177,6 +177,7 @@ subprojects {
"org.elasticsearch:rest-api-spec:${version}": ':rest-api-spec', "org.elasticsearch:rest-api-spec:${version}": ':rest-api-spec',
"org.elasticsearch:elasticsearch:${version}": ':core', "org.elasticsearch:elasticsearch:${version}": ':core',
"org.elasticsearch:elasticsearch-cli:${version}": ':core:cli', "org.elasticsearch:elasticsearch-cli:${version}": ':core:cli',
"org.elasticsearch:elasticsearch-nio:${version}": ':libs:elasticsearch-nio',
"org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest', "org.elasticsearch.client:elasticsearch-rest-client:${version}": ':client:rest',
"org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer', "org.elasticsearch.client:elasticsearch-rest-client-sniffer:${version}": ':client:sniffer',
"org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level', "org.elasticsearch.client:elasticsearch-rest-high-level-client:${version}": ':client:rest-high-level',

View File

@ -19,13 +19,12 @@
package org.elasticsearch.action.support; package org.elasticsearch.action.support;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.BaseFuture; import org.elasticsearch.common.util.concurrent.BaseFuture;
import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.common.util.concurrent.FutureUtils;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,14 +34,7 @@ public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements
@Override @Override
public T actionGet() { public T actionGet() {
try { return FutureUtils.get(this);
return get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw rethrowExecutionException(e);
}
} }
@Override @Override
@ -62,33 +54,7 @@ public abstract class AdapterActionFuture<T, L> extends BaseFuture<T> implements
@Override @Override
public T actionGet(long timeout, TimeUnit unit) { public T actionGet(long timeout, TimeUnit unit) {
try { return FutureUtils.get(this, timeout, unit);
return get(timeout, unit);
} catch (TimeoutException e) {
throw new ElasticsearchTimeoutException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw rethrowExecutionException(e);
}
}
static RuntimeException rethrowExecutionException(ExecutionException e) {
if (e.getCause() instanceof ElasticsearchException) {
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
Throwable root = esEx.unwrapCause();
if (root instanceof ElasticsearchException) {
return (ElasticsearchException) root;
} else if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
} else if (e.getCause() instanceof RuntimeException) {
return (RuntimeException) e.getCause();
} else {
return new UncategorizedExecutionException("Failed execution", e);
}
} }
@Override @Override

View File

@ -19,9 +19,15 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.support.AdapterActionFuture;
import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.SuppressForbidden;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureUtils { public class FutureUtils {
@ -33,4 +39,60 @@ public class FutureUtils {
return false; return false;
} }
/**
* Calls {@link Future#get()} without the checked exceptions.
*
* @param future to dereference
* @param <T> the type returned
* @return the value of the future
*/
public static <T> T get(Future<T> future) {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw rethrowExecutionException(e);
}
}
/**
* Calls {@link Future#get(long, TimeUnit)} without the checked exceptions.
*
* @param future to dereference
* @param timeout to wait
* @param unit for timeout
* @param <T> the type returned
* @return the value of the future
*/
public static <T> T get(Future<T> future, long timeout, TimeUnit unit) {
try {
return future.get(timeout, unit);
} catch (TimeoutException e) {
throw new ElasticsearchTimeoutException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Future got interrupted", e);
} catch (ExecutionException e) {
throw FutureUtils.rethrowExecutionException(e);
}
}
public static RuntimeException rethrowExecutionException(ExecutionException e) {
if (e.getCause() instanceof ElasticsearchException) {
ElasticsearchException esEx = (ElasticsearchException) e.getCause();
Throwable root = esEx.unwrapCause();
if (root instanceof ElasticsearchException) {
return (ElasticsearchException) root;
} else if (root instanceof RuntimeException) {
return (RuntimeException) root;
}
return new UncategorizedExecutionException("Failed execution", root);
} else if (e.getCause() instanceof RuntimeException) {
return (RuntimeException) e.getCause();
} else {
return new UncategorizedExecutionException("Failed execution", e);
}
}
} }

View File

@ -63,6 +63,11 @@ grant codeBase "${codebase.mocksocket}" {
permission java.net.SocketPermission "*", "accept,connect"; permission java.net.SocketPermission "*", "accept,connect";
}; };
grant codeBase "${codebase.elasticsearch-nio}" {
// elasticsearch-nio makes and accepts socket connections
permission java.net.SocketPermission "*", "accept,connect";
};
grant codeBase "${codebase.elasticsearch-rest-client}" { grant codeBase "${codebase.elasticsearch-rest-client}" {
// rest makes socket connections for rest tests // rest makes socket connections for rest tests
permission java.net.SocketPermission "*", "connect"; permission java.net.SocketPermission "*", "connect";

View File

@ -0,0 +1,66 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.precommit.PrecommitTasks
apply plugin: 'elasticsearch.build'
apply plugin: 'nebula.maven-base-publish'
apply plugin: 'nebula.maven-scm'
archivesBaseName = 'elasticsearch-nio'
publishing {
publications {
nebula {
artifactId = archivesBaseName
}
}
}
dependencies {
compile "org.apache.logging.log4j:log4j-api:${versions.log4j}"
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile "org.hamcrest:hamcrest-all:${versions.hamcrest}"
testCompile("org.elasticsearch.test:framework:${version}") {
exclude group: 'org.elasticsearch', module: 'elasticsearch-nio'
}
}
forbiddenApisMain {
// elasticsearch-nio does not depend on core, so only jdk signatures should be checked
// es-all is not checked as we connect and accept sockets
signaturesURLs = [PrecommitTasks.getResource('/forbidden/jdk-signatures.txt')]
}
//JarHell is part of es core, which we don't want to pull in
jarHell.enabled=false
thirdPartyAudit.excludes = [
'org/osgi/framework/AdaptPermission',
'org/osgi/framework/AdminPermission',
'org/osgi/framework/Bundle',
'org/osgi/framework/BundleActivator',
'org/osgi/framework/BundleContext',
'org/osgi/framework/BundleEvent',
'org/osgi/framework/SynchronousBundleListener',
'org/osgi/framework/wiring/BundleWire',
'org/osgi/framework/wiring/BundleWiring'
]

View File

@ -0,0 +1 @@
7a2999229464e7a324aa503c0a52ec0f05efe7bd

View File

@ -0,0 +1,202 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright 1999-2005 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,5 @@
Apache log4j
Copyright 2007 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -17,23 +17,20 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.ESSelector;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.NetworkChannel; import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
/** /**
* This is a basic channel abstraction used by the {@link org.elasticsearch.transport.nio.NioTransport}. * This is a basic channel abstraction used by the {@link ESSelector}.
* <p> * <p>
* A channel is open once it is constructed. The channel remains open and {@link #isOpen()} will return * A channel is open once it is constructed. The channel remains open and {@link #isOpen()} will return
* true until the channel is explicitly closed. * true until the channel is explicitly closed.
@ -138,8 +135,8 @@ public abstract class AbstractNioChannel<S extends SelectableChannel & NetworkCh
} }
@Override @Override
public void addCloseListener(ActionListener<Void> listener) { public void addCloseListener(BiConsumer<Void, Throwable> listener) {
closeContext.whenComplete(ActionListener.toBiConsumer(listener)); closeContext.whenComplete(listener);
} }
// Package visibility for testing // Package visibility for testing

View File

@ -17,9 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;

View File

@ -17,14 +17,10 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import java.io.IOException; import java.io.IOException;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -47,7 +43,7 @@ public class AcceptorEventHandler extends EventHandler {
* *
* @param nioServerSocketChannel that was registered * @param nioServerSocketChannel that was registered
*/ */
void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) { protected void serverChannelRegistered(NioServerSocketChannel nioServerSocketChannel) {
SelectionKeyUtils.setAcceptInterested(nioServerSocketChannel); SelectionKeyUtils.setAcceptInterested(nioServerSocketChannel);
} }
@ -57,7 +53,7 @@ public class AcceptorEventHandler extends EventHandler {
* @param channel that was registered * @param channel that was registered
* @param exception that occurred * @param exception that occurred
*/ */
void registrationException(NioServerSocketChannel channel, Exception exception) { protected void registrationException(NioServerSocketChannel channel, Exception exception) {
logger.error(new ParameterizedMessage("failed to register server channel: {}", channel), exception); logger.error(new ParameterizedMessage("failed to register server channel: {}", channel), exception);
} }
@ -67,8 +63,8 @@ public class AcceptorEventHandler extends EventHandler {
* *
* @param nioServerChannel that can accept a connection * @param nioServerChannel that can accept a connection
*/ */
void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException { protected void acceptChannel(NioServerSocketChannel nioServerChannel) throws IOException {
ChannelFactory channelFactory = nioServerChannel.getChannelFactory(); ChannelFactory<?, ?> channelFactory = nioServerChannel.getChannelFactory();
SocketSelector selector = selectorSupplier.get(); SocketSelector selector = selectorSupplier.get();
NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector); NioSocketChannel nioSocketChannel = channelFactory.acceptNioChannel(nioServerChannel, selector);
nioServerChannel.getAcceptContext().accept(nioSocketChannel); nioServerChannel.getAcceptContext().accept(nioSocketChannel);
@ -80,7 +76,7 @@ public class AcceptorEventHandler extends EventHandler {
* @param nioServerChannel that accepting a connection * @param nioServerChannel that accepting a connection
* @param exception that occurred * @param exception that occurred
*/ */
void acceptException(NioServerSocketChannel nioServerChannel, Exception exception) { protected void acceptException(NioServerSocketChannel nioServerChannel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}", logger.debug(() -> new ParameterizedMessage("exception while accepting new channel from server channel: {}",
nioServerChannel), exception); nioServerChannel), exception);
} }

View File

@ -17,17 +17,16 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.mocksocket.PrivilegedSocketAccess;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.SocketSelector;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> { public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel, Socket extends NioSocketChannel> {
@ -38,7 +37,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
* *
* @param rawChannelFactory a factory that will construct the raw socket channels * @param rawChannelFactory a factory that will construct the raw socket channels
*/ */
ChannelFactory(RawChannelFactory rawChannelFactory) { protected ChannelFactory(RawChannelFactory rawChannelFactory) {
this.rawChannelFactory = rawChannelFactory; this.rawChannelFactory = rawChannelFactory;
} }
@ -134,7 +133,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
} }
} }
static class RawChannelFactory { protected static class RawChannelFactory {
private final boolean tcpNoDelay; private final boolean tcpNoDelay;
private final boolean tcpKeepAlive; private final boolean tcpKeepAlive;
@ -142,8 +141,8 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
private final int tcpSendBufferSize; private final int tcpSendBufferSize;
private final int tcpReceiveBufferSize; private final int tcpReceiveBufferSize;
RawChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReusedAddress, int tcpSendBufferSize, public RawChannelFactory(boolean tcpNoDelay, boolean tcpKeepAlive, boolean tcpReusedAddress, int tcpSendBufferSize,
int tcpReceiveBufferSize) { int tcpReceiveBufferSize) {
this.tcpNoDelay = tcpNoDelay; this.tcpNoDelay = tcpNoDelay;
this.tcpKeepAlive = tcpKeepAlive; this.tcpKeepAlive = tcpKeepAlive;
this.tcpReusedAddress = tcpReusedAddress; this.tcpReusedAddress = tcpReusedAddress;
@ -155,7 +154,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
SocketChannel socketChannel = SocketChannel.open(); SocketChannel socketChannel = SocketChannel.open();
try { try {
configureSocketChannel(socketChannel); configureSocketChannel(socketChannel);
PrivilegedSocketAccess.connect(socketChannel, remoteAddress); connect(socketChannel, remoteAddress);
} catch (IOException e) { } catch (IOException e) {
closeRawChannel(socketChannel, e); closeRawChannel(socketChannel, e);
throw e; throw e;
@ -165,7 +164,7 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
SocketChannel acceptNioChannel(NioServerSocketChannel serverChannel) throws IOException { SocketChannel acceptNioChannel(NioServerSocketChannel serverChannel) throws IOException {
ServerSocketChannel serverSocketChannel = serverChannel.getRawChannel(); ServerSocketChannel serverSocketChannel = serverChannel.getRawChannel();
SocketChannel socketChannel = PrivilegedSocketAccess.accept(serverSocketChannel); SocketChannel socketChannel = accept(serverSocketChannel);
try { try {
configureSocketChannel(socketChannel); configureSocketChannel(socketChannel);
} catch (IOException e) { } catch (IOException e) {
@ -202,5 +201,21 @@ public abstract class ChannelFactory<ServerSocket extends NioServerSocketChannel
socket.setSendBufferSize(tcpReceiveBufferSize); socket.setSendBufferSize(tcpReceiveBufferSize);
} }
} }
public static SocketChannel accept(ServerSocketChannel serverSocketChannel) throws IOException {
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<SocketChannel>) serverSocketChannel::accept);
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
private static void connect(SocketChannel socketChannel, InetSocketAddress remoteAddress) throws IOException {
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () -> socketChannel.connect(remoteAddress));
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
} }
} }

View File

@ -17,10 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.transport.nio.channel.NioChannel;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -30,17 +27,19 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* This is a basic selector abstraction used by {@link org.elasticsearch.transport.nio.NioTransport}. This * This is a basic selector abstraction. This selector wraps a raw nio {@link Selector}. When you call
* selector wraps a raw nio {@link Selector}. When you call {@link #runLoop()}, the selector will run until * {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
* {@link #close()} is called. This instance handles closing of channels. Users should call * of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
* {@link #queueChannelClose(NioChannel)} to schedule a channel for close by this selector. * this selector.
* <p> * <p>
* Children of this class should implement the specific {@link #processKey(SelectionKey)}, * Children of this class should implement the specific {@link #processKey(SelectionKey)},
* {@link #preSelect()}, and {@link #cleanup()} functionality. * {@link #preSelect()}, and {@link #cleanup()} functionality.
@ -54,7 +53,7 @@ public abstract class ESSelector implements Closeable {
private final ReentrantLock runLock = new ReentrantLock(); private final ReentrantLock runLock = new ReentrantLock();
private final CountDownLatch exitedLoop = new CountDownLatch(1); private final CountDownLatch exitedLoop = new CountDownLatch(1);
private final AtomicBoolean isClosed = new AtomicBoolean(false); private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final PlainActionFuture<Boolean> isRunningFuture = PlainActionFuture.newFuture(); private final CompletableFuture<Void> isRunningFuture = new CompletableFuture<>();
private volatile Thread thread; private volatile Thread thread;
ESSelector(EventHandler eventHandler) throws IOException { ESSelector(EventHandler eventHandler) throws IOException {
@ -71,7 +70,7 @@ public abstract class ESSelector implements Closeable {
*/ */
public void runLoop() { public void runLoop() {
if (runLock.tryLock()) { if (runLock.tryLock()) {
isRunningFuture.onResponse(true); isRunningFuture.complete(null);
try { try {
setThread(); setThread();
while (isOpen()) { while (isOpen()) {
@ -205,7 +204,7 @@ public abstract class ESSelector implements Closeable {
return runLock.isLocked(); return runLock.isLocked();
} }
public PlainActionFuture<Boolean> isRunningFuture() { public Future<Void> isRunningFuture() {
return isRunningFuture; return isRunningFuture;
} }

View File

@ -17,11 +17,10 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.transport.nio.channel.NioChannel;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.Selector; import java.nio.channels.Selector;
@ -30,7 +29,7 @@ public abstract class EventHandler {
protected final Logger logger; protected final Logger logger;
public EventHandler(Logger logger) { EventHandler(Logger logger) {
this.logger = logger; this.logger = logger;
} }
@ -39,7 +38,7 @@ public abstract class EventHandler {
* *
* @param exception the exception * @param exception the exception
*/ */
void selectException(IOException exception) { protected void selectException(IOException exception) {
logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception); logger.warn(new ParameterizedMessage("io exception during select [thread={}]", Thread.currentThread().getName()), exception);
} }
@ -48,7 +47,7 @@ public abstract class EventHandler {
* *
* @param exception the exception * @param exception the exception
*/ */
void closeSelectorException(IOException exception) { protected void closeSelectorException(IOException exception) {
logger.warn(new ParameterizedMessage("io exception while closing selector [thread={}]", Thread.currentThread().getName()), logger.warn(new ParameterizedMessage("io exception while closing selector [thread={}]", Thread.currentThread().getName()),
exception); exception);
} }
@ -58,7 +57,7 @@ public abstract class EventHandler {
* *
* @param exception that was uncaught * @param exception that was uncaught
*/ */
void uncaughtException(Exception exception) { protected void uncaughtException(Exception exception) {
Thread thread = Thread.currentThread(); Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, exception); thread.getUncaughtExceptionHandler().uncaughtException(thread, exception);
} }
@ -68,7 +67,7 @@ public abstract class EventHandler {
* *
* @param channel that should be closed * @param channel that should be closed
*/ */
void handleClose(NioChannel channel) { protected void handleClose(NioChannel channel) {
try { try {
channel.closeFromSelector(); channel.closeFromSelector();
} catch (IOException e) { } catch (IOException e) {
@ -83,7 +82,7 @@ public abstract class EventHandler {
* @param channel that was being closed * @param channel that was being closed
* @param exception that occurred * @param exception that occurred
*/ */
void closeException(NioChannel channel, Exception exception) { protected void closeException(NioChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("exception while closing channel: {}", channel), exception);
} }
@ -95,7 +94,7 @@ public abstract class EventHandler {
* @param channel that caused the exception * @param channel that caused the exception
* @param exception that was thrown * @param exception that was thrown
*/ */
void genericChannelException(NioChannel channel, Exception exception) { protected void genericChannelException(NioChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("exception while handling event for channel: {}", channel), exception);
} }
} }

View File

@ -17,11 +17,9 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.nio.utils.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.BigArrays;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -37,9 +35,9 @@ import java.util.function.Supplier;
* the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can
* be called and the buffer will expand using the supplier provided. * be called and the buffer will expand using the supplier provided.
*/ */
public final class InboundChannelBuffer implements Releasable { public final class InboundChannelBuffer implements AutoCloseable {
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE; private static final int PAGE_SIZE = 1 << 14;
private static final int PAGE_MASK = PAGE_SIZE - 1; private static final int PAGE_MASK = PAGE_SIZE - 1;
private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE); private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0]; private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
@ -226,20 +224,19 @@ public final class InboundChannelBuffer implements Releasable {
return (int) (index & PAGE_MASK); return (int) (index & PAGE_MASK);
} }
public static class Page implements Releasable { public static class Page implements AutoCloseable {
private final ByteBuffer byteBuffer; private final ByteBuffer byteBuffer;
private final Releasable releasable; private final Runnable closeable;
public Page(ByteBuffer byteBuffer, Releasable releasable) { public Page(ByteBuffer byteBuffer, Runnable closeable) {
this.byteBuffer = byteBuffer; this.byteBuffer = byteBuffer;
this.releasable = releasable; this.closeable = closeable;
} }
@Override @Override
public void close() { public void close() {
releasable.close(); closeable.run();
} }
} }
} }

View File

@ -17,16 +17,14 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.ESSelector;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.NetworkChannel; import java.nio.channels.NetworkChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.util.function.BiConsumer;
public interface NioChannel { public interface NioChannel {
@ -53,5 +51,5 @@ public interface NioChannel {
* *
* @param listener to be called at close * @param listener to be called at close
*/ */
void addCloseListener(ActionListener<Void> listener); void addCloseListener(BiConsumer<Void, Throwable> listener);
} }

View File

@ -17,17 +17,16 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils; import org.elasticsearch.nio.utils.ExceptionsHelper;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
@ -108,7 +107,16 @@ public class NioGroup implements AutoCloseable {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (isOpen.compareAndSet(true, false)) { if (isOpen.compareAndSet(true, false)) {
IOUtils.close(Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList())); List<ESSelector> toClose = Stream.concat(acceptors.stream(), socketSelectors.stream()).collect(Collectors.toList());
List<IOException> closingExceptions = new ArrayList<>();
for (ESSelector selector : toClose) {
try {
selector.close();
} catch (IOException e) {
closingExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closingExceptions);
} }
} }
@ -116,7 +124,18 @@ public class NioGroup implements AutoCloseable {
for (ESSelector acceptor : selectors) { for (ESSelector acceptor : selectors) {
if (acceptor.isRunning() == false) { if (acceptor.isRunning() == false) {
threadFactory.newThread(acceptor::runLoop).start(); threadFactory.newThread(acceptor::runLoop).start();
acceptor.isRunningFuture().actionGet(); try {
acceptor.isRunningFuture().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Interrupted while waiting for selector to start.", e);
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException("Exception during selector start.", e);
}
}
} }
} }
} }

View File

@ -17,9 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.transport.nio.AcceptingSelector;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -27,16 +25,16 @@ import java.util.function.Consumer;
public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> { public class NioServerSocketChannel extends AbstractNioChannel<ServerSocketChannel> {
private final ChannelFactory channelFactory; private final ChannelFactory<?, ?> channelFactory;
private Consumer<NioSocketChannel> acceptContext; private Consumer<NioSocketChannel> acceptContext;
public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory channelFactory, AcceptingSelector selector) public NioServerSocketChannel(ServerSocketChannel socketChannel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException { throws IOException {
super(socketChannel, selector); super(socketChannel, selector);
this.channelFactory = channelFactory; this.channelFactory = channelFactory;
} }
public ChannelFactory getChannelFactory() { public ChannelFactory<?, ?> getChannelFactory() {
return channelFactory; return channelFactory;
} }

View File

@ -17,11 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.InboundChannelBuffer;
import org.elasticsearch.transport.nio.SocketSelector;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -157,8 +153,8 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
return isConnected; return isConnected;
} }
public void addConnectListener(ActionListener<Void> listener) { public void addConnectListener(BiConsumer<Void, Throwable> listener) {
connectContext.whenComplete(ActionListener.toBiConsumer(listener)); connectContext.whenComplete(listener);
} }
@Override @Override

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import java.io.IOException; import java.io.IOException;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -28,7 +28,7 @@ public class RoundRobinSupplier<S> implements Supplier<S> {
private final int count; private final int count;
private AtomicInteger counter = new AtomicInteger(0); private AtomicInteger counter = new AtomicInteger(0);
public RoundRobinSupplier(S[] selectors) { RoundRobinSupplier(S[] selectors) {
this.count = selectors.length; this.count = selectors.length;
this.selectors = selectors; this.selectors = selectors;
} }

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;

View File

@ -17,17 +17,13 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.WriteContext;
import java.io.IOException; import java.io.IOException;
import java.util.function.BiConsumer;
/** /**
* Event handler designed to handle events from non-server sockets * Event handler designed to handle events from non-server sockets
@ -47,7 +43,7 @@ public class SocketEventHandler extends EventHandler {
* *
* @param channel that was registered * @param channel that was registered
*/ */
void handleRegistration(NioSocketChannel channel) { protected void handleRegistration(NioSocketChannel channel) {
SelectionKeyUtils.setConnectAndReadInterested(channel); SelectionKeyUtils.setConnectAndReadInterested(channel);
} }
@ -57,7 +53,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that was registered * @param channel that was registered
* @param exception that occurred * @param exception that occurred
*/ */
void registrationException(NioSocketChannel channel, Exception exception) { protected void registrationException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("failed to register socket channel: {}", channel), exception);
exceptionCaught(channel, exception); exceptionCaught(channel, exception);
} }
@ -68,7 +64,7 @@ public class SocketEventHandler extends EventHandler {
* *
* @param channel that was registered * @param channel that was registered
*/ */
void handleConnect(NioSocketChannel channel) { protected void handleConnect(NioSocketChannel channel) {
SelectionKeyUtils.removeConnectInterested(channel); SelectionKeyUtils.removeConnectInterested(channel);
} }
@ -78,7 +74,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that was connecting * @param channel that was connecting
* @param exception that occurred * @param exception that occurred
*/ */
void connectException(NioSocketChannel channel, Exception exception) { protected void connectException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("failed to connect to socket channel: {}", channel), exception);
exceptionCaught(channel, exception); exceptionCaught(channel, exception);
} }
@ -89,7 +85,7 @@ public class SocketEventHandler extends EventHandler {
* *
* @param channel that can be read * @param channel that can be read
*/ */
void handleRead(NioSocketChannel channel) throws IOException { protected void handleRead(NioSocketChannel channel) throws IOException {
int bytesRead = channel.getReadContext().read(); int bytesRead = channel.getReadContext().read();
if (bytesRead == -1) { if (bytesRead == -1) {
handleClose(channel); handleClose(channel);
@ -102,7 +98,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that was being read * @param channel that was being read
* @param exception that occurred * @param exception that occurred
*/ */
void readException(NioSocketChannel channel, Exception exception) { protected void readException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("exception while reading from socket channel: {}", channel), exception);
exceptionCaught(channel, exception); exceptionCaught(channel, exception);
} }
@ -113,7 +109,7 @@ public class SocketEventHandler extends EventHandler {
* *
* @param channel that can be read * @param channel that can be read
*/ */
void handleWrite(NioSocketChannel channel) throws IOException { protected void handleWrite(NioSocketChannel channel) throws IOException {
WriteContext channelContext = channel.getWriteContext(); WriteContext channelContext = channel.getWriteContext();
channelContext.flushChannel(); channelContext.flushChannel();
if (channelContext.hasQueuedWriteOps()) { if (channelContext.hasQueuedWriteOps()) {
@ -129,7 +125,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that was being written to * @param channel that was being written to
* @param exception that occurred * @param exception that occurred
*/ */
void writeException(NioSocketChannel channel, Exception exception) { protected void writeException(NioSocketChannel channel, Exception exception) {
logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", channel), exception); logger.debug(() -> new ParameterizedMessage("exception while writing to socket channel: {}", channel), exception);
exceptionCaught(channel, exception); exceptionCaught(channel, exception);
} }
@ -142,7 +138,7 @@ public class SocketEventHandler extends EventHandler {
* @param channel that caused the exception * @param channel that caused the exception
* @param exception that was thrown * @param exception that was thrown
*/ */
void genericChannelException(NioChannel channel, Exception exception) { protected void genericChannelException(NioChannel channel, Exception exception) {
super.genericChannelException(channel, exception); super.genericChannelException(channel, exception);
exceptionCaught((NioSocketChannel) channel, exception); exceptionCaught((NioSocketChannel) channel, exception);
} }
@ -153,7 +149,7 @@ public class SocketEventHandler extends EventHandler {
* @param listener that was called * @param listener that was called
* @param exception that occurred * @param exception that occurred
*/ */
<V> void listenerException(ActionListener<V> listener, Exception exception) { protected <V> void listenerException(BiConsumer<V, Throwable> listener, Exception exception) {
logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception); logger.warn(new ParameterizedMessage("exception while executing listener: {}", listener), exception);
} }

View File

@ -17,12 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.WriteContext;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
@ -30,6 +25,7 @@ import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.BiConsumer;
/** /**
* Selector implementation that handles {@link NioSocketChannel}. It's main piece of functionality is * Selector implementation that handles {@link NioSocketChannel}. It's main piece of functionality is
@ -140,10 +136,10 @@ public class SocketSelector extends ESSelector {
* @param listener to be executed * @param listener to be executed
* @param value to provide to listener * @param value to provide to listener
*/ */
public <V> void executeListener(ActionListener<V> listener, V value) { public <V> void executeListener(BiConsumer<V, Throwable> listener, V value) {
assert isOnCurrentThread() : "Must be on selector thread"; assert isOnCurrentThread() : "Must be on selector thread";
try { try {
listener.onResponse(value); listener.accept(value, null);
} catch (Exception e) { } catch (Exception e) {
eventHandler.listenerException(listener, e); eventHandler.listenerException(listener, e);
} }
@ -156,10 +152,10 @@ public class SocketSelector extends ESSelector {
* @param listener to be executed * @param listener to be executed
* @param exception to provide to listener * @param exception to provide to listener
*/ */
public <V> void executeFailedListener(ActionListener<V> listener, Exception exception) { public <V> void executeFailedListener(BiConsumer<V, Throwable> listener, Exception exception) {
assert isOnCurrentThread() : "Must be on selector thread"; assert isOnCurrentThread() : "Must be on selector thread";
try { try {
listener.onFailure(exception); listener.accept(null, exception);
} catch (Exception e) { } catch (Exception e) {
eventHandler.listenerException(listener, e); eventHandler.listenerException(listener, e);
} }

View File

@ -17,17 +17,14 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.nio.WriteOperation;
import java.io.IOException; import java.io.IOException;
import java.util.function.BiConsumer;
public interface WriteContext { public interface WriteContext {
void sendMessage(BytesReference reference, ActionListener<Void> listener); void sendMessage(Object message, BiConsumer<Void, Throwable> listener);
void queueWriteOperations(WriteOperation writeOperation); void queueWriteOperations(WriteOperation writeOperation);

View File

@ -17,33 +17,26 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.function.BiConsumer;
public class WriteOperation { public class WriteOperation {
private final NioSocketChannel channel; private final NioSocketChannel channel;
private final ActionListener<Void> listener; private final BiConsumer<Void, Throwable> listener;
private final ByteBuffer[] buffers; private final ByteBuffer[] buffers;
private final int[] offsets; private final int[] offsets;
private final int length; private final int length;
private int internalIndex; private int internalIndex;
public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<Void> listener) { public WriteOperation(NioSocketChannel channel, ByteBuffer[] buffers, BiConsumer<Void, Throwable> listener) {
this.channel = channel; this.channel = channel;
this.listener = listener; this.listener = listener;
this.buffers = toByteBuffers(bytesReference); this.buffers = buffers;
this.offsets = new int[buffers.length]; this.offsets = new int[buffers.length];
int offset = 0; int offset = 0;
for (int i = 0; i < buffers.length; i++) { for (int i = 0; i < buffers.length; i++) {
@ -58,7 +51,7 @@ public class WriteOperation {
return buffers; return buffers;
} }
public ActionListener<Void> getListener() { public BiConsumer<Void, Throwable> getListener() {
return listener; return listener;
} }
@ -96,21 +89,4 @@ public class WriteOperation {
final int i = Arrays.binarySearch(offsets, offset); final int i = Arrays.binarySearch(offsets, offset);
return i < 0 ? (-(i + 1)) - 1 : i; return i < 0 ? (-(i + 1)) - 1 : i;
} }
private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) {
BytesRefIterator byteRefIterator = bytesReference.iterator();
BytesRef r;
try {
// Most network messages are composed of three buffers.
ArrayList<ByteBuffer> buffers = new ArrayList<>(3);
while ((r = byteRefIterator.next()) != null) {
buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length));
}
return buffers.toArray(new ByteBuffer[buffers.size()]);
} catch (IOException e) {
// this is really an error since we don't do IO in our bytesreferences
throw new AssertionError("won't happen", e);
}
}
} }

View File

@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.nio.utils;
import java.util.List;
// TODO: This should eventually be removed once ExceptionsHelper is moved to a core library jar
public class ExceptionsHelper {
/**
* Rethrows the first exception in the list and adds all remaining to the suppressed list.
* If the given list is empty no exception is thrown
*
*/
public static <T extends Throwable> void rethrowAndSuppress(List<T> exceptions) throws T {
T main = null;
for (T ex : exceptions) {
main = useOrSuppress(main, ex);
}
if (main != null) {
throw main;
}
}
private static <T extends Throwable> T useOrSuppress(T first, T second) {
if (first == null) {
return second;
} else {
first.addSuppressed(second);
}
return first;
}
}

View File

@ -17,11 +17,9 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;

View File

@ -17,15 +17,9 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import org.elasticsearch.transport.nio.channel.DoNotRegisterServerChannel;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.ReadContext;
import org.elasticsearch.transport.nio.channel.WriteContext;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -45,9 +39,9 @@ public class AcceptorEventHandlerTests extends ESTestCase {
private AcceptorEventHandler handler; private AcceptorEventHandler handler;
private SocketSelector socketSelector; private SocketSelector socketSelector;
private ChannelFactory channelFactory; private ChannelFactory<NioServerSocketChannel, NioSocketChannel> channelFactory;
private NioServerSocketChannel channel; private NioServerSocketChannel channel;
private Consumer acceptedChannelCallback; private Consumer<NioSocketChannel> acceptedChannelCallback;
@Before @Before
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")

View File

@ -17,12 +17,10 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.SocketSelector;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -41,7 +39,7 @@ import static org.mockito.Mockito.when;
public class ChannelFactoryTests extends ESTestCase { public class ChannelFactoryTests extends ESTestCase {
private ChannelFactory channelFactory; private ChannelFactory<NioServerSocketChannel, NioSocketChannel> channelFactory;
private ChannelFactory.RawChannelFactory rawChannelFactory; private ChannelFactory.RawChannelFactory rawChannelFactory;
private SocketChannel rawChannel; private SocketChannel rawChannel;
private ServerSocketChannel rawServerChannel; private ServerSocketChannel rawServerChannel;
@ -51,7 +49,7 @@ public class ChannelFactoryTests extends ESTestCase {
@Before @Before
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void setupFactory() throws IOException { public void setupFactory() throws IOException {
rawChannelFactory = mock(TcpChannelFactory.RawChannelFactory.class); rawChannelFactory = mock(ChannelFactory.RawChannelFactory.class);
channelFactory = new TestChannelFactory(rawChannelFactory); channelFactory = new TestChannelFactory(rawChannelFactory);
socketSelector = mock(SocketSelector.class); socketSelector = mock(SocketSelector.class);
acceptingSelector = mock(AcceptingSelector.class); acceptingSelector = mock(AcceptingSelector.class);
@ -131,7 +129,7 @@ public class ChannelFactoryTests extends ESTestCase {
assertFalse(rawServerChannel.isOpen()); assertFalse(rawServerChannel.isOpen());
} }
private static class TestChannelFactory extends ChannelFactory { private static class TestChannelFactory extends ChannelFactory<NioServerSocketChannel, NioSocketChannel> {
TestChannelFactory(RawChannelFactory rawChannelFactory) { TestChannelFactory(RawChannelFactory rawChannelFactory) {
super(rawChannelFactory); super(rawChannelFactory);

View File

@ -17,10 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.transport.nio.SocketSelector;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;

View File

@ -17,10 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
@ -28,7 +25,7 @@ import java.nio.channels.ServerSocketChannel;
public class DoNotRegisterServerChannel extends NioServerSocketChannel { public class DoNotRegisterServerChannel extends NioServerSocketChannel {
public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory channelFactory, AcceptingSelector selector) public DoNotRegisterServerChannel(ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
throws IOException { throws IOException {
super(channel, channelFactory, selector); super(channel, channelFactory, selector);
} }

View File

@ -17,10 +17,9 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;

View File

@ -17,12 +17,11 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.common.CheckedRunnable; import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.ChannelFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;

View File

@ -17,13 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.AcceptorEventHandler;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -48,7 +47,7 @@ public class NioServerSocketChannelTests extends ESTestCase {
thread = new Thread(selector::runLoop); thread = new Thread(selector::runLoop);
closedRawChannel = new AtomicBoolean(false); closedRawChannel = new AtomicBoolean(false);
thread.start(); thread.start();
selector.isRunningFuture().actionGet(); FutureUtils.get(selector.isRunningFuture());
} }
@After @After
@ -61,9 +60,9 @@ public class NioServerSocketChannelTests extends ESTestCase {
AtomicBoolean isClosed = new AtomicBoolean(false); AtomicBoolean isClosed = new AtomicBoolean(false);
CountDownLatch latch = new CountDownLatch(1); CountDownLatch latch = new CountDownLatch(1);
NioChannel channel = new DoNotCloseServerChannel("nio", mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector); NioChannel channel = new DoNotCloseServerChannel(mock(ServerSocketChannel.class), mock(ChannelFactory.class), selector);
channel.addCloseListener(new ActionListener<Void>() { channel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override @Override
public void onResponse(Void o) { public void onResponse(Void o) {
isClosed.set(true); isClosed.set(true);
@ -75,14 +74,14 @@ public class NioServerSocketChannelTests extends ESTestCase {
isClosed.set(true); isClosed.set(true);
latch.countDown(); latch.countDown();
} }
}); }));
assertTrue(channel.isOpen()); assertTrue(channel.isOpen());
assertFalse(closedRawChannel.get()); assertFalse(closedRawChannel.get());
assertFalse(isClosed.get()); assertFalse(isClosed.get());
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
channel.addCloseListener(closeFuture); channel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
channel.close(); channel.close();
closeFuture.actionGet(); closeFuture.actionGet();
@ -95,8 +94,8 @@ public class NioServerSocketChannelTests extends ESTestCase {
private class DoNotCloseServerChannel extends DoNotRegisterServerChannel { private class DoNotCloseServerChannel extends DoNotRegisterServerChannel {
private DoNotCloseServerChannel(String profile, ServerSocketChannel channel, ChannelFactory channelFactory, private DoNotCloseServerChannel(ServerSocketChannel channel, ChannelFactory<?, ?> channelFactory, AcceptingSelector selector)
AcceptingSelector selector) throws IOException { throws IOException {
super(channel, channelFactory, selector); super(channel, channelFactory, selector);
} }

View File

@ -17,13 +17,12 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.SocketEventHandler;
import org.elasticsearch.transport.nio.SocketSelector;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -52,7 +51,7 @@ public class NioSocketChannelTests extends ESTestCase {
thread = new Thread(selector::runLoop); thread = new Thread(selector::runLoop);
closedRawChannel = new AtomicBoolean(false); closedRawChannel = new AtomicBoolean(false);
thread.start(); thread.start();
selector.isRunningFuture().actionGet(); FutureUtils.get(selector.isRunningFuture());
} }
@After @After
@ -68,7 +67,7 @@ public class NioSocketChannelTests extends ESTestCase {
NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector); NioSocketChannel socketChannel = new DoNotCloseChannel(mock(SocketChannel.class), selector);
socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class), mock(BiConsumer.class)); socketChannel.setContexts(mock(ReadContext.class), mock(WriteContext.class), mock(BiConsumer.class));
socketChannel.addCloseListener(new ActionListener<Void>() { socketChannel.addCloseListener(ActionListener.toBiConsumer(new ActionListener<Void>() {
@Override @Override
public void onResponse(Void o) { public void onResponse(Void o) {
isClosed.set(true); isClosed.set(true);
@ -79,14 +78,14 @@ public class NioSocketChannelTests extends ESTestCase {
isClosed.set(true); isClosed.set(true);
latch.countDown(); latch.countDown();
} }
}); }));
assertTrue(socketChannel.isOpen()); assertTrue(socketChannel.isOpen());
assertFalse(closedRawChannel.get()); assertFalse(closedRawChannel.get());
assertFalse(isClosed.get()); assertFalse(isClosed.get());
PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> closeFuture = PlainActionFuture.newFuture();
socketChannel.addCloseListener(closeFuture); socketChannel.addCloseListener(ActionListener.toBiConsumer(closeFuture));
socketChannel.close(); socketChannel.close();
closeFuture.actionGet(); closeFuture.actionGet();
@ -105,7 +104,7 @@ public class NioSocketChannelTests extends ESTestCase {
selector.scheduleForRegistration(socketChannel); selector.scheduleForRegistration(socketChannel);
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
socketChannel.addConnectListener(connectFuture); socketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture));
connectFuture.get(100, TimeUnit.SECONDS); connectFuture.get(100, TimeUnit.SECONDS);
assertTrue(socketChannel.isConnectComplete()); assertTrue(socketChannel.isConnectComplete());
@ -122,7 +121,7 @@ public class NioSocketChannelTests extends ESTestCase {
selector.scheduleForRegistration(socketChannel); selector.scheduleForRegistration(socketChannel);
PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture(); PlainActionFuture<Void> connectFuture = PlainActionFuture.newFuture();
socketChannel.addConnectListener(connectFuture); socketChannel.addConnectListener(ActionListener.toBiConsumer(connectFuture));
ExecutionException e = expectThrows(ExecutionException.class, () -> connectFuture.get(100, TimeUnit.SECONDS)); ExecutionException e = expectThrows(ExecutionException.class, () -> connectFuture.get(100, TimeUnit.SECONDS));
assertTrue(e.getCause() instanceof IOException); assertTrue(e.getCause() instanceof IOException);

View File

@ -17,17 +17,10 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.DoNotRegisterChannel; import org.elasticsearch.transport.nio.TcpWriteContext;
import org.elasticsearch.transport.nio.channel.NioChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.ReadContext;
import org.elasticsearch.transport.nio.channel.SelectionKeyUtils;
import org.elasticsearch.transport.nio.channel.TcpWriteContext;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
@ -121,10 +114,10 @@ public class SocketEventHandlerTests extends ESTestCase {
setWriteAndRead(channel); setWriteAndRead(channel);
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());
BytesArray bytesArray = new BytesArray(new byte[1]); ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, buffers, mock(BiConsumer.class)));
when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(1); when(rawChannel.write(buffers[0])).thenReturn(1);
handler.handleWrite(channel); handler.handleWrite(channel);
assertEquals(SelectionKey.OP_READ, selectionKey.interestOps()); assertEquals(SelectionKey.OP_READ, selectionKey.interestOps());
@ -136,10 +129,10 @@ public class SocketEventHandlerTests extends ESTestCase {
setWriteAndRead(channel); setWriteAndRead(channel);
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());
BytesArray bytesArray = new BytesArray(new byte[1]); ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class))); channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, buffers, mock(BiConsumer.class)));
when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(0); when(rawChannel.write(buffers[0])).thenReturn(0);
handler.handleWrite(channel); handler.handleWrite(channel);
assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps()); assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());

View File

@ -17,18 +17,13 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.WriteContext;
import org.elasticsearch.transport.nio.utils.TestSelectionKey;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException; import java.nio.channels.ClosedSelectorException;
@ -36,9 +31,11 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.function.BiConsumer;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same; import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -53,8 +50,8 @@ public class SocketSelectorTests extends ESTestCase {
private NioSocketChannel channel; private NioSocketChannel channel;
private TestSelectionKey selectionKey; private TestSelectionKey selectionKey;
private WriteContext writeContext; private WriteContext writeContext;
private ActionListener<Void> listener; private BiConsumer<Void, Throwable> listener;
private BytesReference bufferReference = new BytesArray(new byte[1]); private ByteBuffer[] buffers = {ByteBuffer.allocate(1)};
private Selector rawSelector; private Selector rawSelector;
@Before @Before
@ -64,7 +61,7 @@ public class SocketSelectorTests extends ESTestCase {
eventHandler = mock(SocketEventHandler.class); eventHandler = mock(SocketEventHandler.class);
channel = mock(NioSocketChannel.class); channel = mock(NioSocketChannel.class);
writeContext = mock(WriteContext.class); writeContext = mock(WriteContext.class);
listener = mock(ActionListener.class); listener = mock(BiConsumer.class);
selectionKey = new TestSelectionKey(0); selectionKey = new TestSelectionKey(0);
selectionKey.attach(channel); selectionKey.attach(channel);
rawSelector = mock(Selector.class); rawSelector = mock(Selector.class);
@ -132,26 +129,26 @@ public class SocketSelectorTests extends ESTestCase {
public void testQueueWriteWhenNotRunning() throws Exception { public void testQueueWriteWhenNotRunning() throws Exception {
socketSelector.close(); socketSelector.close();
socketSelector.queueWrite(new WriteOperation(channel, bufferReference, listener)); socketSelector.queueWrite(new WriteOperation(channel, buffers, listener));
verify(listener).onFailure(any(ClosedSelectorException.class)); verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class));
} }
public void testQueueWriteChannelIsNoLongerWritable() throws Exception { public void testQueueWriteChannelIsNoLongerWritable() throws Exception {
WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); WriteOperation writeOperation = new WriteOperation(channel, buffers, listener);
socketSelector.queueWrite(writeOperation); socketSelector.queueWrite(writeOperation);
when(channel.isWritable()).thenReturn(false); when(channel.isWritable()).thenReturn(false);
socketSelector.preSelect(); socketSelector.preSelect();
verify(writeContext, times(0)).queueWriteOperations(writeOperation); verify(writeContext, times(0)).queueWriteOperations(writeOperation);
verify(listener).onFailure(any(ClosedChannelException.class)); verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class));
} }
public void testQueueWriteSelectionKeyThrowsException() throws Exception { public void testQueueWriteSelectionKeyThrowsException() throws Exception {
SelectionKey selectionKey = mock(SelectionKey.class); SelectionKey selectionKey = mock(SelectionKey.class);
WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); WriteOperation writeOperation = new WriteOperation(channel, buffers, listener);
CancelledKeyException cancelledKeyException = new CancelledKeyException(); CancelledKeyException cancelledKeyException = new CancelledKeyException();
socketSelector.queueWrite(writeOperation); socketSelector.queueWrite(writeOperation);
@ -161,11 +158,11 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.preSelect(); socketSelector.preSelect();
verify(writeContext, times(0)).queueWriteOperations(writeOperation); verify(writeContext, times(0)).queueWriteOperations(writeOperation);
verify(listener).onFailure(cancelledKeyException); verify(listener).accept(null, cancelledKeyException);
} }
public void testQueueWriteSuccessful() throws Exception { public void testQueueWriteSuccessful() throws Exception {
WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); WriteOperation writeOperation = new WriteOperation(channel, buffers, listener);
socketSelector.queueWrite(writeOperation); socketSelector.queueWrite(writeOperation);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
@ -178,7 +175,7 @@ public class SocketSelectorTests extends ESTestCase {
} }
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception { public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); WriteOperation writeOperation = new WriteOperation(channel, buffers, listener);
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0); assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
@ -192,7 +189,7 @@ public class SocketSelectorTests extends ESTestCase {
public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws Exception { public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws Exception {
SelectionKey selectionKey = mock(SelectionKey.class); SelectionKey selectionKey = mock(SelectionKey.class);
WriteOperation writeOperation = new WriteOperation(channel, bufferReference, listener); WriteOperation writeOperation = new WriteOperation(channel, buffers, listener);
CancelledKeyException cancelledKeyException = new CancelledKeyException(); CancelledKeyException cancelledKeyException = new CancelledKeyException();
when(channel.isWritable()).thenReturn(true); when(channel.isWritable()).thenReturn(true);
@ -201,7 +198,7 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.queueWriteInChannelBuffer(writeOperation); socketSelector.queueWriteInChannelBuffer(writeOperation);
verify(writeContext, times(0)).queueWriteOperations(writeOperation); verify(writeContext, times(0)).queueWriteOperations(writeOperation);
verify(listener).onFailure(cancelledKeyException); verify(listener).accept(null, cancelledKeyException);
} }
public void testConnectEvent() throws Exception { public void testConnectEvent() throws Exception {
@ -295,7 +292,7 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.preSelect(); socketSelector.preSelect();
socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), new BytesArray(new byte[1]), listener)); socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), buffers, listener));
socketSelector.scheduleForRegistration(unRegisteredChannel); socketSelector.scheduleForRegistration(unRegisteredChannel);
TestSelectionKey testSelectionKey = new TestSelectionKey(0); TestSelectionKey testSelectionKey = new TestSelectionKey(0);
@ -304,14 +301,14 @@ public class SocketSelectorTests extends ESTestCase {
socketSelector.cleanupAndCloseChannels(); socketSelector.cleanupAndCloseChannels();
verify(listener).onFailure(any(ClosedSelectorException.class)); verify(listener).accept(isNull(Void.class), any(ClosedSelectorException.class));
verify(eventHandler).handleClose(channel); verify(eventHandler).handleClose(channel);
verify(eventHandler).handleClose(unRegisteredChannel); verify(eventHandler).handleClose(unRegisteredChannel);
} }
public void testExecuteListenerWillHandleException() throws Exception { public void testExecuteListenerWillHandleException() throws Exception {
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).onResponse(null); doThrow(exception).when(listener).accept(null, null);
socketSelector.executeListener(listener, null); socketSelector.executeListener(listener, null);
@ -321,7 +318,7 @@ public class SocketSelectorTests extends ESTestCase {
public void testExecuteFailedListenerWillHandleException() throws Exception { public void testExecuteFailedListenerWillHandleException() throws Exception {
IOException ioException = new IOException(); IOException ioException = new IOException();
RuntimeException exception = new RuntimeException(); RuntimeException exception = new RuntimeException();
doThrow(exception).when(listener).onFailure(ioException); doThrow(exception).when(listener).accept(null, ioException);
socketSelector.executeFailedListener(listener, ioException); socketSelector.executeFailedListener(listener, ioException);

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.utils; package org.elasticsearch.nio;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;

View File

@ -17,19 +17,16 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio; package org.elasticsearch.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.List; import java.util.List;
import java.util.function.BiConsumer;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -38,18 +35,19 @@ import static org.mockito.Mockito.when;
public class WriteOperationTests extends ESTestCase { public class WriteOperationTests extends ESTestCase {
private NioSocketChannel channel; private NioSocketChannel channel;
private ActionListener<Void> listener; private BiConsumer<Void, Throwable> listener;
@Before @Before
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void setFields() { public void setFields() {
channel = mock(NioSocketChannel.class); channel = mock(NioSocketChannel.class);
listener = mock(ActionListener.class); listener = mock(BiConsumer.class);
} }
public void testFlush() throws IOException { public void testFlush() throws IOException {
WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); ByteBuffer[] buffers = {ByteBuffer.allocate(10)};
WriteOperation writeOp = new WriteOperation(channel, buffers, listener);
when(channel.write(any(ByteBuffer[].class))).thenReturn(10); when(channel.write(any(ByteBuffer[].class))).thenReturn(10);
@ -60,7 +58,8 @@ public class WriteOperationTests extends ESTestCase {
} }
public void testPartialFlush() throws IOException { public void testPartialFlush() throws IOException {
WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener); ByteBuffer[] buffers = {ByteBuffer.allocate(10)};
WriteOperation writeOp = new WriteOperation(channel, buffers, listener);
when(channel.write(any(ByteBuffer[].class))).thenReturn(5); when(channel.write(any(ByteBuffer[].class))).thenReturn(5);
@ -70,11 +69,8 @@ public class WriteOperationTests extends ESTestCase {
} }
public void testMultipleFlushesWithCompositeBuffer() throws IOException { public void testMultipleFlushesWithCompositeBuffer() throws IOException {
BytesArray bytesReference1 = new BytesArray(new byte[10]); ByteBuffer[] buffers = {ByteBuffer.allocate(10), ByteBuffer.allocate(15), ByteBuffer.allocate(3)};
BytesArray bytesReference2 = new BytesArray(new byte[15]); WriteOperation writeOp = new WriteOperation(channel, buffers, listener);
BytesArray bytesReference3 = new BytesArray(new byte[3]);
CompositeBytesReference bytesReference = new CompositeBytesReference(bytesReference1, bytesReference2, bytesReference3);
WriteOperation writeOp = new WriteOperation(channel, bytesReference, listener);
ArgumentCaptor<ByteBuffer[]> buffersCaptor = ArgumentCaptor.forClass(ByteBuffer[].class); ArgumentCaptor<ByteBuffer[]> buffersCaptor = ArgumentCaptor.forClass(ByteBuffer[].class);

Binary file not shown.

View File

@ -28,6 +28,7 @@ List projects = [
'test:fixtures:krb5kdc-fixture', 'test:fixtures:krb5kdc-fixture',
'test:fixtures:old-elasticsearch', 'test:fixtures:old-elasticsearch',
'test:logger-usage', 'test:logger-usage',
'libs:elasticsearch-nio',
'modules:aggs-matrix-stats', 'modules:aggs-matrix-stats',
'modules:analysis-common', 'modules:analysis-common',
'modules:ingest-common', 'modules:ingest-common',
@ -171,3 +172,5 @@ if (extraProjects.exists()) {
addSubProjects('', extraProjectDir, projects, branches) addSubProjects('', extraProjectDir, projects, branches)
} }
} }
include 'libs'

View File

@ -21,6 +21,7 @@ import org.elasticsearch.gradle.precommit.PrecommitTasks;
dependencies { dependencies {
compile "org.elasticsearch.client:elasticsearch-rest-client:${version}" compile "org.elasticsearch.client:elasticsearch-rest-client:${version}"
compile "org.elasticsearch:elasticsearch-nio:${version}"
compile "org.elasticsearch:elasticsearch:${version}" compile "org.elasticsearch:elasticsearch:${version}"
compile "org.elasticsearch:elasticsearch-cli:${version}" compile "org.elasticsearch:elasticsearch-cli:${version}"
compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" compile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"

View File

@ -33,16 +33,15 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.nio.AcceptorEventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports; import org.elasticsearch.transport.Transports;
import org.elasticsearch.transport.nio.channel.NioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpChannelFactory;
import org.elasticsearch.transport.nio.channel.TcpNioServerSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpReadContext;
import org.elasticsearch.transport.nio.channel.TcpWriteContext;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
@ -90,7 +89,7 @@ public class NioTransport extends TcpTransport {
protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener) protected TcpNioSocketChannel initiateChannel(DiscoveryNode node, TimeValue connectTimeout, ActionListener<Void> connectListener)
throws IOException { throws IOException {
TcpNioSocketChannel channel = nioGroup.openChannel(node.getAddress().address(), clientChannelFactory); TcpNioSocketChannel channel = nioGroup.openChannel(node.getAddress().address(), clientChannelFactory);
channel.addConnectListener(connectListener); channel.addConnectListener(ActionListener.toBiConsumer(connectListener));
return channel; return channel;
} }
@ -154,7 +153,7 @@ public class NioTransport extends TcpTransport {
return (c) -> { return (c) -> {
Supplier<InboundChannelBuffer.Page> pageSupplier = () -> { Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false); Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes); return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
}; };
c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)), c.setContexts(new TcpReadContext(c, new TcpReadHandler(profileName, this), new InboundChannelBuffer(pageSupplier)),
new TcpWriteContext(c), this::exceptionCaught); new TcpWriteContext(c), this::exceptionCaught);

View File

@ -17,11 +17,14 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.nio.AcceptingSelector; import org.elasticsearch.nio.AcceptingSelector;
import org.elasticsearch.transport.nio.SocketSelector; import org.elasticsearch.nio.SocketSelector;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -39,8 +42,8 @@ public class TcpChannelFactory extends ChannelFactory<TcpNioServerSocketChannel,
private final Consumer<NioSocketChannel> contextSetter; private final Consumer<NioSocketChannel> contextSetter;
private final Consumer<NioServerSocketChannel> serverContextSetter; private final Consumer<NioServerSocketChannel> serverContextSetter;
public TcpChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer<NioSocketChannel> contextSetter, TcpChannelFactory(TcpTransport.ProfileSettings profileSettings, Consumer<NioSocketChannel> contextSetter,
Consumer<NioServerSocketChannel> serverContextSetter) { Consumer<NioServerSocketChannel> serverContextSetter) {
super(new RawChannelFactory(profileSettings.tcpNoDelay, super(new RawChannelFactory(profileSettings.tcpNoDelay,
profileSettings.tcpKeepAlive, profileSettings.tcpKeepAlive,
profileSettings.reuseAddress, profileSettings.reuseAddress,

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;

View File

@ -17,12 +17,13 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.nio.AcceptingSelector; import org.elasticsearch.nio.AcceptingSelector;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
@ -48,6 +49,11 @@ public class TcpNioServerSocketChannel extends NioServerSocketChannel implements
throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel."); throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel.");
} }
@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override @Override
public String toString() { public String toString() {
return "TcpNioServerSocketChannel{" + return "TcpNioServerSocketChannel{" +

View File

@ -17,12 +17,13 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.nio.SocketSelector; import org.elasticsearch.nio.SocketSelector;
import java.io.IOException; import java.io.IOException;
import java.net.StandardSocketOptions; import java.net.StandardSocketOptions;
@ -35,7 +36,7 @@ public class TcpNioSocketChannel extends NioSocketChannel implements TcpChannel
} }
public void sendMessage(BytesReference reference, ActionListener<Void> listener) { public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getWriteContext().sendMessage(reference, listener); getWriteContext().sendMessage(reference, ActionListener.toBiConsumer(listener));
} }
@Override @Override
@ -45,6 +46,11 @@ public class TcpNioSocketChannel extends NioSocketChannel implements TcpChannel
} }
} }
@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override @Override
public String toString() { public String toString() {
return "TcpNioSocketChannel{" + return "TcpNioSocketChannel{" +

View File

@ -17,13 +17,14 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.common.bytes.ByteBufferReference; import org.elasticsearch.common.bytes.ByteBufferReference;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.transport.nio.InboundChannelBuffer; import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.transport.nio.TcpReadHandler; import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ReadContext;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;

View File

@ -20,8 +20,7 @@
package org.elasticsearch.transport.nio; package org.elasticsearch.transport.nio;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.nio.channel.TcpNioSocketChannel;
import java.io.IOException; import java.io.IOException;

View File

@ -17,16 +17,22 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.nio.SocketSelector; import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.nio.WriteOperation; import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.WriteContext;
import org.elasticsearch.nio.WriteOperation;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.function.BiConsumer;
public class TcpWriteContext implements WriteContext { public class TcpWriteContext implements WriteContext {
@ -38,13 +44,14 @@ public class TcpWriteContext implements WriteContext {
} }
@Override @Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) { public void sendMessage(Object message, BiConsumer<Void, Throwable> listener) {
BytesReference reference = (BytesReference) message;
if (channel.isWritable() == false) { if (channel.isWritable() == false) {
listener.onFailure(new ClosedChannelException()); listener.accept(null, new ClosedChannelException());
return; return;
} }
WriteOperation writeOperation = new WriteOperation(channel, reference, listener); WriteOperation writeOperation = new WriteOperation(channel, toByteBuffers(reference), listener);
SocketSelector selector = channel.getSelector(); SocketSelector selector = channel.getSelector();
if (selector.isOnCurrentThread() == false) { if (selector.isOnCurrentThread() == false) {
selector.queueWrite(writeOperation); selector.queueWrite(writeOperation);
@ -110,4 +117,21 @@ public class TcpWriteContext implements WriteContext {
lastOpCompleted = op.isFullyFlushed(); lastOpCompleted = op.isFullyFlushed();
} }
} }
private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) {
BytesRefIterator byteRefIterator = bytesReference.iterator();
BytesRef r;
try {
// Most network messages are composed of three buffers.
ArrayList<ByteBuffer> buffers = new ArrayList<>(3);
while ((r = byteRefIterator.next()) != null) {
buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length));
}
return buffers.toArray(new ByteBuffer[buffers.size()]);
} catch (IOException e) {
// this is really an error since we don't do IO in our bytesreferences
throw new AssertionError("won't happen", e);
}
}
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;

View File

@ -17,7 +17,7 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.BytesStreamOutput;

View File

@ -17,14 +17,13 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.CompositeBytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.InboundChannelBuffer;
import org.elasticsearch.transport.nio.TcpReadHandler;
import org.junit.Before; import org.junit.Before;
import java.io.IOException; import java.io.IOException;

View File

@ -17,21 +17,23 @@
* under the License. * under the License.
*/ */
package org.elasticsearch.transport.nio.channel; package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.SocketSelector;
import org.elasticsearch.nio.WriteOperation;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.nio.SocketSelector;
import org.elasticsearch.transport.nio.WriteOperation;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.function.BiConsumer;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -40,7 +42,7 @@ import static org.mockito.Mockito.when;
public class TcpWriteContextTests extends ESTestCase { public class TcpWriteContextTests extends ESTestCase {
private SocketSelector selector; private SocketSelector selector;
private ActionListener<Void> listener; private BiConsumer<Void, Throwable> listener;
private TcpWriteContext writeContext; private TcpWriteContext writeContext;
private NioSocketChannel channel; private NioSocketChannel channel;
@ -49,7 +51,7 @@ public class TcpWriteContextTests extends ESTestCase {
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
selector = mock(SocketSelector.class); selector = mock(SocketSelector.class);
listener = mock(ActionListener.class); listener = mock(BiConsumer.class);
channel = mock(NioSocketChannel.class); channel = mock(NioSocketChannel.class);
writeContext = new TcpWriteContext(channel); writeContext = new TcpWriteContext(channel);
@ -62,7 +64,7 @@ public class TcpWriteContextTests extends ESTestCase {
writeContext.sendMessage(new BytesArray(generateBytes(10)), listener); writeContext.sendMessage(new BytesArray(generateBytes(10)), listener);
verify(listener).onFailure(any(ClosedChannelException.class)); verify(listener).accept(isNull(Void.class), any(ClosedChannelException.class));
} }
public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exception { public void testSendMessageFromDifferentThreadIsQueuedWithSelector() throws Exception {
@ -103,7 +105,8 @@ public class TcpWriteContextTests extends ESTestCase {
public void testWriteIsQueuedInChannel() throws Exception { public void testWriteIsQueuedInChannel() throws Exception {
assertFalse(writeContext.hasQueuedWriteOps()); assertFalse(writeContext.hasQueuedWriteOps());
writeContext.queueWriteOperations(new WriteOperation(channel, new BytesArray(generateBytes(10)), listener)); ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
writeContext.queueWriteOperations(new WriteOperation(channel, buffer, listener));
assertTrue(writeContext.hasQueuedWriteOps()); assertTrue(writeContext.hasQueuedWriteOps());
} }
@ -111,7 +114,8 @@ public class TcpWriteContextTests extends ESTestCase {
public void testWriteOpsCanBeCleared() throws Exception { public void testWriteOpsCanBeCleared() throws Exception {
assertFalse(writeContext.hasQueuedWriteOps()); assertFalse(writeContext.hasQueuedWriteOps());
writeContext.queueWriteOperations(new WriteOperation(channel, new BytesArray(generateBytes(10)), listener)); ByteBuffer[] buffer = {ByteBuffer.allocate(10)};
writeContext.queueWriteOperations(new WriteOperation(channel, buffer, listener));
assertTrue(writeContext.hasQueuedWriteOps()); assertTrue(writeContext.hasQueuedWriteOps());
@ -151,7 +155,7 @@ public class TcpWriteContextTests extends ESTestCase {
when(writeOperation.isFullyFlushed()).thenReturn(false); when(writeOperation.isFullyFlushed()).thenReturn(false);
writeContext.flushChannel(); writeContext.flushChannel();
verify(listener, times(0)).onResponse(null); verify(listener, times(0)).accept(null, null);
assertTrue(writeContext.hasQueuedWriteOps()); assertTrue(writeContext.hasQueuedWriteOps());
} }
@ -159,7 +163,7 @@ public class TcpWriteContextTests extends ESTestCase {
public void testMultipleWritesPartialFlushes() throws IOException { public void testMultipleWritesPartialFlushes() throws IOException {
assertFalse(writeContext.hasQueuedWriteOps()); assertFalse(writeContext.hasQueuedWriteOps());
ActionListener listener2 = mock(ActionListener.class); BiConsumer listener2 = mock(BiConsumer.class);
WriteOperation writeOperation1 = mock(WriteOperation.class); WriteOperation writeOperation1 = mock(WriteOperation.class);
WriteOperation writeOperation2 = mock(WriteOperation.class); WriteOperation writeOperation2 = mock(WriteOperation.class);
when(writeOperation1.getListener()).thenReturn(listener); when(writeOperation1.getListener()).thenReturn(listener);
@ -174,7 +178,7 @@ public class TcpWriteContextTests extends ESTestCase {
writeContext.flushChannel(); writeContext.flushChannel();
verify(selector).executeListener(listener, null); verify(selector).executeListener(listener, null);
verify(listener2, times(0)).onResponse(channel); verify(listener2, times(0)).accept(null, null);
assertTrue(writeContext.hasQueuedWriteOps()); assertTrue(writeContext.hasQueuedWriteOps());
when(writeOperation2.isFullyFlushed()).thenReturn(true); when(writeOperation2.isFullyFlushed()).thenReturn(true);

View File

@ -20,7 +20,8 @@
package org.elasticsearch.transport.nio; package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.elasticsearch.transport.nio.channel.NioSocketChannel; import org.elasticsearch.nio.SocketEventHandler;
import org.elasticsearch.nio.NioSocketChannel;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;