From 6a76fa3382f49973eebf4336f2ddc2a720ff0673 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 3 Mar 2016 21:43:02 +0100 Subject: [PATCH] Use SleepingWrapper on shared filesytems On shared FS / shadow replicas we rely on a lock retry if the lock has not yet been relesed on a relocated primary. This commit adds this `hack` for shared filesystems only. Closes #16936 --- .../index/store/FsDirectoryService.java | 6 +- .../org/elasticsearch/index/store/Store.java | 4 - .../index/IndexWithShadowReplicasIT.java | 2 - .../index/store/FsDirectoryServiceTests.java | 76 +++++++++++++++++++ 4 files changed, 81 insertions(+), 7 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/index/store/FsDirectoryServiceTests.java diff --git a/core/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java b/core/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java index d5d6d5234be..06bc6a84a88 100644 --- a/core/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java +++ b/core/src/main/java/org/elasticsearch/index/store/FsDirectoryService.java @@ -29,8 +29,10 @@ import org.apache.lucene.store.NativeFSLockFactory; import org.apache.lucene.store.RateLimitedFSDirectory; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSLockFactory; +import org.apache.lucene.store.SleepingLockWrapper; import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.Constants; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Setting; @@ -86,10 +88,12 @@ public class FsDirectoryService extends DirectoryService implements StoreRateLim final Path location = path.resolveIndex(); Files.createDirectories(location); Directory wrapped = newFSDirectory(location, indexSettings.getValue(INDEX_LOCK_FACTOR_SETTING)); + if (IndexMetaData.isOnSharedFilesystem(indexSettings.getSettings())) { + wrapped = new SleepingLockWrapper(wrapped, 5000); + } return new RateLimitedFSDirectory(wrapped, this, this) ; } - @Override public void onPause(long nanos) { rateLimitingTimeInNanos.inc(nanos); diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java index c7377a4ab6b..deeb3daad51 100644 --- a/core/src/main/java/org/elasticsearch/index/store/Store.java +++ b/core/src/main/java/org/elasticsearch/index/store/Store.java @@ -50,7 +50,6 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -70,7 +69,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRefCounted; import org.elasticsearch.common.util.concurrent.RefCounted; import org.elasticsearch.common.util.iterable.Iterables; import org.elasticsearch.env.ShardLock; -import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.AbstractIndexShardComponent; @@ -83,7 +81,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.sql.Time; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -150,7 +147,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref this(shardId, indexSettings, directoryService, shardLock, OnClose.EMPTY); } - @Inject public Store(ShardId shardId, IndexSettings indexSettings, DirectoryService directoryService, ShardLock shardLock, OnClose onClose) throws IOException { super(shardId, indexSettings); final Settings settings = indexSettings.getSettings(); diff --git a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java index 1b5d845ff74..a7d127a60c8 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java +++ b/core/src/test/java/org/elasticsearch/index/IndexWithShadowReplicasIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.index; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; @@ -79,7 +78,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; /** * Tests for indices that use shadow replicas and a shared filesystem */ -@AwaitsFix(bugUrl = "breaks after https://github.com/elastic/elasticsearch/pull/16930 , Simon to fix") @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndexWithShadowReplicasIT extends ESIntegTestCase { diff --git a/core/src/test/java/org/elasticsearch/index/store/FsDirectoryServiceTests.java b/core/src/test/java/org/elasticsearch/index/store/FsDirectoryServiceTests.java new file mode 100644 index 00000000000..9da39b8da71 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/index/store/FsDirectoryServiceTests.java @@ -0,0 +1,76 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.index.store; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.RateLimitedFSDirectory; +import org.apache.lucene.store.SimpleFSDirectory; +import org.apache.lucene.store.SleepingLockWrapper; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.IndexModule; +import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.IndexSettingsModule; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class FsDirectoryServiceTests extends ESTestCase { + + public void testHasSleepWrapperOnSharedFS() throws IOException { + Settings build = randomBoolean() ? + Settings.builder().put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true).build() : + Settings.builder().put(IndexMetaData.SETTING_SHADOW_REPLICAS, true).build();; + IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build); + IndexStoreConfig config = new IndexStoreConfig(build); + IndexStore store = new IndexStore(settings, config); + Path tempDir = createTempDir().resolve("foo").resolve("0"); + Files.createDirectories(tempDir); + ShardPath path = new ShardPath(false, tempDir, tempDir, settings.getUUID(), new ShardId(settings.getIndex(), 0)); + FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path); + Directory directory = fsDirectoryService.newDirectory(); + assertTrue(directory instanceof RateLimitedFSDirectory); + RateLimitedFSDirectory rateLimitingDirectory = (RateLimitedFSDirectory) directory; + Directory delegate = rateLimitingDirectory.getDelegate(); + assertTrue(delegate.getClass().toString(), delegate instanceof SleepingLockWrapper); + } + + public void testHasNoSleepWrapperOnNormalFS() throws IOException { + Settings build = Settings.builder().put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), "simplefs").build(); + IndexSettings settings = IndexSettingsModule.newIndexSettings("foo", build); + IndexStoreConfig config = new IndexStoreConfig(build); + IndexStore store = new IndexStore(settings, config); + Path tempDir = createTempDir().resolve("foo").resolve("0"); + Files.createDirectories(tempDir); + ShardPath path = new ShardPath(false, tempDir, tempDir, settings.getUUID(), new ShardId(settings.getIndex(), 0)); + FsDirectoryService fsDirectoryService = new FsDirectoryService(settings, store, path); + Directory directory = fsDirectoryService.newDirectory(); + assertTrue(directory instanceof RateLimitedFSDirectory); + RateLimitedFSDirectory rateLimitingDirectory = (RateLimitedFSDirectory) directory; + Directory delegate = rateLimitingDirectory.getDelegate(); + assertFalse(delegate instanceof SleepingLockWrapper); + assertTrue(delegate instanceof SimpleFSDirectory); + } +}