-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Add failure event for queries that were posted but not submitted #24205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,13 +25,19 @@ | |
import io.opentelemetry.api.trace.Span; | ||
import io.opentelemetry.api.trace.StatusCode; | ||
import io.opentelemetry.api.trace.Tracer; | ||
import io.trino.SessionRepresentation; | ||
import io.trino.client.QueryError; | ||
import io.trino.client.QueryResults; | ||
import io.trino.client.StatementStats; | ||
import io.trino.client.TypedQueryData; | ||
import io.trino.event.QueryMonitor; | ||
import io.trino.execution.ExecutionFailureInfo; | ||
import io.trino.execution.LocationFactory; | ||
import io.trino.execution.QueryManagerConfig; | ||
import io.trino.execution.QueryState; | ||
import io.trino.operator.RetryPolicy; | ||
import io.trino.server.BasicQueryInfo; | ||
import io.trino.server.BasicQueryStats; | ||
import io.trino.server.ExternalUriInfo; | ||
import io.trino.server.GoneException; | ||
import io.trino.server.HttpRequestSessionContextFactory; | ||
|
@@ -43,7 +49,11 @@ | |
import io.trino.server.security.ResourceSecurity; | ||
import io.trino.spi.ErrorCode; | ||
import io.trino.spi.QueryId; | ||
import io.trino.spi.TrinoException; | ||
import io.trino.spi.security.Identity; | ||
import io.trino.spi.type.TimeZoneKey; | ||
import io.trino.sql.SqlEnvironmentConfig; | ||
import io.trino.sql.SqlPath; | ||
import io.trino.tracing.TrinoAttributes; | ||
import jakarta.annotation.Nullable; | ||
import jakarta.annotation.PostConstruct; | ||
|
@@ -68,6 +78,9 @@ | |
import jakarta.ws.rs.core.Response; | ||
|
||
import java.net.URI; | ||
import java.security.Principal; | ||
import java.time.Instant; | ||
import java.util.Locale; | ||
import java.util.Optional; | ||
import java.util.OptionalDouble; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
@@ -95,7 +108,10 @@ | |
import static io.trino.server.protocol.Slug.Context.QUEUED_QUERY; | ||
import static io.trino.server.security.ResourceSecurity.AccessType.AUTHENTICATED_USER; | ||
import static io.trino.server.security.ResourceSecurity.AccessType.PUBLIC; | ||
import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY; | ||
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; | ||
import static io.trino.spi.StandardErrorCode.USER_CANCELED; | ||
import static io.trino.util.Failures.toFailure; | ||
import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; | ||
import static java.util.Objects.requireNonNull; | ||
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; | ||
|
@@ -121,6 +137,9 @@ public class QueuedStatementResource | |
|
||
private final boolean compressionEnabled; | ||
private final QueryManager queryManager; | ||
private final QueryMonitor queryMonitor; | ||
private final String defaultSqlPath; | ||
private final LocationFactory locationFactory; | ||
|
||
@Inject | ||
public QueuedStatementResource( | ||
|
@@ -130,7 +149,10 @@ public QueuedStatementResource( | |
DispatchExecutor executor, | ||
QueryInfoUrlFactory queryInfoUrlTemplate, | ||
ServerConfig serverConfig, | ||
QueryManagerConfig queryManagerConfig) | ||
QueryManagerConfig queryManagerConfig, | ||
QueryMonitor queryMonitor, | ||
SqlEnvironmentConfig sqlEnvironmentConfig, | ||
LocationFactory locationFactory) | ||
{ | ||
this.sessionContextFactory = requireNonNull(sessionContextFactory, "sessionContextFactory is null"); | ||
this.dispatchManager = requireNonNull(dispatchManager, "dispatchManager is null"); | ||
|
@@ -140,6 +162,9 @@ public QueuedStatementResource( | |
this.queryInfoUrlFactory = requireNonNull(queryInfoUrlTemplate, "queryInfoUrlTemplate is null"); | ||
this.compressionEnabled = serverConfig.isQueryResultsCompressionEnabled(); | ||
queryManager = new QueryManager(queryManagerConfig.getClientTimeout()); | ||
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | ||
this.defaultSqlPath = requireNonNull(sqlEnvironmentConfig.getPath(), "path is null"); | ||
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null"); | ||
} | ||
|
||
@PostConstruct | ||
|
@@ -182,7 +207,7 @@ private Query registerQuery(String statement, HttpServletRequest servletRequest, | |
MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders(); | ||
|
||
SessionContext sessionContext = sessionContextFactory.createSessionContext(headers, remoteAddress, identity); | ||
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer); | ||
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory, tracer, queryMonitor, defaultSqlPath, locationFactory); | ||
queryManager.registerQuery(query); | ||
|
||
// let authentication filter know that identity lifecycle has been handed off | ||
|
@@ -306,12 +331,24 @@ private static final class Query | |
private final Span querySpan; | ||
private final Slug slug = Slug.createNew(); | ||
private final AtomicLong lastToken = new AtomicLong(); | ||
private final QueryMonitor queryMonitor; | ||
private final String defaultPath; | ||
private final LocationFactory locationFactory; | ||
|
||
private final long initTime = System.nanoTime(); | ||
private final AtomicReference<Boolean> submissionGate = new AtomicReference<>(); | ||
private final SettableFuture<Void> creationFuture = SettableFuture.create(); | ||
|
||
public Query(String query, SessionContext sessionContext, DispatchManager dispatchManager, QueryInfoUrlFactory queryInfoUrlFactory, Tracer tracer) | ||
private boolean cancelled; | ||
|
||
public Query( | ||
String query, | ||
SessionContext sessionContext, | ||
DispatchManager dispatchManager, | ||
QueryInfoUrlFactory queryInfoUrlFactory, | ||
Tracer tracer, | ||
QueryMonitor queryMonitor, | ||
String defaultPath, | ||
LocationFactory locationFactory) | ||
{ | ||
this.query = requireNonNull(query, "query is null"); | ||
this.sessionContext = requireNonNull(sessionContext, "sessionContext is null"); | ||
|
@@ -323,6 +360,9 @@ public Query(String query, SessionContext sessionContext, DispatchManager dispat | |
this.querySpan = tracer.spanBuilder("query") | ||
.setAttribute(TrinoAttributes.QUERY_ID, queryId.toString()) | ||
.startSpan(); | ||
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null"); | ||
this.defaultPath = requireNonNull(defaultPath, "defaultPath is null"); | ||
this.locationFactory = requireNonNull(locationFactory, "locationFactory is null"); | ||
} | ||
|
||
public QueryId getQueryId() | ||
|
@@ -400,15 +440,86 @@ public QueryResults getQueryResults(long token, ExternalUriInfo externalUriInfo) | |
|
||
public void cancel() | ||
{ | ||
creationFuture.addListener(() -> dispatchManager.cancelQuery(queryId), directExecutor()); | ||
cancelled = true; | ||
creationFuture.addListener(() -> cancelInternal(), directExecutor()); | ||
} | ||
|
||
private void cancelInternal() | ||
{ | ||
if (!dispatchManager.cancelQuery(queryId)) { | ||
queryMonitor.queryImmediateFailureEvent( | ||
getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()), | ||
toFailure(new TrinoException(USER_CANCELED, "Query was canceled"))); | ||
} | ||
} | ||
|
||
public void destroy() | ||
{ | ||
querySpan.setStatus(StatusCode.ERROR).end(); | ||
if (!submissionGate.get()) { | ||
if (cancelled) { | ||
queryMonitor.queryImmediateFailureEvent( | ||
getBasicQueryInfoForFailure(USER_CANCELED.toErrorCode()), | ||
toFailure(new TrinoException(USER_CANCELED, "Query was canceled"))); | ||
Comment on lines
+461
to
+463
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I do not get the flow for cancelation. |
||
} | ||
else { | ||
queryMonitor.queryImmediateFailureEvent( | ||
getBasicQueryInfoForFailure(ABANDONED_QUERY.toErrorCode()), | ||
toFailure(new TrinoException(ABANDONED_QUERY, "Query was abandoned %s was abandoned by the client, as it never checked for results after submission".formatted(queryId)))); | ||
} | ||
} | ||
sessionContext.getIdentity().destroy(); | ||
} | ||
|
||
private BasicQueryInfo getBasicQueryInfoForFailure(ErrorCode errorCode) | ||
{ | ||
return new BasicQueryInfo( | ||
queryId, | ||
new SessionRepresentation( | ||
queryId.toString(), | ||
querySpan, | ||
sessionContext.getTransactionId(), | ||
sessionContext.supportClientTransaction(), | ||
sessionContext.getIdentity().getUser(), | ||
sessionContext.getOriginalIdentity().getUser(), | ||
sessionContext.getIdentity().getGroups(), | ||
sessionContext.getOriginalIdentity().getGroups(), | ||
sessionContext.getIdentity().getPrincipal().map(Principal::toString), | ||
sessionContext.getIdentity().getEnabledRoles(), | ||
sessionContext.getSource(), | ||
sessionContext.getCatalog(), | ||
sessionContext.getSchema(), | ||
SqlPath.buildPath(sessionContext.getPath().orElse(defaultPath), sessionContext.getCatalog()), | ||
sessionContext.getTraceToken(), | ||
sessionContext.getTimeZoneId().map(TimeZoneKey::getTimeZoneKey).orElse(null), | ||
sessionContext.getLanguage().map(s -> Locale.forLanguageTag(s)).orElse(Locale.getDefault()), | ||
sessionContext.getRemoteUserAddress(), | ||
sessionContext.getUserAgent(), | ||
sessionContext.getClientInfo(), | ||
sessionContext.getClientTags(), | ||
sessionContext.getClientCapabilities(), | ||
sessionContext.getResourceEstimates(), | ||
Instant.now(), | ||
sessionContext.getSystemProperties(), | ||
sessionContext.getCatalogSessionProperties(), | ||
sessionContext.getIdentity().getCatalogRoles(), | ||
sessionContext.getPreparedStatements(), | ||
sessionContext.getProtocolHeaders().getProtocolName(), | ||
sessionContext.getQueryDataEncoding()), | ||
Optional.empty(), | ||
FAILED, | ||
false, | ||
locationFactory.createQueryLocation(queryId), | ||
query, | ||
Optional.empty(), | ||
Optional.empty(), | ||
BasicQueryStats.immediateFailureQueryStats(), | ||
errorCode.getType(), | ||
errorCode, | ||
Optional.empty(), | ||
RetryPolicy.NONE); | ||
} | ||
|
||
private QueryResults createQueryResults(long token, ExternalUriInfo externalUriInfo, DispatchInfo dispatchInfo) | ||
{ | ||
URI nextUri = getNextUri(token, externalUriInfo, dispatchInfo); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -177,6 +177,55 @@ public void queryCreatedEvent(BasicQueryInfo queryInfo) | |
Optional::empty))); | ||
} | ||
|
||
private static QueryStatistics createEmptyStatistics(Duration queuedTime) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unrelated change. Fine on its own but move to separate commit |
||
{ | ||
return new QueryStatistics( | ||
ofMillis(0), | ||
ofMillis(0), | ||
ofMillis(0), | ||
queuedTime, | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
ImmutableList.of(), | ||
0, | ||
true, | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
Optional.empty()); | ||
} | ||
|
||
public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailureInfo failure) | ||
{ | ||
eventListenerManager.queryCompleted(requiresAnonymizedPlan -> new QueryCompletedEvent( | ||
|
@@ -194,51 +243,7 @@ public void queryImmediateFailureEvent(BasicQueryInfo queryInfo, ExecutionFailur | |
Optional.empty(), | ||
Optional.empty(), | ||
Optional::empty), | ||
new QueryStatistics( | ||
ofMillis(0), | ||
ofMillis(0), | ||
ofMillis(0), | ||
ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis()), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
Optional.empty(), | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
0, | ||
ImmutableList.of(), | ||
0, | ||
true, | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
ImmutableList.of(), | ||
Optional.empty()), | ||
createEmptyStatistics(ofMillis(queryInfo.getQueryStats().getQueuedTime().toMillis())), | ||
createQueryContext( | ||
queryInfo.getSession(), | ||
queryInfo.getResourceGroupId(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why can't we have event logic in
dispatchManager.cancelQuery
?Is that lack of session or the fact that is used from QueryResource too? Or ... ?