add more tests and apply feedback from @mikemccand
This commit is contained in:
parent
59211927b6
commit
5204440471
|
@ -21,8 +21,6 @@ package org.elasticsearch.index;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.StringWriter;
|
||||
import java.nio.file.Path;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
|
@ -108,7 +106,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
private final IndexSettings indexSettings;
|
||||
private final IndexingSlowLog slowLog;
|
||||
private final IndexingOperationListener[] listeners;
|
||||
private volatile RefreshTasks refreshTask;
|
||||
private volatile AsyncRefreshTask refreshTask;
|
||||
private final AsyncTranslogFSync fsyncTask;
|
||||
|
||||
public IndexService(IndexSettings indexSettings, NodeEnvironment nodeEnv,
|
||||
SimilarityService similarityService,
|
||||
|
@ -145,7 +144,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
this.listeners = new IndexingOperationListener[1+listenersIn.length];
|
||||
this.listeners[0] = slowLog;
|
||||
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
|
||||
this.refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool());
|
||||
// kick off async ops for the first shard in this index
|
||||
if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
|
||||
this.fsyncTask = new AsyncTranslogFSync(this);
|
||||
} else {
|
||||
this.fsyncTask = null;
|
||||
}
|
||||
this.refreshTask = new AsyncRefreshTask(this);
|
||||
}
|
||||
|
||||
public int numberOfShards() {
|
||||
|
@ -221,7 +226,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService);
|
||||
IOUtils.close(bitsetFilterCache, indexCache, mapperService, indexFieldData, analysisService, refreshTask, fsyncTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -312,17 +317,9 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, listeners);
|
||||
}
|
||||
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
indexShard.updateRoutingEntry(routing, true);
|
||||
if (shards.isEmpty()) {
|
||||
ThreadPool threadPool = nodeServicesProvider.getThreadPool();
|
||||
if (this.indexSettings.getTranslogSyncInterval().millis() != 0) {
|
||||
new AsyncTranslogFSync(this, threadPool); // kick this off if we are the first shard in this service.
|
||||
}
|
||||
rescheduleRefreshTasks();
|
||||
}
|
||||
shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap();
|
||||
success = true;
|
||||
return indexShard;
|
||||
|
@ -590,7 +587,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
try {
|
||||
refreshTask.close();
|
||||
} finally {
|
||||
refreshTask = new RefreshTasks(this, nodeServicesProvider.getThreadPool());
|
||||
refreshTask = new AsyncRefreshTask(this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -662,16 +659,16 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private volatile Exception lastThrownException;
|
||||
|
||||
BaseAsyncTask(IndexService indexService, ThreadPool threadPool, TimeValue interval) {
|
||||
BaseAsyncTask(IndexService indexService, TimeValue interval) {
|
||||
this.indexService = indexService;
|
||||
this.threadPool = threadPool;
|
||||
this.threadPool = indexService.getThreadPool();
|
||||
this.interval = interval;
|
||||
onTaskCompletion();
|
||||
}
|
||||
|
||||
boolean mustReschedule() {
|
||||
// don't re-schedule if its closed or if we dont' have a single shard here..., we are done
|
||||
return (indexService.closed.get() || indexService.shards.isEmpty()) == false
|
||||
return indexService.closed.get() == false
|
||||
&& closed.get() == false && interval.millis() > 0;
|
||||
}
|
||||
|
||||
|
@ -685,13 +682,17 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
|
||||
boolean isScheduled() {
|
||||
return scheduledFuture != null;
|
||||
}
|
||||
|
||||
public final void run() {
|
||||
try {
|
||||
runInternal();
|
||||
} catch (Exception ex) {
|
||||
if (lastThrownException == null || sameException(lastThrownException, ex) == false) {
|
||||
// prevent the annoying fact of logging the same stuff all the time with an interval of 1 sec will spam all your logs
|
||||
indexService.logger.warn("failed to run task {} - supressing re-occuring exceptions unless the exception changes", ex, toString());
|
||||
indexService.logger.warn("failed to run task {} - suppressing re-occurring exceptions unless the exception changes", ex, toString());
|
||||
lastThrownException = ex;
|
||||
}
|
||||
} finally {
|
||||
|
@ -746,8 +747,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
*/
|
||||
final static class AsyncTranslogFSync extends BaseAsyncTask {
|
||||
|
||||
AsyncTranslogFSync(IndexService indexService, ThreadPool threadPool) {
|
||||
super(indexService, threadPool, indexService.getIndexSettings().getTranslogSyncInterval());
|
||||
AsyncTranslogFSync(IndexService indexService) {
|
||||
super(indexService, indexService.getIndexSettings().getTranslogSyncInterval());
|
||||
}
|
||||
|
||||
protected String getThreadPool() {
|
||||
|
@ -764,10 +765,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
|
||||
final class RefreshTasks extends BaseAsyncTask {
|
||||
final class AsyncRefreshTask extends BaseAsyncTask {
|
||||
|
||||
RefreshTasks(IndexService indexService, ThreadPool threadPool) {
|
||||
super(indexService, threadPool, indexService.getIndexSettings().getRefreshInterval());
|
||||
AsyncRefreshTask(IndexService indexService) {
|
||||
super(indexService, indexService.getIndexSettings().getRefreshInterval());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -785,10 +786,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
}
|
||||
}
|
||||
|
||||
RefreshTasks getRefreshTask() { // for tests
|
||||
AsyncRefreshTask getRefreshTask() { // for tests
|
||||
return refreshTask;
|
||||
}
|
||||
|
||||
|
||||
|
||||
AsyncTranslogFSync getFsyncTask() { // for tests
|
||||
return fsyncTask;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -556,7 +556,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
/** Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}. */
|
||||
public void refresh(String source) {
|
||||
verifyNotClosed();
|
||||
if (getEngine().refreshNeeded()) {
|
||||
// if (getEngine().refreshNeeded()) {
|
||||
if (canIndex()) {
|
||||
long bytes = getEngine().getIndexBufferRAMBytesUsed();
|
||||
writingBytes.addAndGet(bytes);
|
||||
|
@ -575,7 +575,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
getEngine().refresh(source);
|
||||
refreshMetric.inc(System.nanoTime() - time);
|
||||
}
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
/** Returns how many bytes we are currently moving from heap to disk */
|
||||
|
@ -1197,7 +1197,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
void handleRefreshException(Exception e) {
|
||||
private void handleRefreshException(Exception e) {
|
||||
if (e instanceof EngineClosedException) {
|
||||
// ignore
|
||||
} else if (e instanceof RefreshFailedEngineException) {
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import org.apache.lucene.search.MatchAllDocsQuery;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.cluster.metadata.AliasMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
@ -28,14 +30,15 @@ import org.elasticsearch.common.unit.TimeValue;
|
|||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.InvalidAliasNameException;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
@ -160,11 +163,10 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
|
||||
public void testBaseAsyncTask() throws InterruptedException, IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
ThreadPool pool = indexService.getThreadPool();
|
||||
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
|
||||
AtomicReference<CountDownLatch> latch2 = new AtomicReference<>(new CountDownLatch(1));
|
||||
final AtomicInteger count = new AtomicInteger();
|
||||
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1)) {
|
||||
IndexService.BaseAsyncTask task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1)) {
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
count.incrementAndGet();
|
||||
|
@ -202,27 +204,23 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
assertEquals(2, count.get());
|
||||
|
||||
|
||||
task = new IndexService.BaseAsyncTask(indexService, pool, TimeValue.timeValueMillis(1000000)) {
|
||||
task = new IndexService.BaseAsyncTask(indexService, TimeValue.timeValueMillis(1000000)) {
|
||||
@Override
|
||||
protected void runInternal() {
|
||||
|
||||
}
|
||||
};
|
||||
assertTrue(task.mustReschedule());
|
||||
if (randomBoolean()) {
|
||||
for (Integer id : indexService.shardIds()) {
|
||||
indexService.removeShard(id, "simon says");
|
||||
}
|
||||
} else {
|
||||
indexService.close("simon says", false);
|
||||
}
|
||||
|
||||
assertFalse("no shards left", task.mustReschedule());
|
||||
assertTrue(task.isScheduled());
|
||||
task.close();
|
||||
assertFalse(task.isScheduled());
|
||||
}
|
||||
|
||||
public void testRefreshTaskIsUpdated() {
|
||||
public void testRefreshTaskIsUpdated() throws IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
IndexService.RefreshTasks refreshTask = indexService.getRefreshTask();
|
||||
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
|
||||
assertEquals(1000, refreshTask.getInterval().millis());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
|
@ -231,6 +229,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
indexService.updateMetaData(metaData);
|
||||
assertNotSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
assertFalse(refreshTask.isScheduled());
|
||||
assertFalse(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
// set it to 100ms
|
||||
|
@ -241,6 +240,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
|
||||
refreshTask = indexService.getRefreshTask();
|
||||
assertTrue(refreshTask.mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertEquals(100, refreshTask.getInterval().millis());
|
||||
|
||||
// set it to 200ms
|
||||
|
@ -251,6 +251,7 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
|
||||
refreshTask = indexService.getRefreshTask();
|
||||
assertTrue(refreshTask.mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertEquals(200, refreshTask.getInterval().millis());
|
||||
|
||||
// set it to 200ms again
|
||||
|
@ -258,7 +259,69 @@ public class IndexServiceTests extends ESSingleNodeTestCase {
|
|||
indexService.updateMetaData(metaData);
|
||||
assertSame(refreshTask, indexService.getRefreshTask());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
assertTrue(refreshTask.isScheduled());
|
||||
assertFalse(refreshTask.isClosed());
|
||||
assertEquals(200, refreshTask.getInterval().millis());
|
||||
indexService.close("simon says", false);
|
||||
assertFalse(refreshTask.isScheduled());
|
||||
assertTrue(refreshTask.isClosed());
|
||||
}
|
||||
|
||||
public void testFsyncTaskIsRunning() throws IOException {
|
||||
IndexService indexService = newIndexService();
|
||||
IndexService.AsyncTranslogFSync fsyncTask = indexService.getFsyncTask();
|
||||
assertNotNull(fsyncTask);
|
||||
assertEquals(5000, fsyncTask.getInterval().millis());
|
||||
assertTrue(fsyncTask.mustReschedule());
|
||||
assertTrue(fsyncTask.isScheduled());
|
||||
|
||||
indexService.close("simon says", false);
|
||||
assertFalse(fsyncTask.isScheduled());
|
||||
assertTrue(fsyncTask.isClosed());
|
||||
}
|
||||
|
||||
public void testRefreshActuallyWorks() throws Exception {
|
||||
IndexService indexService = newIndexService();
|
||||
ensureGreen("test");
|
||||
IndexService.AsyncRefreshTask refreshTask = indexService.getRefreshTask();
|
||||
assertEquals(1000, refreshTask.getInterval().millis());
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
|
||||
// now disable
|
||||
IndexMetaData metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, -1)).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(0, search.totalHits);
|
||||
}
|
||||
// refresh every millisecond
|
||||
metaData = IndexMetaData.builder(indexService.getMetaData()).settings(Settings.builder().put(indexService.getMetaData().getSettings()).put(IndexSettings.INDEX_REFRESH_INTERVAL, "1ms")).build();
|
||||
indexService.updateMetaData(metaData);
|
||||
assertBusy(() -> {
|
||||
try (Engine.Searcher searcher = shard.acquireSearcher("test")) {
|
||||
TopDocs search = searcher.searcher().search(new MatchAllDocsQuery(), 10);
|
||||
assertEquals(1, search.totalHits);
|
||||
} catch (IOException e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void testAsyncFsyncActuallyWorks() throws Exception {
|
||||
Settings settings = Settings.builder()
|
||||
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL, "10ms") // very often :)
|
||||
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY, Translog.Durability.ASYNC)
|
||||
.build();
|
||||
IndexService indexService = createIndex("test", settings);
|
||||
ensureGreen("test");
|
||||
assertTrue(indexService.getRefreshTask().mustReschedule());
|
||||
client().prepareIndex("test", "test", "1").setSource("{\"foo\": \"bar\"}").get();
|
||||
IndexShard shard = indexService.getShard(0);
|
||||
assertBusy(() -> {
|
||||
assertFalse(shard.getTranslog().syncNeeded());
|
||||
});
|
||||
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue