Do not retry INSERT task into metadata if max_allowed_packet limit is violated (#14271)

Changes
- Add a `DruidException` which contains a user-facing error message, HTTP response code
- Make `EntryExistsException` extend `DruidException`
- If metadata store max_allowed_packet limit is violated while inserting a new task, throw
`DruidException` with response code 400 (bad request) to prevent retries
- Add `SQLMetadataConnector.isRootCausePacketTooBigException` with impl for MySQL
This commit is contained in:
Kashif Faraz 2023-06-10 12:15:44 +05:30 committed by GitHub
parent 31c386ee1b
commit 6e158704cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 458 additions and 242 deletions

View File

@ -3465,7 +3465,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
public void testSuspendedNoRunningTasks() throws Exception
public void testSuspendedNoRunningTasks()
{
supervisor = getTestableSupervisor(1, 1, true, "PT1H", null, null, true);

View File

@ -46,7 +46,12 @@ public class MySQLConnector extends SQLMetadataConnector
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private static final String QUOTE_STRING = "`";
private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME = "com.mysql.jdbc.exceptions.MySQLTransientException";
private static final String MYSQL_TRANSIENT_EXCEPTION_CLASS_NAME
= "com.mysql.jdbc.exceptions.MySQLTransientException";
private static final String MARIA_DB_PACKET_EXCEPTION_CLASS_NAME
= "org.mariadb.jdbc.internal.util.exceptions.MaxAllowedPacketException";
private static final String MYSQL_PACKET_EXCEPTION_CLASS_NAME
= "com.mysql.jdbc.PacketTooBigException";
@Nullable
private final Class<?> myTransientExceptionClass;
@ -218,6 +223,19 @@ public class MySQLConnector extends SQLMetadataConnector
return false;
}
@Override
protected boolean isRootCausePacketTooBigException(Throwable t)
{
if (t == null) {
return false;
}
final String className = t.getClass().getName();
return MARIA_DB_PACKET_EXCEPTION_CLASS_NAME.equals(className)
|| MYSQL_PACKET_EXCEPTION_CLASS_NAME.equals(className)
|| isRootCausePacketTooBigException(t.getCause());
}
@Override
public Void insertOrUpdate(
final String tableName,

View File

@ -29,6 +29,7 @@ import org.junit.Test;
import java.sql.SQLException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLTransientException;
public class MySQLConnectorTest
{
@ -59,10 +60,10 @@ public class MySQLConnectorTest
Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransientException()));
Assert.assertTrue(connector.connectorIsTransientException(new MySQLTransactionRollbackException()));
Assert.assertTrue(
connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317))
connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337))
connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337))
);
// this method does not specially handle normal transient exceptions either, since it is not vendor specific
Assert.assertFalse(
@ -82,16 +83,43 @@ public class MySQLConnectorTest
// no vendor specific for MariaDb, so should always be false
Assert.assertFalse(connector.connectorIsTransientException(new MySQLTransientException()));
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("some transient failure", "wtf", 1317))
connector.connectorIsTransientException(new SQLException("some transient failure", "s0", 1317))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLException("totally realistic test data", "wtf", 1337))
connector.connectorIsTransientException(new SQLException("totally realistic test data", "s0", 1337))
);
Assert.assertFalse(
connector.connectorIsTransientException(new SQLTransientConnectionException("transient"))
);
}
@Test
public void testIsRootCausePacketTooBigException()
{
MySQLConnector connector = new MySQLConnector(
CONNECTOR_CONFIG_SUPPLIER,
TABLES_CONFIG_SUPPLIER,
new MySQLConnectorSslConfig(),
MYSQL_DRIVER_CONFIG
);
// The test method should return true only for
// mariadb.MaxAllowedPacketException or mysql.PacketTooBigException.
// Verifying this requires creating a mock Class object, but Class is final
// and has only a private constructor. It would be overkill to try to mock it.
// Verify some of the false cases
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new SQLException())
);
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new SQLTransientException())
);
Assert.assertFalse(
connector.isRootCausePacketTooBigException(new MySQLTransientException())
);
}
@Test
public void testLimitClause()
{

View File

@ -242,7 +242,7 @@ public abstract class ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
SubTaskSpec<SubTaskType> spec
)
{
LOG.info("Submit a new task for spec[%s]", spec.getId());
LOG.info("Submitting a new task for spec[%s]", spec.getId());
final ListenableFuture<SubTaskCompleteEvent<SubTaskType>> future = taskMonitor.submit(spec);
Futures.addCallback(
future,

View File

@ -90,7 +90,7 @@ public class HeapMemoryTaskStorage implements TaskStorage
TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource());
TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff);
if (alreadyExisted != null) {
throw new EntryExistsException(task.getId());
throw new EntryExistsException("Task", task.getId());
}
log.info("Inserted task %s with status: %s", task.getId(), status);

View File

@ -20,13 +20,13 @@
package org.apache.druid.indexing.overlord;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
@ -139,7 +139,7 @@ public class MetadataTaskStorage implements TaskStorage
status.getId()
);
log.info("Inserting task %s with status: %s", task.getId(), status);
log.info("Inserting task [%s] with status [%s].", task.getId(), status);
try {
handler.insert(
@ -153,12 +153,11 @@ public class MetadataTaskStorage implements TaskStorage
task.getGroupId()
);
}
catch (DruidException e) {
throw e;
}
catch (Exception e) {
if (e instanceof EntryExistsException) {
throw (EntryExistsException) e;
} else {
throw new RuntimeException(e);
}
throw new RuntimeException(e);
}
}
@ -167,15 +166,12 @@ public class MetadataTaskStorage implements TaskStorage
{
Preconditions.checkNotNull(status, "status");
log.info("Updating task %s to status: %s", status.getId(), status);
final String taskId = status.getId();
log.info("Updating status of task [%s] to [%s].", taskId, status);
final boolean set = handler.setStatus(
status.getId(),
status.isRunnable(),
status
);
final boolean set = handler.setStatus(taskId, status.isRunnable(), status);
if (!set) {
throw new ISE("Active task not found: %s", status.getId());
throw new ISE("No active task for id [%s]", taskId);
}
}
@ -279,10 +275,8 @@ public class MetadataTaskStorage implements TaskStorage
Preconditions.checkNotNull(taskLock, "taskLock");
log.info(
"Adding lock on interval[%s] version[%s] for task: %s",
taskLock.getInterval(),
taskLock.getVersion(),
taskid
"Adding lock on interval[%s] version[%s] for task [%s].",
taskLock.getInterval(), taskLock.getVersion(), taskid
);
handler.addLock(taskid, taskLock);
@ -296,15 +290,13 @@ public class MetadataTaskStorage implements TaskStorage
Preconditions.checkNotNull(newLock, "newLock");
log.info(
"Replacing an existing lock[%s] with a new lock[%s] for task: %s",
oldLock,
newLock,
taskid
"Replacing an existing lock[%s] with a new lock[%s] for task [%s].",
oldLock, newLock, taskid
);
final Long oldLockId = handler.getLockId(taskid, oldLock);
if (oldLockId == null) {
throw new ISE("Cannot find an existing lock[%s]", oldLock);
throw new ISE("Cannot find lock[%s] for task [%s]", oldLock, taskid);
}
handler.replaceLock(taskid, oldLockId, newLock);
@ -336,14 +328,8 @@ public class MetadataTaskStorage implements TaskStorage
{
return ImmutableList.copyOf(
Iterables.transform(
getLocksWithIds(taskid).entrySet(), new Function<Map.Entry<Long, TaskLock>, TaskLock>()
{
@Override
public TaskLock apply(Map.Entry<Long, TaskLock> e)
{
return e.getValue();
}
}
getLocksWithIds(taskid).entrySet(),
Entry::getValue
)
);
}

View File

@ -482,7 +482,7 @@ public class TaskQueue
IdUtils.validateId("Task ID", task.getId());
if (taskStorage.getTask(task.getId()).isPresent()) {
throw new EntryExistsException(StringUtils.format("Task %s already exists", task.getId()));
throw new EntryExistsException("Task", task.getId());
}
// Set forceTimeChunkLock before adding task spec to taskStorage, so that we can see always consistent task spec.

View File

@ -36,6 +36,7 @@ import org.apache.druid.client.indexing.IndexingWorker;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
@ -64,7 +65,6 @@ import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
@ -222,25 +222,15 @@ public class OverlordResource
return asLeaderWith(
taskMaster.getTaskQueue(),
new Function<TaskQueue, Response>()
{
@Override
public Response apply(TaskQueue taskQueue)
{
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (EntryExistsException e) {
return Response.status(Response.Status.BAD_REQUEST)
.entity(
ImmutableMap.of(
"error",
StringUtils.format("Task[%s] already exists!", task.getId())
)
)
.build();
}
taskQueue -> {
try {
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
catch (DruidException e) {
return Response.status(e.getResponseCode())
.entity(ImmutableMap.of("error", e.getMessage()))
.build();
}
}
);

View File

@ -149,7 +149,7 @@ public class TaskLockboxTest
}
@Test
public void testLock() throws Exception
public void testLock()
{
validator.expectLockCreated(
TaskLockType.EXCLUSIVE,
@ -176,7 +176,7 @@ public class TaskLockboxTest
}
@Test
public void testTrySharedLock() throws Exception
public void testTrySharedLock()
{
final Interval interval = Intervals.of("2017-01/2017-02");
@ -427,7 +427,7 @@ public class TaskLockboxTest
}
@Test
public void testSyncWithUnknownTaskTypesFromModuleNotLoaded() throws Exception
public void testSyncWithUnknownTaskTypesFromModuleNotLoaded()
{
// ensure that if we don't know how to deserialize a task it won't explode the lockbox
// (or anything else that uses taskStorage.getActiveTasks() and doesn't expect null which is most things)
@ -1201,7 +1201,7 @@ public class TaskLockboxTest
}
@Test
public void testGetLockedIntervalsForLowPriorityTask() throws Exception
public void testGetLockedIntervalsForLowPriorityTask()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.create(5);
@ -1221,7 +1221,7 @@ public class TaskLockboxTest
}
@Test
public void testGetLockedIntervalsForEqualPriorityTask() throws Exception
public void testGetLockedIntervalsForEqualPriorityTask()
{
// Acquire lock for a low priority task
final Task task = NoopTask.create(5);
@ -1245,7 +1245,7 @@ public class TaskLockboxTest
}
@Test
public void testExclusiveLockCompatibility() throws Exception
public void testExclusiveLockCompatibility()
{
final TaskLock theLock = validator.expectLockCreated(
TaskLockType.EXCLUSIVE,
@ -1282,7 +1282,7 @@ public class TaskLockboxTest
}
@Test
public void testExclusiveLockCanRevokeAllIncompatible() throws Exception
public void testExclusiveLockCanRevokeAllIncompatible()
{
final TaskLockboxValidator validator = new TaskLockboxValidator(lockbox, taskStorage);
@ -1325,7 +1325,7 @@ public class TaskLockboxTest
}
@Test
public void testSharedLockCompatibility() throws Exception
public void testSharedLockCompatibility()
{
final TaskLock theLock = validator.expectLockCreated(
TaskLockType.SHARED,
@ -1374,7 +1374,7 @@ public class TaskLockboxTest
}
@Test
public void testSharedLockCanRevokeAllIncompatible() throws Exception
public void testSharedLockCanRevokeAllIncompatible()
{
final TaskLock exclusiveLock = validator.expectLockCreated(
TaskLockType.EXCLUSIVE,
@ -1415,7 +1415,7 @@ public class TaskLockboxTest
}
@Test
public void testAppendLockCompatibility() throws Exception
public void testAppendLockCompatibility()
{
final TaskLock theLock = validator.expectLockCreated(
TaskLockType.APPEND,
@ -1473,7 +1473,7 @@ public class TaskLockboxTest
}
@Test
public void testAppendLockCanRevokeAllIncompatible() throws Exception
public void testAppendLockCanRevokeAllIncompatible()
{
final TaskLock sharedLock = validator.expectLockCreated(
TaskLockType.SHARED,
@ -1521,7 +1521,7 @@ public class TaskLockboxTest
@Test
public void testReplaceLockCompatibility() throws Exception
public void testReplaceLockCompatibility()
{
final TaskLock theLock = validator.expectLockCreated(
TaskLockType.REPLACE,
@ -1566,7 +1566,7 @@ public class TaskLockboxTest
}
@Test
public void testReplaceLockCanRevokeAllIncompatible() throws Exception
public void testReplaceLockCanRevokeAllIncompatible()
{
final TaskLock appendLock0 = validator.expectLockCreated(
TaskLockType.APPEND,
@ -1619,7 +1619,7 @@ public class TaskLockboxTest
}
@Test
public void testGetLockedIntervalsForRevokedLocks() throws Exception
public void testGetLockedIntervalsForRevokedLocks()
{
// Acquire lock for a low priority task
final Task lowPriorityTask = NoopTask.create(5);
@ -1663,7 +1663,7 @@ public class TaskLockboxTest
}
@Test
public void testFailedToReacquireTaskLock() throws Exception
public void testFailedToReacquireTaskLock()
{
// Tasks to be failed have a group id with the substring "FailingLockAcquisition"
// Please refer to NullLockPosseTaskLockbox
@ -1704,7 +1704,7 @@ public class TaskLockboxTest
}
@Test
public void testConflictsWithOverlappingSharedLocks() throws Exception
public void testConflictsWithOverlappingSharedLocks()
{
TaskLock conflictingLock = validator.expectLockCreated(
TaskLockType.SHARED,
@ -1744,7 +1744,7 @@ public class TaskLockboxTest
this.taskStorage = taskStorage;
}
public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority) throws Exception
public TaskLock expectLockCreated(TaskLockType type, Interval interval, int priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
Assert.assertNotNull(lock);
@ -1757,7 +1757,7 @@ public class TaskLockboxTest
lockbox.revokeLock(lockToTaskIdMap.get(lock), lock);
}
public void expectLockNotGranted(TaskLockType type, Interval interval, int priority) throws Exception
public void expectLockNotGranted(TaskLockType type, Interval interval, int priority)
{
final TaskLock lock = tryTaskLock(type, interval, priority);
Assert.assertNull(lock);
@ -1785,7 +1785,7 @@ public class TaskLockboxTest
}
}
private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority) throws Exception
private TaskLock tryTaskLock(TaskLockType type, Interval interval, int priority)
{
final Task task = NoopTask.create(priority);
tasks.add(task);

View File

@ -139,7 +139,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
taskClientFactory = createMock(SeekableStreamIndexTaskClientFactory.class);
spec = createMock(SeekableStreamSupervisorSpec.class);
indexTaskClient = createMock(SeekableStreamIndexTaskClient.class);
recordSupplier = (RecordSupplier<String, String, ByteEntity>) createMock(RecordSupplier.class);
recordSupplier = createMock(RecordSupplier.class);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@ -175,7 +175,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testRunning() throws Exception
public void testRunning()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
@ -214,7 +214,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testRunningStreamGetSequenceNumberReturnsNull() throws Exception
public void testRunningStreamGetSequenceNumberReturnsNull()
{
EasyMock.reset(recordSupplier);
EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes();
@ -266,7 +266,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testConnectingToStreamFail() throws Exception
public void testConnectingToStreamFail()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
@ -321,7 +321,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testConnectingToStreamFailRecoveryFailRecovery() throws Exception
public void testConnectingToStreamFailRecoveryFailRecovery()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM))
@ -395,7 +395,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testDiscoveringInitialTasksFailRecoveryFail() throws Exception
public void testDiscoveringInitialTasksFailRecoveryFail()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
@ -560,7 +560,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testCreatingTasksFailRecoveryFail() throws Exception
public void testCreatingTasksFailRecoveryFail()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
@ -638,7 +638,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testSuspended() throws Exception
public void testSuspended()
{
EasyMock.expect(spec.isSuspended()).andReturn(true).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
@ -677,7 +677,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testStopping() throws Exception
public void testStopping()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
@ -718,7 +718,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
@Test
public void testStoppingGracefully() throws Exception
public void testStoppingGracefully()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.exception;
/**
* A generic exception thrown by Druid.
*/
public class DruidException extends RuntimeException
{
public static final int HTTP_CODE_SERVER_ERROR = 500;
public static final int HTTP_CODE_BAD_REQUEST = 400;
private final int responseCode;
private final boolean isTransient;
public DruidException(String message, int responseCode, Throwable cause, boolean isTransient)
{
super(message, cause);
this.responseCode = responseCode;
this.isTransient = isTransient;
}
public int getResponseCode()
{
return responseCode;
}
/**
* Returns true if this is a transient exception and might go away if the
* operation is retried.
*/
public boolean isTransient()
{
return isTransient;
}
}

View File

@ -19,19 +19,26 @@
package org.apache.druid.metadata;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.java.util.common.StringUtils;
public class EntryExistsException extends Exception
/**
* A non-transient Druid metadata exception thrown when trying to insert a
* duplicate entry in the metadata.
*/
public class EntryExistsException extends DruidException
{
public EntryExistsException(String entryId, Throwable t)
private static final int HTTP_BAD_REQUEST = 400;
public EntryExistsException(String entryType, String entryId)
{
super(StringUtils.format("Entry already exists: %s", entryId), t);
this(entryType, entryId, null);
}
public EntryExistsException(String entryId)
public EntryExistsException(String entryType, String entryId, Throwable t)
{
this(entryId, null);
super(StringUtils.format("%s [%s] already exists.", entryType, entryId), HTTP_BAD_REQUEST, t, false);
}
}

View File

@ -56,7 +56,7 @@ public class MetadataStorageConnectorConfig
String connectUri,
String user,
String password,
Map<String, Object> properties
Map<String, String> properties
)
{
MetadataStorageConnectorConfig config = new MetadataStorageConnectorConfig();

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.exception;
import org.junit.Assert;
import org.junit.Test;
public class DruidExceptionTest
{
@Test
public void testExceptionMessageAndResponseCode()
{
DruidException exception = Assert.assertThrows(
DruidException.class,
() -> {
throw new DruidException("an error has occurred", 401, null, true);
}
);
Assert.assertEquals("an error has occurred", exception.getMessage());
Assert.assertEquals(401, exception.getResponseCode());
Assert.assertTrue(exception.isTransient());
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import org.apache.druid.common.exception.DruidException;
import org.junit.Assert;
import org.junit.Test;
public class EntryExistsExceptionTest
{
@Test
public void testExceptionMessageAndResponseCode()
{
EntryExistsException exception = Assert.assertThrows(
EntryExistsException.class,
() -> {
throw new EntryExistsException("task", "100");
}
);
Assert.assertEquals("task [100] already exists.", exception.getMessage());
Assert.assertEquals(DruidException.HTTP_CODE_BAD_REQUEST, exception.getResponseCode());
}
}

View File

@ -77,8 +77,8 @@ public class MetadataStorageConnectorConfigTest
"user",
"\"nothing\""
);
Assert.assertTrue(metadataStorageConnectorConfig.equals(metadataStorageConnectorConfig2));
Assert.assertTrue(metadataStorageConnectorConfig.hashCode() == metadataStorageConnectorConfig2.hashCode());
Assert.assertEquals(metadataStorageConnectorConfig, metadataStorageConnectorConfig2);
Assert.assertEquals(metadataStorageConnectorConfig.hashCode(), metadataStorageConnectorConfig2.hashCode());
}
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
@ -193,12 +193,9 @@ public class MetadataStorageConnectorConfigTest
@Test
public void testCreate()
{
Map<String, Object> props = ImmutableMap.of("key", "value");
MetadataStorageConnectorConfig config = MetadataStorageConnectorConfig.create(
"connectURI",
"user",
"pwd",
props);
Map<String, String> props = ImmutableMap.of("key", "value");
MetadataStorageConnectorConfig config =
MetadataStorageConnectorConfig.create("connectURI", "user", "pwd", props);
Assert.assertEquals("connectURI", config.getConnectURI());
Assert.assertEquals("user", config.getUser());
Assert.assertEquals("pwd", config.getPassword());

View File

@ -169,7 +169,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
|| e instanceof SQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| e instanceof UnableToExecuteStatementException
|| (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause()))
|| connectorIsTransientException(e)
|| (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause())));
@ -183,6 +183,17 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
return false;
}
/**
* Checks if the root cause of the given exception is a PacketTooBigException.
*
* @return false by default. Specific implementations should override this method
* to correctly classify their packet exceptions.
*/
protected boolean isRootCausePacketTooBigException(Throwable t)
{
return false;
}
public void createTable(final String tableName, final Iterable<String> sql)
{
try {
@ -336,12 +347,7 @@ public abstract class SQLMetadataConnector implements MetadataStorageConnector
{
try {
DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
ResultSet columns = databaseMetaData.getColumns(
null,
null,
table,
column
);
ResultSet columns = databaseMetaData.getColumns(null, null, table, column);
return columns.next();
}
catch (SQLException e) {

View File

@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.java.util.common.DateTimes;
@ -165,34 +166,61 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
{
try {
getConnector().retryWithHandle(
(HandleCallback<Void>) handle -> {
final String sql = StringUtils.format(
"INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) "
+ "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)",
getEntryTable()
);
handle.createStatement(sql)
.bind("id", id)
.bind("created_date", timestamp.toString())
.bind("datasource", dataSource)
.bind("payload", jsonMapper.writeValueAsBytes(entry))
.bind("type", type)
.bind("group_id", groupId)
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
},
e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent())
handle -> insertEntryWithHandle(handle, id, timestamp, dataSource, entry, active, status, type, groupId),
this::isTransientDruidException
);
}
catch (CallbackFailedException e) {
propagateAsRuntimeException(e.getCause());
}
catch (Exception e) {
if (isStatementException(e) && getEntry(id).isPresent()) {
throw new EntryExistsException(id, e);
} else {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
propagateAsRuntimeException(e);
}
}
private void propagateAsRuntimeException(Throwable t)
{
Throwables.propagateIfPossible(t);
throw new RuntimeException(t);
}
/**
* Inserts the given entry into the metadata store. This method wraps any
* exception thrown in a {@link DruidException}. When used in a HandleCallback,
* that exception is further wrapped in a {@link CallbackFailedException}.
*/
private Void insertEntryWithHandle(
Handle handle,
String entryId,
DateTime timestamp,
String dataSource,
EntryType entry,
boolean active,
StatusType status,
String type,
String groupId
)
{
try {
final String sql = StringUtils.format(
"INSERT INTO %s (id, created_date, datasource, payload, type, group_id, active, status_payload) "
+ "VALUES (:id, :created_date, :datasource, :payload, :type, :group_id, :active, :status_payload)",
getEntryTable()
);
handle.createStatement(sql)
.bind("id", entryId)
.bind("created_date", timestamp.toString())
.bind("datasource", dataSource)
.bind("payload", jsonMapper.writeValueAsBytes(entry))
.bind("type", type)
.bind("group_id", groupId)
.bind("active", active)
.bind("status_payload", jsonMapper.writeValueAsBytes(status))
.execute();
return null;
}
catch (Throwable t) {
throw wrapInDruidException(entryId, t);
}
}
@ -202,6 +230,17 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
(e instanceof CallbackFailedException && e.getCause() instanceof StatementException);
}
private boolean isTransientDruidException(Throwable t)
{
if (t instanceof CallbackFailedException) {
return isTransientDruidException(t.getCause());
} else if (t instanceof DruidException) {
return ((DruidException) t).isTransient();
} else {
return getConnector().isTransientException(t);
}
}
@Override
public boolean setStatus(final String entryId, final boolean active, final StatusType status)
{
@ -304,10 +343,7 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
final Query<Map<String, Object>> query;
switch (entry.getKey()) {
case ACTIVE:
query = createActiveTaskStreamingQuery(
handle,
dataSource
);
query = createActiveTaskStreamingQuery(handle, dataSource);
tasks.addAll(query.map(taskInfoMapper).list());
break;
case COMPLETE:
@ -390,6 +426,35 @@ public abstract class SQLMetadataStorageActionHandler<EntryType, StatusType, Log
);
}
/**
* Wraps the given error in a user friendly DruidException.
*/
private DruidException wrapInDruidException(String taskId, Throwable t)
{
if (isStatementException(t) && getEntry(taskId).isPresent()) {
return new EntryExistsException("Task", taskId);
} else if (connector.isRootCausePacketTooBigException(t)) {
return new DruidException(
StringUtils.format(
"Payload for task [%s] exceeds the packet limit."
+ " Update the max_allowed_packet on your metadata store"
+ " server or in the connection properties.",
taskId
),
DruidException.HTTP_CODE_BAD_REQUEST,
t,
false
);
} else {
return new DruidException(
StringUtils.format("Encountered metadata exception for task [%s]", taskId),
DruidException.HTTP_CODE_SERVER_ERROR,
t,
connector.isTransientException(t)
);
}
}
/**
* Fetches the columns needed to build TaskStatusPlus for completed tasks
* Please note that this requires completion of data migration to avoid empty values for task type and groupId

View File

@ -19,9 +19,9 @@
package org.apache.druid.metadata;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.Assert;
@ -30,14 +30,19 @@ import org.junit.Rule;
import org.junit.Test;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientConnectionException;
import java.sql.SQLTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class SQLMetadataConnectorTest
@ -47,7 +52,6 @@ public class SQLMetadataConnectorTest
private TestDerbyConnector connector;
private MetadataStorageTablesConfig tablesConfig;
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
@Before
public void setUp()
@ -78,28 +82,23 @@ public class SQLMetadataConnectorTest
connector.createSupervisorsTable();
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
for (String table : tables) {
Assert.assertTrue(
StringUtils.format("table %s was not created!", table),
connector.tableExists(handle, table)
);
}
String taskTable = tablesConfig.getTasksTable();
for (String column : Arrays.asList("type", "group_id")) {
Assert.assertTrue(
StringUtils.format("Tasks table column %s was not created!", column),
connector.tableContainsColumn(handle, taskTable, column)
);
}
return null;
handle -> {
for (String table : tables) {
Assert.assertTrue(
StringUtils.format("table %s was not created!", table),
connector.tableExists(handle, table)
);
}
String taskTable = tablesConfig.getTasksTable();
for (String column : Arrays.asList("type", "group_id")) {
Assert.assertTrue(
StringUtils.format("Tasks table column %s was not created!", column),
connector.tableContainsColumn(handle, taskTable, column)
);
}
return null;
}
);
@ -147,16 +146,8 @@ public class SQLMetadataConnectorTest
private void dropTable(final String tableName)
{
connector.getDBI().withHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute();
return null;
}
}
handle -> handle.createStatement(StringUtils.format("DROP TABLE %s", tableName))
.execute()
);
}
@ -213,79 +204,74 @@ public class SQLMetadataConnectorTest
}
}
private MetadataStorageConnectorConfig getDbcpPropertiesFile(
boolean createTables,
String host,
int port,
String connectURI,
String user,
String pwdString,
String pwd
) throws Exception
{
return JSON_MAPPER.readValue(
"{" +
"\"createTables\": \"" + createTables + "\"," +
"\"host\": \"" + host + "\"," +
"\"port\": \"" + port + "\"," +
"\"connectURI\": \"" + connectURI + "\"," +
"\"user\": \"" + user + "\"," +
"\"password\": " + pwdString + "," +
"\"dbcp\": {\n" +
" \"maxConnLifetimeMillis\" : 1200000,\n" +
" \"defaultQueryTimeout\" : \"30000\"\n" +
"}" +
"}",
MetadataStorageConnectorConfig.class
);
}
@Test
public void testBasicDataSourceCreation() throws Exception
public void testBasicDataSourceCreation()
{
MetadataStorageConnectorConfig config = getDbcpPropertiesFile(
true,
"host",
1234,
"connectURI",
"user",
"{\"type\":\"default\",\"password\":\"nothing\"}",
"nothing"
Map<String, String> props = ImmutableMap.of(
"maxConnLifetimeMillis", "1200000",
"defaultQueryTimeout", "30000"
);
MetadataStorageConnectorConfig config =
MetadataStorageConnectorConfig.create("connectURI", "user", "password", props);
TestSQLMetadataConnector testSQLMetadataConnector = new TestSQLMetadataConnector(
Suppliers.ofInstance(config),
Suppliers.ofInstance(tablesConfig)
);
BasicDataSource dataSource = testSQLMetadataConnector.getDatasource();
Assert.assertEquals(dataSource.getMaxConnLifetimeMillis(), 1200000);
Assert.assertEquals((long) dataSource.getDefaultQueryTimeout(), 30000);
Assert.assertEquals(dataSource.getDefaultQueryTimeout().intValue(), 30000);
}
private boolean verifyTaskTypeAndGroupId(String table, String id, String type, String groupId)
@Test
public void testIsTransientException()
{
try {
return connector.retryWithHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle) throws SQLException
{
Statement statement = handle.getConnection().createStatement();
ResultSet resultSet = statement.executeQuery(
StringUtils.format("SELECT * FROM %1$s WHERE id = '%2$s'", table, id)
);
resultSet.next();
boolean flag = type.equals(resultSet.getString("type"))
&& groupId.equals(resultSet.getString("group_id"));
statement.close();
return flag;
}
}
);
}
catch (Exception e) {
e.printStackTrace();
return false;
}
MetadataStorageConnectorConfig config =
MetadataStorageConnectorConfig.create("connectURI", "user", "password", Collections.emptyMap());
TestSQLMetadataConnector metadataConnector = new TestSQLMetadataConnector(
Suppliers.ofInstance(config),
Suppliers.ofInstance(tablesConfig)
);
// Transient exceptions
Assert.assertTrue(metadataConnector.isTransientException(new RetryTransactionException("")));
Assert.assertTrue(metadataConnector.isTransientException(new SQLRecoverableException()));
Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientException()));
Assert.assertTrue(metadataConnector.isTransientException(new SQLTransientConnectionException()));
// Non transient exceptions
Assert.assertFalse(metadataConnector.isTransientException(null));
Assert.assertFalse(metadataConnector.isTransientException(new SQLException()));
Assert.assertFalse(metadataConnector.isTransientException(new UnableToExecuteStatementException("")));
// Nested transient exceptions
Assert.assertTrue(
metadataConnector.isTransientException(
new CallbackFailedException(new SQLTransientException())
)
);
Assert.assertTrue(
metadataConnector.isTransientException(
new UnableToObtainConnectionException(new SQLException())
)
);
Assert.assertTrue(
metadataConnector.isTransientException(
new UnableToExecuteStatementException(new SQLTransientException())
)
);
// Nested non-transient exceptions
Assert.assertFalse(
metadataConnector.isTransientException(
new CallbackFailedException(new SQLException())
)
);
Assert.assertFalse(
metadataConnector.isTransientException(
new UnableToExecuteStatementException(new SQLException())
)
);
}
}

View File

@ -127,7 +127,7 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test
public void testEntryAndStatus() throws Exception
public void testEntryAndStatus()
{
Map<String, Object> entry = ImmutableMap.of("numericId", 1234);
Map<String, Object> status1 = ImmutableMap.of("count", 42);
@ -256,7 +256,7 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test(timeout = 60_000L)
public void testRepeatInsert() throws Exception
public void testRepeatInsert()
{
final String entryId = "abcd";
Map<String, Object> entry = ImmutableMap.of("a", 1);
@ -269,7 +269,7 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test
public void testLogs() throws Exception
public void testLogs()
{
final String entryId = "abcd";
Map<String, Object> entry = ImmutableMap.of("a", 1);
@ -301,7 +301,7 @@ public class SQLMetadataStorageActionHandlerTest
@Test
public void testLocks() throws Exception
public void testLocks()
{
final String entryId = "ABC123";
Map<String, Object> entry = ImmutableMap.of("a", 1);
@ -404,7 +404,7 @@ public class SQLMetadataStorageActionHandlerTest
}
@Test
public void testRemoveTasksOlderThan() throws Exception
public void testRemoveTasksOlderThan()
{
final String entryId1 = "1234";
Map<String, Object> entry1 = ImmutableMap.of("numericId", 1234);