Allow double-closing of FSTranslog
the translog might be reused across engines which is currently a problem in the design such that we have to allow calls to `close` more than once. This moves the closed check for snapshot on the actual file to exit the loop. Relates to #10807
This commit is contained in:
parent
f87fb95830
commit
2c510f0689
|
@ -239,6 +239,11 @@ public class BufferingFsTranslogFile implements FsTranslogFile {
|
||||||
return channelReference.file();
|
return channelReference.file();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean closed() {
|
||||||
|
return this.closed.get();
|
||||||
|
}
|
||||||
|
|
||||||
class WrapperOutputStream extends OutputStream {
|
class WrapperOutputStream extends OutputStream {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -94,8 +94,6 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
|
|
||||||
private final ApplySettings applySettings = new ApplySettings();
|
private final ApplySettings applySettings = new ApplySettings();
|
||||||
|
|
||||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
||||||
BigArrays bigArrays, ShardPath shardPath) throws IOException {
|
BigArrays bigArrays, ShardPath shardPath) throws IOException {
|
||||||
|
@ -141,7 +139,6 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (closed.compareAndSet(false, true)) {
|
|
||||||
if (indexSettingsService != null) {
|
if (indexSettingsService != null) {
|
||||||
indexSettingsService.removeListener(applySettings);
|
indexSettingsService.removeListener(applySettings);
|
||||||
}
|
}
|
||||||
|
@ -152,7 +149,6 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
rwl.writeLock().unlock();
|
rwl.writeLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Path location() {
|
public Path location() {
|
||||||
|
@ -358,13 +354,15 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
@Override
|
@Override
|
||||||
public FsChannelSnapshot snapshot() throws TranslogException {
|
public FsChannelSnapshot snapshot() throws TranslogException {
|
||||||
while (true) {
|
while (true) {
|
||||||
if (closed.get()) {
|
FsTranslogFile current = this.current;
|
||||||
throw new TranslogException(shardId, "translog is already closed");
|
|
||||||
}
|
|
||||||
FsChannelSnapshot snapshot = current.snapshot();
|
FsChannelSnapshot snapshot = current.snapshot();
|
||||||
if (snapshot != null) {
|
if (snapshot != null) {
|
||||||
return snapshot;
|
return snapshot;
|
||||||
}
|
}
|
||||||
|
if (current.closed() && this.current == current) {
|
||||||
|
// check if we are closed and if we are still current - then this translog is closed and we can exit
|
||||||
|
throw new TranslogException(shardId, "current translog is already closed");
|
||||||
|
}
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -82,4 +82,6 @@ public interface FsTranslogFile extends Closeable {
|
||||||
TranslogStream getStream();
|
TranslogStream getStream();
|
||||||
|
|
||||||
public Path getPath();
|
public Path getPath();
|
||||||
|
|
||||||
|
public boolean closed();
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,4 +182,10 @@ public class SimpleFsTranslogFile implements FsTranslogFile {
|
||||||
public void updateBufferSize(int bufferSize) throws TranslogException {
|
public void updateBufferSize(int bufferSize) throws TranslogException {
|
||||||
// nothing to do here...
|
// nothing to do here...
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean closed() {
|
||||||
|
return this.closed.get();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -340,7 +340,7 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
|
||||||
Translog.Snapshot snapshot = translog.snapshot();
|
Translog.Snapshot snapshot = translog.snapshot();
|
||||||
fail("translog is closed");
|
fail("translog is closed");
|
||||||
} catch (TranslogException ex) {
|
} catch (TranslogException ex) {
|
||||||
assertEquals(ex.getMessage(), "translog is already closed");
|
assertEquals(ex.getMessage(), "current translog is already closed");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue