diff --git a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java index 4b894805cf9..149eefb0bdf 100644 --- a/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java +++ b/api/src/main/java/io/druid/timeline/partition/NoneShardSpec.java @@ -20,6 +20,7 @@ package io.druid.timeline.partition; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; import io.druid.data.input.InputRow; @@ -55,6 +56,7 @@ public class NoneShardSpec implements ShardSpec } @Override + @JsonIgnore public int getPartitionNum() { return 0; diff --git a/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java b/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java index e138924c4f8..61cff1653b8 100644 --- a/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java +++ b/api/src/test/java/io/druid/timeline/partition/NoneShardSpecTest.java @@ -5,6 +5,8 @@ import io.druid.TestObjectMapper; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; + public class NoneShardSpecTest { @Test @@ -28,4 +30,13 @@ public class NoneShardSpecTest Assert.assertTrue(serde1 == serde2); Assert.assertTrue(one == serde1); } + + @Test + public void testPartitionFieldIgnored() throws IOException + { + final String jsonStr = "{\"type\": \"none\",\"partitionNum\": 2}"; + ObjectMapper mapper = new TestObjectMapper(); + final ShardSpec noneShardSpec = mapper.readValue(jsonStr, ShardSpec.class); + noneShardSpec.equals(NoneShardSpec.instance()); + } } diff --git a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java index 53b5c1b3beb..45269d08478 100644 --- a/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java +++ b/server/src/main/java/io/druid/segment/loading/LocalDataSegmentPusher.java @@ -29,9 +29,12 @@ import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.SegmentUtils; import io.druid.timeline.DataSegment; +import org.apache.commons.io.FileUtils; import java.io.File; import java.io.IOException; +import java.nio.file.FileAlreadyExistsException; +import java.util.UUID; /** */ @@ -70,7 +73,9 @@ public class LocalDataSegmentPusher implements DataSegmentPusher @Override public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException { - File outDir = new File(config.getStorageDirectory(), DataSegmentPusherUtil.getStorageDir(segment)); + final String storageDir = DataSegmentPusherUtil.getStorageDir(segment); + final File baseStorageDir = config.getStorageDirectory(); + final File outDir = new File(baseStorageDir, storageDir); log.info("Copying segment[%s] to local filesystem at location[%s]", segment.getIdentifier(), outDir.toString()); @@ -88,19 +93,49 @@ public class LocalDataSegmentPusher implements DataSegmentPusher ); } - if (!outDir.mkdirs() && !outDir.isDirectory()) { - throw new IOException(String.format("Cannot create directory[%s]", outDir)); + final File tmpOutDir = new File(baseStorageDir, intermediateDirFor(storageDir)); + log.info("Creating intermediate directory[%s] for segment[%s]", tmpOutDir.toString(), segment.getIdentifier()); + final long size = compressSegment(dataSegmentFile, tmpOutDir); + + final DataSegment dataSegment = createDescriptorFile( + segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip"))) + .withSize(size) + .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), + tmpOutDir + ); + + // moving the temporary directory to the final destination, once success the potentially concurrent push operations + // will be failed and will read the descriptor.json created by current push operation directly + createDirectoryIfNotExists(outDir.getParentFile()); + try { + java.nio.file.Files.move(tmpOutDir.toPath(), outDir.toPath()); } + catch (FileAlreadyExistsException e) { + log.warn("Push destination directory[%s] exists, ignore this message if replication is configured.", outDir); + FileUtils.deleteDirectory(tmpOutDir); + return jsonMapper.readValue(new File(outDir, "descriptor.json"), DataSegment.class); + } + return dataSegment; + } + + private void createDirectoryIfNotExists(File directory) throws IOException + { + if (!directory.mkdirs() && !directory.isDirectory()) { + throw new IOException(String.format("Cannot create directory[%s]", directory.toString())); + } + } + + private String intermediateDirFor(String storageDir) + { + return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString(); + } + + private long compressSegment(File dataSegmentFile, File outDir) throws IOException + { + createDirectoryIfNotExists(outDir); File outFile = new File(outDir, "index.zip"); log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile); - long size = CompressionUtils.zip(dataSegmentFile, outFile); - - return createDescriptorFile( - segment.withLoadSpec(makeLoadSpec(outFile)) - .withSize(size) - .withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)), - outDir - ); + return CompressionUtils.zip(dataSegmentFile, outFile); } private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException diff --git a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java index 9fa62e9baa7..b6fe8f4573c 100644 --- a/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java +++ b/server/src/test/java/io/druid/segment/loading/LocalDataSegmentPusherTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.io.Files; import com.google.common.primitives.Ints; +import io.druid.jackson.DefaultObjectMapper; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.Interval; @@ -48,15 +49,15 @@ public class LocalDataSegmentPusherTest LocalDataSegmentPusherConfig config; File dataSegmentFiles; DataSegment dataSegment = new DataSegment( - "ds", - new Interval(0, 1), - "v1", - null, - null, - null, - NoneShardSpec.instance(), - null, - -1 + "ds", + new Interval(0, 1), + "v1", + null, + null, + null, + NoneShardSpec.instance(), + null, + -1 ); @Before @@ -64,7 +65,7 @@ public class LocalDataSegmentPusherTest { config = new LocalDataSegmentPusherConfig(); config.storageDirectory = temporaryFolder.newFolder(); - localDataSegmentPusher = new LocalDataSegmentPusher(config, new ObjectMapper()); + localDataSegmentPusher = new LocalDataSegmentPusher(config, new DefaultObjectMapper()); dataSegmentFiles = temporaryFolder.newFolder(); Files.asByteSink(new File(dataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x9)); } @@ -103,6 +104,17 @@ public class LocalDataSegmentPusherTest } } + @Test + public void testFirstPushWinsForConcurrentPushes() throws IOException + { + File replicatedDataSegmentFiles = temporaryFolder.newFolder(); + Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8)); + DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment); + DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment); + + Assert.assertEquals(returnSegment1, returnSegment2); + } + @Test public void testPushCannotCreateDirectory() throws IOException {