diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 201e9f23d8d..38c340bcbb6 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -829,7 +829,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements boolean success = false; RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name()); try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) { - try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata())) { + try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) { final byte[] buffer = new byte[BUFFER_SIZE]; int length; while((length=stream.read(buffer))>0){ diff --git a/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java b/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java index c087b92e407..d0ee6482a91 100644 --- a/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java +++ b/src/main/java/org/elasticsearch/index/store/DistributorDirectory.java @@ -18,17 +18,15 @@ */ package org.elasticsearch.index.store; -import org.apache.lucene.index.IndexFileNames; -import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.*; import org.apache.lucene.util.IOUtils; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.math.MathUtils; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.store.distributor.Distributor; import java.io.FileNotFoundException; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -75,22 +73,15 @@ public final class DistributorDirectory extends BaseDirectory { this.distributor = distributor; for (Directory dir : distributor.all()) { for (String file : dir.listAll()) { - if (!usePrimary(file)) { - nameDirMapping.put(file, dir); - } + nameDirMapping.put(file, dir); } } + lockFactory = new DistributorLockFactoryWrapper(distributor.primary()); } @Override public final String[] listAll() throws IOException { - final ArrayList files = new ArrayList<>(); - for (Directory dir : distributor.all()) { - for (String file : dir.listAll()) { - files.add(file); - } - } - return files.toArray(new String[files.size()]); + return nameDirMapping.keySet().toArray(new String[0]); } @Override @@ -106,7 +97,7 @@ public final class DistributorDirectory extends BaseDirectory { public void deleteFile(String name) throws IOException { getDirectory(name, true).deleteFile(name); Directory remove = nameDirMapping.remove(name); - assert usePrimary(name) || remove != null : "Tried to delete file " + name + " but couldn't"; + assert remove != null : "Tried to delete file " + name + " but couldn't"; } @Override @@ -145,27 +136,16 @@ public final class DistributorDirectory extends BaseDirectory { return getDirectory(name, true); } - /** - * Returns true if the primary directory should be used for the given file. - */ - private boolean usePrimary(String name) { - return IndexFileNames.SEGMENTS_GEN.equals(name) || Store.isChecksum(name) || IndexWriter.WRITE_LOCK_NAME.equals(name); - } - /** * Returns the directory that has previously been associated with this file name or associates the name with a directory * if failIfNotAssociated is set to false. */ private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException { - if (usePrimary(name)) { - return distributor.primary(); - } Directory directory = nameDirMapping.get(name); if (directory == null) { if (failIfNotAssociated) { throw new FileNotFoundException("No such file [" + name + "]"); } - // Pick a directory and associate this new file with it: final Directory dir = distributor.any(); directory = nameDirMapping.putIfAbsent(name, dir); @@ -178,24 +158,10 @@ public final class DistributorDirectory extends BaseDirectory { return directory; } - @Override - public Lock makeLock(String name) { - return distributor.primary().makeLock(name); - } - - @Override - public void clearLock(String name) throws IOException { - distributor.primary().clearLock(name); - } - - @Override - public LockFactory getLockFactory() { - return distributor.primary().getLockFactory(); - } - @Override public void setLockFactory(LockFactory lockFactory) throws IOException { distributor.primary().setLockFactory(lockFactory); + super.setLockFactory(new DistributorLockFactoryWrapper(distributor.primary())); } @Override @@ -211,7 +177,7 @@ public final class DistributorDirectory extends BaseDirectory { /** * Renames the given source file to the given target file unless the target already exists. * - * @param directoryService the DirecotrySerivce to use. + * @param directoryService the DirectoryService to use. * @param from the source file name. * @param to the target file name * @throws IOException if the target file already exists. @@ -233,4 +199,119 @@ public final class DistributorDirectory extends BaseDirectory { } } } -} + + Distributor getDistributor() { + return distributor; + } + + /** + * Basic checks to ensure the internal mapping is consistent - should only be used in assertions + */ + static boolean assertConsistency(ESLogger logger, DistributorDirectory dir) throws IOException { + boolean consistent = true; + StringBuilder builder = new StringBuilder(); + Directory[] all = dir.distributor.all(); + for (Directory d : all) { + for (String file : d.listAll()) { + final Directory directory = dir.nameDirMapping.get(file); + if (directory == null) { + consistent = false; + builder.append("File ").append(file) + .append(" was not mapped to a directory but exists in one of the distributors directories") + .append(System.lineSeparator()); + } + if (directory != d) { + consistent = false; + builder.append("File ").append(file).append(" was mapped to a directory ").append(directory) + .append(" but exists in another distributor directory").append(d) + .append(System.lineSeparator()); + } + + } + } + if (consistent == false) { + logger.info(builder.toString()); + } + assert consistent: builder.toString(); + return consistent; // return boolean so it can be easily be used in asserts + } + + /** + * This inner class is a simple wrapper around the original + * lock factory to track files written / created through the + * lock factory. For instance {@link NativeFSLockFactory} creates real + * files that we should expose for consistency reasons. + */ + private class DistributorLockFactoryWrapper extends LockFactory { + private final Directory dir; + private final LockFactory delegate; + private final boolean writesFiles; + + public DistributorLockFactoryWrapper(Directory dir) { + this.dir = dir; + final FSDirectory leaf = DirectoryUtils.getLeaf(dir, FSDirectory.class); + if (leaf != null) { + writesFiles = leaf.getLockFactory() instanceof FSLockFactory; + } else { + writesFiles = false; + } + this.delegate = dir.getLockFactory(); + } + + @Override + public void setLockPrefix(String lockPrefix) { + delegate.setLockPrefix(lockPrefix); + } + + @Override + public String getLockPrefix() { + return delegate.getLockPrefix(); + } + + @Override + public Lock makeLock(String lockName) { + return new DistributorLock(delegate.makeLock(lockName), lockName); + } + + @Override + public void clearLock(String lockName) throws IOException { + delegate.clearLock(lockName); + } + + @Override + public String toString() { + return "DistributorLockFactoryWrapper(" + delegate.toString() + ")"; + } + + private class DistributorLock extends Lock { + private final Lock delegateLock; + private final String name; + + DistributorLock(Lock delegate, String name) { + this.delegateLock = delegate; + this.name = name; + } + + @Override + public boolean obtain() throws IOException { + if (delegateLock.obtain()) { + if (writesFiles) { + assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir); + nameDirMapping.putIfAbsent(name, dir); + } + return true; + } else { + return false; + } + } + + @Override + public void close() throws IOException { delegateLock.close(); } + + @Override + public boolean isLocked() throws IOException { + return delegateLock.isLocked(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index ebaa88ed4e7..3429c0a1416 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -327,14 +327,25 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex * Note: Checksums are calculated nevertheless since lucene does it by default sicne version 4.8.0. This method only adds the * verification against the checksum in the given metadata and does not add any significant overhead. */ - public IndexOutput createVerifyingOutput(final String filename, final IOContext context, final StoreFileMetaData metadata) throws IOException { - if (metadata.hasLegacyChecksum() || metadata.checksum() == null) { - logger.debug("create legacy output for {}", filename); - return directory().createOutput(filename, context); + public IndexOutput createVerifyingOutput(String fileName, final StoreFileMetaData metadata, final IOContext context) throws IOException { + IndexOutput output = directory().createOutput(fileName, context); + boolean success = false; + try { + if (metadata.hasLegacyChecksum() || metadata.checksum() == null) { + logger.debug("create legacy output for {}", fileName); + } else { + assert metadata.writtenBy() != null; + assert metadata.writtenBy().onOrAfter(Version.LUCENE_48); + output = new VerifyingIndexOutput(metadata, output); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(output); + } } - assert metadata.writtenBy() != null; - assert metadata.writtenBy().onOrAfter(Version.LUCENE_48); - return new VerifyingIndexOutput(metadata, directory().createOutput(filename, context)); + return output; + } public static void verify(IndexOutput output) throws IOException { @@ -418,7 +429,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex /** * This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way? */ - public class StoreDirectory extends FilterDirectory { + public final class StoreDirectory extends FilterDirectory { StoreDirectory(Directory delegateDirectory) throws IOException { super(delegateDirectory); @@ -462,6 +473,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } private void innerClose() throws IOException { + assert DistributorDirectory.assertConsistency(logger, distributorDirectory); super.close(); } @@ -789,7 +801,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } } - private static final String CHECKSUMS_PREFIX = "_checksums-"; + public static final String CHECKSUMS_PREFIX = "_checksums-"; public static final boolean isChecksum(String name) { // TODO can we drowp .cks diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java index 0195beff427..d9187f84ec3 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryStatus.java @@ -252,7 +252,7 @@ public class RecoveryStatus extends AbstractRefCounted { String tempFileName = getTempNameForFile(fileName); // add first, before it's created tempFileNames.add(tempFileName); - IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, IOContext.DEFAULT, metaData); + IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT); openIndexOutputs.put(fileName, indexOutput); return indexOutput; } diff --git a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java b/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java index 3cee05f6f6d..ca339aa8338 100644 --- a/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java +++ b/src/test/java/org/elasticsearch/index/store/DistributorDirectoryTest.java @@ -18,29 +18,33 @@ */ package org.elasticsearch.index.store; -import java.io.File; -import java.io.IOException; - -import org.apache.lucene.store.BaseDirectoryTestCase; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.IOContext; -import org.apache.lucene.store.MockDirectoryWrapper; +import com.carrotsearch.randomizedtesting.annotations.*; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.store.*; +import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.TimeUnits; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.index.store.distributor.Distributor; import org.elasticsearch.test.ElasticsearchThreadFilter; import org.elasticsearch.test.junit.listeners.LoggingListener; -import com.carrotsearch.randomizedtesting.annotations.Listeners; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; -import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.NoSuchFileException; +import java.util.Arrays; @ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class}) -@ThreadLeakScope(ThreadLeakScope.Scope.NONE) -@TimeoutSuite(millis = 20 * TimeUnits.MINUTE) // timeout the suite after 20min and fail the test. +@ThreadLeakScope(ThreadLeakScope.Scope.SUITE) +@ThreadLeakLingering(linger = 5000) // 5 sec lingering +@TimeoutSuite(millis = 5 * TimeUnits.MINUTE) @Listeners(LoggingListener.class) @LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") public class DistributorDirectoryTest extends BaseDirectoryTestCase { + protected final ESLogger logger = Loggers.getLogger(getClass()); @Override protected Directory getDirectory(File path) throws IOException { @@ -52,7 +56,13 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { ((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false); } } - return new DistributorDirectory(directories); + return new FilterDirectory(new DistributorDirectory(directories)) { + @Override + public void close() throws IOException { + assertTrue(DistributorDirectory.assertConsistency(logger, ((DistributorDirectory) this.getDelegate()))); + super.close(); + } + }; } // #7306: don't invoke the distributor when we are opening an already existing file @@ -80,7 +90,7 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { } }; - Directory dd = new DistributorDirectory(distrib); + DistributorDirectory dd = new DistributorDirectory(distrib); assertEquals(0, dd.fileLength("one.txt")); dd.openInput("one.txt", IOContext.DEFAULT).close(); try { @@ -89,6 +99,90 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase { } catch (IllegalStateException ise) { // expected } + assertTrue(DistributorDirectory.assertConsistency(logger, dd)); dd.close(); } + + public void testRenameFiles() throws IOException { + final int iters = 1 + random().nextInt(10); + for (int i = 0; i < iters; i++) { + Directory[] dirs = new Directory[1 + random().nextInt(5)]; + for (int j=0; j < dirs.length; j++) { + MockDirectoryWrapper directory = newMockDirectory(); + directory.setEnableVirusScanner(false); + directory.setCheckIndexOnClose(false); + dirs[j] = directory; + } + + DistributorDirectory dd = new DistributorDirectory(dirs); + String file = RandomPicks.randomFrom(random(), Arrays.asList(Store.CHECKSUMS_PREFIX, IndexFileNames.SEGMENTS_GEN)); + String tmpFileName = RandomPicks.randomFrom(random(), Arrays.asList("recovery.", "foobar.", "test.")) + Math.max(0, Math.abs(random().nextLong())) + "." + file; + try (IndexOutput out = dd.createOutput(tmpFileName, IOContext.DEFAULT)) { + out.writeInt(1); + } + Directory theDir = null; + for (Directory d : dirs) { + try { + if (d.fileLength(tmpFileName) > 0) { + theDir = d; + break; + } + } catch (IOException ex) { + // nevermind + } + } + assertNotNull("file must be in at least one dir", theDir); + DirectoryService service = new DirectoryService() { + @Override + public Directory[] build() throws IOException { + return new Directory[0]; + } + + @Override + public long throttleTimeInNanos() { + return 0; + } + + @Override + public void renameFile(Directory dir, String from, String to) throws IOException { + dir.copy(dir, from, to, IOContext.DEFAULT); + dir.deleteFile(from); + } + + @Override + public void fullDelete(Directory dir) throws IOException { + } + }; + dd.renameFile(service, tmpFileName, file); + try { + dd.fileLength(tmpFileName); + fail("file ["+tmpFileName + "] was renamed but still exists"); + } catch (FileNotFoundException | NoSuchFileException ex) { + // all is well + } + try { + theDir.fileLength(tmpFileName); + fail("file ["+tmpFileName + "] was renamed but still exists"); + } catch (FileNotFoundException | NoSuchFileException ex) { + // all is well + } + + + assertEquals(theDir.fileLength(file), 4); + + try (IndexOutput out = dd.createOutput("foo.bar", IOContext.DEFAULT)) { + out.writeInt(1); + } + try { + dd.renameFile(service, "foo.bar", file); + fail("target file already exists"); + } catch (IOException ex) { + // target file already exists + } + + theDir.deleteFile(file); + assertTrue(DistributorDirectory.assertConsistency(logger, dd)); + IOUtils.close(dd); + } + } } diff --git a/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java b/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java new file mode 100644 index 00000000000..cd7fe995e20 --- /dev/null +++ b/src/test/java/org/elasticsearch/index/store/DistributorInTheWildTest.java @@ -0,0 +1,207 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.store; + +import com.carrotsearch.randomizedtesting.annotations.Listeners; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase; +import org.apache.lucene.search.IndexSearcher; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.index.store.distributor.Distributor; +import org.elasticsearch.test.ElasticsearchThreadFilter; +import org.elasticsearch.test.junit.listeners.LoggingListener; +import org.junit.Before; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +/** + * This test is a copy of TestNRTThreads from lucene that puts some + * hard concurrent pressure on the directory etc. to ensure DistributorDirectory is behaving ok. + */ +@LuceneTestCase.SuppressCodecs({ "SimpleText", "Memory", "Direct" }) +@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class}) +@ThreadLeakScope(ThreadLeakScope.Scope.SUITE) +@ThreadLeakLingering(linger = 5000) // 5 sec lingering +@Listeners(LoggingListener.class) +@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose") +public class DistributorInTheWildTest extends ThreadedIndexingAndSearchingTestCase { + protected final ESLogger logger = Loggers.getLogger(getClass()); + + private boolean useNonNrtReaders = true; + + @Before + public void setUp() throws Exception { + super.setUp(); + useNonNrtReaders = random().nextBoolean(); + } + + @Override + protected void doSearching(ExecutorService es, long stopTime) throws Exception { + + boolean anyOpenDelFiles = false; + + DirectoryReader r = DirectoryReader.open(writer, true); + + while (System.currentTimeMillis() < stopTime && !failed.get()) { + if (random().nextBoolean()) { + if (VERBOSE) { + logger.info("TEST: now reopen r=" + r); + } + final DirectoryReader r2 = DirectoryReader.openIfChanged(r); + if (r2 != null) { + r.close(); + r = r2; + } + } else { + if (VERBOSE) { + logger.info("TEST: now close reader=" + r); + } + r.close(); + writer.commit(); + final Set openDeletedFiles = getOpenDeletedFiles(dir); + if (openDeletedFiles.size() > 0) { + logger.info("OBD files: " + openDeletedFiles); + } + anyOpenDelFiles |= openDeletedFiles.size() > 0; + //assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size()); + if (VERBOSE) { + logger.info("TEST: now open"); + } + r = DirectoryReader.open(writer, true); + } + if (VERBOSE) { + logger.info("TEST: got new reader=" + r); + } + //logger.info("numDocs=" + r.numDocs() + " + //openDelFileCount=" + dir.openDeleteFileCount()); + + if (r.numDocs() > 0) { + fixedSearcher = new IndexSearcher(r, es); + smokeTestSearcher(fixedSearcher); + runSearchThreads(System.currentTimeMillis() + 500); + } + } + r.close(); + + //logger.info("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount()); + final Set openDeletedFiles = getOpenDeletedFiles(dir); + if (openDeletedFiles.size() > 0) { + logger.info("OBD files: " + openDeletedFiles); + } + anyOpenDelFiles |= openDeletedFiles.size() > 0; + + assertFalse("saw non-zero open-but-deleted count", anyOpenDelFiles); + } + + private Set getOpenDeletedFiles(Directory dir) throws IOException { + if (random().nextBoolean() && dir instanceof MockDirectoryWrapper) { + return ((MockDirectoryWrapper) dir).getOpenDeletedFiles(); + } + DistributorDirectory d = dir instanceof MockDirectoryWrapper ? (DistributorDirectory)((MockDirectoryWrapper) dir).getDelegate() : (DistributorDirectory) dir; + assertTrue(DistributorDirectory.assertConsistency(logger, d)); + Distributor distributor = d.getDistributor(); + Set set = new HashSet<>(); + for (Directory subDir : distributor.all()) { + Set openDeletedFiles = ((MockDirectoryWrapper) subDir).getOpenDeletedFiles(); + set.addAll(openDeletedFiles); + } + return set; + } + + @Override + protected Directory getDirectory(Directory in) { + assert in instanceof MockDirectoryWrapper; + if (!useNonNrtReaders) ((MockDirectoryWrapper) in).setAssertNoDeleteOpenFile(true); + + Directory[] directories = new Directory[1 + random().nextInt(5)]; + directories[0] = in; + for (int i = 1; i < directories.length; i++) { + final File tempDir = createTempDir(getTestName()); + directories[i] = newMockFSDirectory(tempDir); // some subclasses rely on this being MDW + if (!useNonNrtReaders) ((MockDirectoryWrapper) directories[i]).setAssertNoDeleteOpenFile(true); + } + for (Directory dir : directories) { + ((MockDirectoryWrapper) dir).setCheckIndexOnClose(false); + } + try { + if (random().nextBoolean()) { + return new MockDirectoryWrapper(random(), new DistributorDirectory(directories)); + } else { + return new DistributorDirectory(directories); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + protected void doAfterWriter(ExecutorService es) throws Exception { + // Force writer to do reader pooling, always, so that + // all merged segments, even for merges before + // doSearching is called, are warmed: + DirectoryReader.open(writer, true).close(); + } + + private IndexSearcher fixedSearcher; + + @Override + protected IndexSearcher getCurrentSearcher() throws Exception { + return fixedSearcher; + } + + @Override + protected void releaseSearcher(IndexSearcher s) throws Exception { + if (s != fixedSearcher) { + // Final searcher: + s.getIndexReader().close(); + } + } + + @Override + protected IndexSearcher getFinalSearcher() throws Exception { + final IndexReader r2; + if (useNonNrtReaders) { + if (random().nextBoolean()) { + r2 = DirectoryReader.open(writer, true); + } else { + writer.commit(); + r2 = DirectoryReader.open(dir); + } + } else { + r2 = DirectoryReader.open(writer, true); + } + return newSearcher(r2); + } + + public void testNRTThreads() throws Exception { + runTest("TestNRTThreads"); + } +} diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 15be74a2289..742233b1d55 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -460,7 +460,6 @@ public class StoreTest extends ElasticsearchLuceneTestCase { // ok } IOUtils.close(verifyingIndexInput); - IOUtils.close(dir); } @@ -561,7 +560,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException { for (String file : store.directory().listAll()) { - if (!"write.lock".equals(file) && !IndexFileNames.SEGMENTS_GEN.equals(file) && !Store.isChecksum(file)) { + if (!IndexWriter.WRITE_LOCK_NAME.equals(file) && !IndexFileNames.SEGMENTS_GEN.equals(file) && !Store.isChecksum(file)) { assertTrue(file + " is not in the map: " + metadata.asMap().size() + " vs. " + store.directory().listAll().length, metadata.asMap().containsKey(file)); } else { assertFalse(file + " is not in the map: " + metadata.asMap().size() + " vs. " + store.directory().listAll().length, metadata.asMap().containsKey(file)); diff --git a/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java new file mode 100644 index 00000000000..b5e651c9567 --- /dev/null +++ b/src/test/java/org/elasticsearch/indices/recovery/RecoveryStatusTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.indices.recovery; + +import com.google.common.collect.Sets; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.index.service.IndexService; +import org.elasticsearch.index.shard.service.InternalIndexShard; +import org.elasticsearch.index.store.StoreFileMetaData; +import org.elasticsearch.test.ElasticsearchSingleNodeTest; + +import java.io.IOException; +import java.util.Set; +import java.util.regex.Pattern; + +/** + */ +public class RecoveryStatusTests extends ElasticsearchSingleNodeTest { + + public void testRenameTempFiles() throws IOException { + IndexService service = createIndex("foo"); + RecoveryState state = new RecoveryState(); + + InternalIndexShard indexShard = (InternalIndexShard) service.shard(0); + DiscoveryNode node = new DiscoveryNode("foo", new LocalTransportAddress("bar"), Version.CURRENT); + RecoveryStatus status = new RecoveryStatus(indexShard, node, state, new RecoveryTarget.RecoveryListener() { + @Override + public void onRecoveryDone(RecoveryState state) { + } + + @Override + public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) { + } + }); + try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8), status.store())) { + indexOutput.writeInt(1); + IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar"); + assertSame(openIndexOutput, indexOutput); + openIndexOutput.writeInt(1); + } + status.removeOpenIndexOutputs("foo.bar"); + Set strings = Sets.newHashSet(status.store().directory().listAll()); + String expectedFile = null; + for (String file : strings) { + if (Pattern.compile("recovery[.]\\d+[.]foo[.]bar").matcher(file).matches()) { + expectedFile = file; + break; + } + } + assertNotNull(expectedFile); + status.renameAllTempFiles(); + strings = Sets.newHashSet(status.store().directory().listAll()); + assertTrue(strings.toString(), strings.contains("foo.bar")); + assertFalse(strings.toString(), strings.contains(expectedFile)); + status.markAsDone(); + } +}