1+ /*
2+ * Copyright 2025 NAVER Corp.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+
17+ package com .navercorp .pinpoint .collector .applicationmap .service ;
18+
19+ import com .navercorp .pinpoint .collector .applicationmap .dao .HostApplicationMapDao ;
20+ import com .navercorp .pinpoint .common .profiler .logging .ThrottledLogger ;
21+ import com .navercorp .pinpoint .common .server .applicationmap .Vertex ;
22+ import com .navercorp .pinpoint .common .server .bo .BasicSpan ;
23+ import com .navercorp .pinpoint .common .server .bo .SpanBo ;
24+ import com .navercorp .pinpoint .common .server .bo .SpanChunkBo ;
25+ import com .navercorp .pinpoint .common .server .bo .SpanEventBo ;
26+ import com .navercorp .pinpoint .common .trace .ServiceType ;
27+ import com .navercorp .pinpoint .common .trace .ServiceTypeCategory ;
28+ import com .navercorp .pinpoint .loader .service .ServiceTypeRegistryService ;
29+ import org .apache .commons .collections4 .CollectionUtils ;
30+ import org .apache .logging .log4j .LogManager ;
31+ import org .apache .logging .log4j .Logger ;
32+ import org .springframework .stereotype .Service ;
33+
34+ import java .util .List ;
35+ import java .util .Objects ;
36+
37+ /**
38+ * Trace service implementation for HBase storage.
39+ */
40+ @ Service
41+ public class HbaseApplicationMapService implements ApplicationMapService {
42+
43+ private final Logger logger = LogManager .getLogger (getClass ());
44+
45+ private static final String MERGE_AGENT = "_" ;
46+ private static final String MERGE_QUEUE = "_" ;
47+
48+ private final ThrottledLogger throttledLogger = ThrottledLogger .getLogger (logger , 10000 );
49+
50+ private final HostApplicationMapDao hostApplicationMapDao ;
51+
52+ private final LinkService linkService ;
53+
54+ private final ServiceTypeRegistryService registry ;
55+
56+ public HbaseApplicationMapService (HostApplicationMapDao hostApplicationMapDao ,
57+ LinkService linkService ,
58+ ServiceTypeRegistryService registry ) {
59+ this .hostApplicationMapDao = Objects .requireNonNull (hostApplicationMapDao , "hostApplicationMapDao" );
60+ this .linkService = Objects .requireNonNull (linkService , "statisticsService" );
61+ this .registry = Objects .requireNonNull (registry , "registry" );
62+ }
63+
64+ @ Override
65+ public void insertSpanChunk (final SpanChunkBo spanChunkBo ) {
66+ Vertex selfVertex = getSelfVertex (spanChunkBo );
67+ final List <SpanEventBo > spanEventList = spanChunkBo .getSpanEventBoList ();
68+ if (spanEventList != null ) {
69+ // TODO need to batch update later.
70+ insertSpanEventList (spanEventList , selfVertex , spanChunkBo .getAgentId (), spanChunkBo .getEndPoint (), spanChunkBo .getCollectorAcceptTime ());
71+ }
72+
73+ }
74+
75+ private Vertex getSelfVertex (BasicSpan basicSpan ) {
76+ final ServiceType applicationServiceType = getApplicationServiceType (basicSpan );
77+ return Vertex .of (basicSpan .getApplicationName (), applicationServiceType );
78+ }
79+
80+ private ServiceType getApplicationServiceType (BasicSpan basicSpan ) {
81+ final int applicationServiceTypeCode = basicSpan .getApplicationServiceType ();
82+ return registry .findServiceType (applicationServiceTypeCode );
83+ }
84+
85+ @ Override
86+ public void insertSpan (final SpanBo spanBo ) {
87+
88+ final Vertex selfVertex = getSelfVertex (spanBo );
89+
90+ insertAcceptorHost (spanBo , selfVertex );
91+ insertSpanStat (spanBo , selfVertex );
92+ insertSpanEventStat (spanBo , selfVertex );
93+ }
94+
95+
96+ private void insertAcceptorHost (long requestTime , SpanEventBo spanEvent , Vertex selfVertex ) {
97+ final String endPoint = spanEvent .getEndPoint ();
98+ if (endPoint == null ) {
99+ logger .debug ("endPoint is null. appName:{} spanEvent:{}" , selfVertex , spanEvent );
100+ return ;
101+ }
102+ final String destinationId = spanEvent .getDestinationId ();
103+ if (destinationId == null ) {
104+ logger .debug ("destinationId is null. appName:{} spanEvent:{}" , selfVertex , spanEvent );
105+ return ;
106+ }
107+ ServiceType serviceType = registry .findServiceType (spanEvent .getServiceType ());
108+ Vertex rpcVertex = Vertex .of (destinationId , serviceType );
109+ hostApplicationMapDao .insert (requestTime , selfVertex .applicationName (), selfVertex .serviceType ().getCode (), rpcVertex , endPoint );
110+ }
111+
112+ private void insertAcceptorHost (SpanBo span , Vertex selfVertex ) {
113+ // save host application map
114+ // acceptor host is set at profiler module only when the span is not the kind of root span
115+ final String acceptorHost = span .getAcceptorHost ();
116+ if (acceptorHost == null ) {
117+ logger .debug ("acceptorHost is null agent: {}/{}" , span .getApplicationName (), span .getAgentName ());
118+ return ;
119+ }
120+
121+ final String parentApplicationName = span .getParentApplicationName ();
122+ if (parentApplicationName == null ) {
123+ logger .debug ("parentApplicationName is null agent: {}/{}" , span .getApplicationName (), span .getAgentName ());
124+ return ;
125+ }
126+ final int parentServiceType = span .getParentApplicationServiceType ();
127+ final ServiceType spanServiceType = registry .findServiceType (span .getServiceType ());
128+ if (spanServiceType .isQueue ()) {
129+ final String host = span .getEndPoint ();
130+ if (host == null ) {
131+ logger .debug ("endPoint is null agent: {}/{}" , span .getApplicationName (), span .getAgentName ());
132+ return ;
133+ }
134+ hostApplicationMapDao .insert (span .getCollectorAcceptTime (), parentApplicationName , parentServiceType , selfVertex , host );
135+ } else {
136+ hostApplicationMapDao .insert (span .getCollectorAcceptTime (), parentApplicationName , parentServiceType , selfVertex , acceptorHost );
137+ }
138+ }
139+
140+ private Vertex getParentVertex (SpanBo span ) {
141+ String parentApplicationName = span .getParentApplicationName ();
142+ ServiceType parentApplicationType = registry .findServiceType (span .getParentApplicationServiceType ());
143+ return Vertex .of (parentApplicationName , parentApplicationType );
144+ }
145+
146+ private void insertSpanStat (SpanBo span , Vertex selfVertex ) {
147+
148+ final ServiceType spanServiceType = registry .findServiceType (span .getServiceType ());
149+
150+ int bugCheck = 0 ;
151+ if (span .getParentSpanId () == -1 ) {
152+ if (spanServiceType .isQueue ()) {
153+ // create virtual queue node
154+ String applicationName = span .getAcceptorHost ();
155+ if (applicationName == null ) {
156+ applicationName = span .getRemoteAddr ();
157+ }
158+ Vertex acceptVertex = Vertex .of (applicationName , spanServiceType );
159+ linkService .updateOutLink (span .getCollectorAcceptTime (), acceptVertex , span .getRemoteAddr (),
160+ selfVertex , MERGE_QUEUE , span .getElapsed (), span .hasError ());
161+
162+ if (logger .isDebugEnabled ()) {
163+ logger .debug ("[InLink] root-queue {} <- {}/{}" , selfVertex , acceptVertex , span .getAgentId ());
164+ }
165+ linkService .updateInLink (span .getCollectorAcceptTime (), selfVertex ,
166+ acceptVertex , MERGE_QUEUE , span .getElapsed (), span .hasError ());
167+ } else {
168+ // create virtual user
169+ // linkService.updateOutLink(span.getCollectorAcceptTime(), Link.of(span.getApplicationName(), ServiceType.USER), MERGE_AGENT,
170+ // spanLink, MERGE_AGENT, span.getElapsed(), span.hasError());
171+
172+ // update the span information of the current node (self)
173+ Vertex userVertex = Vertex .of (span .getApplicationName (), ServiceType .USER );
174+ linkService .updateInLink (span .getCollectorAcceptTime (), selfVertex , userVertex , MERGE_AGENT , span .getElapsed (), span .hasError ());
175+ }
176+ bugCheck ++;
177+ }
178+
179+ // save statistics info only when parentApplicationContext exists
180+ // when drawing server map based on statistics info, you must know the application name of the previous node.
181+ if (span .getParentApplicationName () != null ) {
182+
183+ Vertex parentVertex = getParentVertex (span );
184+ logger .debug ("Received parent application name. parentName:{} appName:{}" , parentVertex , span .getApplicationName ());
185+
186+ // create virtual queue node if current' span's service type is a queue AND :
187+ // 1. parent node's application service type is not a queue (it may have come from a queue that is traced)
188+ // 2. current node's application service type is not a queue (current node may be a queue that is traced)
189+ if (spanServiceType .isQueue ()) {
190+ if (!selfVertex .serviceType ().isQueue () && !parentVertex .serviceType ().isQueue ()) {
191+ // emulate virtual queue node's accept Span and record it's acceptor host
192+ String applicationName = span .getAcceptorHost ();
193+ if (applicationName == null ) {
194+ applicationName = span .getRemoteAddr ();
195+ }
196+ final Vertex queueAcceptVertex = Vertex .of (applicationName , spanServiceType );
197+
198+ if (logger .isDebugEnabled ()) {
199+ logger .debug ("[Bind] child-queue {}/{} <- {}" , queueAcceptVertex , span .getRemoteAddr (), parentVertex );
200+ }
201+ hostApplicationMapDao .insert (span .getCollectorAcceptTime (), parentVertex .applicationName (), parentVertex .serviceType ().getCode (), queueAcceptVertex , span .getRemoteAddr ());
202+ // emulate virtual queue node's send SpanEvent
203+
204+ if (logger .isDebugEnabled ()) {
205+ logger .debug ("[OutLink] child-queue {}/{} -> {}/{}" , queueAcceptVertex , span .getRemoteAddr (),
206+ selfVertex , span .getEndPoint ());
207+ }
208+ linkService .updateOutLink (span .getCollectorAcceptTime (), queueAcceptVertex , span .getRemoteAddr (),
209+ selfVertex , MERGE_QUEUE , span .getElapsed (), span .hasError ());
210+
211+ parentVertex = queueAcceptVertex ;
212+ }
213+ }
214+ if (logger .isDebugEnabled ()) {
215+ logger .debug ("child-span updateInLink child:{}/{} <- parentAppName:{}" ,
216+ selfVertex , span .getAgentId (), parentVertex );
217+ }
218+ linkService .updateInLink (span .getCollectorAcceptTime (), selfVertex ,
219+ parentVertex , MERGE_AGENT , span .getElapsed (), span .hasError ());
220+ bugCheck ++;
221+ }
222+
223+ // record the response time of the current node (self).
224+ // blow code may be conflict of idea above callee key.
225+ // it is odd to record reversely, because of already recording the caller data at previous node.
226+ // the data may be different due to timeout or network error.
227+
228+ linkService .updateResponseTime (span .getCollectorAcceptTime (), selfVertex , span .getAgentId (), span .getElapsed (), span .hasError ());
229+
230+ if (bugCheck != 1 ) {
231+ logger .info ("ambiguous span found(bug). span {}/{}" , span .getApplicationName (), span .getAgentName ());
232+ if (logger .isDebugEnabled ()) {
233+ logger .debug ("ambiguous span found(bug). detailed span {}" , span );
234+ }
235+ }
236+ }
237+
238+ private void insertSpanEventStat (SpanBo span , Vertex selfVertex ) {
239+
240+ final List <SpanEventBo > spanEventList = span .getSpanEventBoList ();
241+ if (CollectionUtils .isEmpty (spanEventList )) {
242+ return ;
243+ }
244+ if (logger .isDebugEnabled ()) {
245+ logger .debug ("handle spanEvent {}/{} size:{}" , span .getApplicationName (), span .getAgentId (), spanEventList .size ());
246+ }
247+
248+ // TODO need to batch update later.
249+ insertSpanEventList (spanEventList , selfVertex , span .getAgentId (), span .getEndPoint (), span .getCollectorAcceptTime ());
250+ }
251+
252+ private void insertSpanEventList (List <SpanEventBo > spanEventList , Vertex selfVertex ,
253+ String agentId , String endPoint , long requestTime ) {
254+
255+ for (SpanEventBo spanEvent : spanEventList ) {
256+ final ServiceType spanEventType = registry .findServiceType (spanEvent .getServiceType ());
257+
258+ if (isAlias (spanEventType , spanEvent )) {
259+ insertAcceptorHost (requestTime , spanEvent , selfVertex );
260+ continue ;
261+ }
262+
263+ if (!spanEventType .isRecordStatistics ()) {
264+ continue ;
265+ }
266+
267+ final String spanEventApplicationName = normalize (spanEvent .getDestinationId (), spanEventType );
268+ final String spanEventEndPoint = spanEvent .getEndPoint ();
269+
270+ // if terminal update statistics
271+ final int elapsed = spanEvent .getEndElapsed ();
272+ final boolean hasException = spanEvent .hasException ();
273+
274+ if (spanEventApplicationName == null ) {
275+ throttledLogger .info ("Failed to insert statistics. Cause:SpanEvent has invalid format " +
276+ "selfApplication:{}/{}, spanEventApplication:{}/{}" ,
277+ selfVertex , agentId , spanEventApplicationName , spanEventType );
278+ continue ;
279+ }
280+
281+ Vertex spanEventVertex = Vertex .of (spanEventApplicationName , spanEventType );
282+ /*
283+ * save information to draw a server map based on statistics
284+ */
285+ // save the information of outLink (the spanevent that called span)
286+ linkService .updateOutLink (requestTime , selfVertex , MERGE_AGENT ,
287+ spanEventVertex , spanEventEndPoint , elapsed , hasException );
288+
289+ // save the information of inLink (the span that spanevent called)
290+ linkService .updateInLink (requestTime , spanEventVertex ,
291+ selfVertex , endPoint , elapsed , hasException );
292+ }
293+ }
294+
295+ private String normalize (String spanEventApplicationName , ServiceType spanEventType ) {
296+ if (spanEventType .getCategory () == ServiceTypeCategory .DATABASE ) {
297+ // empty database id
298+ if (spanEventApplicationName == null ) {
299+ return "UNKNOWN_DATABASE" ;
300+ }
301+ }
302+ return spanEventApplicationName ;
303+ }
304+
305+ private boolean isAlias (ServiceType spanEventType , SpanEventBo forDebugEvent ) {
306+ if (!spanEventType .isAlias ()) {
307+ return false ;
308+ }
309+ if (spanEventType .isRecordStatistics ()) {
310+ logger .error ("ServiceType with ALIAS should NOT have RECORD_STATISTICS {}" , forDebugEvent );
311+ return false ;
312+ }
313+ return true ;
314+ }
315+ }
0 commit comments