1616
1717package com .navercorp .pinpoint .web .applicationmap .appender .server ;
1818
19+ import com .navercorp .pinpoint .common .server .util .time .Range ;
1920import com .navercorp .pinpoint .common .trace .ServiceType ;
2021import com .navercorp .pinpoint .web .applicationmap .nodes .Node ;
2122import com .navercorp .pinpoint .web .applicationmap .nodes .NodeList ;
2223import com .navercorp .pinpoint .web .applicationmap .nodes .ServerGroupList ;
2324import com .navercorp .pinpoint .web .applicationmap .rawdata .LinkDataDuplexMap ;
24- import com .navercorp .pinpoint .common .server .util .time .Range ;
25- import org .apache .logging .log4j .Logger ;
25+ import com .navercorp .pinpoint .web .vo .Application ;
2626import org .apache .logging .log4j .LogManager ;
27+ import org .apache .logging .log4j .Logger ;
2728import org .springframework .util .CollectionUtils ;
2829
2930import java .time .Instant ;
3536import java .util .concurrent .Executor ;
3637import java .util .concurrent .TimeUnit ;
3738import java .util .concurrent .TimeoutException ;
38- import java .util .concurrent .atomic .AtomicBoolean ;
3939import java .util .function .Supplier ;
4040
4141/**
@@ -55,70 +55,101 @@ public DefaultServerInfoAppender(ServerGroupListFactory serverGroupListFactory,
5555 }
5656
5757 @ Override
58- public void appendServerInfo (final Range range , final NodeList source , final LinkDataDuplexMap linkDataDuplexMap , final long timeoutMillis ) {
58+ public void appendServerInfo (final Range range , final NodeList source , final LinkDataDuplexMap linkDataDuplexMap , long timeoutMillis ) {
5959 if (source == null ) {
6060 return ;
6161 }
62+
6263 Collection <Node > nodes = source .getNodeList ();
6364 if (CollectionUtils .isEmpty (nodes )) {
6465 return ;
6566 }
66- final AtomicBoolean stopSign = new AtomicBoolean ();
67- final CompletableFuture [] futures = getServerGroupListFutures (range , nodes , linkDataDuplexMap , stopSign );
68- if (-1 == timeoutMillis ) {
69- // Returns the result value when complete
70- CompletableFuture .allOf (futures ).join ();
71- } else {
72- try {
73- CompletableFuture .allOf (futures ).get (timeoutMillis , TimeUnit .MILLISECONDS );
74- } catch (Exception e ) { // InterruptedException, ExecutionException, TimeoutException
75- stopSign .set (Boolean .TRUE );
76- String cause = "an error occurred while adding server info" ;
77- if (e instanceof TimeoutException ) {
78- cause += " build timed out. timeout=" + timeoutMillis + "ms" ;
79- }
80- throw new RuntimeException (cause , e );
81- }
67+
68+ final List <ServerGroupRequest > serverGroupRequest = getServerGroupListFutures (range , nodes , linkDataDuplexMap );
69+
70+ timeoutMillis = defaultTimeoutMillis (timeoutMillis );
71+ join (serverGroupRequest , timeoutMillis );
72+
73+ bind (serverGroupRequest );
74+ }
75+
76+ private long defaultTimeoutMillis (long timeoutMillis ) {
77+ if (timeoutMillis == -1 ) {
78+ return Long .MAX_VALUE ;
8279 }
80+ return timeoutMillis ;
8381 }
8482
85- private CompletableFuture [] getServerGroupListFutures (Range range , Collection <Node > nodes , LinkDataDuplexMap linkDataDuplexMap , AtomicBoolean stopSign ) {
86- List <CompletableFuture < Void > > serverGroupListFutures = new ArrayList <>();
83+ private List < ServerGroupRequest > getServerGroupListFutures (Range range , Collection <Node > nodes , LinkDataDuplexMap linkDataDuplexMap ) {
84+ List <ServerGroupRequest > serverGroupListFutures = new ArrayList <>();
8785 for (Node node : nodes ) {
8886 if (node .getServiceType ().isUnknown ()) {
8987 // we do not know the server info for unknown nodes
9088 continue ;
9189 }
92- CompletableFuture <Void > serverGroupListFuture = getServerGroupListFuture (range , node , linkDataDuplexMap , stopSign );
93- serverGroupListFutures .add (serverGroupListFuture );
90+ CompletableFuture <ServerGroupList > serverGroupListFuture = getServerGroupListFuture (range , node , linkDataDuplexMap );
91+ serverGroupListFutures .add (new ServerGroupRequest ( node , serverGroupListFuture ) );
9492 }
95- return serverGroupListFutures . toArray ( new CompletableFuture [ 0 ]) ;
93+ return serverGroupListFutures ;
9694 }
9795
98- private CompletableFuture <Void > getServerGroupListFuture (Range range , Node node , LinkDataDuplexMap linkDataDuplexMap , AtomicBoolean stopSign ) {
99- CompletableFuture <ServerGroupList > serverGroupListFuture ;
100- ServiceType nodeServiceType = node .getServiceType ();
96+ private record ServerGroupRequest (Node node , CompletableFuture <ServerGroupList > future ) {
97+ }
98+
99+ private CompletableFuture <ServerGroupList > getServerGroupListFuture (Range range , Node node , LinkDataDuplexMap linkDataDuplexMap ) {
100+ final Application application = node .getApplication ();
101+ final ServiceType nodeServiceType = application .getServiceType ();
101102 if (nodeServiceType .isWas ()) {
102- final Instant to = range .getToInstant ();
103- serverGroupListFuture = CompletableFuture .supplyAsync (new Supplier <ServerGroupList >() {
103+ return CompletableFuture .supplyAsync (new Supplier <>() {
104104 @ Override
105105 public ServerGroupList get () {
106- if (Boolean .TRUE == stopSign .get ()) { // Stop
107- return serverGroupListFactory .createEmptyNodeInstanceList ();
108- }
106+ final Instant to = range .getToInstant ();
109107 return serverGroupListFactory .createWasNodeInstanceList (node , to );
110108 }
111109 }, executor );
112110 } else if (nodeServiceType .isTerminal () || nodeServiceType .isAlias ()) {
113111 // extract information about the terminal node
114- serverGroupListFuture = CompletableFuture .completedFuture (serverGroupListFactory .createTerminalNodeInstanceList (node , linkDataDuplexMap ));
112+ return CompletableFuture .completedFuture (serverGroupListFactory .createTerminalNodeInstanceList (node , linkDataDuplexMap ));
115113 } else if (nodeServiceType .isQueue ()) {
116- serverGroupListFuture = CompletableFuture .completedFuture (serverGroupListFactory .createQueueNodeInstanceList (node , linkDataDuplexMap ));
114+ return CompletableFuture .completedFuture (serverGroupListFactory .createQueueNodeInstanceList (node , linkDataDuplexMap ));
117115 } else if (nodeServiceType .isUser ()) {
118- serverGroupListFuture = CompletableFuture .completedFuture (serverGroupListFactory .createUserNodeInstanceList ());
119- } else {
120- serverGroupListFuture = CompletableFuture .completedFuture (serverGroupListFactory .createEmptyNodeInstanceList ());
116+ return CompletableFuture .completedFuture (serverGroupListFactory .createUserNodeInstanceList ());
117+ }
118+ return CompletableFuture .completedFuture (serverGroupListFactory .createEmptyNodeInstanceList ());
119+ }
120+
121+
122+ private void join (List <ServerGroupRequest > serverGroupRequests , long timeoutMillis ) {
123+ @ SuppressWarnings ("rawtypes" )
124+ final CompletableFuture [] futures = serverGroupRequests .stream ()
125+ .map (ServerGroupRequest ::future )
126+ .toArray (CompletableFuture []::new );
127+
128+ final CompletableFuture <Void > all = CompletableFuture .allOf (futures );
129+ try {
130+ all .get (timeoutMillis , TimeUnit .MILLISECONDS );
131+ } catch (Throwable e ) {
132+ all .cancel (false );
133+ String cause = "an error occurred while adding server info" ;
134+ if (e instanceof TimeoutException ) {
135+ cause += " build timed out. timeout=" + timeoutMillis + "ms" ;
136+ }
137+ throw new RuntimeException (cause , e );
121138 }
122- return serverGroupListFuture .thenAccept (node ::setServerGroupList );
123139 }
140+
141+ private void bind (List <ServerGroupRequest > serverGroupRequest ) {
142+ for (ServerGroupRequest pair : serverGroupRequest ) {
143+ Node node = pair .node ();
144+ CompletableFuture <ServerGroupList > future = pair .future ();
145+ try {
146+ ServerGroupList serverGroupList = future .getNow (null );
147+ node .setServerGroupList (serverGroupList );
148+ } catch (Throwable th ) {
149+ logger .warn ("Failed to get server info for node {}" , node );
150+ throw new RuntimeException ("Unexpected error" , th );
151+ }
152+ }
153+ }
154+
124155}
0 commit comments