[STORE] Remove special file handling from DistributorDirecotry

This commit removes all special file handling from DistributorDirectory
that assigned certain files to the primary directory. This special handling
was added to ensure that files that are written more than once are essentially
overwritten. Yet this implementation is consistent all the time and doesn't need
this special handling for files that are written through this directory. Writes
to the underlying directory not going through the distributor directory are not
and have never been supported.

Note: this commit also fixes the problem of adding directories to the distributor
during restart where the primary can suddenly change and file mappings are by-passed.

Closes #8276
This commit is contained in:
Simon Willnauer 2014-10-29 16:19:08 +01:00
parent f2d545c40e
commit 44e24d3916
8 changed files with 539 additions and 70 deletions

View File

@ -829,7 +829,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
boolean success = false;
RecoveryState.File file = recoveryState.getIndex().file(fileInfo.name());
try (InputStream stream = new PartSliceStream(blobContainer, fileInfo)) {
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), IOContext.DEFAULT, fileInfo.metadata())) {
try (final IndexOutput indexOutput = store.createVerifyingOutput(fileInfo.physicalName(), fileInfo.metadata(), IOContext.DEFAULT)) {
final byte[] buffer = new byte[BUFFER_SIZE];
int length;
while((length=stream.read(buffer))>0){

View File

@ -18,17 +18,15 @@
*/
package org.elasticsearch.index.store;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.math.MathUtils;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.store.distributor.Distributor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -75,22 +73,15 @@ public final class DistributorDirectory extends BaseDirectory {
this.distributor = distributor;
for (Directory dir : distributor.all()) {
for (String file : dir.listAll()) {
if (!usePrimary(file)) {
nameDirMapping.put(file, dir);
}
}
}
lockFactory = new DistributorLockFactoryWrapper(distributor.primary());
}
@Override
public final String[] listAll() throws IOException {
final ArrayList<String> files = new ArrayList<>();
for (Directory dir : distributor.all()) {
for (String file : dir.listAll()) {
files.add(file);
}
}
return files.toArray(new String[files.size()]);
return nameDirMapping.keySet().toArray(new String[0]);
}
@Override
@ -106,7 +97,7 @@ public final class DistributorDirectory extends BaseDirectory {
public void deleteFile(String name) throws IOException {
getDirectory(name, true).deleteFile(name);
Directory remove = nameDirMapping.remove(name);
assert usePrimary(name) || remove != null : "Tried to delete file " + name + " but couldn't";
assert remove != null : "Tried to delete file " + name + " but couldn't";
}
@Override
@ -145,27 +136,16 @@ public final class DistributorDirectory extends BaseDirectory {
return getDirectory(name, true);
}
/**
* Returns true if the primary directory should be used for the given file.
*/
private boolean usePrimary(String name) {
return IndexFileNames.SEGMENTS_GEN.equals(name) || Store.isChecksum(name) || IndexWriter.WRITE_LOCK_NAME.equals(name);
}
/**
* Returns the directory that has previously been associated with this file name or associates the name with a directory
* if failIfNotAssociated is set to false.
*/
private Directory getDirectory(String name, boolean failIfNotAssociated) throws IOException {
if (usePrimary(name)) {
return distributor.primary();
}
Directory directory = nameDirMapping.get(name);
if (directory == null) {
if (failIfNotAssociated) {
throw new FileNotFoundException("No such file [" + name + "]");
}
// Pick a directory and associate this new file with it:
final Directory dir = distributor.any();
directory = nameDirMapping.putIfAbsent(name, dir);
@ -178,24 +158,10 @@ public final class DistributorDirectory extends BaseDirectory {
return directory;
}
@Override
public Lock makeLock(String name) {
return distributor.primary().makeLock(name);
}
@Override
public void clearLock(String name) throws IOException {
distributor.primary().clearLock(name);
}
@Override
public LockFactory getLockFactory() {
return distributor.primary().getLockFactory();
}
@Override
public void setLockFactory(LockFactory lockFactory) throws IOException {
distributor.primary().setLockFactory(lockFactory);
super.setLockFactory(new DistributorLockFactoryWrapper(distributor.primary()));
}
@Override
@ -211,7 +177,7 @@ public final class DistributorDirectory extends BaseDirectory {
/**
* Renames the given source file to the given target file unless the target already exists.
*
* @param directoryService the DirecotrySerivce to use.
* @param directoryService the DirectoryService to use.
* @param from the source file name.
* @param to the target file name
* @throws IOException if the target file already exists.
@ -233,4 +199,119 @@ public final class DistributorDirectory extends BaseDirectory {
}
}
}
Distributor getDistributor() {
return distributor;
}
/**
* Basic checks to ensure the internal mapping is consistent - should only be used in assertions
*/
static boolean assertConsistency(ESLogger logger, DistributorDirectory dir) throws IOException {
boolean consistent = true;
StringBuilder builder = new StringBuilder();
Directory[] all = dir.distributor.all();
for (Directory d : all) {
for (String file : d.listAll()) {
final Directory directory = dir.nameDirMapping.get(file);
if (directory == null) {
consistent = false;
builder.append("File ").append(file)
.append(" was not mapped to a directory but exists in one of the distributors directories")
.append(System.lineSeparator());
}
if (directory != d) {
consistent = false;
builder.append("File ").append(file).append(" was mapped to a directory ").append(directory)
.append(" but exists in another distributor directory").append(d)
.append(System.lineSeparator());
}
}
}
if (consistent == false) {
logger.info(builder.toString());
}
assert consistent: builder.toString();
return consistent; // return boolean so it can be easily be used in asserts
}
/**
* This inner class is a simple wrapper around the original
* lock factory to track files written / created through the
* lock factory. For instance {@link NativeFSLockFactory} creates real
* files that we should expose for consistency reasons.
*/
private class DistributorLockFactoryWrapper extends LockFactory {
private final Directory dir;
private final LockFactory delegate;
private final boolean writesFiles;
public DistributorLockFactoryWrapper(Directory dir) {
this.dir = dir;
final FSDirectory leaf = DirectoryUtils.getLeaf(dir, FSDirectory.class);
if (leaf != null) {
writesFiles = leaf.getLockFactory() instanceof FSLockFactory;
} else {
writesFiles = false;
}
this.delegate = dir.getLockFactory();
}
@Override
public void setLockPrefix(String lockPrefix) {
delegate.setLockPrefix(lockPrefix);
}
@Override
public String getLockPrefix() {
return delegate.getLockPrefix();
}
@Override
public Lock makeLock(String lockName) {
return new DistributorLock(delegate.makeLock(lockName), lockName);
}
@Override
public void clearLock(String lockName) throws IOException {
delegate.clearLock(lockName);
}
@Override
public String toString() {
return "DistributorLockFactoryWrapper(" + delegate.toString() + ")";
}
private class DistributorLock extends Lock {
private final Lock delegateLock;
private final String name;
DistributorLock(Lock delegate, String name) {
this.delegateLock = delegate;
this.name = name;
}
@Override
public boolean obtain() throws IOException {
if (delegateLock.obtain()) {
if (writesFiles) {
assert (nameDirMapping.containsKey(name) == false || nameDirMapping.get(name) == dir);
nameDirMapping.putIfAbsent(name, dir);
}
return true;
} else {
return false;
}
}
@Override
public void close() throws IOException { delegateLock.close(); }
@Override
public boolean isLocked() throws IOException {
return delegateLock.isLocked();
}
}
}
}

View File

@ -327,14 +327,25 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
* Note: Checksums are calculated nevertheless since lucene does it by default sicne version 4.8.0. This method only adds the
* verification against the checksum in the given metadata and does not add any significant overhead.
*/
public IndexOutput createVerifyingOutput(final String filename, final IOContext context, final StoreFileMetaData metadata) throws IOException {
public IndexOutput createVerifyingOutput(String fileName, final StoreFileMetaData metadata, final IOContext context) throws IOException {
IndexOutput output = directory().createOutput(fileName, context);
boolean success = false;
try {
if (metadata.hasLegacyChecksum() || metadata.checksum() == null) {
logger.debug("create legacy output for {}", filename);
return directory().createOutput(filename, context);
}
logger.debug("create legacy output for {}", fileName);
} else {
assert metadata.writtenBy() != null;
assert metadata.writtenBy().onOrAfter(Version.LUCENE_48);
return new VerifyingIndexOutput(metadata, directory().createOutput(filename, context));
output = new VerifyingIndexOutput(metadata, output);
}
success = true;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(output);
}
}
return output;
}
public static void verify(IndexOutput output) throws IOException {
@ -418,7 +429,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
/**
* This exists so {@link org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat} can load its boolean setting; can we find a more straightforward way?
*/
public class StoreDirectory extends FilterDirectory {
public final class StoreDirectory extends FilterDirectory {
StoreDirectory(Directory delegateDirectory) throws IOException {
super(delegateDirectory);
@ -462,6 +473,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
private void innerClose() throws IOException {
assert DistributorDirectory.assertConsistency(logger, distributorDirectory);
super.close();
}
@ -789,7 +801,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
}
private static final String CHECKSUMS_PREFIX = "_checksums-";
public static final String CHECKSUMS_PREFIX = "_checksums-";
public static final boolean isChecksum(String name) {
// TODO can we drowp .cks

View File

@ -252,7 +252,7 @@ public class RecoveryStatus extends AbstractRefCounted {
String tempFileName = getTempNameForFile(fileName);
// add first, before it's created
tempFileNames.add(tempFileName);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, IOContext.DEFAULT, metaData);
IndexOutput indexOutput = store.createVerifyingOutput(tempFileName, metaData, IOContext.DEFAULT);
openIndexOutputs.put(fileName, indexOutput);
return indexOutput;
}

View File

@ -18,29 +18,33 @@
*/
package org.elasticsearch.index.store;
import java.io.File;
import java.io.IOException;
import org.apache.lucene.store.BaseDirectoryTestCase;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.MockDirectoryWrapper;
import com.carrotsearch.randomizedtesting.annotations.*;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.TimeUnits;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.test.ElasticsearchThreadFilter;
import org.elasticsearch.test.junit.listeners.LoggingListener;
import com.carrotsearch.randomizedtesting.annotations.Listeners;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(ThreadLeakScope.Scope.NONE)
@TimeoutSuite(millis = 20 * TimeUnits.MINUTE) // timeout the suite after 20min and fail the test.
@ThreadLeakScope(ThreadLeakScope.Scope.SUITE)
@ThreadLeakLingering(linger = 5000) // 5 sec lingering
@TimeoutSuite(millis = 5 * TimeUnits.MINUTE)
@Listeners(LoggingListener.class)
@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
public class DistributorDirectoryTest extends BaseDirectoryTestCase {
protected final ESLogger logger = Loggers.getLogger(getClass());
@Override
protected Directory getDirectory(File path) throws IOException {
@ -52,7 +56,13 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
((MockDirectoryWrapper) directories[i]).setEnableVirusScanner(false);
}
}
return new DistributorDirectory(directories);
return new FilterDirectory(new DistributorDirectory(directories)) {
@Override
public void close() throws IOException {
assertTrue(DistributorDirectory.assertConsistency(logger, ((DistributorDirectory) this.getDelegate())));
super.close();
}
};
}
// #7306: don't invoke the distributor when we are opening an already existing file
@ -80,7 +90,7 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
}
};
Directory dd = new DistributorDirectory(distrib);
DistributorDirectory dd = new DistributorDirectory(distrib);
assertEquals(0, dd.fileLength("one.txt"));
dd.openInput("one.txt", IOContext.DEFAULT).close();
try {
@ -89,6 +99,90 @@ public class DistributorDirectoryTest extends BaseDirectoryTestCase {
} catch (IllegalStateException ise) {
// expected
}
assertTrue(DistributorDirectory.assertConsistency(logger, dd));
dd.close();
}
public void testRenameFiles() throws IOException {
final int iters = 1 + random().nextInt(10);
for (int i = 0; i < iters; i++) {
Directory[] dirs = new Directory[1 + random().nextInt(5)];
for (int j=0; j < dirs.length; j++) {
MockDirectoryWrapper directory = newMockDirectory();
directory.setEnableVirusScanner(false);
directory.setCheckIndexOnClose(false);
dirs[j] = directory;
}
DistributorDirectory dd = new DistributorDirectory(dirs);
String file = RandomPicks.randomFrom(random(), Arrays.asList(Store.CHECKSUMS_PREFIX, IndexFileNames.SEGMENTS_GEN));
String tmpFileName = RandomPicks.randomFrom(random(), Arrays.asList("recovery.", "foobar.", "test.")) + Math.max(0, Math.abs(random().nextLong())) + "." + file;
try (IndexOutput out = dd.createOutput(tmpFileName, IOContext.DEFAULT)) {
out.writeInt(1);
}
Directory theDir = null;
for (Directory d : dirs) {
try {
if (d.fileLength(tmpFileName) > 0) {
theDir = d;
break;
}
} catch (IOException ex) {
// nevermind
}
}
assertNotNull("file must be in at least one dir", theDir);
DirectoryService service = new DirectoryService() {
@Override
public Directory[] build() throws IOException {
return new Directory[0];
}
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override
public void renameFile(Directory dir, String from, String to) throws IOException {
dir.copy(dir, from, to, IOContext.DEFAULT);
dir.deleteFile(from);
}
@Override
public void fullDelete(Directory dir) throws IOException {
}
};
dd.renameFile(service, tmpFileName, file);
try {
dd.fileLength(tmpFileName);
fail("file ["+tmpFileName + "] was renamed but still exists");
} catch (FileNotFoundException | NoSuchFileException ex) {
// all is well
}
try {
theDir.fileLength(tmpFileName);
fail("file ["+tmpFileName + "] was renamed but still exists");
} catch (FileNotFoundException | NoSuchFileException ex) {
// all is well
}
assertEquals(theDir.fileLength(file), 4);
try (IndexOutput out = dd.createOutput("foo.bar", IOContext.DEFAULT)) {
out.writeInt(1);
}
try {
dd.renameFile(service, "foo.bar", file);
fail("target file already exists");
} catch (IOException ex) {
// target file already exists
}
theDir.deleteFile(file);
assertTrue(DistributorDirectory.assertConsistency(logger, dd));
IOUtils.close(dd);
}
}
}

View File

@ -0,0 +1,207 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.annotations.Listeners;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering;
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.ThreadedIndexingAndSearchingTestCase;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.index.store.distributor.Distributor;
import org.elasticsearch.test.ElasticsearchThreadFilter;
import org.elasticsearch.test.junit.listeners.LoggingListener;
import org.junit.Before;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutorService;
/**
* This test is a copy of TestNRTThreads from lucene that puts some
* hard concurrent pressure on the directory etc. to ensure DistributorDirectory is behaving ok.
*/
@LuceneTestCase.SuppressCodecs({ "SimpleText", "Memory", "Direct" })
@ThreadLeakFilters(defaultFilters = true, filters = {ElasticsearchThreadFilter.class})
@ThreadLeakScope(ThreadLeakScope.Scope.SUITE)
@ThreadLeakLingering(linger = 5000) // 5 sec lingering
@Listeners(LoggingListener.class)
@LuceneTestCase.SuppressSysoutChecks(bugUrl = "we log a lot on purpose")
public class DistributorInTheWildTest extends ThreadedIndexingAndSearchingTestCase {
protected final ESLogger logger = Loggers.getLogger(getClass());
private boolean useNonNrtReaders = true;
@Before
public void setUp() throws Exception {
super.setUp();
useNonNrtReaders = random().nextBoolean();
}
@Override
protected void doSearching(ExecutorService es, long stopTime) throws Exception {
boolean anyOpenDelFiles = false;
DirectoryReader r = DirectoryReader.open(writer, true);
while (System.currentTimeMillis() < stopTime && !failed.get()) {
if (random().nextBoolean()) {
if (VERBOSE) {
logger.info("TEST: now reopen r=" + r);
}
final DirectoryReader r2 = DirectoryReader.openIfChanged(r);
if (r2 != null) {
r.close();
r = r2;
}
} else {
if (VERBOSE) {
logger.info("TEST: now close reader=" + r);
}
r.close();
writer.commit();
final Set<String> openDeletedFiles = getOpenDeletedFiles(dir);
if (openDeletedFiles.size() > 0) {
logger.info("OBD files: " + openDeletedFiles);
}
anyOpenDelFiles |= openDeletedFiles.size() > 0;
//assertEquals("open but deleted: " + openDeletedFiles, 0, openDeletedFiles.size());
if (VERBOSE) {
logger.info("TEST: now open");
}
r = DirectoryReader.open(writer, true);
}
if (VERBOSE) {
logger.info("TEST: got new reader=" + r);
}
//logger.info("numDocs=" + r.numDocs() + "
//openDelFileCount=" + dir.openDeleteFileCount());
if (r.numDocs() > 0) {
fixedSearcher = new IndexSearcher(r, es);
smokeTestSearcher(fixedSearcher);
runSearchThreads(System.currentTimeMillis() + 500);
}
}
r.close();
//logger.info("numDocs=" + r.numDocs() + " openDelFileCount=" + dir.openDeleteFileCount());
final Set<String> openDeletedFiles = getOpenDeletedFiles(dir);
if (openDeletedFiles.size() > 0) {
logger.info("OBD files: " + openDeletedFiles);
}
anyOpenDelFiles |= openDeletedFiles.size() > 0;
assertFalse("saw non-zero open-but-deleted count", anyOpenDelFiles);
}
private Set<String> getOpenDeletedFiles(Directory dir) throws IOException {
if (random().nextBoolean() && dir instanceof MockDirectoryWrapper) {
return ((MockDirectoryWrapper) dir).getOpenDeletedFiles();
}
DistributorDirectory d = dir instanceof MockDirectoryWrapper ? (DistributorDirectory)((MockDirectoryWrapper) dir).getDelegate() : (DistributorDirectory) dir;
assertTrue(DistributorDirectory.assertConsistency(logger, d));
Distributor distributor = d.getDistributor();
Set<String> set = new HashSet<>();
for (Directory subDir : distributor.all()) {
Set<String> openDeletedFiles = ((MockDirectoryWrapper) subDir).getOpenDeletedFiles();
set.addAll(openDeletedFiles);
}
return set;
}
@Override
protected Directory getDirectory(Directory in) {
assert in instanceof MockDirectoryWrapper;
if (!useNonNrtReaders) ((MockDirectoryWrapper) in).setAssertNoDeleteOpenFile(true);
Directory[] directories = new Directory[1 + random().nextInt(5)];
directories[0] = in;
for (int i = 1; i < directories.length; i++) {
final File tempDir = createTempDir(getTestName());
directories[i] = newMockFSDirectory(tempDir); // some subclasses rely on this being MDW
if (!useNonNrtReaders) ((MockDirectoryWrapper) directories[i]).setAssertNoDeleteOpenFile(true);
}
for (Directory dir : directories) {
((MockDirectoryWrapper) dir).setCheckIndexOnClose(false);
}
try {
if (random().nextBoolean()) {
return new MockDirectoryWrapper(random(), new DistributorDirectory(directories));
} else {
return new DistributorDirectory(directories);
}
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
@Override
protected void doAfterWriter(ExecutorService es) throws Exception {
// Force writer to do reader pooling, always, so that
// all merged segments, even for merges before
// doSearching is called, are warmed:
DirectoryReader.open(writer, true).close();
}
private IndexSearcher fixedSearcher;
@Override
protected IndexSearcher getCurrentSearcher() throws Exception {
return fixedSearcher;
}
@Override
protected void releaseSearcher(IndexSearcher s) throws Exception {
if (s != fixedSearcher) {
// Final searcher:
s.getIndexReader().close();
}
}
@Override
protected IndexSearcher getFinalSearcher() throws Exception {
final IndexReader r2;
if (useNonNrtReaders) {
if (random().nextBoolean()) {
r2 = DirectoryReader.open(writer, true);
} else {
writer.commit();
r2 = DirectoryReader.open(dir);
}
} else {
r2 = DirectoryReader.open(writer, true);
}
return newSearcher(r2);
}
public void testNRTThreads() throws Exception {
runTest("TestNRTThreads");
}
}

View File

@ -460,7 +460,6 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
// ok
}
IOUtils.close(verifyingIndexInput);
IOUtils.close(dir);
}
@ -561,7 +560,7 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {
for (String file : store.directory().listAll()) {
if (!"write.lock".equals(file) && !IndexFileNames.SEGMENTS_GEN.equals(file) && !Store.isChecksum(file)) {
if (!IndexWriter.WRITE_LOCK_NAME.equals(file) && !IndexFileNames.SEGMENTS_GEN.equals(file) && !Store.isChecksum(file)) {
assertTrue(file + " is not in the map: " + metadata.asMap().size() + " vs. " + store.directory().listAll().length, metadata.asMap().containsKey(file));
} else {
assertFalse(file + " is not in the map: " + metadata.asMap().size() + " vs. " + store.directory().listAll().length, metadata.asMap().containsKey(file));

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import com.google.common.collect.Sets;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.InternalIndexShard;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
import java.io.IOException;
import java.util.Set;
import java.util.regex.Pattern;
/**
*/
public class RecoveryStatusTests extends ElasticsearchSingleNodeTest {
public void testRenameTempFiles() throws IOException {
IndexService service = createIndex("foo");
RecoveryState state = new RecoveryState();
InternalIndexShard indexShard = (InternalIndexShard) service.shard(0);
DiscoveryNode node = new DiscoveryNode("foo", new LocalTransportAddress("bar"), Version.CURRENT);
RecoveryStatus status = new RecoveryStatus(indexShard, node, state, new RecoveryTarget.RecoveryListener() {
@Override
public void onRecoveryDone(RecoveryState state) {
}
@Override
public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, boolean sendShardFailure) {
}
});
try (IndexOutput indexOutput = status.openAndPutIndexOutput("foo.bar", new StoreFileMetaData("foo.bar", 8), status.store())) {
indexOutput.writeInt(1);
IndexOutput openIndexOutput = status.getOpenIndexOutput("foo.bar");
assertSame(openIndexOutput, indexOutput);
openIndexOutput.writeInt(1);
}
status.removeOpenIndexOutputs("foo.bar");
Set<String> strings = Sets.newHashSet(status.store().directory().listAll());
String expectedFile = null;
for (String file : strings) {
if (Pattern.compile("recovery[.]\\d+[.]foo[.]bar").matcher(file).matches()) {
expectedFile = file;
break;
}
}
assertNotNull(expectedFile);
status.renameAllTempFiles();
strings = Sets.newHashSet(status.store().directory().listAll());
assertTrue(strings.toString(), strings.contains("foo.bar"));
assertFalse(strings.toString(), strings.contains(expectedFile));
status.markAsDone();
}
}