Skip to content

In Sedona 1.9.0 aggregate functions fail on the second call within a Spark-Connect session #3044

@jeromewh

Description

@jeromewh

Same code shown in the example below was working in Sedona 1.8.1.

Expected behavior

A Sedona aggregate function such as ST_Envelope_Aggr can still be used repeatedly in Sedona 1.9.0.
Every call executes correctly, whether used in groupBy().agg(...) or inside a window expression.

Actual behavior

Within a Spark Connect session the first call to ST_Envelope_Aggr succeeds and returns the correct result. The second, identical call fails :

...
pyspark.errors.exceptions.connect.UnsupportedOperationException:
Aggregate function ST_Envelope_Aggr cannot be used as a regular function
...

JVM stack trace:

java.lang.UnsupportedOperationException
        at org.apache.sedona.sql.UDF.AbstractCatalog.$anonfun$registerAggregateFunction$1(AbstractCatalog.scala:124)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction(FunctionRegistry.scala:241)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistryBase.lookupFunction$(FunctionRegistry.scala:235)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:307)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.$anonfun$resolveBuiltinOrTempFunctionInternal$1(SessionCatalog.scala:2097)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupTempFuncWithViewContext(SessionCatalog.scala:2107)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolveBuiltinOrTempFunctionInternal(SessionCatalog.scala:2097)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolveBuiltinOrTempFunction(SessionCatalog.scala:2074)
        at org.apache.spark.sql.catalyst.analysis.FunctionResolution.resolveBuiltinOrTempFunction(FunctionResolution.scala:99)
        at org.apache.spark.sql.catalyst.analysis.FunctionResolution.$anonfun$resolveFunction$1(FunctionResolution.scala:52)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:104)
        at org.apache.spark.sql.catalyst.analysis.FunctionResolution.resolveFunction(FunctionResolution.scala:52)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$19$$anonfun$applyOrElse$163.applyOrElse(Analyzer.scala:2364)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$19$$anonfun$applyOrElse$163.applyOrElse(Analyzer.scala:2334)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:545)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:107)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:545)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:542)
        at scala.collection.immutable.Vector1.map(Vector.scala:2141)
        at scala.collection.immutable.Vector1.map(Vector.scala:386)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:729)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:542)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:542)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1264)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1263)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:587)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:542)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsUpWithPruning$1(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:257)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:107)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:257)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:269)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:275)
        at scala.collection.immutable.List.map(List.scala:240)
        at scala.collection.immutable.List.map(List.scala:79)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:275)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$5(QueryPlan.scala:280)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:333)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:280)
        at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUpWithPruning(QueryPlan.scala:245)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$19.applyOrElse(Analyzer.scala:2334)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$19.applyOrElse(Analyzer.scala:2204)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:139)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:107)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:139)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$2(AnalysisHelper.scala:136)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1264)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1263)
        at org.apache.spark.sql.catalyst.plans.logical.Aggregate.mapChildren(basicLogicalOperators.scala:1156)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:136)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:416)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:135)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:131)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:37)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:2204)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:2201)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:248)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:183)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:179)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:245)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:237)
        at scala.collection.immutable.List.foreach(List.scala:323)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:237)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:343)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:339)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:339)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:289)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:207)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:207)
        at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.resolveInFixedPoint(HybridAnalyzer.scala:236)
        at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.$anonfun$apply$1(HybridAnalyzer.scala:91)
        at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.withTrackedAnalyzerBridgeState(HybridAnalyzer.scala:122)
        at org.apache.spark.sql.catalyst.analysis.resolver.HybridAnalyzer.apply(HybridAnalyzer.scala:84)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:322)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:423)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:322)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$2(QueryExecution.scala:139)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:330)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:717)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:330)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:329)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$lazyAnalyzed$1(QueryExecution.scala:139)
        at scala.util.Try$.apply(Try.scala:217)
        at org.apache.spark.util.Utils$.doTryWithCallerStacktrace(Utils.scala:1392)
        at org.apache.spark.util.Utils$.getTryWithCallerStacktrace(Utils.scala:1453)
        at org.apache.spark.util.LazyTry.get(LazyTry.scala:58)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:150)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:90)
        at org.apache.spark.sql.classic.Dataset$.$anonfun$ofRows$5(Dataset.scala:138)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at org.apache.spark.sql.classic.Dataset$.ofRows(Dataset.scala:135)
        at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:76)
        at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
        at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:197)
        at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:396)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:396)
        at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:185)
        at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
        at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:395)
        at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:197)
        at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
        at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:334)

Steps to reproduce

Run the attached docker-compose.yml to start a local Spark-Connect Server that has the Sedona JARs and SedonaSqlExtensions enabled.

Run the test.py against the local server.

python test.py sc://localhost:15002

Output:

[call 1] OK - 2 groups
[call 2] ... UnsupportedOperationException: Aggregate function ST_Envelope_Aggr cannot be used as a regular function

The minimal trigger is a plain groupBy("group_id").agg(count("*").alias("count"), ST_Perimeter(ST_Envelope_Aggr("geometry"), True).alias("union_geom")) executed twice.
The same failure occurs when the aggregate is used inside a window expression (the original production code path).

Version matrix (bisect)

The code is unchanged across all combinations; only the Sedona/Spark versions differ (see Dockerfiles):

Sedona Spark Result
1.8.1 4.0.2 works
1.9.0 4.0.2 fails
1.9.0 4.1.2 fails

The Problem seems to be tied to Sedona 1.9.0 and is independent of the Spark version (it reproduces on both 4.0.2 and 4.1.2). 1.8.1 is the last working version.

Attached Files

1_8_Dockerfile.txt

1_9_Dockerfile.txt

docker-compose.yml

start.sh

test.py

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions