Merge pull request #10809 from s1monw/issues/10807
[TRANSLOG] Fail #snapshot if translog is closed
This commit is contained in:
commit
54ccfda484
|
@ -50,6 +50,7 @@ import java.nio.file.*;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
|
@ -93,7 +94,7 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
|
|
||||||
private final ApplySettings applySettings = new ApplySettings();
|
private final ApplySettings applySettings = new ApplySettings();
|
||||||
|
|
||||||
|
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
public FsTranslog(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
|
||||||
|
@ -140,14 +141,16 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
if (indexSettingsService != null) {
|
if (closed.compareAndSet(false, true)) {
|
||||||
indexSettingsService.removeListener(applySettings);
|
if (indexSettingsService != null) {
|
||||||
}
|
indexSettingsService.removeListener(applySettings);
|
||||||
rwl.writeLock().lock();
|
}
|
||||||
try {
|
rwl.writeLock().lock();
|
||||||
IOUtils.close(this.trans, this.current);
|
try {
|
||||||
} finally {
|
IOUtils.close(this.trans, this.current);
|
||||||
rwl.writeLock().unlock();
|
} finally {
|
||||||
|
rwl.writeLock().unlock();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -355,6 +358,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog
|
||||||
@Override
|
@Override
|
||||||
public FsChannelSnapshot snapshot() throws TranslogException {
|
public FsChannelSnapshot snapshot() throws TranslogException {
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (closed.get()) {
|
||||||
|
throw new TranslogException(shardId, "translog is already closed");
|
||||||
|
}
|
||||||
FsChannelSnapshot snapshot = current.snapshot();
|
FsChannelSnapshot snapshot = current.snapshot();
|
||||||
if (snapshot != null) {
|
if (snapshot != null) {
|
||||||
return snapshot;
|
return snapshot;
|
||||||
|
|
|
@ -332,6 +332,17 @@ public abstract class AbstractSimpleTranslogTests extends ElasticsearchTestCase
|
||||||
snapshot.close();
|
snapshot.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSnapshotOnClosedTranslog() throws IOException {
|
||||||
|
assertTrue(Files.exists(translogDir.resolve("translog-1")));
|
||||||
|
translog.add(new Translog.Create("test", "1", new byte[]{1}));
|
||||||
|
translog.close();
|
||||||
|
try {
|
||||||
|
Translog.Snapshot snapshot = translog.snapshot();
|
||||||
|
} catch (TranslogException ex) {
|
||||||
|
assertEquals(ex.getMessage(), "translog is already closed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void deleteOnRollover() throws IOException {
|
public void deleteOnRollover() throws IOException {
|
||||||
translog.add(new Translog.Create("test", "1", new byte[]{1}));
|
translog.add(new Translog.Create("test", "1", new byte[]{1}));
|
||||||
|
|
Loading…
Reference in New Issue