@@ -25,6 +25,11 @@ public class SparkLauncherListener implements SparkAppHandle.Listener {
2525
2626 private static volatile boolean shutdownHookRegistered = false ;
2727
28+ private static long spanStartTimeMs = 0L ;
29+ private static long connectedTimeMs = 0L ;
30+ private static long submittedTimeMs = 0L ;
31+ private static long runningTimeMs = 0L ;
32+
2833 public static synchronized void createLauncherSpan (Object launcher ) {
2934 if (launcherSpan != null ) {
3035 return ;
@@ -40,6 +45,10 @@ public static synchronized void createLauncherSpan(Object launcher) {
4045 span .setSamplingPriority (PrioritySampling .USER_KEEP , SamplingMechanism .DATA_JOBS );
4146 setLauncherConfigTags (span , launcher );
4247 captureEmrStepId (span );
48+ spanStartTimeMs = System .currentTimeMillis ();
49+ connectedTimeMs = 0L ;
50+ submittedTimeMs = 0L ;
51+ runningTimeMs = 0L ;
4352 launcherSpan = span ;
4453
4554 if (!shutdownHookRegistered ) {
@@ -52,6 +61,7 @@ public static synchronized void createLauncherSpan(Object launcher) {
5261 AgentSpan s = launcherSpan ;
5362 if (s != null ) {
5463 log .info ("Finishing spark.launcher span from shutdown hook" );
64+ setTimingMetrics (s );
5565 s .finish ();
5666 launcherSpan = null ;
5767 }
@@ -70,6 +80,7 @@ public static synchronized void finishSpan(boolean isError, String errorMessage)
7080 span .setTag (DDTags .ERROR_TYPE , "Spark Launcher Failed" );
7181 span .setTag (DDTags .ERROR_MSG , errorMessage );
7282 }
83+ setTimingMetrics (span );
7384 span .finish ();
7485 launcherSpan = null ;
7586 }
@@ -82,17 +93,34 @@ public static synchronized void finishSpanWithThrowable(Throwable throwable) {
8293 if (throwable != null ) {
8394 span .addThrowable (throwable );
8495 }
96+ setTimingMetrics (span );
8597 span .finish ();
8698 launcherSpan = null ;
8799 }
88100
101+ private static void setTimingMetrics (AgentSpan span ) {
102+ if (spanStartTimeMs <= 0L ) {
103+ return ;
104+ }
105+ if (connectedTimeMs > 0L ) {
106+ span .setMetric ("spark.launcher.time_to_connected_ms" , connectedTimeMs - spanStartTimeMs );
107+ }
108+ if (submittedTimeMs > 0L ) {
109+ span .setMetric ("spark.launcher.time_to_submitted_ms" , submittedTimeMs - spanStartTimeMs );
110+ }
111+ if (runningTimeMs > 0L ) {
112+ span .setMetric ("spark.launcher.time_to_running_ms" , runningTimeMs - spanStartTimeMs );
113+ }
114+ }
115+
89116 @ Override
90117 public void stateChanged (SparkAppHandle handle ) {
91118 synchronized (SparkLauncherListener .class ) {
92119 SparkAppHandle .State state = handle .getState ();
93120 AgentSpan span = launcherSpan ;
94121 if (span != null ) {
95122 span .setTag ("spark.launcher.app_state" , state .toString ());
123+ recordStateTimestamp (state );
96124
97125 String appId = handle .getAppId ();
98126 if (appId != null ) {
@@ -132,6 +160,23 @@ public void infoChanged(SparkAppHandle handle) {
132160 }
133161 }
134162
163+ private static void recordStateTimestamp (SparkAppHandle .State state ) {
164+ long now = System .currentTimeMillis ();
165+ if (state == SparkAppHandle .State .CONNECTED ) {
166+ if (connectedTimeMs == 0L ) {
167+ connectedTimeMs = now ;
168+ }
169+ } else if (state == SparkAppHandle .State .SUBMITTED ) {
170+ if (submittedTimeMs == 0L ) {
171+ submittedTimeMs = now ;
172+ }
173+ } else if (state == SparkAppHandle .State .RUNNING ) {
174+ if (runningTimeMs == 0L ) {
175+ runningTimeMs = now ;
176+ }
177+ }
178+ }
179+
135180 private static void captureEmrStepId (AgentSpan span ) {
136181 String stepId = EmrUtils .getEmrStepId ();
137182 if (stepId != null ) {
0 commit comments