From 31652a8b3d463b30164c2c2f4b130a50ec6373cf Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Tue, 30 Dec 2014 14:39:05 +0100 Subject: [PATCH] Fix TransportNodesListShardStoreMetaData for custom data paths Cleans up the testReusePeerRecovery test as well The actual fix is in TransportNodesListShardStoreMetaData.java, which needs to use `nodeEnv.shardDataPaths` instead of `nodeEnv.shardPaths`. Due to the difficulty in tracking this down, I've added a lot of additional logging. This also fixes a logging issue in GatewayAllocator --- .../gateway/GatewayAllocator.java | 6 ++- .../gateway/MetaDataStateFormat.java | 9 ++++- .../TransportNodesListShardStoreMetaData.java | 2 +- .../gateway/RecoveryFromGatewayTests.java | 37 +++++++++++-------- 4 files changed, 35 insertions(+), 19 deletions(-) diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index d5b58663189..3b3b2ee9de5 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -330,6 +330,8 @@ public class GatewayAllocator extends AbstractComponent { sizeMatched += storeFileMetaData.length(); } } + logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data", + shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched); if (sizeMatched > lastSizeMatched) { lastSizeMatched = sizeMatched; lastDiscoNodeMatched = discoNode; @@ -345,7 +347,7 @@ public class GatewayAllocator extends AbstractComponent { // we only check on THROTTLE since we checked before before on NO Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation); if (decision.type() == Decision.Type.THROTTLE) { - if (logger.isTraceEnabled()) { + if (logger.isDebugEnabled()) { logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched)); } // we are throttling this, but we have enough to allocate to this node, ignore it for now @@ -411,6 +413,8 @@ public class GatewayAllocator extends AbstractComponent { for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) { // -1 version means it does not exists, which is what the API returns, and what we expect to + logger.trace("[{}] on node [{}] has version [{}] of shard", + shard, nodeShardState.getNode(), nodeShardState.version()); shardStates.put(nodeShardState.getNode(), nodeShardState.version()); } return shardStates; diff --git a/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java b/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java index 83a0440a6c4..03cae56c9d2 100644 --- a/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java +++ b/src/main/java/org/elasticsearch/gateway/MetaDataStateFormat.java @@ -244,7 +244,9 @@ public abstract class MetaDataStateFormat { maxVersion = Math.max(maxVersion, version); final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false; maxVersionIsLegacy &= legacy; // on purpose, see NOTE below - files.add(new PathAndVersion(stateFile, version, legacy)); + PathAndVersion pav = new PathAndVersion(stateFile, version, legacy); + logger.trace("found state file: {}", pav); + files.add(pav); } } } @@ -275,6 +277,7 @@ public abstract class MetaDataStateFormat { } } else { state = format.read(stateFile, version); + logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName()); } return state; } catch (Throwable e) { @@ -324,6 +327,10 @@ public abstract class MetaDataStateFormat { this.version = version; this.legacy = legacy; } + + public String toString() { + return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]"; + } } /** diff --git a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 82c17fa7c1e..85ea347d480 100644 --- a/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -165,7 +165,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesOperatio if (!storeType.contains("fs")) { return new StoreFilesMetaData(false, shardId, ImmutableMap.of()); } - Path[] shardLocations = nodeEnv.shardPaths(shardId); + Path[] shardLocations = nodeEnv.shardDataPaths(shardId, metaData.settings()); Path[] shardIndexLocations = new Path[shardLocations.length]; for (int i = 0; i < shardLocations.length; i++) { shardIndexLocations[i] = shardLocations[i].resolve("index"); diff --git a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java index 3263dd5f8d6..d05f686b261 100644 --- a/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java +++ b/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayTests.java @@ -354,7 +354,8 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { // prevent any rebalance actions during the peer recovery // if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if // we reuse the files on disk after full restarts for replicas. - assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE))); + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()))); + ensureGreen(); logger.info("--> indexing docs"); for (int i = 0; i < 1000; i++) { client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet(); @@ -368,44 +369,48 @@ public class RecoveryFromGatewayTests extends ElasticsearchIntegrationTest { logger.info("Running Cluster Health"); ensureGreen(); client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges - client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).execute().actionGet(); + client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get(); - logger.info("--> shutting down the nodes"); + logger.info("--> disabling allocation while the cluster is shut down"); // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings() .setTransientSettings(settingsBuilder() .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) .get(); + logger.info("--> full cluster restart"); internalCluster().fullRestart(); - logger.info("Running Cluster Health"); + logger.info("--> waiting for cluster to return to green after first shutdown"); ensureGreen(); - logger.info("--> shutting down the nodes"); + + logger.info("--> disabling allocation while the cluster is shut down second time"); // Disable allocations while we are closing nodes client().admin().cluster().prepareUpdateSettings() .setTransientSettings(settingsBuilder() .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE)) .get(); + logger.info("--> full cluster restart"); internalCluster().fullRestart(); - - logger.info("Running Cluster Health"); + logger.info("--> waiting for cluster to return to green after second shutdown"); ensureGreen(); RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); - for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) { RecoveryState recoveryState = response.recoveryState(); if (!recoveryState.getPrimary()) { - logger.info("--> shard {}, recovered {}, reuse {}", response.getShardId(), recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount()); - assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(0l)); - assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(0l)); - assertThat(recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount())); - assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0)); - assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount())); - assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0)); - assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes())); + logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}", + response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(), + recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount()); + assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredByteCount(), equalTo(0l)); + assertThat("data should have been reused", recoveryState.getIndex().reusedByteCount(), greaterThan(0l)); + assertThat("all bytes should be reused", recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount())); + assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0)); + assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount())); + assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0)); + assertThat("all bytes should be reused bytes", + recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes())); } else { assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(recoveryState.getIndex().reusedByteCount())); assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(recoveryState.getIndex().reusedFileCount()));