Skip to content

Commit fac0f37

Browse files
addressing PR comments
1 parent 35df353 commit fac0f37

File tree

5 files changed

+73
-32
lines changed

5 files changed

+73
-32
lines changed

src/main/java/com/ibm/watson/modelmesh/Metrics.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.logging.log4j.LogManager;
3737
import org.apache.logging.log4j.Logger;
3838

39-
import java.lang.reflect.Array;
4039
import java.net.SocketAddress;
4140
import java.nio.channels.DatagramChannel;
4241
import java.util.Collections;
@@ -239,12 +238,12 @@ public PrometheusMetrics(Map<String, String> params, Map<String, String> infoMet
239238
if (m == API_REQUEST_TIME || m == API_REQUEST_COUNT || m == INVOKE_MODEL_TIME
240239
|| m == INVOKE_MODEL_COUNT || m == REQUEST_PAYLOAD_SIZE || m == RESPONSE_PAYLOAD_SIZE) {
241240
if (this.enablePerModelMetrics && m.type != COUNTER_WITH_HISTO) {
242-
builder.labelNames("method", "code", "modelId");
241+
builder.labelNames("method", "code", "modelId", "vModelId");
243242
} else {
244243
builder.labelNames("method", "code");
245244
}
246245
} else if (this.enablePerModelMetrics && m.type != GAUGE && m.type != COUNTER && m.type != COUNTER_WITH_HISTO) {
247-
builder.labelNames("modelId");
246+
builder.labelNames("modelId", "vModelId");
248247
}
249248
Collector collector = builder.name(m.promName).help(m.description).create();
250249
metricsMap.put(m, collector);
@@ -369,8 +368,8 @@ public void logTimingMetricSince(Metric metric, long prevTime, boolean isNano) {
369368

370369
@Override
371370
public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano, String modelId) {
372-
if (enablePerModelMetrics) {
373-
((Histogram) metricsMap.get(metric)).labels(modelId).observe(isNano ? elapsed / M : elapsed);
371+
if (enablePerModelMetrics && modelId != null) {
372+
((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(isNano ? elapsed / M : elapsed);
374373
} else {
375374
((Histogram) metricsMap.get(metric)).observe(isNano ? elapsed / M : elapsed);
376375
}
@@ -379,7 +378,7 @@ public void logTimingMetricDuration(Metric metric, long elapsed, boolean isNano,
379378
@Override
380379
public void logSizeEventMetric(Metric metric, long value, String modelId) {
381380
if (enablePerModelMetrics) {
382-
((Histogram) metricsMap.get(metric)).labels(modelId).observe(value * metric.newMultiplier);
381+
((Histogram) metricsMap.get(metric)).labels(modelId, "").observe(value * metric.newMultiplier);
383382
} else {
384383
((Histogram) metricsMap.get(metric)).observe(value * metric.newMultiplier);
385384
}
@@ -403,18 +402,25 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos,
403402
final long elapsedMillis = elapsedNanos / M;
404403
final Histogram timingHisto = (Histogram) metricsMap
405404
.get(external ? API_REQUEST_TIME : INVOKE_MODEL_TIME);
406-
String mId = vModelId == null ? modelId : vModelId;
407405
int idx = shortNames ? name.indexOf('/') : -1;
408406
String methodName = idx == -1 ? name : name.substring(idx + 1);
407+
if (modelId == null) {
408+
logger.error("invalid ModelId. Label value for ModelId will be left blank");
409+
modelId = "";
410+
}
411+
if (vModelId == null) {
412+
logger.debug("vModelId is empty, creating empty label");
413+
vModelId = "";
414+
}
409415
if (enablePerModelMetrics) {
410-
timingHisto.labels(methodName, code.name(), mId).observe(elapsedMillis);
416+
timingHisto.labels(methodName, code.name(), modelId, vModelId).observe(elapsedMillis);
411417
} else {
412418
timingHisto.labels(methodName, code.name()).observe(elapsedMillis);
413419
}
414420
if (reqPayloadSize != -1) {
415421
if (enablePerModelMetrics) {
416422
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
417-
.labels(methodName, code.name(), mId).observe(reqPayloadSize);
423+
.labels(methodName, code.name(), modelId, vModelId).observe(reqPayloadSize);
418424
} else {
419425
((Histogram) metricsMap.get(REQUEST_PAYLOAD_SIZE))
420426
.labels(methodName, code.name()).observe(reqPayloadSize);
@@ -423,7 +429,7 @@ public void logRequestMetrics(boolean external, String name, long elapsedNanos,
423429
if (respPayloadSize != -1) {
424430
if (enablePerModelMetrics) {
425431
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
426-
.labels(methodName, code.name(), mId).observe(respPayloadSize);
432+
.labels(methodName, code.name(), modelId, vModelId).observe(respPayloadSize);
427433
} else {
428434
((Histogram) metricsMap.get(RESPONSE_PAYLOAD_SIZE))
429435
.labels(methodName, code.name()).observe(respPayloadSize);

src/main/java/com/ibm/watson/modelmesh/ModelMesh.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3315,6 +3315,7 @@ protected Map<String, ServiceInstanceInfo> getMap(Object[] arr) {
33153315
static final String KNOWN_SIZE_CXT_KEY = "tas.known_size";
33163316
static final String UNBALANCED_KEY = "mmesh.unbalanced";
33173317
static final String DEST_INST_ID_KEY = "tas.dest_iid";
3318+
static final String VMODELID = "vmodelid";
33183319

33193320
// these are the possible values for the tas.internal context parameter
33203321
// it won't be set on requests from outside of the cluster, and will
@@ -3347,8 +3348,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea
33473348
List<String> excludeInstances)
33483349
throws ModelNotFoundException, ModelLoadException, ModelNotHereException, InternalException {
33493350
try {
3350-
return (StatusInfo) invokeModel(modelId, null, internalOpRemoteMeth,
3351-
returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args"
3351+
return (StatusInfo) invokeModel(modelId, false, null,
3352+
internalOpRemoteMeth, returnStatus, load, sync, lastUsed, excludeInstances); // <-- "args"
33523353
} catch (ModelNotFoundException | ModelLoadException | ModelNotHereException | InternalException e) {
33533354
throw e;
33543355
} catch (TException e) {
@@ -3416,8 +3417,8 @@ public StatusInfo internalOperation(String modelId, boolean returnStatus, boolea
34163417
* @throws TException
34173418
*/
34183419
@SuppressWarnings("unchecked")
3419-
protected Object invokeModel(final String modelId, final Method method, final Method remoteMeth,
3420-
final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException {
3420+
protected Object invokeModel(final String modelId, Boolean isVModel, final Method method,
3421+
final Method remoteMeth, final Object... args) throws ModelNotFoundException, ModelNotHereException, ModelLoadException, TException {
34213422

34223423
//verify parameter values
34233424
if (modelId == null || modelId.isEmpty()) {
@@ -3430,6 +3431,10 @@ protected Object invokeModel(final String modelId, final Method method, final Me
34303431
}
34313432

34323433
final String tasInternal = contextMap.get(TAS_INTERNAL_CXT_KEY);
3434+
String vModelId = "";
3435+
if (isVModel) {
3436+
vModelId = contextMap.get(VMODELID);
3437+
}
34333438
// Set the external request flag if it's not a tasInternal call or if
34343439
// tasInternal == INTERNAL_REQ. The latter is a new ensureLoaded
34353440
// invocation originating from within the cluster.
@@ -3502,7 +3507,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me
35023507
throw new ModelNotHereException(instanceId, modelId);
35033508
}
35043509
try {
3505-
return invokeLocalModel(ce, method, args, modelId);
3510+
return invokeLocalModel(ce, method, args, modelId, isVModel);
35063511
} catch (ModelLoadException mle) {
35073512
mr = registry.get(modelId);
35083513
if (mr == null || !mr.loadFailedInInstance(instanceId)) {
@@ -3716,7 +3721,7 @@ protected Object invokeModel(final String modelId, final Method method, final Me
37163721
localInvokesInFlight.incrementAndGet();
37173722
}
37183723
try {
3719-
Object result = invokeLocalModel(cacheEntry, method, args, modelId);
3724+
Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel);
37203725
return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result;
37213726
} finally {
37223727
if (!favourSelfForHits) {
@@ -3936,7 +3941,7 @@ else if (mr.getInstanceIds().containsKey(instanceId)) {
39363941

39373942
// invoke model
39383943
try {
3939-
Object result = invokeLocalModel(cacheEntry, method, args, modelId);
3944+
Object result = invokeLocalModel(cacheEntry, method, args, modelId, isVModel);
39403945
return method == null && externalReq ? updateWithModelCopyInfo(result, mr) : result;
39413946
} catch (ModelNotHereException e) {
39423947
if (loadTargetFilter != null) loadTargetFilter.remove(instanceId);
@@ -3989,10 +3994,9 @@ else if (mr.getInstanceIds().containsKey(instanceId)) {
39893994
throw t;
39903995
} finally {
39913996
if (methodStartNanos > 0L && metrics.isEnabled()) {
3992-
String[] extraLabels = new String[]{modelId};
39933997
// only logged here in non-grpc (legacy) mode
39943998
metrics.logRequestMetrics(true, getRequestMethodName(method, args),
3995-
nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, "");
3999+
nanoTime() - methodStartNanos, metricStatusCode, -1, -1, modelId, vModelId);
39964000
}
39974001
curThread.setName(threadNameBefore);
39984002
}
@@ -4122,13 +4126,15 @@ private Map<String, Long> filterIfReadOnly(Map<String, Long> instId) {
41224126
* instances inside and some out, and a request has been sent from outside the
41234127
* cluster to an instance inside (since it may land on an unintended instance in
41244128
* that case).
4125-
*
4129+
* @param isVModel TODO
4130+
* @throws TException TODO
41264131
* @throws ModelNotHereException if the specified destination instance isn't found
41274132
*/
41284133
protected Object forwardInvokeModel(String destId, String modelId, Method remoteMeth, Object... args)
41294134
throws TException {
41304135
destinationInstance.set(destId);
41314136
try {
4137+
//TODO: not sure what is happening here.. do I need to pass vmodelid to the remoteMeth.invoke?
41324138
return remoteMeth.invoke(directClient, ObjectArrays.concat(modelId, args));
41334139
} catch (Exception e) {
41344140
if (e instanceof InvocationTargetException) {
@@ -4404,17 +4410,17 @@ protected Object invokeRemoteModel(BaseModelMeshService.Iface client, Method met
44044410
return remoteMeth.invoke(client, ObjectArrays.concat(modelId, args));
44054411
}
44064412

4407-
protected Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args, String modelId)
4413+
protected Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args, String modelId, Boolean isVModel)
44084414
throws InterruptedException, TException {
4409-
Object result = invokeLocalModel(ce, method, args);
4415+
Object result = invokeLocalModel(ce, method, false, args);
44104416
// if this is an ensure-loaded request, check-for and trigger a "chained" load if necessary
44114417
if (method == null) {
44124418
triggerChainedLoadIfNecessary(modelId, result, args, ce.getWeight(), null);
44134419
}
44144420
return result;
44154421
}
44164422

4417-
private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
4423+
private Object invokeLocalModel(CacheEntry<?> ce, Method method, Boolean isVModel, Object[] args)
44184424
throws InterruptedException, TException {
44194425

44204426
if (method == null) {
@@ -4429,7 +4435,11 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
44294435
long now = currentTimeMillis();
44304436
ce.upgradePriority(now + 3600_000L, now + 7200_000L); // (2 hours in future)
44314437
}
4432-
4438+
Map<String, String> contextMap = ThreadContext.getCurrentContext();
4439+
String vModelId = null;
4440+
if (isVModel) {
4441+
vModelId = contextMap.get(VMODELID);
4442+
}
44334443
// The future-waiting timeouts should not be needed, request threads are interrupted when their
44344444
// timeouts/deadlines expire, and the model loading thread that it waits for has its own timeout.
44354445
// But we still set a large one as a safeguard (there can be pathalogical cases where model-loading
@@ -4529,7 +4539,7 @@ private Object invokeLocalModel(CacheEntry<?> ce, Method method, Object[] args)
45294539
ce.afterInvoke(weight, tookNanos);
45304540
if (code != null && metrics.isEnabled()) {
45314541
metrics.logRequestMetrics(false, getRequestMethodName(method, args),
4532-
tookNanos, code, -1, -1, ce.modelId, "");
4542+
tookNanos, code, -1, -1, ce.modelId, vModelId);
45334543
}
45344544
}
45354545
}

src/main/java/com/ibm/watson/modelmesh/ModelMeshApi.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@
8787
import io.netty.util.ReferenceCountUtil;
8888
import io.netty.util.concurrent.FastThreadLocalThread;
8989
import org.apache.thrift.TException;
90-
import org.checkerframework.checker.units.qual.A;
9190
import org.slf4j.Logger;
9291
import org.slf4j.LoggerFactory;
9392
import org.slf4j.MDC;
@@ -345,6 +344,10 @@ protected static void setUnbalancedLitelinksContextParam() {
345344
ThreadContext.addContextEntry(ModelMesh.UNBALANCED_KEY, "true"); // unbalanced
346345
}
347346

347+
protected static void setvModelIdLiteLinksContextParam(String vModelId) {
348+
ThreadContext.addContextEntry(ModelMesh.VMODELID, vModelId);
349+
}
350+
348351
// ----------------- concrete model management methods
349352

350353
@Override
@@ -438,18 +441,19 @@ ModelResponse callModel(String modelId, boolean isVModel, String methodName, Str
438441
if (unbalanced) {
439442
setUnbalancedLitelinksContextParam();
440443
}
441-
return delegate.callModel(modelId, methodName, headers, data);
444+
return delegate.callModel(modelId, isVModel, methodName, headers, data);
442445
}
443446
String vModelId = modelId;
444447
modelId = null;
445448
boolean first = true;
446449
while (true) {
447450
modelId = vmm().resolveVModelId(vModelId, modelId);
451+
setvModelIdLiteLinksContextParam(vModelId);
448452
if (unbalanced) {
449453
setUnbalancedLitelinksContextParam();
450454
}
451455
try {
452-
return delegate.callModel(modelId, methodName, headers, data);
456+
return delegate.callModel(modelId, true, methodName, headers, data);
453457
} catch (ModelNotFoundException mnfe) {
454458
if (!first) throw mnfe;
455459
} catch (Exception e) {
@@ -784,6 +788,27 @@ public void onHalfClose() {
784788
call.close(status, emptyMeta());
785789
Metrics metrics = delegate.metrics;
786790
if (metrics.isEnabled()) {
791+
Iterator<String> midIt = modelIds.iterator();
792+
while (midIt.hasNext()) {
793+
if (isVModel) {
794+
String mId = null;
795+
String vmId = midIt.next();
796+
try {
797+
mId = vmm().resolveVModelId(midIt.next(), mId);
798+
metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos,
799+
status.getCode(), reqSize, respSize, mId, vmId);
800+
}
801+
catch (Exception e) {
802+
logger.error("Could not resolve model id for vModelId" + vmId, e);
803+
metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos,
804+
status.getCode(), reqSize, respSize, "", vmId);
805+
}
806+
} else {
807+
metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos,
808+
status.getCode(), reqSize, respSize, midIt.next(), "");
809+
}
810+
}
811+
787812
if (isVModel) {
788813
metrics.logRequestMetrics(true, methodName, nanoTime() - startNanos,
789814
status.getCode(), reqSize, respSize, "", Iterables.toString(modelIds));

src/main/java/com/ibm/watson/modelmesh/SidecarModelMesh.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1098,12 +1098,12 @@ public List<ByteBuffer> applyModelMulti(String modelId, List<ByteBuffer> input,
10981098
@SuppressWarnings("unchecked")
10991099
List<ByteBuffer> applyModel(String modelId, List<ByteBuffer> input, Map<String, String> metadata)
11001100
throws TException {
1101-
return (List<ByteBuffer>) invokeModel(modelId, localMeth, remoteMeth, input, metadata);
1101+
return (List<ByteBuffer>) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata);
11021102
}
11031103

11041104
// refcount of provided ByteBuf should not be modified
1105-
ModelResponse callModel(String modelId, String methodName, Metadata headers, ByteBuf data) throws TException {
1106-
return (ModelResponse) invokeModel(modelId, directLocalMeth, remoteMeth, methodName, headers, data);
1105+
ModelResponse callModel(String modelId, Boolean isVModel, String methodName, Metadata headers, ByteBuf data) throws TException {
1106+
return (ModelResponse) invokeModel(modelId, isVModel, directLocalMeth, remoteMeth, methodName, headers, data);
11071107
}
11081108

11091109
@Idempotent

src/test/java/com/ibm/watson/modelmesh/DummyModelMesh.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ protected ModelLoader<?> getLoader() {
7575
@Override
7676
public ByteBuffer applyModel(String modelId, ByteBuffer input, Map<String, String> metadata)
7777
throws TException {
78-
return (ByteBuffer) invokeModel(modelId, localMeth, remoteMeth, input, metadata);
78+
return (ByteBuffer) invokeModel(modelId, false, localMeth, remoteMeth, input, metadata);
7979
}
8080

8181
@Override

0 commit comments

Comments
 (0)