diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 05cf4063205..f756c629b98 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -25,6 +25,7 @@ import org.apache.lucene.util.SparseFixedBitSet; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ResourceAlreadyExistsException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.RoutingMissingException; @@ -289,13 +290,11 @@ public class TransportBulkAction extends HandledTransportAction implement } - public void process(@Nullable MappingMetaData mappingMd, String concreteIndex) { + public void process(Version indexCreatedVersion, @Nullable MappingMetaData mappingMd, String concreteIndex) { if (mappingMd != null) { // might as well check for routing here if (mappingMd.routing().required() && routing == null) { @@ -508,7 +509,13 @@ public class IndexRequest extends ReplicatedWriteRequest implement if (id == null) { assert autoGeneratedTimestamp == -1 : "timestamp has already been generated!"; autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia - id(UUIDs.base64UUID()); + String uid; + if (indexCreatedVersion.onOrAfter(Version.V_6_0_0_beta1)) { + uid = UUIDs.base64UUID(); + } else { + uid = UUIDs.legacyBase64UUID(); + } + id(uid); } } diff --git a/core/src/main/java/org/elasticsearch/common/LegacyTimeBasedUUIDGenerator.java b/core/src/main/java/org/elasticsearch/common/LegacyTimeBasedUUIDGenerator.java new file mode 100644 index 00000000000..2bf19f1dcbb --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/LegacyTimeBasedUUIDGenerator.java @@ -0,0 +1,86 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common; + +import java.util.Base64; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but + * we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. + */ + +class LegacyTimeBasedUUIDGenerator implements UUIDGenerator { + + // We only use bottom 3 bytes for the sequence number. Paranoia: init with random int so that if JVM/OS/machine goes down, clock slips + // backwards, and JVM comes back up, we are less likely to be on the same sequenceNumber at the same time: + private final AtomicInteger sequenceNumber = new AtomicInteger(SecureRandomHolder.INSTANCE.nextInt()); + + // Used to ensure clock moves forward: + private long lastTimestamp; + + private static final byte[] SECURE_MUNGED_ADDRESS = MacAddressProvider.getSecureMungedAddress(); + + static { + assert SECURE_MUNGED_ADDRESS.length == 6; + } + + /** Puts the lower numberOfLongBytes from l into the array, starting index pos. */ + private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) { + for (int i=0; i>> (i*8)); + } + } + + @Override + public String getBase64UUID() { + final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff; + long timestamp = System.currentTimeMillis(); + + synchronized (this) { + // Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are + // shut down, clock goes backwards, and we restart... for this we randomize the sequenceNumber on init to decrease chance of + // collision: + timestamp = Math.max(lastTimestamp, timestamp); + + if (sequenceId == 0) { + // Always force the clock to increment whenever sequence number is 0, in case we have a long time-slip backwards: + timestamp++; + } + + lastTimestamp = timestamp; + } + + final byte[] uuidBytes = new byte[15]; + + // Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000): + putLong(uuidBytes, timestamp, 0, 6); + + // MAC address adds 6 bytes: + System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length); + + // Sequence number adds 3 bytes: + putLong(uuidBytes, sequenceId, 12, 3); + + assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length; + + return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes); + } +} diff --git a/core/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java b/core/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java index 8d507ae7f22..550559eac9f 100644 --- a/core/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java +++ b/core/src/main/java/org/elasticsearch/common/TimeBasedUUIDGenerator.java @@ -23,7 +23,8 @@ import java.util.Base64; import java.util.concurrent.atomic.AtomicInteger; /** These are essentially flake ids (http://boundary.com/blog/2012/01/12/flake-a-decentralized-k-ordered-unique-id-generator-in-erlang) but - * we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. */ + * we use 6 (not 8) bytes for timestamp, and use 3 (not 2) bytes for sequence number. We also reorder bytes in a way that does not make ids + * sort in order anymore, but is more friendly to the way that the Lucene terms dictionary is structured. */ class TimeBasedUUIDGenerator implements UUIDGenerator { @@ -40,17 +41,20 @@ class TimeBasedUUIDGenerator implements UUIDGenerator { assert SECURE_MUNGED_ADDRESS.length == 6; } - /** Puts the lower numberOfLongBytes from l into the array, starting index pos. */ - private static void putLong(byte[] array, long l, int pos, int numberOfLongBytes) { - for (int i=0; i>> (i*8)); - } + // protected for testing + protected long currentTimeMillis() { + return System.currentTimeMillis(); + } + + // protected for testing + protected byte[] macAddress() { + return SECURE_MUNGED_ADDRESS; } @Override public String getBase64UUID() { final int sequenceId = sequenceNumber.incrementAndGet() & 0xffffff; - long timestamp = System.currentTimeMillis(); + long timestamp = currentTimeMillis(); synchronized (this) { // Don't let timestamp go backwards, at least "on our watch" (while this JVM is running). We are still vulnerable if we are @@ -67,17 +71,45 @@ class TimeBasedUUIDGenerator implements UUIDGenerator { } final byte[] uuidBytes = new byte[15]; + int i = 0; - // Only use lower 6 bytes of the timestamp (this will suffice beyond the year 10000): - putLong(uuidBytes, timestamp, 0, 6); + // We have auto-generated ids, which are usually used for append-only workloads. + // So we try to optimize the order of bytes for indexing speed (by having quite + // unique bytes close to the beginning of the ids so that sorting is fast) and + // compression (by making sure we share common prefixes between enough ids), + // but not necessarily for lookup speed (by having the leading bytes identify + // segments whenever possible) - // MAC address adds 6 bytes: - System.arraycopy(SECURE_MUNGED_ADDRESS, 0, uuidBytes, 6, SECURE_MUNGED_ADDRESS.length); + // Blocks in the block tree have between 25 and 48 terms. So all prefixes that + // are shared by ~30 terms should be well compressed. I first tried putting the + // two lower bytes of the sequence id in the beginning of the id, but compression + // is only triggered when you have at least 30*2^16 ~= 2M documents in a segment, + // which is already quite large. So instead, we are putting the 1st and 3rd byte + // of the sequence number so that compression starts to be triggered with smaller + // segment sizes and still gives pretty good indexing speed. We use the sequenceId + // rather than the timestamp because the distribution of the timestamp depends too + // much on the indexing rate, so it is less reliable. - // Sequence number adds 3 bytes: - putLong(uuidBytes, sequenceId, 12, 3); + uuidBytes[i++] = (byte) sequenceId; + // changes every 65k docs, so potentially every second if you have a steady indexing rate + uuidBytes[i++] = (byte) (sequenceId >>> 16); - assert 9 + SECURE_MUNGED_ADDRESS.length == uuidBytes.length; + // Now we start focusing on compression and put bytes that should not change too often. + uuidBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs + uuidBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h + uuidBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days + uuidBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years + byte[] macAddress = macAddress(); + assert macAddress.length == 6; + System.arraycopy(macAddress, 0, uuidBytes, i, macAddress.length); + i += macAddress.length; + + // Finally we put the remaining bytes, which will likely not be compressed at all. + uuidBytes[i++] = (byte) (timestamp >>> 8); + uuidBytes[i++] = (byte) (sequenceId >>> 8); + uuidBytes[i++] = (byte) timestamp; + + assert i == uuidBytes.length; return Base64.getUrlEncoder().withoutPadding().encodeToString(uuidBytes); } diff --git a/core/src/main/java/org/elasticsearch/common/UUIDs.java b/core/src/main/java/org/elasticsearch/common/UUIDs.java index ca3c7cb222e..63fcaedde0f 100644 --- a/core/src/main/java/org/elasticsearch/common/UUIDs.java +++ b/core/src/main/java/org/elasticsearch/common/UUIDs.java @@ -24,6 +24,7 @@ import java.util.Random; public class UUIDs { private static final RandomBasedUUIDGenerator RANDOM_UUID_GENERATOR = new RandomBasedUUIDGenerator(); + private static final UUIDGenerator LEGACY_TIME_UUID_GENERATOR = new LegacyTimeBasedUUIDGenerator(); private static final UUIDGenerator TIME_UUID_GENERATOR = new TimeBasedUUIDGenerator(); /** Generates a time-based UUID (similar to Flake IDs), which is preferred when generating an ID to be indexed into a Lucene index as @@ -32,6 +33,11 @@ public class UUIDs { return TIME_UUID_GENERATOR.getBase64UUID(); } + /** Legacy implementation of {@link #base64UUID()}, for pre 6.0 indices. */ + public static String legacyBase64UUID() { + return LEGACY_TIME_UUID_GENERATOR.getBase64UUID(); + } + /** Returns a Base64 encoded version of a Version 4.0 compatible UUID as defined here: http://www.ietf.org/rfc/rfc4122.txt, using the * provided {@code Random} instance */ public static String randomBase64UUID(Random random) { diff --git a/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java b/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java index 3dce0ecdfd4..ffc93f024d1 100644 --- a/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java +++ b/core/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java @@ -215,7 +215,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL actionListener.onResponse(new ShardsSyncedFlushResult(shardId, totalShards, "[" + inflight + "] ongoing operations on primary")); } else { // 3. now send the sync request to all the shards - String syncId = UUIDs.base64UUID(); + String syncId = UUIDs.randomBase64UUID(); sendSyncRequests(syncId, activeShards, state, commitIds, shardId, totalShards, actionListener); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index fab7806b968..973c09635a3 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -120,7 +120,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget this.indexShard = indexShard; this.sourceNode = sourceNode; this.shardId = indexShard.shardId(); - this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.base64UUID() + "."; + this.tempFilePrefix = RECOVERY_PREFIX + UUIDs.randomBase64UUID() + "."; this.store = indexShard.store(); this.ensureClusterStateVersionCallback = ensureClusterStateVersionCallback; // make sure the store is not released until we are done. diff --git a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java index 73a44ff145c..392a989de48 100644 --- a/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java +++ b/core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java @@ -122,10 +122,10 @@ public class IndexRequestTests extends ESTestCase { public void testAutoGenIdTimestampIsSet() { IndexRequest request = new IndexRequest("index", "type"); - request.process(null, "index"); + request.process(Version.CURRENT, null, "index"); assertTrue("expected > 0 but got: " + request.getAutoGeneratedTimestamp(), request.getAutoGeneratedTimestamp() > 0); request = new IndexRequest("index", "type", "1"); - request.process(null, "index"); + request.process(Version.CURRENT, null, "index"); assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, request.getAutoGeneratedTimestamp()); } diff --git a/core/src/test/java/org/elasticsearch/common/UUIDTests.java b/core/src/test/java/org/elasticsearch/common/UUIDTests.java index 62ad47868a6..ed253ce3c72 100644 --- a/core/src/test/java/org/elasticsearch/common/UUIDTests.java +++ b/core/src/test/java/org/elasticsearch/common/UUIDTests.java @@ -18,9 +18,24 @@ */ package org.elasticsearch.common; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field.Store; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.IndexWriterConfig.OpenMode; +import org.apache.lucene.store.Directory; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import org.hamcrest.Matchers; import java.util.HashSet; +import java.util.Random; import java.util.Set; public class UUIDTests extends ESTestCase { @@ -100,4 +115,61 @@ public class UUIDTests extends ESTestCase { } assertEquals(count*uuids, globalSet.size()); } + + public void testCompression() throws Exception { + Logger logger = Loggers.getLogger(UUIDTests.class); + // Low number so that the test runs quickly, but the results are more interesting with larger numbers + // of indexed documents + assertThat(testCompression(500000, 10000, 3, logger), Matchers.lessThan(12d)); // ~10.8 in practice + assertThat(testCompression(500000, 1000, 3, logger), Matchers.lessThan(13d)); // ~11.5 in practice + assertThat(testCompression(500000, 100, 3, logger), Matchers.lessThan(21d)); // ~19.5 in practice + } + + private static double testCompression(int numDocs, int numDocsPerSecond, int numNodes, Logger logger) throws Exception { + final double intervalBetweenDocs = 1000. / numDocsPerSecond; // milliseconds + final byte[][] macAddresses = new byte[numNodes][]; + Random r = random(); + for (int i = 0; i < macAddresses.length; ++i) { + macAddresses[i] = new byte[6]; + random().nextBytes(macAddresses[i]); + } + UUIDGenerator generator = new TimeBasedUUIDGenerator() { + double currentTimeMillis = System.currentTimeMillis(); + + @Override + protected long currentTimeMillis() { + currentTimeMillis += intervalBetweenDocs * 2 * r.nextDouble(); + return (long) currentTimeMillis; + } + + @Override + protected byte[] macAddress() { + return RandomPicks.randomFrom(r, macAddresses); + } + }; + // Avoid randomization which will slow down things without improving + // the quality of this test + Directory dir = newFSDirectory(createTempDir()); + IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setOpenMode(OpenMode.CREATE)); + Document doc = new Document(); + StringField id = new StringField("_id", "", Store.NO); + doc.add(id); + long start = System.nanoTime(); + for (int i = 0; i < numDocs; ++i) { + id.setStringValue(generator.getBase64UUID()); + w.addDocument(doc); + } + w.forceMerge(1); + long time = (System.nanoTime() - start) / 1000 / 1000; + w.close(); + long size = 0; + for (String file : dir.listAll()) { + size += dir.fileLength(file); + } + dir.close(); + double bytesPerDoc = (double) size / numDocs; + logger.info(numDocs + " docs indexed at " + numDocsPerSecond + " docs/s required " + new ByteSizeValue(size) + + " bytes of disk space, or " + bytesPerDoc + " bytes per document. Took: " + new TimeValue(time) + "."); + return bytesPerDoc; + } } diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 251e744a876..d7e93888642 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -602,7 +602,7 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request) throws Exception { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { - ((IndexRequest) itemRequest.request()).process(null, index.getName()); + ((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName()); } } final TransportWriteAction.WritePrimaryResult result = diff --git a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index 20c9f3613e5..d16b117fd8d 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -56,7 +56,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); - String syncId = UUIDs.base64UUID(); + String syncId = UUIDs.randomBase64UUID(); SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); @@ -178,7 +178,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { client().prepareIndex("test", "test", "2").setSource("{}", XContentType.JSON).get(); } client().admin().indices().prepareFlush("test").setForce(true).get(); - String syncId = UUIDs.base64UUID(); + String syncId = UUIDs.randomBase64UUID(); final SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); @@ -208,7 +208,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { Map commitIds = SyncedFlushUtil.sendPreSyncRequests(flushService, activeShards, state, shardId); assertEquals("exactly one commit id", 1, commitIds.size()); commitIds.clear(); // wipe it... - String syncId = UUIDs.base64UUID(); + String syncId = UUIDs.randomBase64UUID(); SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener(); flushService.sendSyncRequests(syncId, activeShards, state, commitIds, shardId, shardRoutingTable.size(), listener); listener.latch.await(); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index 58c2fa6277c..85a9ee10208 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.io.stream.InputStreamStreamInput; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; -import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.seqno.SequenceNumbersService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; @@ -36,7 +35,6 @@ import java.io.ByteArrayOutputStream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; import static org.elasticsearch.test.VersionUtils.randomVersion; -import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.Matchers.equalTo; public class StartRecoveryRequestTests extends ESTestCase { @@ -45,7 +43,7 @@ public class StartRecoveryRequestTests extends ESTestCase { final Version targetNodeVersion = randomVersion(random()); final StartRecoveryRequest outRequest = new StartRecoveryRequest( new ShardId("test", "_na_", 0), - UUIDs.base64UUID(), + UUIDs.randomBase64UUID(), new DiscoveryNode("a", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion), new DiscoveryNode("b", buildNewFakeTransportAddress(), emptyMap(), emptySet(), targetNodeVersion), Store.MetadataSnapshot.EMPTY, diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml index dc097765c57..53b9b741bdc 100755 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/cat.shards/10_basic.yml @@ -106,9 +106,10 @@ cat.shards: index: sync_id_test h: index,state,sync_id +# 20 chars for sync ids with 5.x which uses time-based uuids and 22 with 6.x which uses random uuids - match: $body: | - /^(sync_id_test\s+STARTED\s+[A-Za-z0-9_\-]{20}\n){5}$/ + /^(sync_id_test\s+STARTED\s+[A-Za-z0-9_\-]{20,22}\n){5}$/ - do: indices.delete: