Pass Directory instead of DirectoryService to Store (#33466)

Instead of passing DirectoryService which causes yet another dependency
on Store we can just pass in a Directory since we will just call
`DirectoryService#newDirectory()` on it anyway.
This commit is contained in:
Simon Willnauer 2018-09-07 14:00:24 +02:00 committed by GitHub
parent 79cd6385fe
commit c12d232215
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 39 additions and 130 deletions

View File

@ -64,6 +64,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
@ -377,7 +378,9 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
warmer.warm(searcher, shard, IndexService.this.indexSettings);
}
};
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock,
// TODO we can remove either IndexStore or DirectoryService. All we need is a simple Supplier<Directory>
DirectoryService directoryService = indexStore.newDirectoryService(path);
store = new Store(shardId, this.indexSettings, directoryService.newDirectory(), lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId)));
indexShard = new IndexShard(routing, this.indexSettings, path, store, indexSortSupplier,
indexCache, mapperService, similarityService, engineFactory,

View File

@ -64,7 +64,6 @@ import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
@ -153,18 +152,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
}
};
public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock) throws IOException {
this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY);
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock) {
this(shardId, indexSettings, directory, shardLock, OnClose.EMPTY);
}
public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock,
OnClose onClose) throws IOException {
public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, ShardLock shardLock,
OnClose onClose) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
Directory dir = directoryService.newDirectory();
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;

View File

@ -50,7 +50,6 @@ import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -106,13 +105,7 @@ public class RefreshListenersTests extends ESTestCase {
ShardId shardId = new ShardId(new Index("index", "_na_"), 1);
String allocationId = UUIDs.randomBase64UUID(random());
Directory directory = newDirectory();
DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
store = new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, createTempDir("translog"), indexSettings,
BigArrays.NON_RECYCLING_INSTANCE);

View File

@ -104,12 +104,10 @@ public class StoreTests extends ESTestCase {
private static final Version MIN_SUPPORTED_LUCENE_VERSION = org.elasticsearch.Version.CURRENT
.minimumIndexCompatibilityVersion().luceneVersion;
public void testRefCount() throws IOException {
public void testRefCount() {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
IndexSettings indexSettings = INDEX_SETTINGS;
Store store = new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, indexSettings, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
int incs = randomIntBetween(1, 100);
for (int i = 0; i < incs; i++) {
if (randomBoolean()) {
@ -296,8 +294,7 @@ public class StoreTests extends ESTestCase {
public void testNewChecksums() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
// set default codec - all segments need checksums
IndexWriter writer = new IndexWriter(store.directory(), newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec()));
int docs = 1 + random().nextInt(100);
@ -347,7 +344,7 @@ public class StoreTests extends ESTestCase {
assertConsistent(store, metadata);
TestUtil.checkIndex(store.directory());
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
IOUtils.close(store);
}
@ -455,32 +452,11 @@ public class StoreTests extends ESTestCase {
}
public void assertDeleteContent(Store store, DirectoryService service) throws IOException {
public void assertDeleteContent(Store store, Directory dir) throws IOException {
deleteContent(store.directory());
assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0));
assertThat(store.stats().sizeInBytes(), equalTo(0L));
assertThat(service.newDirectory().listAll().length, equalTo(0));
}
private static final class LuceneManagedDirectoryService extends DirectoryService {
private final Directory dir;
private final Random random;
LuceneManagedDirectoryService(Random random) {
this(random, true);
}
LuceneManagedDirectoryService(Random random, boolean preventDoubleWrite) {
super(new ShardId(INDEX_SETTINGS.getIndex(), 1), INDEX_SETTINGS);
dir = StoreTests.newDirectory(random);
this.random = random;
}
@Override
public Directory newDirectory() throws IOException {
return dir;
}
assertThat(dir.listAll().length, equalTo(0));
}
public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {
@ -511,8 +487,7 @@ public class StoreTests extends ESTestCase {
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(random.nextBoolean());
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
@ -526,7 +501,7 @@ public class StoreTests extends ESTestCase {
writer.commit();
writer.close();
first = store.getMetadata(null);
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
store.close();
}
long time = new Date().getTime();
@ -541,8 +516,7 @@ public class StoreTests extends ESTestCase {
iwc.setMergePolicy(NoMergePolicy.INSTANCE);
iwc.setUseCompoundFile(random.nextBoolean());
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random);
store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
final boolean lotsOfSegments = rarely(random);
for (Document d : docs) {
@ -639,8 +613,7 @@ public class StoreTests extends ESTestCase {
public void testCleanupFromSnapshot() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
// this time random codec....
IndexWriterConfig indexWriterConfig = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
// we keep all commits and that allows us clean based on multiple snapshots
@ -727,11 +700,10 @@ public class StoreTests extends ESTestCase {
public void testOnCloseCallback() throws IOException {
final ShardId shardId = new ShardId(new Index(randomRealisticUnicodeOfCodepointLengthBetween(1, 10), "_na_"), randomIntBetween(0, 100));
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
final AtomicInteger count = new AtomicInteger(0);
final ShardLock lock = new DummyShardLock(shardId);
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, lock, theLock -> {
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), lock, theLock -> {
assertEquals(shardId, theLock.getShardId());
assertEquals(lock, theLock);
count.incrementAndGet();
@ -748,11 +720,10 @@ public class StoreTests extends ESTestCase {
public void testStoreStats() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Settings settings = Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.put(Store.INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING.getKey(), TimeValue.timeValueMinutes(0)).build();
Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), directoryService,
Store store = new Store(shardId, IndexSettingsModule.newIndexSettings("index", settings), StoreTests.newDirectory(random()),
new DummyShardLock(shardId));
long initialStoreSize = 0;
for (String extraFiles : store.directory().listAll()) {
@ -843,8 +814,7 @@ public class StoreTests extends ESTestCase {
public void testUserDataRead() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriterConfig config = newIndexWriterConfig(random(), new MockAnalyzer(random())).setCodec(TestUtil.getDefaultCodec());
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
config.setIndexDeletionPolicy(deletionPolicy);
@ -867,7 +837,7 @@ public class StoreTests extends ESTestCase {
assertThat(metadata.getCommitUserData().get(Engine.SYNC_COMMIT_ID), equalTo(syncId));
assertThat(metadata.getCommitUserData().get(Translog.TRANSLOG_GENERATION_KEY), equalTo(translogId));
TestUtil.checkIndex(store.directory());
assertDeleteContent(store, directoryService);
assertDeleteContent(store, store.directory());
IOUtils.close(store);
}
@ -893,8 +863,7 @@ public class StoreTests extends ESTestCase {
public void testMarkCorruptedOnTruncatedSegmentsFile() throws IOException {
IndexWriterConfig iwc = newIndexWriterConfig();
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId));
IndexWriter writer = new IndexWriter(store.directory(), iwc);
int numDocs = 1 + random().nextInt(10);
@ -945,15 +914,7 @@ public class StoreTests extends ESTestCase {
writer.commit();
writer.close();
assertTrue(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
store.markStoreCorrupted(new CorruptIndexException("foo", "bar"));
assertFalse(Store.canOpenIndex(logger, tempDir, shardId, (id, l) -> new DummyShardLock(id)));
store.close();
@ -962,14 +923,7 @@ public class StoreTests extends ESTestCase {
public void testDeserializeCorruptionException() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
CorruptIndexException ex = new CorruptIndexException("foo", "bar");
store.markStoreCorrupted(ex);
try {
@ -998,14 +952,7 @@ public class StoreTests extends ESTestCase {
public void testCanReadOldCorruptionMarker() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
final Directory dir = new RAMDirectory(); // I use ram dir to prevent that virusscanner being a PITA
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
public Directory newDirectory() throws IOException {
return dir;
}
};
Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
Store store = new Store(shardId, INDEX_SETTINGS, dir, new DummyShardLock(shardId));
CorruptIndexException exception = new CorruptIndexException("foo", "bar");
String uuid = Store.CORRUPTED + UUIDs.randomBase64UUID();
@ -1065,8 +1012,7 @@ public class StoreTests extends ESTestCase {
public void testEnsureIndexHasHistoryUUID() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {
store.createEmpty();
@ -1098,8 +1044,7 @@ public class StoreTests extends ESTestCase {
public void testHistoryUUIDCanBeForced() throws IOException {
final ShardId shardId = new ShardId("index", "_na_", 1);
DirectoryService directoryService = new LuceneManagedDirectoryService(random());
try (Store store = new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId))) {
try (Store store = new Store(shardId, INDEX_SETTINGS, StoreTests.newDirectory(random()), new DummyShardLock(shardId))) {
store.createEmpty();

View File

@ -63,7 +63,6 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRelocatedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog;
@ -461,18 +460,11 @@ public class RecoverySourceHandlerTests extends ESTestCase {
return newStore(path, true);
}
private Store newStore(Path path, boolean checkIndex) throws IOException {
DirectoryService directoryService = new DirectoryService(shardId, INDEX_SETTINGS) {
@Override
public Directory newDirectory() throws IOException {
BaseDirectoryWrapper baseDirectoryWrapper = RecoverySourceHandlerTests.newFSDirectory(path);
if (checkIndex == false) {
baseDirectoryWrapper.setCheckIndexOnClose(false); // don't run checkindex we might corrupt the index in these tests
}
return baseDirectoryWrapper;
}
};
return new Store(shardId, INDEX_SETTINGS, directoryService, new DummyShardLock(shardId));
return new Store(shardId, INDEX_SETTINGS, baseDirectoryWrapper, new DummyShardLock(shardId));
}

View File

@ -77,7 +77,6 @@ import org.elasticsearch.index.seqno.LocalCheckpointTracker;
import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -358,13 +357,7 @@ public abstract class EngineTestCase extends ESTestCase {
}
protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
}
protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOException {

View File

@ -59,7 +59,6 @@ import org.elasticsearch.index.seqno.ReplicationTracker;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -161,13 +160,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
}
protected Store createStore(ShardId shardId, IndexSettings indexSettings, Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
}
/**

View File

@ -37,7 +37,6 @@ import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
@ -261,14 +260,8 @@ public class FollowingEngineTests extends ESTestCase {
}
private static Store createStore(
final ShardId shardId, final IndexSettings indexSettings, final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
@Override
public Directory newDirectory() throws IOException {
return directory;
}
};
return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
final ShardId shardId, final IndexSettings indexSettings, final Directory directory) {
return new Store(shardId, indexSettings, directory, new DummyShardLock(shardId));
}
private FollowingEngine createEngine(Store store, EngineConfig config) throws IOException {