DB_SQL_TABLE = SemanticAttributes.DB_SQL_TABLE;
++
++}
+diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceReader.java b/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceReader.java
+deleted file mode 100644
+index 1b9300058..000000000
+--- a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceReader.java
++++ /dev/null
+@@ -1,381 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.phoenix.trace;
+-
+-import java.sql.Connection;
+-import java.sql.PreparedStatement;
+-import java.sql.ResultSet;
+-import java.sql.SQLException;
+-import java.util.ArrayList;
+-import java.util.Collection;
+-import java.util.Collections;
+-import java.util.HashSet;
+-import java.util.List;
+-import java.util.Set;
+-import java.util.TreeSet;
+-
+-import org.apache.htrace.Span;
+-import org.apache.phoenix.jdbc.PhoenixConnection;
+-import org.apache.phoenix.metrics.MetricInfo;
+-import org.apache.phoenix.query.QueryServices;
+-import org.apache.phoenix.query.QueryServicesOptions;
+-import org.apache.phoenix.util.LogUtil;
+-import org.slf4j.Logger;
+-import org.slf4j.LoggerFactory;
+-
+-import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
+-import org.apache.phoenix.thirdparty.com.google.common.primitives.Longs;
+-
+-/**
+- * Read the traces written to phoenix tables by the {@link TraceWriter}.
+- */
+-public class TraceReader {
+-
+- private static final Logger LOGGER = LoggerFactory.getLogger(TraceReader.class);
+- private final Joiner comma = Joiner.on(',');
+- private String knownColumns;
+- {
+- // the order here dictates the order we pull out the values below. For now, just keep them
+- // in sync - so we can be efficient pulling them off the results.
+- knownColumns =
+- comma.join(MetricInfo.TRACE.columnName, MetricInfo.PARENT.columnName,
+- MetricInfo.SPAN.columnName, MetricInfo.DESCRIPTION.columnName,
+- MetricInfo.START.columnName, MetricInfo.END.columnName,
+- MetricInfo.HOSTNAME.columnName, TraceWriter.TAG_COUNT,
+- TraceWriter.ANNOTATION_COUNT);
+- }
+-
+- private Connection conn;
+- private String table;
+- private int pageSize;
+-
+- public TraceReader(Connection conn, String tracingTableName) throws SQLException {
+- this.conn = conn;
+- this.table = tracingTableName;
+- String ps = conn.getClientInfo(QueryServices.TRACING_PAGE_SIZE_ATTRIB);
+- this.pageSize = ps == null ? QueryServicesOptions.DEFAULT_TRACING_PAGE_SIZE : Integer.parseInt(ps);
+- }
+-
+- /**
+- * Read all the currently stored traces.
+- *
+- * Be Careful! This could cause an OOME if there are a lot of traces.
+- * @param limit max number of traces to return. If -1, returns all known traces.
+- * @return the found traces
+- * @throws SQLException
+- */
+- public Collection readAll(int limit) throws SQLException {
+- Set traces = new HashSet();
+- // read all the known columns from the table, sorting first by trace column (so the same
+- // trace
+- // goes together), and then by start time (so parent spans always appear before child spans)
+- String query =
+- "SELECT " + knownColumns + " FROM " + table
+- + " ORDER BY " + MetricInfo.TRACE.columnName + " DESC, "
+- + MetricInfo.START.columnName + " ASC" + " LIMIT " + pageSize;
+- int resultCount = 0;
+- try (PreparedStatement stmt = conn.prepareStatement(query);
+- ResultSet results = stmt.executeQuery()) {
+- TraceHolder trace = null;
+- // the spans that are not the root span, but haven't seen their parent yet
+- List orphans = null;
+- while (results.next()) {
+- int index = 1;
+- long traceid = results.getLong(index++);
+- long parent = results.getLong(index++);
+- long span = results.getLong(index++);
+- String desc = results.getString(index++);
+- long start = results.getLong(index++);
+- long end = results.getLong(index++);
+- String host = results.getString(index++);
+- int tagCount = results.getInt(index++);
+- int annotationCount = results.getInt(index++);
+- // we have a new trace
+- if (trace == null || traceid != trace.traceid) {
+- // only increment if we are on a new trace, to ensure we get at least one
+- if (trace != null) {
+- resultCount++;
+- }
+- // we beyond the limit, so we stop
+- if (resultCount >= limit) {
+- break;
+- }
+- trace = new TraceHolder();
+- // add the orphans, so we can track them later
+- orphans = new ArrayList();
+- trace.orphans = orphans;
+- trace.traceid = traceid;
+- traces.add(trace);
+- }
+-
+- // search the spans to determine the if we have a known parent
+- SpanInfo parentSpan = null;
+- if (parent != Span.ROOT_SPAN_ID) {
+- // find the parent
+- for (SpanInfo p : trace.spans) {
+- if (p.id == parent) {
+- parentSpan = p;
+- break;
+- }
+- }
+- }
+- SpanInfo spanInfo =
+- new SpanInfo(parentSpan, parent, span, desc, start, end, host, tagCount,
+- annotationCount);
+- // search the orphans to see if this is the parent id
+-
+- for (int i = 0; i < orphans.size(); i++) {
+- SpanInfo orphan = orphans.get(i);
+- // we found the parent for the orphan
+- if (orphan.parentId == span) {
+- // update the bi-directional relationship
+- orphan.parent = spanInfo;
+- spanInfo.children.add(orphan);
+- // / its no longer an orphan
+- LOGGER.trace(addCustomAnnotations("Found parent for span: " + span));
+- orphans.remove(i--);
+- }
+- }
+-
+- if (parentSpan != null) {
+- // add this as a child to the parent span
+- parentSpan.children.add(spanInfo);
+- } else if (parent != Span.ROOT_SPAN_ID) {
+- // add the span to the orphan pile to check for the remaining spans we see
+- LOGGER.info(addCustomAnnotations("No parent span found for span: "
+- + span + " (root span id: " + Span.ROOT_SPAN_ID + ")"));
+- orphans.add(spanInfo);
+- }
+-
+- // add the span to the full known list
+- trace.spans.add(spanInfo);
+-
+- // go back and find the tags for the row
+- spanInfo.tags.addAll(getTags(traceid, parent, span, tagCount));
+-
+- spanInfo.annotations.addAll(getAnnotations(traceid, parent, span, annotationCount));
+- }
+- }
+-
+- return traces;
+- }
+-
+- private Collection extends String> getTags(long traceid, long parent, long span, int count)
+- throws SQLException {
+- return getDynamicCountColumns(traceid, parent, span, count,
+- TraceWriter.TAG_FAMILY, MetricInfo.TAG.columnName);
+- }
+-
+- private Collection extends String> getAnnotations(long traceid, long parent, long span,
+- int count) throws SQLException {
+- return getDynamicCountColumns(traceid, parent, span, count,
+- TraceWriter.ANNOTATION_FAMILY, MetricInfo.ANNOTATION.columnName);
+- }
+-
+- private Collection extends String> getDynamicCountColumns(long traceid, long parent,
+- long span, int count, String family, String columnName) throws SQLException {
+- if (count == 0) {
+- return Collections.emptyList();
+- }
+-
+- // build the column strings, family.column
+- String[] parts = new String[count];
+- for (int i = 0; i < count; i++) {
+- parts[i] = TraceWriter.getDynamicColumnName(family, columnName, i);
+- }
+- // join the columns together
+- String columns = comma.join(parts);
+-
+- // redo them and add "VARCHAR to the end, so we can specify the columns
+- for (int i = 0; i < count; i++) {
+- parts[i] = parts[i] + " VARCHAR";
+- }
+-
+- String dynamicColumns = comma.join(parts);
+- String request =
+- "SELECT " + columns + " from " + table + "(" + dynamicColumns + ") WHERE "
+- + MetricInfo.TRACE.columnName + "=" + traceid + " AND "
+- + MetricInfo.PARENT.columnName + "=" + parent + " AND "
+- + MetricInfo.SPAN.columnName + "=" + span;
+- LOGGER.trace(addCustomAnnotations("Requesting columns with: " + request));
+- ResultSet results = conn.createStatement().executeQuery(request);
+- List cols = new ArrayList();
+- while (results.next()) {
+- for (int index = 1; index <= count; index++) {
+- cols.add(results.getString(index));
+- }
+- }
+- if (cols.size() < count) {
+- LOGGER.error(addCustomAnnotations("Missing tags! Expected " + count +
+- ", but only got " + cols.size() + " tags from rquest " + request));
+- }
+- return cols;
+- }
+-
+- private String addCustomAnnotations(String logLine) throws SQLException {
+- if (conn.isWrapperFor(PhoenixConnection.class)) {
+- PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
+- logLine = LogUtil.addCustomAnnotations(logLine, phxConn);
+- }
+- return logLine;
+- }
+-
+- /**
+- * Holds information about a trace
+- */
+- public static class TraceHolder {
+- public List orphans;
+- public long traceid;
+- public TreeSet spans = new TreeSet();
+-
+- @Override
+- public int hashCode() {
+- return new Long(traceid).hashCode();
+- }
+-
+- @Override
+- public boolean equals(Object o) {
+- if (o instanceof TraceHolder) {
+- return traceid == ((TraceHolder) o).traceid;
+- }
+- return false;
+- }
+-
+- @Override
+- public String toString() {
+- StringBuilder sb = new StringBuilder("Trace: " + traceid + "\n");
+- // get the first span, which is always going to be the root span
+- SpanInfo root = spans.iterator().next();
+- if (root.parent != null) {
+- sb.append("Root span not present! Just printing found spans\n");
+- for (SpanInfo span : spans) {
+- sb.append(span.toString() + "\n");
+- }
+- } else {
+- // print the tree of spans
+- List toPrint = new ArrayList();
+- toPrint.add(root);
+- while (!toPrint.isEmpty()) {
+- SpanInfo span = toPrint.remove(0);
+- sb.append(span.toString() + "\n");
+- toPrint.addAll(span.children);
+- }
+- }
+- if (orphans.size() > 0) {
+- sb.append("Found orphan spans:\n" + orphans);
+- }
+- return sb.toString();
+- }
+- }
+-
+- public static class SpanInfo implements Comparable {
+- public SpanInfo parent;
+- public List children = new ArrayList();
+- public String description;
+- public long id;
+- public long start;
+- public long end;
+- public String hostname;
+- public int tagCount;
+- public List tags = new ArrayList();
+- public int annotationCount;
+- public List annotations = new ArrayList();
+- private long parentId;
+-
+- public SpanInfo(SpanInfo parent, long parentid, long span, String desc, long start,
+- long end, String host, int tagCount, int annotationCount) {
+- this.parent = parent;
+- this.parentId = parentid;
+- this.id = span;
+- this.description = desc;
+- this.start = start;
+- this.end = end;
+- this.hostname = host;
+- this.tagCount = tagCount;
+- this.annotationCount = annotationCount;
+- }
+-
+- @Override
+- public int hashCode() {
+- return new Long(id).hashCode();
+- }
+-
+- @Override
+- public boolean equals(Object o) {
+- if (o instanceof SpanInfo) {
+- return id == ((SpanInfo) o).id;
+- }
+- return false;
+- }
+-
+- /**
+- * Do the same sorting that we would get from reading the table with a {@link TraceReader},
+- * specifically, by trace and then by start/end. However, these are only every stored in a
+- * single trace, so we can just sort on start/end times.
+- */
+- @Override
+- public int compareTo(SpanInfo o) {
+- // root span always comes first
+- if (this.parentId == Span.ROOT_SPAN_ID) {
+- return -1;
+- } else if (o.parentId == Span.ROOT_SPAN_ID) {
+- return 1;
+- }
+-
+- int compare = Longs.compare(start, o.start);
+- if (compare == 0) {
+- compare = Longs.compare(end, o.end);
+- if (compare == 0) {
+- return Longs.compare(id, o.id);
+- }
+- }
+- return compare;
+- }
+-
+- @Override
+- public String toString() {
+- StringBuilder sb = new StringBuilder("Span: " + id + "\n");
+- sb.append("\tdescription=" + description);
+- sb.append("\n");
+- sb.append("\tparent="
+- + (parent == null ? (parentId == Span.ROOT_SPAN_ID ? "ROOT" : "[orphan - id: "
+- + parentId + "]") : parent.id));
+- sb.append("\n");
+- sb.append("\tstart,end=" + start + "," + end);
+- sb.append("\n");
+- sb.append("\telapsed=" + (end - start));
+- sb.append("\n");
+- sb.append("\thostname=" + hostname);
+- sb.append("\n");
+- sb.append("\ttags=(" + tagCount + ") " + tags);
+- sb.append("\n");
+- sb.append("\tannotations=(" + annotationCount + ") " + annotations);
+- sb.append("\n");
+- sb.append("\tchildren=");
+- for (SpanInfo child : children) {
+- sb.append(child.id + ", ");
+- }
+- sb.append("\n");
+- return sb.toString();
+- }
+-
+- public long getParentIdForTesting() {
+- return parentId;
+- }
+- }
+-}
+\ No newline at end of file
+diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java b/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
+deleted file mode 100644
+index 9440da030..000000000
+--- a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceSpanReceiver.java
++++ /dev/null
+@@ -1,102 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.phoenix.trace;
+-
+-import java.io.IOException;
+-import java.util.concurrent.ArrayBlockingQueue;
+-import java.util.concurrent.BlockingQueue;
+-
+-import org.apache.htrace.Span;
+-import org.apache.htrace.SpanReceiver;
+-import org.apache.phoenix.metrics.MetricInfo;
+-import org.apache.phoenix.query.QueryServicesOptions;
+-import org.slf4j.Logger;
+-import org.slf4j.LoggerFactory;
+-
+-/**
+- * Sink for request traces ({@link SpanReceiver}) that pushes writes to {@link TraceWriter} in a
+- * format that we can more easily consume.
+- *
+- *
+- * Rather than write directly to a phoenix table, we drop it into the metrics queue so we can more
+- * cleanly handle it asynchronously.Currently, {@link MilliSpan} submits the span in a synchronized
+- * block to all the receivers, which could have a lot of overhead if we are submitting to multiple
+- * receivers.
+- *
+- * The format of the generated metrics is this:
+- *
+- * - All Metrics from the same span have the same trace id (allowing correlation in the sink)
+- * - The description of the metric describes what it contains. For instance,
+- *
+- * - {@link MetricInfo#PARENT} is the id of the parent of this span. (Root span is
+- * {@link Span#ROOT_SPAN_ID}).
+- * - {@link MetricInfo#START} is the start time of the span
+- * - {@link MetricInfo#END} is the end time of the span
+- *
+- *
+- *
+- *
+- * So why even submit to {@link TraceWriter} if we only have a single source?
+- *
+- * This allows us to make the updates in batches. We might have spans that finish before other spans
+- * (for instance in the same parent). By batching the updates we can lessen the overhead on the
+- * client, which is also busy doing 'real' work.
+- * This class is custom implementation of metrics queue and handles batch writes to the Phoenix Table
+- * via another thread. Batch size and number of threads are configurable.
+- *
+- */
+-public class TraceSpanReceiver implements SpanReceiver {
+-
+- private static final Logger LOGGER = LoggerFactory.getLogger(TraceSpanReceiver.class);
+-
+- private static final int CAPACITY = QueryServicesOptions.withDefaults().getTracingTraceBufferSize();
+-
+- private BlockingQueue spanQueue = null;
+-
+- public TraceSpanReceiver() {
+- this.spanQueue = new ArrayBlockingQueue(CAPACITY);
+- }
+-
+- @Override
+- public void receiveSpan(Span span) {
+- if (span.getTraceId() != 0 && spanQueue.offer(span)) {
+- if (LOGGER.isTraceEnabled()) {
+- LOGGER.trace("Span buffered to queue " + span.toJson());
+- }
+- } else if (span.getTraceId() != 0 && LOGGER.isDebugEnabled()) {
+- LOGGER.debug("Span NOT buffered due to overflow in queue " + span.toJson());
+- }
+- }
+-
+- @Override
+- public void close() throws IOException {
+- // noop
+- }
+-
+- boolean isSpanAvailable() {
+- return spanQueue.isEmpty();
+- }
+-
+- Span getSpan() {
+- return spanQueue.poll();
+- }
+-
+- int getNumSpans() {
+- return spanQueue.size();
+- }
+-}
+diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceUtil.java b/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceUtil.java
+new file mode 100644
+index 000000000..fbef21630
+--- /dev/null
++++ b/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceUtil.java
+@@ -0,0 +1,391 @@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++package org.apache.phoenix.trace;
++
++import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_CONNECTION_STRING;
++import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_NAME;
++import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_SYSTEM;
++import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_SYSTEM_VALUE;
++import static org.apache.phoenix.trace.PhoenixSemanticAttributes.DB_USER;
++
++import java.util.List;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.CompletionException;
++import java.util.function.BiConsumer;
++import java.util.function.Supplier;
++
++import org.apache.hadoop.security.UserGroupInformation;
++import org.apache.phoenix.jdbc.PhoenixConnection;
++import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import io.opentelemetry.api.GlobalOpenTelemetry;
++import io.opentelemetry.api.common.AttributeKey;
++import io.opentelemetry.api.common.AttributeType;
++import io.opentelemetry.api.trace.Span;
++import io.opentelemetry.api.trace.SpanBuilder;
++import io.opentelemetry.api.trace.SpanKind;
++import io.opentelemetry.api.trace.StatusCode;
++import io.opentelemetry.api.trace.Tracer;
++import io.opentelemetry.context.Context;
++import io.opentelemetry.context.Scope;
++
++// This is copied from org.apache.hadoop.hbase.trace.TraceUtil
++// We shouldn't use the HBase class directly, as it is @InterfaceAudience.Private
++public final class TraceUtil {
++
++ private static final Logger LOGGER = LoggerFactory.getLogger(TraceUtil.class);
++
++ // We could use Boolean, but we're trying to mimic to the Opentracing hint behaviour
++ private static final AttributeKey SAMPLING_PRIORITY_ATTRIBUTE_KEY =
++ AttributeKey.longKey("sampling.priority");
++
++ private TraceUtil() {
++ }
++
++ public static Tracer getGlobalTracer() {
++ // TODO use a real version
++ return GlobalOpenTelemetry.getTracer("org.apache.phoenix", "0.0.1");
++ }
++
++ private static SpanBuilder createPhoenixSpanBuilder(String name) {
++ return getGlobalTracer()
++ .spanBuilder(name)
++ .setAttribute(DB_SYSTEM, DB_SYSTEM_VALUE);
++ }
++
++ private static SpanBuilder setFromConnection(SpanBuilder builder, PhoenixConnection conn) {
++ builder.setAttribute(DB_CONNECTION_STRING, conn.getURL());
++ try {
++ builder.setAttribute(DB_USER, UserGroupInformation.getCurrentUser().getShortUserName());
++ } catch (Exception e) {
++ // Ignore
++ }
++ try {
++ builder.setAttribute(DB_NAME, conn.getSchema());
++ } catch (Exception e) {
++ // Ignore
++ }
++ return builder;
++ }
++
++ /**
++ * Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one
++ * {@link SpanKind#CLIENT} span and one {@link SpanKind#SERVER} span for a traced request, so
++ * use this with caution when you want to create spans with kind other than
++ * {@link SpanKind#INTERNAL}.
++ */
++ public static Span createSpan(PhoenixConnection conn, String name, SpanKind kind, boolean hinted) {
++ SpanBuilder builder = createPhoenixSpanBuilder(name);
++ builder = setFromConnection(builder, conn);
++ builder.setSpanKind(kind);
++ if (hinted) {
++ // Only has an effect if PhoenixHintableSampler is used
++ builder.setAttribute(SAMPLING_PRIORITY_ATTRIBUTE_KEY, 1L);
++ }
++ //FIXME only for debugging. Maybe add a property for runtime ?
++ StringBuilder sb = new StringBuilder();
++ for (StackTraceElement st : Thread.currentThread().getStackTrace()) {
++ sb.append(st.toString() + System.lineSeparator());
++ }
++ builder.setAttribute("create stack trace", sb.toString());
++ return builder.startSpan();
++ }
++
++ public static Span createSpan(PhoenixConnection conn, String name, boolean hinted) {
++ return createSpan(conn, name, SpanKind.INTERNAL, hinted);
++ }
++
++ public static Span createSpan(PhoenixConnection conn, String name) {
++ return createSpan(conn, name, SpanKind.INTERNAL, false);
++ }
++
++
++ /**
++ * Create a span without the information taken from Connection.
++ * This is to be used in the server side code.
++ *
++ * @param name
++ * @param kind
++ * @return
++ */
++ private static Span createServerSideSpan(String name, SpanKind kind) {
++ SpanBuilder builder = createPhoenixSpanBuilder(name);
++ builder.setSpanKind(kind);
++ //FIXME only for debugging. Maybe add a property for runtime ?
++ StringBuilder sb = new StringBuilder();
++ for (StackTraceElement st : Thread.currentThread().getStackTrace()) {
++ sb.append(st.toString() + System.lineSeparator());
++ }
++ builder.setAttribute("create stack trace", sb.toString());
++ return builder.startSpan();
++ }
++
++ public static Span createServerSideSpan(String name) {
++ return createServerSideSpan(name, SpanKind.INTERNAL);
++ }
++
++ /**
++ * Create a span which parent is from remote, i.e, passed through rpc.
++ *
++ * We will set the kind of the returned span to {@link SpanKind#SERVER}, as this should be the
++ * top most span at server side.
++ */
++// public static Span createRemoteSpan(String name, Context ctx) {
++// return getGlobalTracer().spanBuilder(name).setParent(ctx).setSpanKind(SpanKind.SERVER)
++// .startSpan();
++// }
++
++ /**
++ * Create a span with {@link SpanKind#CLIENT}.
++ */
++// public static Span createClientSpan(String name) {
++// return createSpan(name, SpanKind.CLIENT);
++// }
++
++ /**
++ * Trace an asynchronous operation for a table.
++ */
++// public static CompletableFuture tracedFuture(Supplier> action,
++// Supplier spanSupplier) {
++// Span span = spanSupplier.get();
++// try (Scope ignored = span.makeCurrent()) {
++// CompletableFuture future = action.get();
++// endSpan(future, span);
++// return future;
++// }
++// }
++
++ /**
++ * Trace an asynchronous operation.
++ */
++// public static CompletableFuture tracedFuture(Supplier> action,
++// String spanName) {
++// Span span = createSpan(spanName);
++// try (Scope ignored = span.makeCurrent()) {
++// CompletableFuture future = action.get();
++// endSpan(future, span);
++// return future;
++// }
++// }
++
++ /**
++ * Trace an asynchronous operation, and finish the create {@link Span} when all the given
++ * {@code futures} are completed.
++ */
++// public static List> tracedFutures(
++// Supplier>> action, Supplier spanSupplier) {
++// Span span = spanSupplier.get();
++// try (Scope ignored = span.makeCurrent()) {
++// List> futures = action.get();
++// endSpan(CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])), span);
++// return futures;
++// }
++// }
++
++ public static void setError(Span span, Throwable error) {
++ span.recordException(error);
++ span.setStatus(StatusCode.ERROR);
++ }
++
++ /**
++ * Finish the {@code span} when the given {@code future} is completed.
++ */
++// private static void endSpan(CompletableFuture> future, Span span) {
++// addListener(future, (resp, error) -> {
++// if (error != null) {
++// setError(span, error);
++// } else {
++// span.setStatus(StatusCode.OK);
++// }
++// span.end();
++// });
++// }
++
++ /**
++ * Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
++ */
++// public static Runnable tracedRunnable(final Runnable runnable, final String spanName) {
++// return tracedRunnable(runnable, () -> createSpan(spanName));
++// }
++
++ /**
++ * Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
++ */
++// public static Runnable tracedRunnable(final Runnable runnable,
++// final Supplier spanSupplier) {
++// // N.B. This method name follows the convention of this class, i.e., tracedFuture, rather
++// // than
++// // the convention of the OpenTelemetry classes, i.e., Context#wrap.
++// return () -> {
++// final Span span = spanSupplier.get();
++// try (final Scope ignored = span.makeCurrent()) {
++// runnable.run();
++// span.setStatus(StatusCode.OK);
++// } finally {
++// span.end();
++// }
++// };
++// }
++
++ /**
++ * A {@link Runnable} that may also throw.
++ * @param the type of {@link Throwable} that can be produced.
++ */
++// @FunctionalInterface
++// public interface ThrowingRunnable {
++// void run() throws T;
++// }
++
++ /**
++ * Trace the execution of {@code runnable}.
++ */
++// public static void trace(final ThrowingRunnable runnable,
++// final String spanName) throws T {
++// trace(runnable, () -> createSpan(spanName));
++// }
++
++ /**
++ * Trace the execution of {@code runnable}.
++ */
++// public static void trace(final ThrowingRunnable runnable,
++// final Supplier spanSupplier) throws T {
++// Span span = spanSupplier.get();
++// try (Scope ignored = span.makeCurrent()) {
++// runnable.run();
++// span.setStatus(StatusCode.OK);
++// } catch (Throwable e) {
++// setError(span, e);
++// throw e;
++// } finally {
++// span.end();
++// }
++// }
++
++ /**
++ * A {@link Callable} that may also throw.
++ * @param the result type of method call.
++ * @param the type of {@link Throwable} that can be produced.
++ */
++// @FunctionalInterface
++// public interface ThrowingCallable {
++// R call() throws T;
++// }
++
++// public static R trace(final ThrowingCallable callable,
++// final String spanName) throws T {
++// return trace(callable, () -> createSpan(spanName));
++// }
++
++// public static R trace(final ThrowingCallable callable,
++// final Supplier spanSupplier) throws T {
++// Span span = spanSupplier.get();
++// try (Scope ignored = span.makeCurrent()) {
++// final R ret = callable.call();
++// span.setStatus(StatusCode.OK);
++// return ret;
++// } catch (Throwable e) {
++// setError(span, e);
++// throw e;
++// } finally {
++// span.end();
++// }
++// }
++
++ public static Callable wrap(final Callable callable, String spanName) {
++ //FIXME not necessarily server side, we just don't have the PhoenixConnection object here
++ return wrap(callable, () -> createServerSideSpan(spanName));
++ }
++
++ public static Callable wrap(final Callable callable, final Supplier spanSupplier) {
++ return new Callable() {
++ @Override
++ public R call() throws Exception {
++ Span span = spanSupplier.get();
++ try (Scope ignored = span.makeCurrent()) {
++ final R ret = callable.call();
++ span.setStatus(StatusCode.OK);
++ return ret;
++ } catch (Throwable e) {
++ setError(span, e);
++ throw e;
++ } finally {
++ span.end();
++ }
++ }
++ };
++ }
++
++ public static boolean isTraceOn(String traceOption) {
++ Preconditions.checkArgument(traceOption != null);
++ if(traceOption.equalsIgnoreCase("ON")) return true;
++ if(traceOption.equalsIgnoreCase("OFF")) return false;
++ else {
++ throw new IllegalArgumentException("Unknown tracing option: " + traceOption);
++ }
++ }
++
++ // These are copied from org.apache.hadoop.hbase.util.FutureUtils to avoid dependence on
++ // HBase IA.Private classes
++
++ /**
++ * This is method is used when you just want to add a listener to the given future. We will call
++ * {@link CompletableFuture#whenComplete(BiConsumer)} to register the {@code action} to the
++ * {@code future}. Ignoring the return value of a Future is considered as a bad practice as it
++ * may suppress exceptions thrown from the code that completes the future, and this method will
++ * catch all the exception thrown from the {@code action} to catch possible code bugs.
++ *
++ * And the error prone check will always report FutureReturnValueIgnored because every method in
++ * the {@link CompletableFuture} class will return a new {@link CompletableFuture}, so you
++ * always have one future that has not been checked. So we introduce this method and add a
++ * suppress warnings annotation here.
++ */
++ @SuppressWarnings("FutureReturnValueIgnored")
++ private static void addListener(CompletableFuture future,
++ BiConsumer super T, ? super Throwable> action) {
++ future.whenComplete((resp, error) -> {
++ try {
++ // See this post on stack overflow(shorten since the url is too long),
++ // https://s.apache.org/completionexception
++ // For a chain of CompleableFuture, only the first child CompletableFuture can get
++ // the
++ // original exception, others will get a CompletionException, which wraps the
++ // original
++ // exception. So here we unwrap it before passing it to the callback action.
++ action.accept(resp, unwrapCompletionException(error));
++ } catch (Throwable t) {
++ LOGGER.error("Unexpected error caught when processing CompletableFuture", t);
++ }
++ });
++ }
++
++ /**
++ * Get the cause of the {@link Throwable} if it is a {@link CompletionException}.
++ */
++ public static Throwable unwrapCompletionException(Throwable error) {
++ if (error instanceof CompletionException) {
++ Throwable cause = error.getCause();
++ if (cause != null) {
++ return cause;
++ }
++ }
++ return error;
++ }
++}
+diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceWriter.java b/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceWriter.java
+deleted file mode 100644
+index 1d2b75f18..000000000
+--- a/phoenix-core-client/src/main/java/org/apache/phoenix/trace/TraceWriter.java
++++ /dev/null
+@@ -1,331 +0,0 @@
+-/**
+- * Licensed to the Apache Software Foundation (ASF) under one
+- * or more contributor license agreements. See the NOTICE file
+- * distributed with this work for additional information
+- * regarding copyright ownership. The ASF licenses this file
+- * to you under the Apache License, Version 2.0 (the
+- * "License"); you may not use this file except in compliance
+- * with the License. You may obtain a copy of the License at
+- *
+- * http://www.apache.org/licenses/LICENSE-2.0
+- *
+- * Unless required by applicable law or agreed to in writing, software
+- * distributed under the License is distributed on an "AS IS" BASIS,
+- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+- * See the License for the specific language governing permissions and
+- * limitations under the License.
+- */
+-package org.apache.phoenix.trace;
+-
+-import static org.apache.phoenix.metrics.MetricInfo.ANNOTATION;
+-import static org.apache.phoenix.metrics.MetricInfo.DESCRIPTION;
+-import static org.apache.phoenix.metrics.MetricInfo.END;
+-import static org.apache.phoenix.metrics.MetricInfo.HOSTNAME;
+-import static org.apache.phoenix.metrics.MetricInfo.PARENT;
+-import static org.apache.phoenix.metrics.MetricInfo.SPAN;
+-import static org.apache.phoenix.metrics.MetricInfo.START;
+-import static org.apache.phoenix.metrics.MetricInfo.TAG;
+-import static org.apache.phoenix.metrics.MetricInfo.TRACE;
+-
+-import java.sql.Connection;
+-import java.sql.PreparedStatement;
+-import java.sql.SQLException;
+-import java.util.ArrayList;
+-import java.util.List;
+-import java.util.Map;
+-import java.util.Properties;
+-import java.util.concurrent.Executors;
+-import java.util.concurrent.ScheduledExecutorService;
+-import java.util.concurrent.TimeUnit;
+-
+-import org.apache.hadoop.conf.Configuration;
+-import org.apache.hadoop.hbase.HBaseConfiguration;
+-import org.apache.hadoop.hbase.util.Pair;
+-import org.apache.htrace.Span;
+-import org.apache.htrace.TimelineAnnotation;
+-import org.apache.phoenix.compile.MutationPlan;
+-import org.apache.phoenix.execute.MutationState;
+-import org.apache.phoenix.jdbc.PhoenixConnection;
+-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+-import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+-import org.apache.phoenix.metrics.MetricInfo;
+-import org.apache.phoenix.query.QueryServices;
+-import org.apache.phoenix.schema.TableNotFoundException;
+-import org.apache.phoenix.trace.util.Tracing;
+-import org.apache.phoenix.util.QueryUtil;
+-import org.slf4j.Logger;
+-import org.slf4j.LoggerFactory;
+-
+-import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
+-import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
+-import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+-
+-/**
+- * Sink for the trace spans pushed into the queue by {@link TraceSpanReceiver}. The class
+- * instantiates a thread pool of configurable size, which will pull the data from queue and write to
+- * the Phoenix Trace Table in batches. Various configuration options include thread pool size and
+- * batch commit size.
+- */
+-public class TraceWriter {
+- private static final Logger LOGGER = LoggerFactory.getLogger(TraceWriter.class);
+-
+- private static final String VARIABLE_VALUE = "?";
+-
+- private static final Joiner COLUMN_JOIN = Joiner.on(".");
+- static final String TAG_FAMILY = "tags";
+- /**
+- * Count of the number of tags we are storing for this row
+- */
+- static final String TAG_COUNT = COLUMN_JOIN.join(TAG_FAMILY, "count");
+-
+- static final String ANNOTATION_FAMILY = "annotations";
+- static final String ANNOTATION_COUNT = COLUMN_JOIN.join(ANNOTATION_FAMILY, "count");
+-
+- /**
+- * Join strings on a comma
+- */
+- private static final Joiner COMMAS = Joiner.on(',');
+-
+- private String tableName;
+- private int batchSize;
+- private int numThreads;
+- private TraceSpanReceiver traceSpanReceiver;
+-
+- protected ScheduledExecutorService executor;
+-
+- public TraceWriter(String tableName, int numThreads, int batchSize) {
+-
+- this.batchSize = batchSize;
+- this.numThreads = numThreads;
+- this.tableName = tableName;
+- }
+-
+- public void start() {
+-
+- traceSpanReceiver = getTraceSpanReceiver();
+- if (traceSpanReceiver == null) {
+- LOGGER.warn(
+- "No receiver has been initialized for TraceWriter. Traces will not be written.");
+- LOGGER.warn("Restart Phoenix to try again.");
+- return;
+- }
+-
+- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+- builder.setDaemon(true).setNameFormat("PHOENIX-METRICS-WRITER");
+- executor = Executors.newScheduledThreadPool(this.numThreads, builder.build());
+-
+- for (int i = 0; i < this.numThreads; i++) {
+- executor.scheduleAtFixedRate(new FlushMetrics(), 0, 10, TimeUnit.SECONDS);
+- }
+-
+- LOGGER.info("Writing tracing metrics to phoenix table");
+- }
+-
+- @VisibleForTesting
+- protected TraceSpanReceiver getTraceSpanReceiver() {
+- return Tracing.getTraceSpanReceiver();
+- }
+-
+- public class FlushMetrics implements Runnable {
+-
+- private Connection conn;
+- private int counter = 0;
+-
+- public FlushMetrics() {
+- conn = getConnection(tableName);
+- }
+-
+- @Override
+- public void run() {
+- if (conn == null) return;
+- while (!traceSpanReceiver.isSpanAvailable()) {
+- Span span = traceSpanReceiver.getSpan();
+- if (null == span) break;
+- if (LOGGER.isTraceEnabled()) {
+- LOGGER.trace("Span received: " + span.toJson());
+- }
+- addToBatch(span);
+- counter++;
+- if (counter >= batchSize) {
+- commitBatch(conn);
+- counter = 0;
+- }
+- }
+- }
+-
+- private void addToBatch(Span span) {
+-
+- String stmt = "UPSERT INTO " + tableName + " (";
+- // drop it into the queue of things that should be written
+- List keys = new ArrayList();
+- List