Add shadow replicas for shared filesystems

Squashed commit of the following:

commit 20835037c98e7d2fac4206c372717a05a27c4790
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 15:27:17 2015 -0700

    Use Enum for "_primary" preference

commit 325acbe4585179190a959ba3101ee63b99f1931a
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 14:32:41 2015 -0700

    Use ?preference=_primary automatically for realtime GET operations

commit edd49434af5de7e55928f27a1c9ed0fddb1fb133
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 14:32:06 2015 -0700

    Move engine creation into protected createNewEngine method

commit 67a797a9235d4aa376ff4af16f3944d907df4577
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 13:14:01 2015 -0700

    Factor out AssertingSearcher so it can be used by mock Engines

commit 62b0c28df8c23cc0b8205b33f7595c68ff940e2b
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 11:43:17 2015 -0700

    Use IndexMetaData.isIndexUsingShadowReplicas helper

commit 1a0d45629457578a60ae5bccbeba05acf5d79ddd
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 09:59:31 2015 -0700

    Rename usesSharedFilesystem -> isOnSharedFilesystem

commit 73c62df4fc7da8a5ed557620a83910d89b313aa1
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 09:58:02 2015 -0700

    Add MockShadowEngine and hook it up to be used

commit c8e8db473830fce1bdca3c4df80a685e782383bc
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 09:45:50 2015 -0700

    Clarify comment about pre-defined mappings

commit 60a4d5374af5262bd415f4ef40f635278ed12a03
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 09:18:22 2015 -0700

    Add a test for shadow replicas that uses field data

commit 7346f9f382f83a21cd2445b3386fe67472bc3184
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 08:37:14 2015 -0700

    Revert changes to RecoveryTarget.java

commit d90d6980c9b737bd8c0f4339613a5373b1645e95
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 08:35:44 2015 -0700

    Rename `ownsShard` to `canDeleteShardContent`

commit 23001af834d66278ac84d9a72c37b5d1f3a10a7b
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 08:35:25 2015 -0700

    Remove ShadowEngineFactory, add .newReadOnlyEngine method in EngineFactory

commit b64fef1d2c5e167713e869b22d388ff479252173
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 18 08:25:19 2015 -0700

    Add warning that predefined mappings should be used

commit a1b8b8cf0db49d1bd1aeb84e51491f7f0de43b59
Author: Lee Hinman <lee@writequit.org>
Date:   Tue Feb 17 14:31:50 2015 -0700

    Remove unused import and fix index creation example in docs

commit 0b1b852365ceafc0df86866ac3a4ffb6988b08e4
Merge: b9d1fed a22bd49
Author: Lee Hinman <lee@writequit.org>
Date:   Tue Feb 17 10:56:02 2015 -0700

    Merge remote-tracking branch 'refs/remotes/origin/master' into shadow-replicas

commit b9d1fed25ae472a9dce1904eb806702fba4d9786
Merge: 4473e63 41fd4d8
Author: Lee Hinman <lee@writequit.org>
Date:   Tue Feb 17 09:02:27 2015 -0700

    Merge remote-tracking branch 'refs/remotes/origin/master' into shadow-replicas

commit 4473e630460e2f0ca2a2e2478f3712f39a64c919
Author: Lee Hinman <lee@writequit.org>
Date:   Tue Feb 17 09:00:39 2015 -0700

    Add asciidoc documentation for shadow replicas

commit eb699c19f04965952ae45e2caf107124837c4654
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 16:15:39 2015 +0100

    remove last nocommit

commit c5ece6d16d423fbdd36f5d789bd8daa5724d77b0
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 16:13:12 2015 +0100

    simplify shadow engine

commit 45cd34a12a442080477da3ef14ab2fe7947ea97e
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 11:32:57 2015 +0100

    fix tests

commit 744f228c192602a6737051571e040731d413ba8b
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 11:28:12 2015 +0100

    revert changes to IndexShardGateway - these are leftovers from previous iterations

commit 11886b7653dabc23655ec76d112f291301f98f4a
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 11:26:48 2015 +0100

    Back out non-shared FS code. this will go in in a second iteration

commit 77fba571f150a0ca7fb340603669522c3ed65363
Merge: e8ad614 2e3c6a9
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 11:16:46 2015 +0100

    Merge branch 'master' into shadow-replicas

    Conflicts:
    	src/main/java/org/elasticsearch/index/engine/Engine.java

commit e8ad61467304e6d175257e389b8406d2a6cf8dba
Merge: 48a700d 1b8d8da
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 10:54:20 2015 +0100

    Merge branch 'master' into shadow-replicas

commit 48a700d23cff117b8e4851d4008364f92b8272a0
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 10:50:59 2015 +0100

    add test for failing shadow engine / remove nocommit

commit d77414c5e7b2cde830a8e3f70fe463ccc904d4d0
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 17 10:27:56 2015 +0100

    remove nocommits in IndexMetaData

commit abb696563a9e418d3f842a790fcb832f91150be2
Author: Simon Willnauer <simonw@apache.org>
Date:   Mon Feb 16 17:05:02 2015 +0100

    remove nocommit and simplify delete logic

commit 82b9f0449108cd4741568d9b4495bf6c10a5b019
Author: Simon Willnauer <simonw@apache.org>
Date:   Mon Feb 16 16:45:27 2015 +0100

    reduce the changes compared to master

commit 28f069b6d99a65e285ac8c821e6a332a1d8eb315
Author: Simon Willnauer <simonw@apache.org>
Date:   Mon Feb 16 16:43:46 2015 +0100

    fix primary relocation

commit c4c999dd61a44a7a0db9798275a622f2b85b1039
Merge: 2ae80f9 455a85d
Author: Simon Willnauer <simonw@apache.org>
Date:   Mon Feb 16 15:04:26 2015 +0100

    Merge branch 'master' into shadow-replicas

commit 2ae80f9689346f8fd346a0d3775a6341874d8bef
Author: Lee Hinman <lee@writequit.org>
Date:   Fri Feb 13 16:25:34 2015 -0700

    throw UnsupportedOperationException on write operations in ShadowEngine

commit 740c28dd9ef987bf56b670fa1a8bcc6de2845819
Merge: e5bc047 305ba33
Author: Lee Hinman <lee@writequit.org>
Date:   Fri Feb 13 15:38:39 2015 -0700

    Merge branch 'master' into shadow-replicas

commit e5bc047d7c872ae960d397b1ae7b4b78d6a1ea10
Author: Lee Hinman <lee@writequit.org>
Date:   Fri Feb 13 11:38:09 2015 -0700

    Don't replicate document request when using shadow replicas

commit 213292e0679d8ae1492ea11861178236f4abd8ea
Author: Simon Willnauer <simonw@apache.org>
Date:   Fri Feb 13 13:58:05 2015 +0100

    add one more nocommit

commit 83d171cf632f9b77cca9de58505f7db8fcda5599
Merge: aea9692 09eb8d1
Author: Simon Willnauer <simonw@apache.org>
Date:   Fri Feb 13 13:52:29 2015 +0100

    Merge branch 'master' into shadow-replicas

commit aea96920d995dacef294e48e719ba18f1ecf5860
Author: Simon Willnauer <simonw@apache.org>
Date:   Fri Feb 13 09:56:41 2015 +0100

    revert unneeded changes on Store

commit ea4e3e58dc6959a92c06d5990276268d586735f3
Author: Lee Hinman <lee@writequit.org>
Date:   Thu Feb 12 14:26:30 2015 -0700

    Add documentation to ShadowIndexShard, remove nocommit

commit 4f71c8d9f706a0c1c39aa3a370efb1604559d928
Author: Lee Hinman <lee@writequit.org>
Date:   Thu Feb 12 14:17:22 2015 -0700

    Add documentation to ShadowEngine

commit 28a9d1842722acba7ea69e0fa65200444532a30c
Author: Lee Hinman <lee@writequit.org>
Date:   Thu Feb 12 14:08:25 2015 -0700

    Remove nocommit, document canDeleteIndexContents

commit d8d59dbf6d0525cd823d97268d035820e5727ac9
Author: Lee Hinman <lee@writequit.org>
Date:   Thu Feb 12 10:34:32 2015 -0700

    Refactor more shared methods into the abstract Engine

commit a7eb53c1e8b8fbfd9281b43ae39eacbe3cd1a0a6
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Feb 12 17:38:59 2015 +0100

    Simplify shared filesystem recovery by using a dedicated recovery handler that skip
    most phases and enforces shard closing on the soruce before the target opens it's engine

commit a62b9a70adad87d7492c526f4daf868cb05018d9
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Feb 12 15:59:54 2015 +0100

    fix compile error after upstream changes

commit abda7807bc3328a89fd783ca7ad8c6deac35f16f
Merge: f229719 35f6496
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Feb 12 15:57:28 2015 +0100

    Merge branch 'master' into shadow-replicas

    Conflicts:
    	src/main/java/org/elasticsearch/index/engine/Engine.java

commit f2297199b7dd5d3f9f1f109d0ddf3dd83390b0d1
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Feb 12 12:41:32 2015 +0100

    first cut at catchup from primary
    make flush to a refresh
    factor our ShadowIndexShard to have IndexShard be idential to the master and least intrusive

    cleanup abstractions

commit 4a367c07505b84b452807a58890f1cbe21711f27
Author: Simon Willnauer <simonw@apache.org>
Date:   Thu Feb 12 09:50:36 2015 +0100

    fix primary promotion

commit cf2fb807e7e243f1ad603a79bc9d5f31a499b769
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 16:45:41 2015 -0700

    Make assertPathHasBeenCleared recursive

commit 5689b7d2f84ca1c41e4459030af56cb9c0151eff
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 15:58:19 2015 -0700

    Add testShadowReplicaNaturalRelocation

commit fdbe4133537eaeb768747c2200cfc91878afeb97
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 15:28:57 2015 -0700

    Use check for shared filesystem in primary -> primary relocation

    Also adds a nocommit

commit 06e2eb4496762130af87ce68a47d360962091697
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 15:21:32 2015 -0700

    Add a test checking that indices with shadow replicas clean up after themselves

commit e4dbfb09a689b449f0edf6ee24222d7eaba2a215
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 15:08:18 2015 -0700

    Fix segment info for ShadowEngine, remove test nocommit

commit 80cf0e884c66eda7d59ac5d59235e1ce215af8f5
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 14:30:13 2015 -0700

    Remove nocommit in ShadowEngineTests#testFailStart()

commit 5e33eeaca971807b342f9be51a6a566eee005251
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 14:22:59 2015 -0700

    Remove overly-complex test

commit 2378fbb917b467e79c0262d7a41c23321bbeb147
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 13:45:44 2015 -0700

    Fix missing import

commit 52e9cd1b8334a5dd228d5d68bd03fd0040e9c8e9
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 13:45:05 2015 -0700

    Add a test for replica -> primary promotion

commit a95adbeded426d7f69f6ddc4cbd6712b6f6380b4
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 12:54:14 2015 -0700

    Remove tests that don't apply to ShadowEngine

commit 1896feda9de69e4f9cf774ef6748a5c50e953946
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 10:29:12 2015 -0700

    Add testShadowEngineIgnoresWriteOperations and testSearchResultRelease

commit 67d7df41eac5e10a1dd63ddb31de74e326e9d38b
Author: Lee Hinman <lee@writequit.org>
Date:   Wed Feb 11 10:06:05 2015 -0700

    Add start of ShadowEngine unit tests

commit ca9beb2d93d9b5af9aa6c75dbc0ead4ef57e220d
Merge: 2d42736 57a4646
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Feb 11 18:03:53 2015 +0100

    Merge branch 'master' into shadow-replicas

commit 2d42736fed3ed8afda7e4aff10b65d292e1c6f92
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Feb 11 17:51:22 2015 +0100

    shortcut recovery if we are on a shared FS - no need to compare files etc.

commit 24d36c92dd82adce650e7ac8e9f0b43c83b2dc53
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Feb 11 17:08:08 2015 +0100

    utilize the new delete code

commit 2a2eed10f58825aae29ffe4cf01aefa5743a97c7
Merge: 343dc0b 173cfc1
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Feb 11 16:07:41 2015 +0100

    Merge branch 'master' into shadow-replicas

    Conflicts:
    	src/main/java/org/elasticsearch/gateway/GatewayMetaState.java

commit 343dc0b527a7052acdc783ac5abcaad1ef78dbda
Author: Simon Willnauer <simonw@apache.org>
Date:   Wed Feb 11 16:05:28 2015 +0100

    long adder is not available in java7

commit be02cabfeebaea74b51b212957a2a466cfbfb716
Author: Lee Hinman <lee@writequit.org>
Date:   Tue Feb 10 22:04:24 2015 -0700

    Add test that restarts nodes to ensure shadow replicas recover

commit 7fcb373f0617050ca1a5a577b8cf32e32dc612b0
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 10 23:19:21 2015 +0100

    make test more evil

commit 38135af0c1991b88f168ece0efb72ffe9498ff59
Author: Simon Willnauer <simonw@apache.org>
Date:   Tue Feb 10 22:25:11 2015 +0100

    make tests pass

commit 05975af69e6db63cb95f3e40d25bfa7174e006ea
Author: Lee Hinman <lee@writequit.org>
Date:   Mon Jan 12 18:44:29 2015 +0100

    Add ShadowEngine
This commit is contained in:
Lee Hinman 2015-02-18 15:34:06 -07:00
parent cef1c97ad4
commit eb666f7f50
33 changed files with 2322 additions and 160 deletions

View File

@ -41,6 +41,11 @@ and warmers.
* <<indices-templates>>
* <<indices-warmers>>
[float]
[[shadow-replicas]]
== Replica configurations
* <<indices-shadow-replicas>>
[float]
[[monitoring]]
== Monitoring:
@ -107,3 +112,4 @@ include::indices/optimize.asciidoc[]
include::indices/upgrade.asciidoc[]
include::indices/shadow-replicas.asciidoc[]

View File

@ -0,0 +1,105 @@
[[indices-shadow-replicas]]
== Shadow replica indices
experimental[]
If you would like to use a shared filesystem, you can use the shadow replicas
settings to choose where on disk the data for an index should be kept, as well
as how Elasticsearch should replay operations on all the replica shards of an
index.
In order to fully utilize the `index.data_path` and `index.shadow_replicas`
settings, you need to enable using it in elasticsearch.yml:
[source,yaml]
--------------------------------------------------
node.enable_custom_paths: true
--------------------------------------------------
You can then create an index with a custom data path, where each node will use
this path for the data:
[WARNING]
========================
Because shadow replicas do not index the document on replica shards, it's
possible for the replica's known mapping to be behind the index's known mapping
if the latest cluster state has not yet been processed on the node containing
the replica. Because of this, it is highly recommended to use pre-defined
mappings when using shadow replicas.
========================
[source,js]
--------------------------------------------------
curl -XPUT 'localhost:9200/my_index' -d '
{
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 4,
"data_path": "/var/data/my_index",
"shadow_replicas": true
}
}'
--------------------------------------------------
[WARNING]
========================
In the above example, the "/var/data/my_index" path is a shared filesystem that
must be available on every node in the Elasticsearch cluster. You must also
ensure that the Elasticsearch process has the correct permissions to read from
and write to the directory used in the `index.data_path` setting.
========================
An index that has been created with the `index.shadow_replicas` setting set to
"true" will not replicate document operations to any of the replica shards,
instead, it will only continually refresh. Once segments are available on the
filesystem where the shadow replica resides (after an Elasticsearch "flush"), a
regular refresh (governed by the `index.refresh_interval`) can be used to make
the new data searchable.
NOTE: Since documents are only indexed on the primary shard, realtime GET
requests could fail to return a document if executed on the replica shard,
therefore, GET API requests automatically have the `?preference=_primary` flag
set if there is no preference flag already set.
In order to ensure the data is being synchronized in a fast enough manner, you
may need to tune the flush threshold for the index to a desired number. A flush
is needed to fsync segment files to disk, so they will be visible to all other
replica nodes. Users should test what flush threshold levels they are
comfortable with, as increased flushing can impact indexing performance.
The Elasticsearch cluster will still detect the loss of a primary shard, and
transform the replica into a primary in this situation. This transformation will
take slightly longer, since no `IndexWriter` is maintained for each shadow
replica.
Below is the list of settings that can be changed using the update
settings API:
`index.data_path` (string)::
Path to use for the index's data. Note that by default Elasticsearch will
append the node ordinal by default to the path to ensure multiple instances
of Elasticsearch on the same machine do not share a data directory.
`index.shadow_replicas`::
Boolean value indicating this index should use shadow replicas. Defaults to
`false`.
`index.shared_filesystem`::
Boolean value indicating this index uses a shared filesystem. Defaults to
the `true` if `index.shadow_replicas` is set to true, `false` otherwise.
=== Node level settings related to shadow replicas
These are non-dynamic settings that need to be configured in `elasticsearch.yml`
`node.add_id_to_custom_path`::
Boolean setting indicating whether Elasticsearch should append the node's
ordinal to the custom data path. For example, if this is enabled and a path
of "/tmp/foo" is used, the first locally-running node will use "/tmp/foo/0",
the second will use "/tmp/foo/1", the third "/tmp/foo/2", etc. Defaults to
`true`.
`node.enable_custom_paths`::
Boolean value that must be set to `true` in order to use the
`index.data_path` setting. Defaults to `false`.

View File

@ -25,13 +25,15 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.single.shard.TransportShardSingleOperationAction;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.Preference;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -74,6 +76,14 @@ public class TransportGetAction extends TransportShardSingleOperationAction<GetR
if (request.request().realtime == null) {
request.request().realtime = this.realtime;
}
IndexMetaData indexMeta = state.getMetaData().index(request.concreteIndex());
if (request.request().realtime && // if the realtime flag is set
request.request().preference() == null && // the preference flag is not already set
indexMeta != null && // and we have the index
IndexMetaData.isIndexUsingShadowReplicas(indexMeta.settings())) { // and the index uses shadow replicas
// set the preference for the request to use "_primary" automatically
request.request().preference(Preference.PRIMARY.type());
}
// update the routing (request#index here is possibly an alias)
request.request().routing(state.metaData().resolveIndexRouting(request.request().routing(), request.request().index()));
// Fail fast on the node that received the request.

View File

@ -45,11 +45,11 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocumentAlreadyExistsException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
@ -631,9 +631,23 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId(), state.replicaRequest());
// If the replicas use shadow replicas, there is no reason to
// perform the action on the replica, so skip it and
// immediately return
if (IndexMetaData.isIndexUsingShadowReplicas(indexMetaData.settings())) {
// this delays mapping updates on replicas because they have
// to wait until they get the new mapping through the cluster
// state, which is why we recommend pre-defined mappings for
// indices using shadow replicas
state.onReplicaSuccess();
return;
}
if (!nodeId.equals(observer.observedState().nodes().localNodeId())) {
final DiscoveryNode node = observer.observedState().nodes().get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, transportReplicaAction, shardRequest,
transportOptions, new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty vResponse) {
state.onReplicaSuccess();

View File

@ -157,6 +157,8 @@ public class IndexMetaData {
public static final String SETTING_NUMBER_OF_SHARDS = "index.number_of_shards";
public static final String SETTING_NUMBER_OF_REPLICAS = "index.number_of_replicas";
public static final String SETTING_SHADOW_REPLICAS = "index.shadow_replicas";
public static final String SETTING_SHARED_FILESYSTEM = "index.shared_filesystem";
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
public static final String SETTING_BLOCKS_READ = "index.blocks.read";
@ -784,4 +786,25 @@ public class IndexMetaData {
}
}
}
/**
* Returns <code>true</code> iff the given settings indicate that the index
* associated with these settings allocates it's shards on a shared
* filesystem. Otherwise <code>false</code>. The default setting for this
* is the returned value from
* {@link #isIndexUsingShadowReplicas(org.elasticsearch.common.settings.Settings)}.
*/
public static boolean isOnSharedFilesystem(Settings settings) {
return settings.getAsBoolean(SETTING_SHARED_FILESYSTEM, isIndexUsingShadowReplicas(settings));
}
/**
* Returns <code>true</code> iff the given settings indicate that the index associated
* with these settings uses shadow replicas. Otherwise <code>false</code>. The default
* setting for this is <code>false</code>.
*/
public static boolean isIndexUsingShadowReplicas(Settings settings) {
return settings.getAsBoolean(SETTING_SHADOW_REPLICAS, false);
}
}

View File

@ -224,6 +224,7 @@ public class NodeEnvironment extends AbstractComponent implements Closeable{
*
* @param index the index to delete
* @param lockTimeoutMS how long to wait for acquiring the indices shard locks
* @param indexSettings settings for the index being deleted
* @throws Exception if any of the shards data directories can't be locked or deleted
*/
public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSettings Settings indexSettings) throws IOException {

View File

@ -32,8 +32,8 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.DjbHashFunction;
import org.elasticsearch.cluster.routing.HashFunction;
import org.elasticsearch.cluster.routing.SimpleHashFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
@ -54,7 +54,9 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Set;

View File

@ -282,7 +282,7 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE);
}
public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException {
public synchronized IndexShard createShard(int sShardId, boolean primary) throws ElasticsearchException {
/*
* TODO: we execute this in parallel but it's a synced method. Yet, we might
* be able to serialize the execution via the cluster state in the future. for now we just
@ -304,15 +304,17 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
indicesLifecycle.beforeIndexShardCreated(shardId, indexSettings);
logger.debug("creating shard_id {}", shardId);
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
ModulesBuilder modules = new ModulesBuilder();
modules.add(new ShardsPluginsModule(indexSettings, pluginsService));
modules.add(new IndexShardModule(shardId, indexSettings));
modules.add(new IndexShardModule(shardId, primary, indexSettings));
modules.add(new ShardIndexingModule());
modules.add(new ShardSearchModule());
modules.add(new ShardGetModule());
modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class), lock,
new StoreCloseListener(shardId)));
new StoreCloseListener(shardId, canDeleteShardContent)));
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
@ -431,10 +433,12 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
}
}
private void onShardClose(ShardLock lock) {
private void onShardClose(ShardLock lock, boolean ownsShard) {
if (deleted.get()) { // we remove that shards content if this index has been deleted
try {
indicesServices.deleteShardStore("delete index", lock, indexSettings);
if (ownsShard) {
indicesServices.deleteShardStore("delete index", lock, indexSettings);
}
} catch (IOException e) {
logger.warn("{} failed to delete shard content", e, lock.getShardId());
}
@ -443,15 +447,17 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
private class StoreCloseListener implements Store.OnClose {
private final ShardId shardId;
private final boolean ownsShard;
public StoreCloseListener(ShardId shardId) {
public StoreCloseListener(ShardId shardId, boolean ownsShard) {
this.shardId = shardId;
this.ownsShard = ownsShard;
}
@Override
public void handle(ShardLock lock) {
assert lock.getShardId().equals(shardId) : "shard Id mismatch, expected: " + shardId + " but got: " + lock.getShardId();
onShardClose(lock);
onShardClose(lock, ownsShard);
}
}

View File

@ -54,8 +54,7 @@ public class SnapshotIndexCommit extends IndexCommitDelegate implements Releasab
}
/**
* Releases the current snapshot, returning <code>true</code> if it was
* actually released.
* Releases the current snapshot.
*/
public void close() {
deletionPolicy.close(getGeneration());

View File

@ -23,5 +23,7 @@ package org.elasticsearch.index.engine;
*/
public interface EngineFactory {
public Engine newEngine(EngineConfig config);
public Engine newReadWriteEngine(EngineConfig config);
public Engine newReadOnlyEngine(EngineConfig config);
}

View File

@ -20,7 +20,12 @@ package org.elasticsearch.index.engine;
public class InternalEngineFactory implements EngineFactory {
@Override
public Engine newEngine(EngineConfig config) {
public Engine newReadWriteEngine(EngineConfig config) {
return new InternalEngine(config);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
return new ShadowEngine(config);
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* ShadowEngine is a specialized engine that only allows read-only operations
* on the underlying Lucene index. An {@code IndexReader} is opened instead of
* an {@code IndexWriter}. All methods that would usually perform write
* operations are no-ops, this means:
*
* - No operations are written to or read from the translog
* - Create, Index, and Delete do nothing
* - Flush does not fsync any files, or make any on-disk changes
*
* In order for new segments to become visible, the ShadowEngine may perform
* stage1 of the traditional recovery process (copying segment files) from a
* regular primary (which uses {@link org.elasticsearch.index.engine.InternalEngine})
*
* Notice that since this Engine does not deal with the translog, any
* {@link #get(Get get)} request goes directly to the searcher, meaning it is
* non-realtime.
*/
public class ShadowEngine extends Engine {
private volatile SearcherManager searcherManager;
private SegmentInfos lastCommittedSegmentInfos;
public ShadowEngine(EngineConfig engineConfig) {
super(engineConfig);
SearcherFactory searcherFactory = new EngineSearcherFactory(engineConfig);
try {
DirectoryReader reader = null;
store.incRef();
boolean success = false;
try {
reader = ElasticsearchDirectoryReader.wrap(DirectoryReader.open(store.directory()), shardId);
this.searcherManager = new SearcherManager(reader, searcherFactory);
this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
success = true;
} catch (Throwable e) {
logger.warn("failed to create new reader", e);
throw e;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(reader);
store.decRef();
}
}
} catch (IOException ex) {
throw new EngineCreationFailureException(shardId, "failed to open index reader", ex);
}
}
@Override
public void create(Create create) throws EngineException {
throw new UnsupportedOperationException(shardId + " create operation not allowed on shadow engine");
}
@Override
public void index(Index index) throws EngineException {
throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine");
}
@Override
public void delete(Delete delete) throws EngineException {
throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine");
}
@Override
public void delete(DeleteByQuery delete) throws EngineException {
throw new UnsupportedOperationException(shardId + " delete-by-query operation not allowed on shadow engine");
}
@Override
public void flush() throws EngineException {
flush(false, false);
}
@Override
public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
logger.trace("skipping FLUSH on shadow engine");
// reread the last committed segment infos
refresh("flush");
try (ReleasableLock _ = readLock.acquire()) {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
} catch (Throwable e) {
if (isClosed.get() == false) {
logger.warn("failed to read latest segment infos on flush", e);
if (Lucene.isCorruptionException(e)) {
throw new FlushFailedEngineException(shardId, e);
}
}
}
}
@Override
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade) throws EngineException {
// no-op
logger.trace("skipping FORCE-MERGE on shadow engine");
}
@Override
public GetResult get(Get get) throws EngineException {
// There is no translog, so we can get it directly from the searcher
return getFromSearcher(get);
}
@Override
public List<Segment> segments(boolean verbose) {
try (ReleasableLock _ = readLock.acquire()) {
Segment[] segmentsArr = getSegmentInfo(lastCommittedSegmentInfos, verbose);
for (int i = 0; i < segmentsArr.length; i++) {
// hard code all segments as committed, because they are in
// order for the shadow replica to see them
segmentsArr[i].committed = true;
}
return Arrays.asList(segmentsArr);
}
}
@Override
public void refresh(String source) throws EngineException {
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
try (ReleasableLock _ = readLock.acquire()) {
ensureOpen();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
ensureOpen();
} catch (EngineClosedException e) {
throw e;
} catch (Throwable t) {
failEngine("refresh failed", t);
throw new RefreshFailedEngineException(shardId, t);
}
}
@Override
public SnapshotIndexCommit snapshotIndex() throws EngineException {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}
@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
throw new UnsupportedOperationException("Can not recover from a shadow engine");
}
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
}
@Override
protected void closeNoLock(String reason) throws ElasticsearchException {
if (isClosed.compareAndSet(false, true)) {
try {
logger.debug("shadow replica close searcher manager refCount: {}", store.refCount());
IOUtils.close(searcherManager);
} catch (Throwable t) {
logger.warn("shadow replica failed to close searcher manager", t);
} finally {
store.decRef();
}
}
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.optimize.OptimizeRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.common.Booleans;
@ -44,7 +45,6 @@ import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
@ -54,7 +54,6 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
@ -89,9 +88,9 @@ import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.index.store.Store.MetadataSnapshot;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.suggest.stats.SuggestStats;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
@ -131,7 +130,6 @@ public class IndexShard extends AbstractIndexShardComponent {
private final InternalIndicesLifecycle indicesLifecycle;
private final Store store;
private final MergeSchedulerProvider mergeScheduler;
private final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
private final Translog translog;
private final IndexAliasesService indexAliasesService;
private final ShardIndexingService indexingService;
@ -152,16 +150,17 @@ public class IndexShard extends AbstractIndexShardComponent {
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private final EngineConfig config;
private final EngineFactory engineFactory;
private long checkIndexTook = 0;
private volatile IndexShardState state;
private TimeValue refreshInterval;
private volatile ScheduledFuture refreshScheduledFuture;
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
protected final EngineConfig config;
protected final EngineFactory engineFactory;
@Nullable
private RecoveryState recoveryState;
@ -187,7 +186,7 @@ public class IndexShard extends AbstractIndexShardComponent {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService, IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache, ShardBitsetFilterCache shardBitsetFilterCache,
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, AnalysisService analysisService, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
@Nullable IndicesWarmer warmer, SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService, MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
super(shardId, indexSettings);
Preconditions.checkNotNull(store, "Store must be provided to the index shard");
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the index shard");
@ -880,7 +879,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering");
@ -894,7 +893,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private void verifyStarted() throws IllegalIndexShardStateException {
protected final void verifyStarted() throws IllegalIndexShardStateException {
IndexShardState state = this.state; // one time volatile read
if (state != IndexShardState.STARTED) {
throw new IndexShardNotStartedException(shardId, state);
@ -1166,13 +1165,13 @@ public class IndexShard extends AbstractIndexShardComponent {
}
}
private void createNewEngine() {
protected void createNewEngine() {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
}
assert this.currentEngineReference.get() == null;
this.currentEngineReference.set(engineFactory.newEngine(config));
this.currentEngineReference.set(engineFactory.newReadWriteEngine(config));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineFactory;
@ -26,27 +27,43 @@ import org.elasticsearch.index.engine.InternalEngineFactory;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
/**
*
* The {@code IndexShardModule} module is responsible for binding the correct
* shard id, index shard, engine factory, and warming service for a newly
* created shard.
*/
public class IndexShardModule extends AbstractModule {
public static final String ENGINE_FACTORY = "index.engine.factory";
private static final Class<? extends EngineFactory> DEFAULT_ENGINE_FACTORY_CLASS = InternalEngineFactory.class;
private static final String ENGINE_PREFIX = "org.elasticsearch.index.engine.";
private static final String ENGINE_SUFFIX = "EngineFactory";
private final ShardId shardId;
private final Settings settings;
private final boolean primary;
public IndexShardModule(ShardId shardId, Settings settings) {
public IndexShardModule(ShardId shardId, boolean primary, Settings settings) {
this.settings = settings;
this.shardId = shardId;
this.primary = primary;
}
/** Return true if a shadow engine should be used */
protected boolean useShadowEngine() {
return primary == false && IndexMetaData.isIndexUsingShadowReplicas(settings);
}
@Override
protected void configure() {
bind(ShardId.class).toInstance(shardId);
bind(IndexShard.class).asEagerSingleton();
bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS,
"org.elasticsearch.index.engine.", "EngineFactory"));
if (useShadowEngine()) {
bind(IndexShard.class).to(ShadowIndexShard.class).asEagerSingleton();
} else {
bind(IndexShard.class).asEagerSingleton();
}
bind(EngineFactory.class).to(settings.getAsClass(ENGINE_FACTORY, DEFAULT_ENGINE_FACTORY_CLASS, ENGINE_PREFIX, ENGINE_SUFFIX));
bind(ShardIndexWarmerService.class).asEagerSingleton();
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.ShardBitsetFilterCache;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.query.ShardQueryCache;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
import org.elasticsearch.index.get.ShardGetService;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.stats.ShardSearchService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.suggest.stats.ShardSuggestService;
import org.elasticsearch.index.termvectors.ShardTermVectorsService;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.indices.IndicesLifecycle;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;
/**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
* from the primary when a flush happens. It also ensures that a replica being
* promoted to a primary causes the shard to fail, kicking off a re-allocation
* of the primary shard.
*/
public class ShadowIndexShard extends IndexShard {
private final Object mutex = new Object();
@Inject
public ShadowIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService,
IndicesLifecycle indicesLifecycle, Store store, MergeSchedulerProvider mergeScheduler,
Translog translog, ThreadPool threadPool, MapperService mapperService,
IndexQueryParserService queryParserService, IndexCache indexCache,
IndexAliasesService indexAliasesService, ShardIndexingService indexingService,
ShardGetService getService, ShardSearchService searchService,
ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache,
ShardFieldData shardFieldData, PercolatorQueriesRegistry percolatorQueriesRegistry,
ShardPercolateService shardPercolateService, CodecService codecService,
ShardTermVectorsService termVectorsService, IndexFieldDataService indexFieldDataService,
IndexService indexService, ShardSuggestService shardSuggestService, ShardQueryCache shardQueryCache,
ShardBitsetFilterCache shardBitsetFilterCache, @Nullable IndicesWarmer warmer,
SnapshotDeletionPolicy deletionPolicy, SimilarityService similarityService,
MergePolicyProvider mergePolicyProvider, EngineFactory factory) {
super(shardId, indexSettings, indexSettingsService, indicesLifecycle, store, mergeScheduler,
translog, threadPool, mapperService, queryParserService, indexCache, indexAliasesService,
indexingService, getService, searchService, shardWarmerService, shardFilterCache,
shardFieldData, percolatorQueriesRegistry, shardPercolateService, codecService,
termVectorsService, indexFieldDataService, indexService, shardSuggestService,
shardQueryCache, shardBitsetFilterCache, warmer, deletionPolicy, similarityService,
mergePolicyProvider, factory);
}
/**
* In addition to the regular accounting done in
* {@link IndexShard#routingEntry(org.elasticsearch.cluster.routing.ShardRouting)},
* if this shadow replica needs to be promoted to a primary, the shard is
* failed in order to allow a new primary to be re-allocated.
*/
@Override
public IndexShard routingEntry(ShardRouting newRouting) {
ShardRouting shardRouting = this.routingEntry();
super.routingEntry(newRouting);
// check for a shadow replica that now needs to be transformed into
// a normal primary today we simply fail it to force reallocation
if (shardRouting != null && shardRouting.primary() == false && // currently a replica
newRouting.primary() == true) {// becoming a primary
failShard("can't promote shadow replica to primary",
new ElasticsearchIllegalStateException("can't promote shadow replica to primary"));
}
return this;
}
@Override
protected void createNewEngine() {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
}
assert this.currentEngineReference.get() == null;
assert this.shardRouting.primary() == false;
// Use the read-only engine for shadow replicas
this.currentEngineReference.set(engineFactory.newReadOnlyEngine(config));
}
}
}

View File

@ -31,10 +31,13 @@ public final class DirectoryUtils {
private DirectoryUtils() {} // no instance
static final Directory getLeafDirectory(FilterDirectory dir) {
static final <T extends Directory> Directory getLeafDirectory(FilterDirectory dir, Class<T> targetClass) {
Directory current = dir.getDelegate();
while (true) {
if ((current instanceof FilterDirectory)) {
if (targetClass != null && targetClass.isAssignableFrom(current.getClass())) {
break;
}
current = ((FilterDirectory) current).getDelegate();
} else {
break;
@ -59,7 +62,7 @@ public final class DirectoryUtils {
public static <T extends Directory> T getLeaf(Directory dir, Class<T> targetClass, T defaultValue) {
Directory d = dir;
if (dir instanceof FilterDirectory) {
d = getLeafDirectory((FilterDirectory) dir);
d = getLeafDirectory((FilterDirectory) dir, targetClass);
}
if (d instanceof FileSwitchDirectory) {
T leaf = getLeaf(((FileSwitchDirectory) d).getPrimaryDir(), targetClass);

View File

@ -481,7 +481,9 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
// the store metadata gets wiped anyway even without the lock this is just best effort since
// every shards deletes its content under the shard lock it owns.
logger.debug("{} deleting index store reason [{}]", index, reason);
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
if (canDeleteIndexContents(index, indexSettings)) {
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
}
} catch (LockObtainFailedException ex) {
logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index);
} catch (Exception ex) {
@ -502,9 +504,6 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
*/
public void deleteShardStore(String reason, ShardLock lock, Settings indexSettings) throws IOException {
ShardId shardId = lock.getShardId();
if (canDeleteShardContent(shardId, indexSettings) == false) {
throw new ElasticsearchIllegalStateException("Can't delete shard " + shardId);
}
logger.trace("{} deleting shard reason [{}]", shardId, reason);
nodeEnv.deleteShardDirectoryUnderLock(lock, indexSettings);
}
@ -527,6 +526,26 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
logger.trace("{} deleting shard reason [{}]", shardId, reason);
}
/**
* This method returns true if the current node is allowed to delete the
* given index. If the index uses a shared filesystem this method always
* returns false.
* @param index {@code Index} to check whether deletion is allowed
* @param indexSettings {@code Settings} for the given index
* @return true if the index can be deleted on this node
*/
public boolean canDeleteIndexContents(Index index, Settings indexSettings) {
final Tuple<IndexService, Injector> indexServiceInjectorTuple = this.indices.get(index);
if (IndexMetaData.isOnSharedFilesystem(indexSettings) == false) {
if (indexServiceInjectorTuple == null && nodeEnv.hasNodeFile()) {
return true;
}
} else {
logger.trace("{} skipping index directory deletion due to shadow replicas", index);
}
return false;
}
/**
* Returns <code>true</code> iff the shards content for the given shard can be deleted.
* This method will return <code>false</code> if:
@ -550,13 +569,16 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
private boolean canDeleteShardContent(ShardId shardId, @IndexSettings Settings indexSettings) {
final Tuple<IndexService, Injector> indexServiceInjectorTuple = this.indices.get(shardId.getIndex());
// TODO add some protection here to prevent shard deletion if we are on a shard FS or have ShadowReplicas enabled.
if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) {
final IndexService indexService = indexServiceInjectorTuple.v1();
return indexService.hasShard(shardId.id()) == false;
} else if (nodeEnv.hasNodeFile()) {
final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
return FileSystemUtils.exists(shardLocations);
if (IndexMetaData.isOnSharedFilesystem(indexSettings) == false) {
if (indexServiceInjectorTuple != null && nodeEnv.hasNodeFile()) {
final IndexService indexService = indexServiceInjectorTuple.v1();
return indexService.hasShard(shardId.id()) == false;
} else if (nodeEnv.hasNodeFile()) {
final Path[] shardLocations = nodeEnv.shardDataPaths(shardId, indexSettings);
return FileSystemUtils.exists(shardLocations);
}
} else {
logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId);
}
return false;
}

View File

@ -686,7 +686,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] creating shard", shardRouting.index(), shardId);
}
IndexShard indexShard = indexService.createShard(shardId);
IndexShard indexShard = indexService.createShard(shardId, shardRouting.primary());
indexShard.routingEntry(shardRouting);
indexShard.addFailedEngineListener(failedEngineHandler);
} catch (IndexShardAlreadyExistsException e) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.indices.recovery;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
@ -113,8 +114,12 @@ public class RecoverySource extends AbstractComponent {
}
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
final ShardRecoveryHandler handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
final ShardRecoveryHandler handler;
if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) {
handler = new SharedFSRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
} else {
handler = new ShardRecoveryHandler(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
}
ongoingRecoveries.add(shard, handler);
try {
shard.recover(handler);

View File

@ -76,9 +76,9 @@ import java.util.concurrent.atomic.AtomicReference;
* everything relating to copying the segment files as well as sending translog
* operations across the wire once the segments have been copied.
*/
public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
public class ShardRecoveryHandler implements Engine.RecoveryHandler {
private final ESLogger logger;
protected final ESLogger logger;
// Shard that is going to be recovered (the "source")
private final IndexShard shard;
private final String indexName;
@ -471,11 +471,12 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
throw new IndexShardClosedException(request.shardId());
}
cancellableThreads.checkForCancel();
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
StopWatch stopWatch = new StopWatch().start();
final int totalOperations;
logger.trace("[{}][{}] recovery [phase3] to {}: sending transaction log operations", indexName, shardId, request.targetNode());
// Send the translog operations to the target node
int totalOperations = sendSnapshot(snapshot);
totalOperations = sendSnapshot(snapshot);
cancellableThreads.execute(new Interruptable() {
@Override
@ -579,7 +580,7 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
*
* @return the total number of translog operations that were sent
*/
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
int ops = 0;
long size = 0;
int totalOperations = 0;

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
* as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock.
*/
public class SharedFSRecoveryHandler extends ShardRecoveryHandler {
private final IndexShard shard;
private final StartRecoveryRequest request;
public SharedFSRecoveryHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
this.shard = shard;
this.request = request;
}
@Override
public void phase1(SnapshotIndexCommit snapshot) throws ElasticsearchException {
if (request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary()) {
// here we simply fail the primary shard since we can't move them (have 2 writers open at the same time)
// by failing the shard we play safe and just go through the entire reallocation procedure of the primary
// it would be ideal to make sure we flushed the translog here but that is not possible in the current design.
ElasticsearchIllegalStateException exception = new ElasticsearchIllegalStateException("Can't relocate primary - failing");
shard.failShard("primary_relocation", exception);
throw exception;
}
logger.trace("{} recovery [phase2] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
}
@Override
protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchException {
logger.trace("{} recovery [phase3] to {}: skipping transaction log operations for file sync", shard.shardId(), request.targetNode());
return 0;
}
}

View File

@ -0,0 +1,373 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.junit.Test;
import java.nio.file.Path;
import java.util.List;
import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
/**
* Tests for indices that use shadow replicas and a shared filesystem
*/
@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0)
public class IndexWithShadowReplicasTests extends ElasticsearchIntegrationTest {
@Test
public void testIndexWithFewDocuments() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(3, nodeSettings).get();
final String IDX = "test";
final Path dataPath = newTempDirPath();
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX);
// So basically, the primary should fail and the replica will need to
// replay the translog, this is what this tests
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
// Check that we can get doc 1 and 2, because we are doing realtime
// gets and getting from the primary
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(true).setFields("foo").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setRealtime(true).setFields("foo").get();
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
flushAndRefresh(IDX);
client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get();
refresh();
// Check that we can get doc 1 and 2 without realtime
gResp1 = client().prepareGet(IDX, "doc", "1").setRealtime(false).setFields("foo").get();
gResp2 = client().prepareGet(IDX, "doc", "2").setRealtime(false).setFields("foo").get();
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
logger.info("--> restarting both nodes");
if (randomBoolean()) {
logger.info("--> rolling restart");
internalCluster().rollingRestart();
} else {
logger.info("--> full restart");
internalCluster().fullRestart();
}
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ensureGreen(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 4);
logger.info("--> deleting index");
assertAcked(client().admin().indices().prepareDelete(IDX));
}
@Test
public void testReplicaToPrimaryPromotion() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = newTempDirPath();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
flushAndRefresh(IDX);
logger.info("--> stopping node1 [{}]", node1);
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node1));
ensureYellow(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
}
@Test
public void testPrimaryRelocation() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = newTempDirPath();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
client().admin().cluster().prepareHealth().setWaitForNodes("2").get();
flushAndRefresh(IDX);
// now prevent primary from being allocated on node 1 move to node_3
String node3 = internalCluster().startNode(nodeSettings);
Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
ensureGreen(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.toString(), gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
client().prepareIndex(IDX, "doc", "3").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "bar").get();
gResp1 = client().prepareGet(IDX, "doc", "3").setPreference("_primary").setFields("foo").get();
gResp2 = client().prepareGet(IDX, "doc", "4").setPreference("_primary").setFields("foo").get();
assertTrue(gResp1.isExists());
assertTrue(gResp2.isExists());
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
}
@Test
public void testIndexWithShadowReplicasCleansUp() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
int nodeCount = randomIntBetween(2, 5);
internalCluster().startNodesAsync(nodeCount, nodeSettings).get();
Path dataPath = newTempDirPath();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1, nodeCount - 1))
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
flushAndRefresh(IDX);
GetResponse gResp1 = client().prepareGet(IDX, "doc", "1").setFields("foo").get();
GetResponse gResp2 = client().prepareGet(IDX, "doc", "2").setFields("foo").get();
assertThat(gResp1.getField("foo").getValue().toString(), equalTo("bar"));
assertThat(gResp2.getField("foo").getValue().toString(), equalTo("bar"));
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, 2);
assertAcked(client().admin().indices().prepareDelete(IDX));
assertPathHasBeenCleared(dataPath);
}
/**
* Tests that shadow replicas can be "naturally" rebalanced and relocated
* around the cluster. By "naturally" I mean without using the reroute API
* @throws Exception
*/
@Test
public void testShadowReplicaNaturalRelocation() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(2, nodeSettings).get();
Path dataPath = newTempDirPath();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 10)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureGreen(IDX);
int docCount = randomIntBetween(10, 100);
List<IndexRequestBuilder> builders = newArrayList();
for (int i = 0; i < docCount; i++) {
builders.add(client().prepareIndex(IDX, "doc", i + "").setSource("foo", "bar"));
}
indexRandom(true, true, true, builders);
flushAndRefresh(IDX);
// start a third node, with 10 shards each on the other nodes, they
// should relocate some to the third node
final String node3 = internalCluster().startNode(nodeSettings);
assertBusy(new Runnable() {
@Override
public void run() {
client().admin().cluster().prepareHealth().setWaitForNodes("3").get();
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
RoutingNodes nodes = resp.getState().getRoutingNodes();
for (RoutingNode node : nodes) {
logger.info("--> node has {} shards", node.numberOfOwningShards());
assertThat("at least 5 shards on node", node.numberOfOwningShards(), greaterThanOrEqualTo(5));
}
}
});
ensureGreen(IDX);
logger.info("--> performing query");
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, docCount);
assertAcked(client().admin().indices().prepareDelete(IDX));
assertPathHasBeenCleared(dataPath);
}
@Test
public void testShadowReplicasUsingFieldData() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();
internalCluster().startNodesAsync(3, nodeSettings).get();
Path dataPath = newTempDirPath();
String IDX = "test";
Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 2)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();
prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string,index=not_analyzed").get();
ensureGreen(IDX);
client().prepareIndex(IDX, "doc", "1").setSource("foo", "foo").get();
client().prepareIndex(IDX, "doc", "2").setSource("foo", "bar").get();
client().prepareIndex(IDX, "doc", "3").setSource("foo", "baz").get();
client().prepareIndex(IDX, "doc", "4").setSource("foo", "eggplant").get();
flushAndRefresh(IDX);
SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).addFieldDataField("foo").addSort("foo", SortOrder.ASC).get();
assertHitCount(resp, 4);
assertOrderedSearchHits(resp, "2", "3", "4", "1");
SearchHit[] hits = resp.getHits().hits();
assertThat(hits[0].field("foo").getValue().toString(), equalTo("bar"));
assertThat(hits[1].field("foo").getValue().toString(), equalTo("baz"));
assertThat(hits[2].field("foo").getValue().toString(), equalTo("eggplant"));
assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo"));
}
}

View File

@ -0,0 +1,919 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import org.apache.lucene.codecs.Codec;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.*;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.indexing.slowlog.ShardSlowLogIndexingService;
import org.elasticsearch.index.mapper.ParseContext;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.internal.SourceFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.merge.policy.LogByteSizeMergePolicyProvider;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.distributor.LeastUsedDistributor;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.fs.FsTranslog;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ElasticsearchLuceneTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import static com.carrotsearch.randomizedtesting.RandomizedTest.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
import static org.elasticsearch.test.ElasticsearchTestCase.newTempDirPath;
import static org.elasticsearch.test.ElasticsearchTestCase.terminate;
import static org.hamcrest.Matchers.*;
/**
* TODO: document me!
*/
public class ShadowEngineTests extends ElasticsearchLuceneTestCase {
protected final ShardId shardId = new ShardId(new Index("index"), 1);
protected ThreadPool threadPool;
private Store store;
private Store storeReplica;
protected Translog translog;
protected Translog replicaTranslog;
protected Engine primaryEngine;
protected Engine replicaEngine;
private Settings defaultSettings;
private int indexConcurrency;
private String codecName;
private Path dirPath;
@Before
public void setUp() throws Exception {
super.setUp();
CodecService codecService = new CodecService(shardId.index());
indexConcurrency = randomIntBetween(1, 20);
String name = Codec.getDefault().getName();
if (Arrays.asList(codecService.availableCodecs()).contains(name)) {
// some codecs are read only so we only take the ones that we have in the service and randomly
// selected by lucene test case.
codecName = name;
} else {
codecName = "default";
}
defaultSettings = ImmutableSettings.builder()
.put(EngineConfig.INDEX_COMPOUND_ON_FLUSH, randomBoolean())
.put(EngineConfig.INDEX_GC_DELETES_SETTING, "1h") // make sure this doesn't kick in on us
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, randomBoolean())
.put(EngineConfig.INDEX_CODEC_SETTING, codecName)
.put(EngineConfig.INDEX_CONCURRENCY_SETTING, indexConcurrency)
.build(); // TODO randomize more settings
threadPool = new ThreadPool(getClass().getName());
dirPath = newTempDirPath(LifecycleScope.TEST);
store = createStore(dirPath);
store.deleteContent();
storeReplica = createStore(dirPath);
storeReplica.deleteContent();
translog = createTranslog();
primaryEngine = createInternalEngine(store, translog);
LiveIndexWriterConfig currentIndexWriterConfig = ((InternalEngine)primaryEngine).getCurrentIndexWriterConfig();
assertEquals(primaryEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) {
primaryEngine.config().setEnableGcDeletes(false);
}
replicaTranslog = createTranslogReplica();
replicaEngine = createShadowEngine(storeReplica, replicaTranslog);
assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
if (randomBoolean()) {
replicaEngine.config().setEnableGcDeletes(false);
}
}
@After
public void tearDown() throws Exception {
super.tearDown();
replicaEngine.close();
storeReplica.close();
primaryEngine.close();
store.close();
terminate(threadPool);
}
private ParseContext.Document testDocumentWithTextField() {
ParseContext.Document document = testDocument();
document.add(new TextField("value", "test", Field.Store.YES));
return document;
}
private ParseContext.Document testDocument() {
return new ParseContext.Document();
}
private ParsedDocument testParsedDocument(String uid, String id, String type, String routing, long timestamp, long ttl, ParseContext.Document document, BytesReference source, boolean mappingsModified) {
Field uidField = new Field("_uid", uid, UidFieldMapper.Defaults.FIELD_TYPE);
Field versionField = new NumericDocValuesField("_version", 0);
document.add(uidField);
document.add(versionField);
return new ParsedDocument(uidField, versionField, id, type, routing, timestamp, ttl, Arrays.asList(document), source, mappingsModified);
}
protected Store createStore(Path p) throws IOException {
return createStore(newMockFSDirectory(p));
}
protected Store createStore(final Directory directory) throws IOException {
final DirectoryService directoryService = new DirectoryService(shardId, EMPTY_SETTINGS) {
@Override
public Directory[] build() throws IOException {
return new Directory[]{ directory };
}
@Override
public long throttleTimeInNanos() {
return 0;
}
};
return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
}
protected Translog createTranslog() throws IOException {
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/"));
}
protected Translog createTranslogReplica() throws IOException {
return new FsTranslog(shardId, EMPTY_SETTINGS, Paths.get("work/fs-translog/"));
}
protected IndexDeletionPolicy createIndexDeletionPolicy() {
return new KeepOnlyLastDeletionPolicy(shardId, EMPTY_SETTINGS);
}
protected SnapshotDeletionPolicy createSnapshotDeletionPolicy() {
return new SnapshotDeletionPolicy(createIndexDeletionPolicy());
}
protected MergePolicyProvider<?> createMergePolicy() {
return new LogByteSizeMergePolicyProvider(store, new IndexSettingsService(new Index("test"), EMPTY_SETTINGS));
}
protected MergeSchedulerProvider createMergeScheduler(IndexSettingsService indexSettingsService) {
return new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, indexSettingsService);
}
protected ShadowEngine createShadowEngine(Store store, Translog translog) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createShadowEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService));
}
protected InternalEngine createInternalEngine(Store store, Translog translog) {
IndexSettingsService indexSettingsService = new IndexSettingsService(shardId.index(), ImmutableSettings.builder().put(defaultSettings).put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build());
return createInternalEngine(indexSettingsService, store, translog, createMergeScheduler(indexSettingsService));
}
protected ShadowEngine createShadowEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new ShadowEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider));
}
protected InternalEngine createInternalEngine(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
return new InternalEngine(config(indexSettingsService, store, translog, mergeSchedulerProvider));
}
public EngineConfig config(IndexSettingsService indexSettingsService, Store store, Translog translog, MergeSchedulerProvider mergeSchedulerProvider) {
IndexWriterConfig iwc = newIndexWriterConfig();
EngineConfig config = new EngineConfig(shardId, false/*per default optimization for auto generated ids is disabled*/, threadPool, new ShardIndexingService(shardId, EMPTY_SETTINGS, new ShardSlowLogIndexingService(shardId, EMPTY_SETTINGS, indexSettingsService)), indexSettingsService
, null, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), mergeSchedulerProvider,
iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
});
return config;
}
protected Term newUid(String id) {
return new Term("_uid", id);
}
protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
@Test
public void testSegments() throws Exception {
List<Segment> segments = primaryEngine.segments(false);
assertThat(segments.isEmpty(), equalTo(true));
assertThat(primaryEngine.segmentsStats().getCount(), equalTo(0l));
assertThat(primaryEngine.segmentsStats().getMemoryInBytes(), equalTo(0l));
final boolean defaultCompound = defaultSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, true);
// create a doc and refresh
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
primaryEngine.create(new Engine.Create(null, newUid("2"), doc2));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
assertThat(segments.size(), equalTo(1));
SegmentsStats stats = primaryEngine.segmentsStats();
assertThat(stats.getCount(), equalTo(1l));
assertThat(stats.getTermsMemoryInBytes(), greaterThan(0l));
assertThat(stats.getStoredFieldsMemoryInBytes(), greaterThan(0l));
assertThat(stats.getTermVectorsMemoryInBytes(), equalTo(0l));
assertThat(stats.getNormsMemoryInBytes(), greaterThan(0l));
assertThat(stats.getDocValuesMemoryInBytes(), greaterThan(0l));
assertThat(segments.get(0).isCommitted(), equalTo(false));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(0).ramTree, nullValue());
// Check that the replica sees nothing
segments = replicaEngine.segments(false);
assertThat(segments.size(), equalTo(0));
stats = replicaEngine.segmentsStats();
assertThat(stats.getCount(), equalTo(0l));
assertThat(stats.getTermsMemoryInBytes(), equalTo(0l));
assertThat(stats.getStoredFieldsMemoryInBytes(), equalTo(0l));
assertThat(stats.getTermVectorsMemoryInBytes(), equalTo(0l));
assertThat(stats.getNormsMemoryInBytes(), equalTo(0l));
assertThat(stats.getDocValuesMemoryInBytes(), equalTo(0l));
assertThat(segments.size(), equalTo(0));
// flush the primary engine
primaryEngine.flush();
// refresh the replica
replicaEngine.refresh("tests");
// Check that the primary AND replica sees segments now
segments = primaryEngine.segments(false);
assertThat(segments.size(), equalTo(1));
assertThat(primaryEngine.segmentsStats().getCount(), equalTo(1l));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
segments = replicaEngine.segments(false);
assertThat(segments.size(), equalTo(1));
assertThat(replicaEngine.segmentsStats().getCount(), equalTo(1l));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
primaryEngine.config().setCompoundOnFlush(false);
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
primaryEngine.create(new Engine.Create(null, newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
assertThat(segments.size(), equalTo(2));
assertThat(primaryEngine.segmentsStats().getCount(), equalTo(2l));
assertThat(primaryEngine.segmentsStats().getTermsMemoryInBytes(), greaterThan(stats.getTermsMemoryInBytes()));
assertThat(primaryEngine.segmentsStats().getStoredFieldsMemoryInBytes(), greaterThan(stats.getStoredFieldsMemoryInBytes()));
assertThat(primaryEngine.segmentsStats().getTermVectorsMemoryInBytes(), equalTo(0l));
assertThat(primaryEngine.segmentsStats().getNormsMemoryInBytes(), greaterThan(stats.getNormsMemoryInBytes()));
assertThat(primaryEngine.segmentsStats().getDocValuesMemoryInBytes(), greaterThan(stats.getDocValuesMemoryInBytes()));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(false));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
// Make visible to shadow replica
primaryEngine.flush();
replicaEngine.refresh("test");
segments = replicaEngine.segments(false);
assertThat(segments.size(), equalTo(2));
assertThat(replicaEngine.segmentsStats().getCount(), equalTo(2l));
assertThat(replicaEngine.segmentsStats().getTermsMemoryInBytes(), greaterThan(stats.getTermsMemoryInBytes()));
assertThat(replicaEngine.segmentsStats().getStoredFieldsMemoryInBytes(), greaterThan(stats.getStoredFieldsMemoryInBytes()));
assertThat(replicaEngine.segmentsStats().getTermVectorsMemoryInBytes(), equalTo(0l));
assertThat(replicaEngine.segmentsStats().getNormsMemoryInBytes(), greaterThan(stats.getNormsMemoryInBytes()));
assertThat(replicaEngine.segmentsStats().getDocValuesMemoryInBytes(), greaterThan(stats.getDocValuesMemoryInBytes()));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
assertThat(segments.get(0).getDeletedDocs(), equalTo(0));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(true));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
assertThat(segments.size(), equalTo(2));
assertThat(primaryEngine.segmentsStats().getCount(), equalTo(2l));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(1));
assertThat(segments.get(0).getDeletedDocs(), equalTo(1));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(true));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
// Make visible to shadow replica
primaryEngine.flush();
replicaEngine.refresh("test");
primaryEngine.config().setCompoundOnFlush(true);
ParsedDocument doc4 = testParsedDocument("4", "4", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
primaryEngine.create(new Engine.Create(null, newUid("4"), doc4));
primaryEngine.refresh("test");
segments = primaryEngine.segments(false);
assertThat(segments.size(), equalTo(3));
assertThat(primaryEngine.segmentsStats().getCount(), equalTo(3l));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(1));
assertThat(segments.get(0).getDeletedDocs(), equalTo(1));
assertThat(segments.get(0).isCompound(), equalTo(defaultCompound));
assertThat(segments.get(1).isCommitted(), equalTo(true));
assertThat(segments.get(1).isSearch(), equalTo(true));
assertThat(segments.get(1).getNumDocs(), equalTo(1));
assertThat(segments.get(1).getDeletedDocs(), equalTo(0));
assertThat(segments.get(1).isCompound(), equalTo(false));
assertThat(segments.get(2).isCommitted(), equalTo(false));
assertThat(segments.get(2).isSearch(), equalTo(true));
assertThat(segments.get(2).getNumDocs(), equalTo(1));
assertThat(segments.get(2).getDeletedDocs(), equalTo(0));
assertThat(segments.get(2).isCompound(), equalTo(true));
}
@Test
public void testVerboseSegments() throws Exception {
List<Segment> segments = primaryEngine.segments(true);
assertThat(segments.isEmpty(), equalTo(true));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
assertThat(segments.size(), equalTo(1));
assertThat(segments.get(0).ramTree, notNullValue());
ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, false);
primaryEngine.create(new Engine.Create(null, newUid("2"), doc2));
primaryEngine.refresh("test");
ParsedDocument doc3 = testParsedDocument("3", "3", "test", null, -1, -1, testDocumentWithTextField(), B_3, false);
primaryEngine.create(new Engine.Create(null, newUid("3"), doc3));
primaryEngine.refresh("test");
segments = primaryEngine.segments(true);
assertThat(segments.size(), equalTo(3));
assertThat(segments.get(0).ramTree, notNullValue());
assertThat(segments.get(1).ramTree, notNullValue());
assertThat(segments.get(2).ramTree, notNullValue());
// Now make the changes visible to the replica
primaryEngine.flush();
replicaEngine.refresh("test");
segments = replicaEngine.segments(true);
assertThat(segments.size(), equalTo(3));
assertThat(segments.get(0).ramTree, notNullValue());
assertThat(segments.get(1).ramTree, notNullValue());
assertThat(segments.get(2).ramTree, notNullValue());
}
@Test
public void testShadowEngineIgnoresWriteOperations() throws Exception {
// create a document
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
try {
replicaEngine.create(new Engine.Create(null, newUid("1"), doc));
fail("should have thrown an exception");
} catch (UnsupportedOperationException e) {}
replicaEngine.refresh("test");
// its not there...
Engine.Searcher searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
Engine.GetResult getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// index a document
document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
try {
replicaEngine.index(new Engine.Index(null, newUid("1"), doc));
fail("should have thrown an exception");
} catch (UnsupportedOperationException e) {}
replicaEngine.refresh("test");
// its still not there...
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// Now, add a document to the primary so we can test shadow engine deletes
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
primaryEngine.flush();
replicaEngine.refresh("test");
// Now the replica can see it
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
// And the replica can retrieve it
getResult = replicaEngine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// try to delete it on the replica
try {
replicaEngine.delete(new Engine.Delete("test", "1", newUid("1")));
fail("should have thrown an exception");
} catch (UnsupportedOperationException e) {}
replicaEngine.flush();
replicaEngine.refresh("test");
primaryEngine.refresh("test");
// it's still there!
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
getResult = replicaEngine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// it's still there on the primary also!
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
}
@Test
public void testSimpleOperations() throws Exception {
Engine.Searcher searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
// create a document
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
// not on the replica either...
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
// but, we can still get it (in realtime)
Engine.GetResult getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source().source.toBytesArray(), equalTo(B_1.toBytesArray()));
assertThat(getResult.docIdAndVersion(), nullValue());
getResult.release();
// can't get it from the replica, because it's not in the translog for a shadow replica
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// but, not there non realtime
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// refresh and it should be there
primaryEngine.refresh("test");
// now its there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
// also in non realtime
getResult = primaryEngine.get(new Engine.Get(false, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// still not in the replica because no flush
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
// now do an update
document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
document.add(new Field(SourceFieldMapper.NAME, B_2.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_2, false);
primaryEngine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
// but, we can still get it (in realtime)
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source().source.toBytesArray(), equalTo(B_2.toBytesArray()));
assertThat(getResult.docIdAndVersion(), nullValue());
getResult.release();
// refresh and it should be updated
primaryEngine.refresh("test");
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.close();
// flush, now shadow replica should have the files
primaryEngine.flush();
// still not in the replica because the replica hasn't refreshed
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
replicaEngine.refresh("test");
// the replica finally sees it because primary has flushed and replica refreshed
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.close();
// now delete
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
// its not deleted yet
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.close();
// but, get should not see it (in realtime)
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(false));
getResult.release();
// refresh and it should be deleted
primaryEngine.refresh("test");
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
// add it back
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, B_1.toBytes(), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
// refresh and it should be there
primaryEngine.refresh("test");
// now its there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
// now flush
primaryEngine.flush();
// and, verify get (in real time)
getResult = primaryEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), nullValue());
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// the replica should see it if we refresh too!
replicaEngine.refresh("test");
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
getResult = replicaEngine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), nullValue());
assertThat(getResult.docIdAndVersion(), notNullValue());
getResult.release();
// make sure we can still work with the engine
// now do an update
document = testDocument();
document.add(new TextField("value", "test1", Field.Store.YES));
doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_1, false);
primaryEngine.index(new Engine.Index(null, newUid("1"), doc));
// its not updated yet...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 0));
searchResult.close();
// refresh and it should be updated
primaryEngine.refresh("test");
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.close();
// Make visible to shadow replica
primaryEngine.flush();
replicaEngine.refresh("test");
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test1")), 1));
searchResult.close();
}
@Test
public void testSearchResultRelease() throws Exception {
Engine.Searcher searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
// create a document
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
// its not there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0));
searchResult.close();
// flush & refresh and it should everywhere
primaryEngine.flush();
primaryEngine.refresh("test");
replicaEngine.refresh("test");
// now its there...
searchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
// don't release the replica search result yet...
// delete, refresh and do a new search, it should not be there
primaryEngine.delete(new Engine.Delete("test", "1", newUid("1")));
primaryEngine.flush();
primaryEngine.refresh("test");
replicaEngine.refresh("test");
Engine.Searcher updateSearchResult = primaryEngine.acquireSearcher("test");
MatcherAssert.assertThat(updateSearchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
updateSearchResult.close();
// the non released replica search result should not see the deleted yet...
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
}
@Test
public void testFailEngineOnCorruption() {
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
primaryEngine.flush();
MockDirectoryWrapper leaf = DirectoryUtils.getLeaf(replicaEngine.config().getStore().directory(), MockDirectoryWrapper.class);
leaf.setRandomIOExceptionRate(1.0);
leaf.setRandomIOExceptionRateOnOpen(1.0);
try {
replicaEngine.refresh("foo");
fail("exception expected");
} catch (Exception ex) {
}
try {
Engine.Searcher searchResult = replicaEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(1));
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 1));
searchResult.close();
fail("exception expected");
} catch (EngineClosedException ex) {
// all is well
}
}
@Test
public void testExtractShardId() {
try (Engine.Searcher test = replicaEngine.acquireSearcher("test")) {
ShardId shardId = ShardUtils.extractShardId(test.reader());
assertNotNull(shardId);
assertEquals(shardId, replicaEngine.config().getShardId());
}
}
/**
* Random test that throws random exception and ensures all references are
* counted down / released and resources are closed.
*/
@Test
public void testFailStart() throws IOException {
// Need a commit point for this
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, false);
primaryEngine.create(new Engine.Create(null, newUid("1"), doc));
primaryEngine.flush();
// this test fails if any reader, searcher or directory is not closed - MDW FTW
final int iters = scaledRandomIntBetween(10, 100);
for (int i = 0; i < iters; i++) {
MockDirectoryWrapper wrapper = newMockFSDirectory(dirPath);
wrapper.setFailOnOpenInput(randomBoolean());
wrapper.setAllowRandomFileNotFoundException(randomBoolean());
wrapper.setRandomIOExceptionRate(randomDouble());
wrapper.setRandomIOExceptionRateOnOpen(randomDouble());
try (Store store = createStore(wrapper)) {
int refCount = store.refCount();
assertTrue("refCount: "+ store.refCount(), store.refCount() > 0);
Translog translog = createTranslog();
ShadowEngine holder;
try {
holder = createShadowEngine(store, translog);
} catch (EngineCreationFailureException ex) {
assertEquals(store.refCount(), refCount);
continue;
}
holder.config().setFailEngineOnCorruption(true);
assertEquals(store.refCount(), refCount+1);
final int numStarts = scaledRandomIntBetween(1, 5);
for (int j = 0; j < numStarts; j++) {
try {
assertEquals(store.refCount(), refCount + 1);
holder.close();
holder = createShadowEngine(store, translog);
holder.config().setFailEngineOnCorruption(true);
assertEquals(store.refCount(), refCount + 1);
} catch (EngineCreationFailureException ex) {
// all is fine
assertEquals(store.refCount(), refCount);
break;
}
}
translog.close();
holder.close();
assertEquals(store.refCount(), refCount);
}
}
}
@Test
public void testSettings() {
CodecService codecService = new CodecService(shardId.index());
assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
assertEquals(replicaEngine.config().getIndexConcurrency(), indexConcurrency);
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
/** Unit test(s) for IndexShardModule */
public class IndexShardModuleTests extends ElasticsearchTestCase {
@Test
public void testDetermineShadowEngineShouldBeUsed() {
ShardId shardId = new ShardId("myindex", 0);
Settings regularSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
Settings shadowSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.build();
IndexShardModule ism1 = new IndexShardModule(shardId, true, regularSettings);
IndexShardModule ism2 = new IndexShardModule(shardId, false, regularSettings);
IndexShardModule ism3 = new IndexShardModule(shardId, true, shadowSettings);
IndexShardModule ism4 = new IndexShardModule(shardId, false, shadowSettings);
assertFalse("no shadow replicas for normal settings", ism1.useShadowEngine());
assertFalse("no shadow replicas for normal settings", ism2.useShadowEngine());
assertFalse("no shadow replicas for primary shard with shadow settings", ism3.useShadowEngine());
assertTrue("shadow replicas for replica shards with shadow settings", ism4.useShadowEngine());
}
}

View File

@ -43,7 +43,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(dir) {}, FSDirectory.class, null);
assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir)));
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close();
}
@ -51,7 +51,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(dir, FSDirectory.class, null);
assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir)));
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close();
}
@ -60,7 +60,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean()), FSDirectory.class, null);
assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir)));
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close();
}
@ -69,7 +69,7 @@ public class DirectoryUtilsTest extends ElasticsearchLuceneTestCase {
BaseDirectoryWrapper dir = newFSDirectory(file);
FSDirectory directory = DirectoryUtils.getLeaf(new FilterDirectory(new FileSwitchDirectory(stringSet, dir, dir, random().nextBoolean())) {}, FSDirectory.class, null);
assertThat(directory, notNullValue());
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir)));
assertThat(directory, sameInstance(DirectoryUtils.getLeafDirectory(dir, null)));
dir.close();
}

View File

@ -137,23 +137,4 @@ public class IndicesCustomDataPathTests extends ElasticsearchIntegrationTest {
assertAcked(client().admin().indices().prepareDelete(INDEX));
assertPathHasBeenCleared(path);
}
private void assertPathHasBeenCleared(String path) throws Exception {
int count = 0;
StringBuilder sb = new StringBuilder();
sb.append("[");
if (Files.exists(Paths.get(path))) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(path))) {
for (Path file : stream) {
if (Files.isRegularFile(file)) {
count++;
sb.append(file.toAbsolutePath().toString());
sb.append("\n");
}
}
}
}
sb.append("]");
assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0));
}
}

View File

@ -125,8 +125,10 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
@ -1790,6 +1792,39 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
return nodes;
}
/**
* Asserts that there are no files in the specified path
*/
public void assertPathHasBeenCleared(String path) throws Exception {
assertPathHasBeenCleared(Paths.get(path));
}
/**
* Asserts that there are no files in the specified path
*/
public void assertPathHasBeenCleared(Path path) throws Exception {
logger.info("--> checking that [{}] has been cleared", path);
int count = 0;
StringBuilder sb = new StringBuilder();
sb.append("[");
if (Files.exists(path)) {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
for (Path file : stream) {
logger.info("--> found file: [{}]", file.toAbsolutePath().toString());
if (Files.isDirectory(file)) {
assertPathHasBeenCleared(file);
} else if (Files.isRegularFile(file)) {
count++;
sb.append(file.toAbsolutePath().toString());
sb.append("\n");
}
}
}
}
sb.append("]");
assertThat(count + " files exist that should have been cleaned:\n" + sb.toString(), count, equalTo(0));
}
protected static class NumShards {
public final int numPrimaries;
public final int numReplicas;

View File

@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.engine;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.ShardId;
import java.util.Map;
/**
* A searcher that asserts the IndexReader's refcount on close
*/
public class AssertingSearcher extends Engine.Searcher {
private final Engine.Searcher wrappedSearcher;
private final ShardId shardId;
private final IndexSearcher indexSearcher;
private RuntimeException firstReleaseStack;
private final Object lock = new Object();
private final int initialRefCount;
private final ESLogger logger;
private final Map<AssertingSearcher, RuntimeException> inFlightSearchers;
public AssertingSearcher(IndexSearcher indexSearcher, Engine.Searcher wrappedSearcher,
ShardId shardId, Map<AssertingSearcher, RuntimeException> inFlightSearchers,
ESLogger logger) {
super(wrappedSearcher.source(), indexSearcher);
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
// with a wrapped reader.
this.wrappedSearcher = wrappedSearcher;
this.logger = logger;
this.shardId = shardId;
initialRefCount = wrappedSearcher.reader().getRefCount();
this.indexSearcher = indexSearcher;
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
this.inFlightSearchers = inFlightSearchers;
this.inFlightSearchers.put(this, new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]"));
}
@Override
public String source() {
return wrappedSearcher.source();
}
@Override
public void close() throws ElasticsearchException {
RuntimeException remove = inFlightSearchers.remove(this);
synchronized (lock) {
// make sure we only get this once and store the stack of the first caller!
if (remove == null) {
assert firstReleaseStack != null;
AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]");
error.initCause(firstReleaseStack);
throw error;
} else {
assert firstReleaseStack == null;
firstReleaseStack = new RuntimeException("Searcher Released first here, source [" + wrappedSearcher.source() + "]");
}
}
final int refCount = wrappedSearcher.reader().getRefCount();
// this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential
// problems.
assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]";
try {
wrappedSearcher.close();
} catch (RuntimeException ex) {
logger.debug("Failed to release searcher", ex);
throw ex;
}
}
@Override
public IndexReader reader() {
return indexSearcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return indexSearcher;
}
public ShardId shardId() {
return shardId;
}
}

View File

@ -27,7 +27,12 @@ import org.elasticsearch.index.engine.EngineFactory;
*/
public final class MockEngineFactory implements EngineFactory {
@Override
public Engine newEngine(EngineConfig config) {
public Engine newReadWriteEngine(EngineConfig config) {
return new MockInternalEngine(config);
}
@Override
public Engine newReadOnlyEngine(EngineConfig config) {
return new MockShadowEngine(config);
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
@ -96,8 +95,9 @@ public class MockInternalEngine extends InternalEngine {
} finally {
if (logger.isTraceEnabled()) {
// log debug if we have pending searchers
for (Map.Entry<MockInternalEngine.AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId);
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
logger.trace("Unreleased Searchers instance for shard [{}]",
entry.getValue(), entry.getKey().shardId());
}
}
}
@ -131,7 +131,8 @@ public class MockInternalEngine extends InternalEngine {
// pass the original searcher to the super.newSearcher() method to make sure this is the searcher that will
// be released later on. If we wrap an index reader here must not pass the wrapped version to the manager
// on release otherwise the reader will be closed too early. - good news, stuff will fail all over the place if we don't get this right here
return new AssertingSearcher(assertingIndexSearcher, super.newSearcher(source, searcher, manager), shardId);
return new AssertingSearcher(assertingIndexSearcher,
super.newSearcher(source, searcher, manager), shardId, INFLIGHT_ENGINE_SEARCHERS, logger);
}
private DirectoryReader wrapReader(DirectoryReader reader) {
@ -158,73 +159,6 @@ public class MockInternalEngine extends InternalEngine {
return reader;
}
public final class AssertingSearcher extends Searcher {
private final Searcher wrappedSearcher;
private final ShardId shardId;
private final IndexSearcher indexSearcher;
private RuntimeException firstReleaseStack;
private final Object lock = new Object();
private final int initialRefCount;
public AssertingSearcher(IndexSearcher indexSearcher, Searcher wrappedSearcher, ShardId shardId) {
super(wrappedSearcher.source(), indexSearcher);
// we only use the given index searcher here instead of the IS of the wrapped searcher. the IS might be a wrapped searcher
// with a wrapped reader.
this.wrappedSearcher = wrappedSearcher;
this.shardId = shardId;
initialRefCount = wrappedSearcher.reader().getRefCount();
this.indexSearcher = indexSearcher;
assert initialRefCount > 0 : "IndexReader#getRefCount() was [" + initialRefCount + "] expected a value > [0] - reader is already closed";
INFLIGHT_ENGINE_SEARCHERS.put(this, new RuntimeException("Unreleased Searcher, source [" + wrappedSearcher.source() + "]"));
}
@Override
public String source() {
return wrappedSearcher.source();
}
@Override
public void close() throws ElasticsearchException {
RuntimeException remove = INFLIGHT_ENGINE_SEARCHERS.remove(this);
synchronized (lock) {
// make sure we only get this once and store the stack of the first caller!
if (remove == null) {
assert firstReleaseStack != null;
AssertionError error = new AssertionError("Released Searcher more than once, source [" + wrappedSearcher.source() + "]");
error.initCause(firstReleaseStack);
throw error;
} else {
assert firstReleaseStack == null;
firstReleaseStack = new RuntimeException("Searcher Released first here, source [" + wrappedSearcher.source() + "]");
}
}
final int refCount = wrappedSearcher.reader().getRefCount();
// this assert seems to be paranoid but given LUCENE-5362 we better add some assertions here to make sure we catch any potential
// problems.
assert refCount > 0 : "IndexReader#getRefCount() was [" + refCount + "] expected a value > [0] - reader is already closed. Initial refCount was: [" + initialRefCount + "]";
try {
wrappedSearcher.close();
} catch (RuntimeException ex) {
logger.debug("Failed to release searcher", ex);
throw ex;
}
}
@Override
public IndexReader reader() {
return indexSearcher.getIndexReader();
}
@Override
public IndexSearcher searcher() {
return indexSearcher;
}
public ShardId shardId() {
return shardId;
}
}
public static abstract class DirectoryReaderWrapper extends FilterDirectoryReader {
protected final SubReaderWrapper subReaderWrapper;

View File

@ -0,0 +1,120 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.engine;
import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.AssertingIndexSearcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.ShadowEngine;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class MockShadowEngine extends ShadowEngine {
private final MockInternalEngine.MockContext mockContext;
public static final ConcurrentMap<AssertingSearcher, RuntimeException> INFLIGHT_ENGINE_SEARCHERS = new ConcurrentHashMap<>();
public MockShadowEngine(EngineConfig config) {
super(config);
Settings indexSettings = config.getIndexSettings();
final long seed = indexSettings.getAsLong(ElasticsearchIntegrationTest.SETTING_INDEX_SEED, 0l);
Random random = new Random(seed);
final double ratio = indexSettings.getAsDouble(MockInternalEngine.WRAP_READER_RATIO, 0.0d); // DISABLED by default - AssertingDR is crazy slow
Class<? extends AssertingDirectoryReader> wrapper = indexSettings.getAsClass(MockInternalEngine.READER_WRAPPER_TYPE, AssertingDirectoryReader.class);
boolean wrapReader = random.nextDouble() < ratio;
logger.trace("Using [{}] for shard [{}] seed: [{}] wrapReader: [{}]", this.getClass().getName(), shardId, seed, wrapReader);
mockContext = new MockInternalEngine.MockContext(random, wrapReader, wrapper, indexSettings);
}
@Override
public void close() throws IOException {
try {
super.close();
} finally {
if (logger.isTraceEnabled()) {
// log debug if we have pending searchers
for (Map.Entry<AssertingSearcher, RuntimeException> entry : INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
logger.trace("Unreleased Searchers instance for shard [{}]", entry.getValue(), entry.getKey().shardId());
}
}
}
}
@Override
protected Searcher newSearcher(String source, IndexSearcher searcher, SearcherManager manager) throws EngineException {
IndexReader reader = searcher.getIndexReader();
IndexReader wrappedReader = reader;
if (reader instanceof DirectoryReader && mockContext.wrapReader) {
wrappedReader = wrapReader((DirectoryReader) reader);
}
// this executes basic query checks and asserts that weights are normalized only once etc.
final AssertingIndexSearcher assertingIndexSearcher = new AssertingIndexSearcher(mockContext.random, wrappedReader);
assertingIndexSearcher.setSimilarity(searcher.getSimilarity());
// pass the original searcher to the super.newSearcher() method to make
// sure this is the searcher that will be released later on. If we wrap
// an index reader here must not pass the wrapped version to the manager
// on release otherwise the reader will be closed too early. - good
// news, stuff will fail all over the place if we don't get this
// right here
return new AssertingSearcher(assertingIndexSearcher,
super.newSearcher(source, searcher, manager), shardId,
INFLIGHT_ENGINE_SEARCHERS, logger);
}
private DirectoryReader wrapReader(DirectoryReader reader) {
try {
Constructor<?>[] constructors = mockContext.wrapper.getConstructors();
Constructor<?> nonRandom = null;
for (Constructor<?> constructor : constructors) {
Class<?>[] parameterTypes = constructor.getParameterTypes();
if (parameterTypes.length > 0 && parameterTypes[0] == DirectoryReader.class) {
if (parameterTypes.length == 1) {
nonRandom = constructor;
} else if (parameterTypes.length == 2 && parameterTypes[1] == Settings.class) {
return (DirectoryReader) constructor.newInstance(reader, mockContext.indexSettings);
}
}
}
if (nonRandom != null) {
return (DirectoryReader) nonRandom.newInstance(reader);
}
} catch (Exception e) {
throw new ElasticsearchException("Can not wrap reader", e);
}
return reader;
}
}

View File

@ -23,7 +23,6 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.Iterables;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticsearchException;
@ -65,7 +64,9 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.test.engine.AssertingSearcher;
import org.elasticsearch.test.engine.MockInternalEngine;
import org.elasticsearch.test.engine.MockShadowEngine;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
@ -643,20 +644,26 @@ public class ElasticsearchAssertions {
try {
if (awaitBusy(new Predicate<Object>() {
public boolean apply(Object o) {
return MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty();
return MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() &&
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty();
}
}, 5, TimeUnit.SECONDS)) {
return;
}
} catch (InterruptedException ex) {
if (MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) {
if (MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty() &&
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.isEmpty()) {
return;
}
}
try {
RuntimeException ex = null;
StringBuilder builder = new StringBuilder("Unclosed Searchers instance for shards: [");
for (Map.Entry<MockInternalEngine.AssertingSearcher, RuntimeException> entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
ex = entry.getValue();
builder.append(entry.getKey().shardId()).append(",");
}
for (Map.Entry<AssertingSearcher, RuntimeException> entry : MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.entrySet()) {
ex = entry.getValue();
builder.append(entry.getKey().shardId()).append(",");
}
@ -664,6 +671,7 @@ public class ElasticsearchAssertions {
throw new RuntimeException(builder.toString(), ex);
} finally {
MockInternalEngine.INFLIGHT_ENGINE_SEARCHERS.clear();
MockShadowEngine.INFLIGHT_ENGINE_SEARCHERS.clear();
}
}

View File

@ -26,12 +26,16 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.InternalEngine;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.IndexShardException;
import org.elasticsearch.index.shard.IndexShardState;
@ -82,13 +86,13 @@ public class MockFSDirectoryService extends FsDirectoryService {
@IndexSettings Settings indexSettings) {
if (indexShard != null && shardId.equals(sid)) {
logger.info("Shard state before potentially flushing is {}", indexShard.state());
if (validCheckIndexStates.contains(indexShard.state())) {
canRun = true;
if (validCheckIndexStates.contains(indexShard.state()) && indexShard.engine() instanceof InternalEngine) {
// When the the internal engine closes we do a rollback, which removes uncommitted segments
// By doing a commit flush we perform a Lucene commit, but don't clear the translog,
// so that even in tests where don't flush we can check the integrity of the Lucene index
indexShard.engine().snapshotIndex(); // Keep translog for tests that rely on replaying it
Releasables.close(indexShard.engine().snapshotIndex()); // Keep translog for tests that rely on replaying it
logger.info("flush finished in beforeIndexShardClosed");
canRun = true;
}
}
}