diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java index 5c3858cc017..d6010c95469 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/GatewaySnapshotStatus.java @@ -72,11 +72,14 @@ public class GatewaySnapshotStatus { final long indexSize; - public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize) { + final int expectedNumberOfOperations; + + public GatewaySnapshotStatus(Stage stage, long startTime, long time, long indexSize, int expectedNumberOfOperations) { this.stage = stage; this.startTime = startTime; this.time = time; this.indexSize = indexSize; + this.expectedNumberOfOperations = expectedNumberOfOperations; } public Stage stage() { @@ -110,4 +113,12 @@ public class GatewaySnapshotStatus { public ByteSizeValue getIndexSize() { return indexSize(); } + + public int expectedNumberOfOperations() { + return expectedNumberOfOperations; + } + + public int getExpectedNumberOfOperations() { + return expectedNumberOfOperations(); + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java index 67096da83f5..21be6f876a5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java @@ -254,6 +254,7 @@ public class ShardStatus extends BroadcastShardOperationResponse { out.writeVLong(gatewaySnapshotStatus.startTime); out.writeVLong(gatewaySnapshotStatus.time); out.writeVLong(gatewaySnapshotStatus.indexSize); + out.writeVInt(gatewaySnapshotStatus.expectedNumberOfOperations()); } } @@ -284,7 +285,7 @@ public class ShardStatus extends BroadcastShardOperationResponse { if (in.readBoolean()) { gatewaySnapshotStatus = new GatewaySnapshotStatus(GatewaySnapshotStatus.Stage.fromValue(in.readByte()), - in.readVLong(), in.readVLong(), in.readVLong()); + in.readVLong(), in.readVLong(), in.readVLong(), in.readVInt()); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index 1bafcd4677d..b1c30061572 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -239,7 +239,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct break; } shardStatus.gatewaySnapshotStatus = new GatewaySnapshotStatus(stage, snapshotStatus.startTime(), snapshotStatus.time(), - snapshotStatus.index().totalSize()); + snapshotStatus.index().totalSize(), snapshotStatus.translog().expectedNumberOfOperations()); } return shardStatus; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java index 5ac3f662a22..9be7eae5285 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGateway.java @@ -72,14 +72,16 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo private final long lastTranslogId; private final long lastTranslogPosition; private final long lastTranslogLength; + private final int lastTotalTranslogOperations; - public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength) { + public Snapshot(SnapshotIndexCommit indexCommit, Translog.Snapshot translogSnapshot, long lastIndexVersion, long lastTranslogId, long lastTranslogPosition, long lastTranslogLength, int lastTotalTranslogOperations) { this.indexCommit = indexCommit; this.translogSnapshot = translogSnapshot; this.lastIndexVersion = lastIndexVersion; this.lastTranslogId = lastTranslogId; this.lastTranslogPosition = lastTranslogPosition; this.lastTranslogLength = lastTranslogLength; + this.lastTotalTranslogOperations = lastTotalTranslogOperations; } /** @@ -132,5 +134,9 @@ public interface IndexShardGateway extends IndexShardComponent, CloseableIndexCo public long lastTranslogLength() { return lastTranslogLength; } + + public int lastTotalTranslogOperations() { + return this.lastTotalTranslogOperations; + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java index 8f9d456a2f4..b54730eacca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/IndexShardGatewayService.java @@ -62,6 +62,8 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem private volatile long lastTranslogPosition; + private volatile int lastTotalTranslogOperations; + private volatile long lastTranslogLength; private final TimeValue snapshotInterval; @@ -150,6 +152,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem lastTranslogId = -1; lastTranslogPosition = 0; lastTranslogLength = 0; + lastTotalTranslogOperations = recoveryStatus.translog().currentTranslogOperations(); // start the shard if the gateway has not started it already if (indexShard.state() != IndexShardState.STARTED) { @@ -221,12 +224,13 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem logger.debug("snapshot ({}) to {} ...", reason, shardGateway); SnapshotStatus snapshotStatus = - shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength)); + shardGateway.snapshot(new IndexShardGateway.Snapshot(snapshotIndexCommit, translogSnapshot, lastIndexVersion, lastTranslogId, lastTranslogPosition, lastTranslogLength, lastTotalTranslogOperations)); lastIndexVersion = snapshotIndexCommit.getVersion(); lastTranslogId = translogSnapshot.translogId(); lastTranslogPosition = translogSnapshot.position(); lastTranslogLength = translogSnapshot.length(); + lastTotalTranslogOperations = translogSnapshot.totalOperations(); return snapshotStatus; } return null; @@ -237,7 +241,7 @@ public class IndexShardGatewayService extends AbstractIndexShardComponent implem StringBuilder sb = new StringBuilder(); sb.append("snapshot (").append(reason).append(") completed to ").append(shardGateway).append(", took [").append(TimeValue.timeValueMillis(snapshotStatus.time())).append("]\n"); sb.append(" index : version [").append(lastIndexVersion).append("], number_of_files [").append(snapshotStatus.index().numberOfFiles()).append("] with total_size [").append(new ByteSizeValue(snapshotStatus.index().totalSize())).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.index().time())).append("]\n"); - sb.append(" translog : id [").append(lastTranslogId).append("], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]"); + sb.append(" translog : id [").append(lastTranslogId).append("], number_of_operations [" + snapshotStatus.translog().expectedNumberOfOperations() + "], took [").append(TimeValue.timeValueMillis(snapshotStatus.translog().time())).append("]"); logger.debug(sb.toString()); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java index 7712e0c56e7..646e79aa73d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/RecoveryStatus.java @@ -79,7 +79,7 @@ public class RecoveryStatus { public static class Translog { private long startTime = 0; private long time; - private volatile long currentTranslogOperations = 0; + private volatile int currentTranslogOperations = 0; public long startTime() { return this.startTime; @@ -97,11 +97,11 @@ public class RecoveryStatus { this.time = time; } - public void addTranslogOperations(long count) { + public void addTranslogOperations(int count) { this.currentTranslogOperations += count; } - public long currentTranslogOperations() { + public int currentTranslogOperations() { return this.currentTranslogOperations; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java index 59663b86819..93bc8796c24 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/SnapshotStatus.java @@ -122,6 +122,7 @@ public class SnapshotStatus { public static class Translog { private long startTime; private long time; + private int expectedNumberOfOperations; public long startTime() { return this.startTime; @@ -138,5 +139,13 @@ public class SnapshotStatus { public void time(long time) { this.time = time; } + + public int expectedNumberOfOperations() { + return expectedNumberOfOperations; + } + + public void expectedNumberOfOperations(int expectedNumberOfOperations) { + this.expectedNumberOfOperations = expectedNumberOfOperations; + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index d177a33ccec..436913837cb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -235,8 +235,12 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo // Note, we assume the snapshot is always started from "base 0". We need to seek forward if we want to lastTranslogPosition if we want the delta List translogCommitPointFiles = Lists.newArrayList(); + int expectedNumberOfOperations = 0; boolean snapshotRequired = snapshot.newTranslogCreated(); - if (!snapshot.newTranslogCreated()) { + if (snapshot.newTranslogCreated()) { + snapshotRequired = true; + expectedNumberOfOperations = translogSnapshot.totalOperations(); + } else { // if we have a commit point, check that we have all the files listed in it if (!commitPoints.commits().isEmpty()) { CommitPoint commitPoint = commitPoints.commits().get(0); @@ -253,12 +257,16 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo if (snapshot.sameTranslogNewOperations()) { translogSnapshot.seekForward(snapshot.lastTranslogPosition()); snapshotRequired = true; + expectedNumberOfOperations = translogSnapshot.totalOperations() - snapshot.lastTotalTranslogOperations(); } } else { + // a full translog snapshot is required + expectedNumberOfOperations = translogSnapshot.totalOperations(); snapshotRequired = true; } } } + currentSnapshotStatus.translog().expectedNumberOfOperations(expectedNumberOfOperations); if (snapshotRequired) { CommitPoint.FileInfo addedTranslogFileInfo = new CommitPoint.FileInfo(fileNameFromGeneration(++generation), "translog-" + translogSnapshot.translogId(), translogSnapshot.lengthInBytes()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 627e56f92b5..49b7e763cbb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -104,6 +104,16 @@ public interface Translog extends IndexShardComponent { */ long length(); + /** + * The total number of operations in the translog. + */ + int totalOperations(); + + /** + * The number of operations in this snapshot. + */ + int snapshotOperations(); + boolean hasNext(); Operation next(); @@ -116,7 +126,7 @@ public interface Translog extends IndexShardComponent { InputStream stream() throws IOException; /** - * The length in bytes of this channel. + * The length in bytes of this stream. */ long lengthInBytes(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java index f42886c46f6..783f341c56f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsChannelSnapshot.java @@ -41,6 +41,10 @@ public class FsChannelSnapshot implements Translog.Snapshot { private final long id; + private final int totalOperations; + + private final int snapshotOperations; + private final RafReference raf; private final FileChannel channel; @@ -53,12 +57,14 @@ public class FsChannelSnapshot implements Translog.Snapshot { private ByteBuffer cacheBuffer; - public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException { + public FsChannelSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException { this.shardId = shardId; this.id = id; this.raf = raf; this.channel = raf.raf().getChannel(); this.length = length; + this.totalOperations = totalOperations; + this.snapshotOperations = snapshotOperations; } @Override public long translogId() { @@ -73,6 +79,14 @@ public class FsChannelSnapshot implements Translog.Snapshot { return this.length; } + @Override public int totalOperations() { + return this.totalOperations; + } + + @Override public int snapshotOperations() { + return this.snapshotOperations; + } + @Override public InputStream stream() throws IOException { return new FileChannelInputStream(channel, position, lengthInBytes()); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java index 642e93d79f1..414bd6d4c3a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsStreamSnapshot.java @@ -37,6 +37,10 @@ public class FsStreamSnapshot implements Translog.Snapshot { private final long id; + private final int totalOperations; + + private final int snapshotOperations; + private final RafReference raf; private final long length; @@ -49,11 +53,13 @@ public class FsStreamSnapshot implements Translog.Snapshot { private byte[] cachedData; - public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length) throws FileNotFoundException { + public FsStreamSnapshot(ShardId shardId, long id, RafReference raf, long length, int totalOperations, int snapshotOperations) throws FileNotFoundException { this.shardId = shardId; this.id = id; this.raf = raf; this.length = length; + this.totalOperations = totalOperations; + this.snapshotOperations = snapshotOperations; this.dis = new DataInputStream(new FileInputStream(raf.file())); } @@ -69,6 +75,14 @@ public class FsStreamSnapshot implements Translog.Snapshot { return this.length; } + @Override public int totalOperations() { + return this.totalOperations; + } + + @Override public int snapshotOperations() { + return this.snapshotOperations; + } + @Override public InputStream stream() throws IOException { return dis; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java index 43c2a317079..9c4e7213df3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/fs/FsTranslog.java @@ -130,9 +130,9 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog try { raf.increaseRefCount(); if (useStream) { - return new FsStreamSnapshot(shardId, this.id, raf, lastPosition); + return new FsStreamSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get()); } else { - return new FsChannelSnapshot(shardId, this.id, raf, lastPosition); + return new FsChannelSnapshot(shardId, this.id, raf, lastPosition, operationCounter.get(), operationCounter.get()); } } catch (IOException e) { throw new TranslogException(shardId, "Failed to snapshot", e); @@ -148,11 +148,11 @@ public class FsTranslog extends AbstractIndexShardComponent implements Translog try { raf.increaseRefCount(); if (useStream) { - FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition); + FsStreamSnapshot newSnapshot = new FsStreamSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); newSnapshot.seekForward(snapshot.position()); return newSnapshot; } else { - FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition); + FsChannelSnapshot newSnapshot = new FsChannelSnapshot(shardId, id, raf, lastPosition, operationCounter.get(), operationCounter.get() - snapshot.totalOperations()); newSnapshot.seekForward(snapshot.position()); return newSnapshot; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java index ca4aaac5a9b..fe3130644e0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/indices/status/RestIndicesStatusAction.java @@ -205,6 +205,10 @@ public class RestIndicesStatusAction extends BaseRestHandler { builder.field("size_in_bytes", gatewaySnapshotStatus.indexSize().bytes()); builder.endObject(); + builder.startObject("index"); + builder.field("expected_operations", gatewaySnapshotStatus.expectedNumberOfOperations()); + builder.endObject(); + builder.endObject(); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java index 54deea25542..823648e3e60 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/translog/AbstractSimpleTranslogTests.java @@ -59,21 +59,29 @@ public abstract class AbstractSimpleTranslogTests { translog.add(new Translog.Create("test", "1", new byte[]{1})); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(1)); + assertThat(snapshot.totalOperations(), equalTo(1)); + assertThat(snapshot.snapshotOperations(), equalTo(1)); snapshot.release(); translog.add(new Translog.Index("test", "2", new byte[]{2})); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(2)); + assertThat(snapshot.totalOperations(), equalTo(2)); + assertThat(snapshot.snapshotOperations(), equalTo(2)); snapshot.release(); translog.add(new Translog.Delete(newUid("3"))); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(3)); + assertThat(snapshot.totalOperations(), equalTo(3)); + assertThat(snapshot.snapshotOperations(), equalTo(3)); snapshot.release(); translog.add(new Translog.DeleteByQuery(new byte[]{4}, null)); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(4)); + assertThat(snapshot.totalOperations(), equalTo(4)); + assertThat(snapshot.snapshotOperations(), equalTo(4)); snapshot.release(); snapshot = translog.snapshot(); @@ -104,6 +112,8 @@ public abstract class AbstractSimpleTranslogTests { snapshot = translog.snapshot(); assertThat(snapshot, translogSize(0)); + assertThat(snapshot.totalOperations(), equalTo(0)); + assertThat(snapshot.snapshotOperations(), equalTo(0)); snapshot.release(); } @@ -115,6 +125,8 @@ public abstract class AbstractSimpleTranslogTests { translog.add(new Translog.Create("test", "1", new byte[]{1})); snapshot = translog.snapshot(); assertThat(snapshot, translogSize(1)); + assertThat(snapshot.totalOperations(), equalTo(1)); + assertThat(snapshot.snapshotOperations(), equalTo(1)); snapshot.release(); snapshot = translog.snapshot(); @@ -127,10 +139,14 @@ public abstract class AbstractSimpleTranslogTests { // we use the translogSize to also navigate to the last position on this snapshot // so snapshot(Snapshot) will work properly assertThat(snapshot1, translogSize(1)); + assertThat(snapshot1.totalOperations(), equalTo(1)); + assertThat(snapshot1.snapshotOperations(), equalTo(1)); translog.add(new Translog.Index("test", "2", new byte[]{2})); snapshot = translog.snapshot(snapshot1); assertThat(snapshot, translogSize(1)); + assertThat(snapshot.totalOperations(), equalTo(2)); + assertThat(snapshot.snapshotOperations(), equalTo(1)); snapshot.release(); snapshot = translog.snapshot(snapshot1); @@ -138,6 +154,8 @@ public abstract class AbstractSimpleTranslogTests { Translog.Index index = (Translog.Index) snapshot.next(); assertThat(index.source(), equalTo(new byte[]{2})); assertThat(snapshot.hasNext(), equalTo(false)); + assertThat(snapshot.totalOperations(), equalTo(2)); + assertThat(snapshot.snapshotOperations(), equalTo(1)); snapshot.release(); snapshot1.release(); } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java index 031d17c8d61..f2056c4ab3c 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/AbstractSimpleIndexGatewayTests.java @@ -66,11 +66,10 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests // Translog tests logger.info("Creating index [{}]", "test"); - client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); + client("server1").admin().indices().prepareCreate("test").execute().actionGet(); // create a mapping - PutMappingResponse putMappingResponse = client("server1").admin().indices().putMapping(putMappingRequest("test").type("type1") - .source(mappingSource())).actionGet(); + PutMappingResponse putMappingResponse = client("server1").admin().indices().preparePutMapping("test").setType("type1").setSource(mappingSource()).execute().actionGet(); assertThat(putMappingResponse.acknowledged(), equalTo(true)); // verify that mapping is there