[TRANSPORT] never send requests after transport service is stopped
With local transport or any transport that doesn't necessarily send notification if connections are closed we might miss a node disconnection and the request handler hangs forever / until the timeout kicks in. This window only exists during shutdown and is likely unproblematic in practice but tests might run into this problem when local transport is used.
This commit is contained in:
parent
dc1ef7e670
commit
a90d7b1670
|
@ -196,13 +196,11 @@ public class RecoverySource extends AbstractComponent {
|
|||
pool.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
IndexInput indexInput = null;
|
||||
store.incRef();
|
||||
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
||||
try {
|
||||
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
|
||||
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
|
||||
byte[] buf = new byte[BUFFER_SIZE];
|
||||
indexInput = store.directory().openInput(name, IOContext.READONCE);
|
||||
final byte[] buf = new byte[BUFFER_SIZE];
|
||||
boolean shouldCompressRequest = recoverySettings.compress();
|
||||
if (CompressorFactory.isCompressed(indexInput)) {
|
||||
shouldCompressRequest = false;
|
||||
|
@ -249,7 +247,6 @@ public class RecoverySource extends AbstractComponent {
|
|||
exceptions.add(0, e); // last exceptions first
|
||||
}
|
||||
} finally {
|
||||
IOUtils.closeWhileHandlingException(indexInput);
|
||||
try {
|
||||
store.decRef();
|
||||
} finally {
|
||||
|
|
|
@ -41,6 +41,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.Map;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
|
||||
|
@ -50,6 +51,7 @@ import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_
|
|||
*/
|
||||
public class TransportService extends AbstractLifecycleComponent<TransportService> {
|
||||
|
||||
private final AtomicBoolean started = new AtomicBoolean(false);
|
||||
protected final Transport transport;
|
||||
protected final ThreadPool threadPool;
|
||||
|
||||
|
@ -92,10 +94,14 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
if (transport.boundAddress() != null && logger.isInfoEnabled()) {
|
||||
logger.info("{}", transport.boundAddress());
|
||||
}
|
||||
boolean setStarted = started.compareAndSet(false, true);
|
||||
assert setStarted : "service was already started";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws ElasticsearchException {
|
||||
final boolean setStopped = started.compareAndSet(true, false);
|
||||
assert setStopped : "service has already been stopped";
|
||||
try {
|
||||
transport.stop();
|
||||
} finally {
|
||||
|
@ -191,11 +197,16 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
final long requestId = newRequestId();
|
||||
TimeoutHandler timeoutHandler = null;
|
||||
try {
|
||||
clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
|
||||
if (started.get() == false) {
|
||||
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
|
||||
// it will only notify if the toStop code hasn't done the work yet.
|
||||
throw new TransportException("TransportService is closed stopped can't send request");
|
||||
}
|
||||
if (options.timeout() != null) {
|
||||
timeoutHandler = new TimeoutHandler(requestId);
|
||||
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
|
||||
}
|
||||
clientHandlers.put(requestId, new RequestHolder<>(handler, node, action, timeoutHandler));
|
||||
transport.sendRequest(node, requestId, action, request, options);
|
||||
} catch (final Throwable e) {
|
||||
// usually happen either because we failed to connect to the node
|
||||
|
|
Loading…
Reference in New Issue