From 1701fbcad3430a3e6e19134e870a6059864c8a80 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 13 Feb 2019 11:22:48 -0800 Subject: [PATCH] Improve error message for revoked locks (#7035) * Improve error message for revoked locks * fix test * fix test * fix test * fix toString --- .../SegmentTransactionalInsertAction.java | 7 +- .../SegmentTransactionalInsertActionTest.java | 8 +- .../indexing/common/task/IndexTaskTest.java | 5 +- ...TestIndexerMetadataStorageCoordinator.java | 2 +- .../overlord/SegmentPublishResult.java | 31 ++++++-- .../IndexerSQLMetadataStorageCoordinator.java | 4 +- .../appenderator/BaseAppenderatorDriver.java | 27 +++++-- .../overlord/SegmentPublishResultTest.java | 79 +++++++++++++++++++ ...exerSQLMetadataStorageCoordinatorTest.java | 18 ++--- .../BatchAppenderatorDriverTest.java | 2 +- .../StreamAppenderatorDriverFailTest.java | 2 +- .../StreamAppenderatorDriverTest.java | 7 +- 12 files changed, 154 insertions(+), 38 deletions(-) create mode 100644 server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 8a3c713fdb7..0af850e0fcd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -118,7 +118,12 @@ public class SegmentTransactionalInsertAction implements TaskAction SegmentPublishResult.fail( + "Invalid task locks. Maybe they are revoked by a higher priority task." + + " Please check the overlord log for details." + ) + ) .build() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java index e152995cc91..463916f9567 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertActionTest.java @@ -101,7 +101,7 @@ public class SegmentTransactionalInsertActionTest task, actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT1), true), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT1)), result1); SegmentPublishResult result2 = new SegmentTransactionalInsertAction( ImmutableSet.of(SEGMENT2), @@ -111,7 +111,7 @@ public class SegmentTransactionalInsertActionTest task, actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT2), true), result2); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT2)), result2); Assert.assertEquals( ImmutableSet.of(SEGMENT1, SEGMENT2), @@ -143,7 +143,7 @@ public class SegmentTransactionalInsertActionTest actionTestKit.getTaskActionToolbox() ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result); } @Test @@ -157,6 +157,6 @@ public class SegmentTransactionalInsertActionTest thrown.expect(IllegalStateException.class); thrown.expectMessage(CoreMatchers.containsString("are not covered by locks")); SegmentPublishResult result = action.perform(task, actionTestKit.getTaskActionToolbox()); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(SEGMENT3), true), result); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(SEGMENT3)), result); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 746b0c992ae..ef7dbce7b8d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1574,10 +1574,7 @@ public class IndexTaskTest } if (taskAction instanceof SegmentTransactionalInsertAction) { - return (RetType) new SegmentPublishResult( - ((SegmentTransactionalInsertAction) taskAction).getSegments(), - true - ); + return (RetType) SegmentPublishResult.ok(((SegmentTransactionalInsertAction) taskAction).getSegments()); } if (taskAction instanceof SegmentAllocateAction) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 0424cbf3cde..0eeecd5375b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -116,7 +116,7 @@ public class TestIndexerMetadataStorageCoordinator implements IndexerMetadataSto ) { // Don't actually compare metadata, just do it! - return new SegmentPublishResult(announceHistoricalSegments(segments), true); + return SegmentPublishResult.ok(announceHistoricalSegments(segments)); } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java index dc86b7268e5..a088c932e5b 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/SegmentPublishResult.java @@ -25,6 +25,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.util.Objects; import java.util.Set; @@ -42,20 +43,29 @@ public class SegmentPublishResult { private final Set segments; private final boolean success; + @Nullable + private final String errorMsg; - public static SegmentPublishResult fail() + public static SegmentPublishResult ok(Set segments) { - return new SegmentPublishResult(ImmutableSet.of(), false); + return new SegmentPublishResult(segments, true, null); + } + + public static SegmentPublishResult fail(String errorMsg) + { + return new SegmentPublishResult(ImmutableSet.of(), false, errorMsg); } @JsonCreator - public SegmentPublishResult( + private SegmentPublishResult( @JsonProperty("segments") Set segments, - @JsonProperty("success") boolean success + @JsonProperty("success") boolean success, + @JsonProperty("errorMsg") @Nullable String errorMsg ) { this.segments = Preconditions.checkNotNull(segments, "segments"); this.success = success; + this.errorMsg = errorMsg; if (!success) { Preconditions.checkArgument(segments.isEmpty(), "segments must be empty for unsuccessful publishes"); @@ -74,6 +84,13 @@ public class SegmentPublishResult return success; } + @JsonProperty + @Nullable + public String getErrorMsg() + { + return errorMsg; + } + @Override public boolean equals(Object o) { @@ -85,13 +102,14 @@ public class SegmentPublishResult } SegmentPublishResult that = (SegmentPublishResult) o; return success == that.success && - Objects.equals(segments, that.segments); + Objects.equals(segments, that.segments) && + Objects.equals(errorMsg, that.errorMsg); } @Override public int hashCode() { - return Objects.hash(segments, success); + return Objects.hash(segments, success, errorMsg); } @Override @@ -100,6 +118,7 @@ public class SegmentPublishResult return "SegmentPublishResult{" + "segments=" + segments + ", success=" + success + + ", errorMsg='" + errorMsg + '\'' + '}'; } } diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 9df56738912..b0aaa977e6f 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -324,7 +324,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } } - return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true); + return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); } }, 3, @@ -333,7 +333,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor } catch (CallbackFailedException e) { if (definitelyNotUpdated.get()) { - return SegmentPublishResult.fail(); + return SegmentPublishResult.fail(e.getMessage()); } else { // Must throw exception if we are not sure if we updated or not. throw e; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index cb6ba9085a7..76100e100e5 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; +import org.apache.druid.indexing.overlord.SegmentPublishResult; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.logger.Logger; @@ -553,16 +554,26 @@ public abstract class BaseAppenderatorDriver implements Closeable try { final Object metadata = segmentsAndMetadata.getCommitMetadata(); - final boolean published = publisher.publishSegments( + final SegmentPublishResult publishResult = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() - ).isSuccess(); + ); - if (published) { + if (publishResult.isSuccess()) { log.info("Published segments."); } else { - log.info("Transaction failure while publishing segments, removing them from deep storage " - + "and checking if someone else beat us to publishing."); + if (publishResult.getErrorMsg() == null) { + log.warn( + "Transaction failure while publishing segments. Please check the overlord log." + + " Removing them from deep storage and checking if someone else beat us to publishing." + ); + } else { + log.warn( + "Transaction failure while publishing segments because of [%s]. Please check the overlord log." + + " Removing them from deep storage and checking if someone else beat us to publishing.", + publishResult.getErrorMsg() + ); + } segmentsAndMetadata.getSegments().forEach(dataSegmentKiller::killQuietly); @@ -576,7 +587,11 @@ public abstract class BaseAppenderatorDriver implements Closeable .equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) { log.info("Our segments really do exist, awaiting handoff."); } else { - throw new ISE("Failed to publish segments."); + if (publishResult.getErrorMsg() != null) { + throw new ISE("Failed to publish segments because of [%s].", publishResult.getErrorMsg()); + } else { + throw new ISE("Failed to publish segments."); + } } } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java new file mode 100644 index 00000000000..1772a9d9c9f --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/SegmentPublishResultTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord; + +import com.fasterxml.jackson.databind.InjectableValues.Std; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.DataSegment.PruneLoadSpecHolder; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class SegmentPublishResultTest +{ + private final ObjectMapper objectMapper = new DefaultObjectMapper() + .setInjectableValues(new Std().addValue(PruneLoadSpecHolder.class, PruneLoadSpecHolder.DEFAULT)); + + @Test + public void testSerdeOkResult() throws IOException + { + final SegmentPublishResult original = SegmentPublishResult.ok( + ImmutableSet.of( + segment(Intervals.of("2018/2019")), + segment(Intervals.of("2019/2020")) + ) + ); + + final String json = objectMapper.writeValueAsString(original); + final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class); + Assert.assertEquals(original, fromJson); + } + + @Test + public void testSerdeFailResult() throws IOException + { + final SegmentPublishResult original = SegmentPublishResult.fail("test"); + + final String json = objectMapper.writeValueAsString(original); + final SegmentPublishResult fromJson = objectMapper.readValue(json, SegmentPublishResult.class); + Assert.assertEquals(original, fromJson); + } + + private static DataSegment segment(Interval interval) + { + return new DataSegment( + "ds", + interval, + "version", + null, + null, + null, + null, + 9, + 10L + ); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java index 09b6f56f477..879ddfb13b4 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java @@ -304,7 +304,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8), @@ -322,7 +322,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8), @@ -378,7 +378,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment).getBytes(StandardCharsets.UTF_8), @@ -399,7 +399,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment2), true), result2); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment2)), result2); Assert.assertArrayEquals( mapper.writeValueAsString(defaultSegment2).getBytes(StandardCharsets.UTF_8), @@ -429,7 +429,7 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result1); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result1); // Should only be tried once. Assert.assertEquals(1, metadataUpdateCounter.get()); @@ -443,14 +443,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); @@ -464,14 +464,14 @@ public class IndexerSQLMetadataStorageCoordinatorTest new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(defaultSegment), true), result1); + Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( ImmutableSet.of(defaultSegment2), new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); - Assert.assertEquals(new SegmentPublishResult(ImmutableSet.of(), false), result2); + Assert.assertEquals(SegmentPublishResult.fail("java.lang.RuntimeException: Aborting transaction!"), result2); // Should only be tried once per call. Assert.assertEquals(2, metadataUpdateCounter.get()); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java index 66e136aaf7d..6536cb6d4a3 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriverTest.java @@ -195,6 +195,6 @@ public class BatchAppenderatorDriverTest extends EasyMockSupport static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> new SegmentPublishResult(ImmutableSet.of(), true); + return (segments, commitMetadata) -> SegmentPublishResult.ok(ImmutableSet.of()); } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 79b575434bc..9d922537159 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -239,7 +239,7 @@ public class StreamAppenderatorDriverFailTest extends EasyMockSupport { expectedException.expect(ExecutionException.class); expectedException.expectCause(CoreMatchers.instanceOf(ISE.class)); - expectedException.expectMessage("Failed to publish segments."); + expectedException.expectMessage("Failed to publish segments because of [test]."); testFailDuringPublishInternal(false); } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java index e1692177fa9..3a491e95ad8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java @@ -361,16 +361,17 @@ public class StreamAppenderatorDriverTest extends EasyMockSupport static TransactionalSegmentPublisher makeOkPublisher() { - return (segments, commitMetadata) -> new SegmentPublishResult(Collections.emptySet(), true); + return (segments, commitMetadata) -> SegmentPublishResult.ok(Collections.emptySet()); } static TransactionalSegmentPublisher makeFailingPublisher(boolean failWithException) { return (segments, commitMetadata) -> { + final RuntimeException exception = new RuntimeException("test"); if (failWithException) { - throw new RuntimeException("test"); + throw exception; } - return SegmentPublishResult.fail(); + return SegmentPublishResult.fail(exception.getMessage()); }; }