[Fix](Cloud)decouple min pipeline executor size from ConnectContext#60884
[Fix](Cloud)decouple min pipeline executor size from ConnectContext#60884CalvinKirs wants to merge 1 commit intoapache:masterfrom
Conversation
## Background
getMinPipelineExecutorSize in cloud paths implicitly depended on ConnectContext, which caused two issues:
1. Unstable behavior when no thread-local context is available (e.g. internal/async paths).
2. Unclear API semantics since callers could not explicitly specify the target cluster.
This PR makes the API explicit by requiring clusterName.
## What Changed
1. Removed the no-arg getMinPipelineExecutorSize() API and kept only:
getMinPipelineExecutorSize(String clusterName).
2. Unified SystemInfoService and CloudSystemInfoService implementations to the string-arg API.
3. Updated SessionVariable#getParallelExecInstanceNum() to call the string-arg API, with cluster resolved from session/auth information (instead of directly depending on ConnectContext).
4. Added synchronization in ConnectContext:
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
TPC-H: Total hot run time: 29046 ms |
TPC-DS: Total hot run time: 183665 ms |
FE UT Coverage ReportIncrement line coverage |
There was a problem hiding this comment.
Pull request overview
This PR refactors the “min pipeline executor size” lookup to no longer implicitly depend on thread-local ConnectContext in cloud mode, making the target cluster explicit and reducing NPE risk in async/internal call paths (as seen in #60648).
Changes:
- Removes the no-arg
getMinPipelineExecutorSize()and standardizes ongetMinPipelineExecutorSize(String clusterName). - Updates
SessionVariable#getParallelExecInstanceNum()to use per-session user/cluster resolution instead of directly relying onConnectContext. - Adjusts cloud/non-cloud implementations and updates/extends related unit tests.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java | Changes API to string-arg method (non-cloud implementation still scans all backends). |
| fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java | Removes ConnectContext dependency; returns default when cluster name is empty. |
| fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java | Introduces transient qualifiedUser and explicit cloud-cluster resolution for auto parallelism. |
| fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java | Propagates qualified user and selected cloud cluster into SessionVariable. |
| fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java | Updates calls to use the new string-arg API. |
| fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java | Adapts tests to the new API and adds explicit-cluster test coverage. |
Comments suppressed due to low confidence (1)
fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java:1086
- The new clusterName parameter is currently unused in the non-cloud SystemInfoService implementation (it always scans all backends via getAllBackendsByAllCluster()). To avoid misleading API semantics, add Javadoc clarifying that the parameter is ignored in non-cloud mode (kept only for CloudSystemInfoService override) or consider an overload/renaming that makes this explicit.
public int getMinPipelineExecutorSize(String clusterName) {
List<Backend> currentBackends = null;
try {
currentBackends = getAllBackendsByAllCluster().values().asList();
} catch (UserException e) {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| clusterName = context.getCloudCluster(false); | ||
| } catch (Exception e) { | ||
| return 1; | ||
| } | ||
| return infoService.getMinPipelineExecutorSize(clusterName); |
There was a problem hiding this comment.
Helper getMinPipelineExecutorSizeByContext() catches a broad Exception and returns 1, which can mask unexpected failures (e.g., NPEs) in tests that assert the result is 1. Prefer catching the expected ComputeGroupException (or the specific checked exception thrown by getCloudCluster(false)) and let other exceptions fail the test.
| public int getParallelExecInstanceNum() { | ||
| ConnectContext connectContext = ConnectContext.get(); | ||
| if (connectContext != null && connectContext.getEnv() != null && connectContext.getEnv().getAuth() != null) { | ||
| int userParallelExecInstanceNum = connectContext.getEnv().getAuth() | ||
| .getParallelFragmentExecInstanceNum(connectContext.getQualifiedUser()); | ||
| Env currentEnv = Env.getCurrentEnv(); | ||
| if (!Strings.isNullOrEmpty(qualifiedUser) | ||
| && currentEnv != null | ||
| && currentEnv.getAuth() != null) { | ||
| int userParallelExecInstanceNum = currentEnv.getAuth() | ||
| .getParallelFragmentExecInstanceNum(qualifiedUser); | ||
| if (userParallelExecInstanceNum > 0) { | ||
| return userParallelExecInstanceNum; | ||
| } | ||
| } | ||
| if (parallelPipelineTaskNum == 0) { | ||
| int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize(); | ||
| int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolveCloudClusterForAutoParallel()); | ||
| int autoInstance = (size + 1) / 2; | ||
| return Math.min(autoInstance, maxInstanceNum); | ||
| } else { | ||
| return parallelPipelineTaskNum; | ||
| } | ||
| } | ||
|
|
||
| public void setQualifiedUser(String qualifiedUser) { | ||
| this.qualifiedUser = qualifiedUser; | ||
| } | ||
|
|
||
| private String resolveCloudClusterForAutoParallel() { | ||
| if (!Config.isCloudMode()) { | ||
| return ""; | ||
| } | ||
| if (!Strings.isNullOrEmpty(cloudCluster)) { | ||
| return cloudCluster; | ||
| } | ||
| Env currentEnv = Env.getCurrentEnv(); | ||
| if (currentEnv == null || currentEnv.getAuth() == null || Strings.isNullOrEmpty(qualifiedUser)) { | ||
| return ""; | ||
| } | ||
| return Strings.nullToEmpty(currentEnv.getAuth().getDefaultCloudCluster(qualifiedUser)); | ||
| } |
There was a problem hiding this comment.
getParallelExecInstanceNum() now depends on the new per-session transient qualifiedUser plus resolveCloudClusterForAutoParallel() logic, including behavior when ConnectContext thread-local is absent. There are no existing unit tests covering this method, so regressions here (auth override precedence, default-cluster resolution, empty-cluster fallback) are currently untested; please add focused tests for these new branches.
| @@ -658,6 +661,9 @@ public SessionVariable getSessionVariable() { | |||
|
|
|||
| public void setSessionVariable(SessionVariable sessionVariable) { | |||
| this.sessionVariable = sessionVariable; | |||
| if (this.sessionVariable != null) { | |||
| this.sessionVariable.setQualifiedUser(getQualifiedUser()); | |||
| } | |||
There was a problem hiding this comment.
PR description mentions "Added synchronization in ConnectContext", but the changes here only propagate qualifiedUser/cloudCluster into SessionVariable and do not introduce any synchronization. If synchronization is still required for correctness, it appears missing; otherwise please update the PR description to avoid confusion.
Background
#60648
getMinPipelineExecutorSize in cloud paths implicitly depended on ConnectContext, which caused two issues:
This PR makes the API explicit by requiring clusterName.
What Changed
What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)