|
40 | 40 |
|
41 | 41 | from synapse.api.errors import StoreError
|
42 | 42 | from synapse.config.database import DatabaseConnectionConfig
|
| 43 | +from synapse.logging import opentracing |
43 | 44 | from synapse.logging.context import (
|
44 | 45 | LoggingContext,
|
45 | 46 | current_context,
|
@@ -313,7 +314,14 @@ def _do_execute(self, func: Callable[..., R], sql: str, *args: Any) -> R:
|
313 | 314 | start = time.time()
|
314 | 315 |
|
315 | 316 | try:
|
316 |
| - return func(sql, *args) |
| 317 | + with opentracing.start_active_span( |
| 318 | + "db.query", |
| 319 | + tags={ |
| 320 | + opentracing.tags.DATABASE_TYPE: "sql", |
| 321 | + opentracing.tags.DATABASE_STATEMENT: sql, |
| 322 | + }, |
| 323 | + ): |
| 324 | + return func(sql, *args) |
317 | 325 | except Exception as e:
|
318 | 326 | sql_logger.debug("[SQL FAIL] {%s} %s", self.name, e)
|
319 | 327 | raise
|
@@ -525,9 +533,16 @@ def new_transaction(
|
525 | 533 | exception_callbacks=exception_callbacks,
|
526 | 534 | )
|
527 | 535 | try:
|
528 |
| - r = func(cursor, *args, **kwargs) |
529 |
| - conn.commit() |
530 |
| - return r |
| 536 | + with opentracing.start_active_span( |
| 537 | + "db.txn", |
| 538 | + tags={ |
| 539 | + opentracing.SynapseTags.DB_TXN_DESC: desc, |
| 540 | + opentracing.SynapseTags.DB_TXN_ID: name, |
| 541 | + }, |
| 542 | + ): |
| 543 | + r = func(cursor, *args, **kwargs) |
| 544 | + conn.commit() |
| 545 | + return r |
531 | 546 | except self.engine.module.OperationalError as e:
|
532 | 547 | # This can happen if the database disappears mid
|
533 | 548 | # transaction.
|
@@ -653,16 +668,17 @@ async def runInteraction(
|
653 | 668 | logger.warning("Starting db txn '%s' from sentinel context", desc)
|
654 | 669 |
|
655 | 670 | try:
|
656 |
| - result = await self.runWithConnection( |
657 |
| - self.new_transaction, |
658 |
| - desc, |
659 |
| - after_callbacks, |
660 |
| - exception_callbacks, |
661 |
| - func, |
662 |
| - *args, |
663 |
| - db_autocommit=db_autocommit, |
664 |
| - **kwargs, |
665 |
| - ) |
| 671 | + with opentracing.start_active_span(f"db.{desc}"): |
| 672 | + result = await self.runWithConnection( |
| 673 | + self.new_transaction, |
| 674 | + desc, |
| 675 | + after_callbacks, |
| 676 | + exception_callbacks, |
| 677 | + func, |
| 678 | + *args, |
| 679 | + db_autocommit=db_autocommit, |
| 680 | + **kwargs, |
| 681 | + ) |
666 | 682 |
|
667 | 683 | for after_callback, after_args, after_kwargs in after_callbacks:
|
668 | 684 | after_callback(*after_args, **after_kwargs)
|
@@ -718,25 +734,29 @@ def inner_func(conn, *args, **kwargs):
|
718 | 734 | with LoggingContext(
|
719 | 735 | str(curr_context), parent_context=parent_context
|
720 | 736 | ) as context:
|
721 |
| - sched_duration_sec = monotonic_time() - start_time |
722 |
| - sql_scheduling_timer.observe(sched_duration_sec) |
723 |
| - context.add_database_scheduled(sched_duration_sec) |
724 |
| - |
725 |
| - if self.engine.is_connection_closed(conn): |
726 |
| - logger.debug("Reconnecting closed database connection") |
727 |
| - conn.reconnect() |
728 |
| - |
729 |
| - try: |
730 |
| - if db_autocommit: |
731 |
| - self.engine.attempt_to_set_autocommit(conn, True) |
732 |
| - |
733 |
| - db_conn = LoggingDatabaseConnection( |
734 |
| - conn, self.engine, "runWithConnection" |
735 |
| - ) |
736 |
| - return func(db_conn, *args, **kwargs) |
737 |
| - finally: |
738 |
| - if db_autocommit: |
739 |
| - self.engine.attempt_to_set_autocommit(conn, False) |
| 737 | + with opentracing.start_active_span( |
| 738 | + operation_name="db.connection", |
| 739 | + ): |
| 740 | + sched_duration_sec = monotonic_time() - start_time |
| 741 | + sql_scheduling_timer.observe(sched_duration_sec) |
| 742 | + context.add_database_scheduled(sched_duration_sec) |
| 743 | + |
| 744 | + if self.engine.is_connection_closed(conn): |
| 745 | + logger.debug("Reconnecting closed database connection") |
| 746 | + conn.reconnect() |
| 747 | + opentracing.log_kv({"message": "reconnected"}) |
| 748 | + |
| 749 | + try: |
| 750 | + if db_autocommit: |
| 751 | + self.engine.attempt_to_set_autocommit(conn, True) |
| 752 | + |
| 753 | + db_conn = LoggingDatabaseConnection( |
| 754 | + conn, self.engine, "runWithConnection" |
| 755 | + ) |
| 756 | + return func(db_conn, *args, **kwargs) |
| 757 | + finally: |
| 758 | + if db_autocommit: |
| 759 | + self.engine.attempt_to_set_autocommit(conn, False) |
740 | 760 |
|
741 | 761 | return await make_deferred_yieldable(
|
742 | 762 | self._db_pool.runWithConnection(inner_func, *args, **kwargs)
|
|
0 commit comments