Source code for merton.obs

"""OpenTelemetry observability for merton.

Wraps the heavy calibration / panel / portfolio entry points with
OpenTelemetry spans so production users get end-to-end traces in their
existing OTel pipeline (Datadog, Honeycomb, Grafana Tempo, …).

The module is **lazy** — calling :func:`enable` is what wires up the
TracerProvider. Before that, :func:`traced` is a no-op decorator, so
``import merton`` carries zero observability overhead for users who
haven't opted in.

Install with::

    pip install "merton[obs]"

then::

    import merton

    merton.obs.enable(service_name="risk-engine", otlp_endpoint="http://otel-collector:4317")
    # All MertonModel.fit / batch_fit / calibrator calls now emit spans.

Configuration via environment variables:

- ``MERTON_OBS=1`` — auto-enable on import (uses defaults).
- ``MERTON_OTLP_ENDPOINT`` — override the default OTLP gRPC endpoint.
- ``MERTON_OBS_CONSOLE=1`` — also export to stdout (debugging).
"""

from __future__ import annotations

import contextlib
import functools
import os
from collections.abc import Callable
from contextlib import contextmanager
from typing import Any, TypeVar

from . import __version__

F = TypeVar("F", bound=Callable[..., Any])

_DEFAULT_OTLP_ENDPOINT = "http://localhost:4317"
_DEFAULT_SERVICE_NAME = "merton"

_state: dict[str, Any] = {
    "enabled": False,
    "tracer_provider": None,
    "tracer": None,
}


[docs] def is_enabled() -> bool: """Return ``True`` if OTel tracing is wired up in this process.""" return bool(_state["enabled"])
[docs] def enable( *, service_name: str = _DEFAULT_SERVICE_NAME, otlp_endpoint: str | None = None, console: bool = False, insecure: bool = True, ) -> None: """Initialise the OpenTelemetry TracerProvider. Idempotent — calling multiple times keeps the first configuration. Raises :class:`ImportError` if ``merton[obs]`` isn't installed. """ if _state["enabled"]: return try: from opentelemetry import trace from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor except ImportError as err: # pragma: no cover - extras gate raise ImportError('merton.obs requires merton[obs]: pip install "merton[obs]"') from err resource = Resource.create( { "service.name": service_name, "service.version": __version__, "library.name": "merton", } ) provider = TracerProvider(resource=resource) endpoint = otlp_endpoint or os.environ.get("MERTON_OTLP_ENDPOINT", _DEFAULT_OTLP_ENDPOINT) try: from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter provider.add_span_processor( BatchSpanProcessor( OTLPSpanExporter(endpoint=endpoint, insecure=insecure), max_queue_size=4096, max_export_batch_size=512, ) ) except ImportError: # pragma: no cover - sub-extra missing # OTLP exporter is an optional dependency on top of the API; fall back # to the no-op exporter so users without it still see local console # output if requested. pass if console or os.environ.get("MERTON_OBS_CONSOLE") == "1": from opentelemetry.sdk.trace.export import ( ConsoleSpanExporter, SimpleSpanProcessor, ) provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) trace.set_tracer_provider(provider) _state["tracer_provider"] = provider _state["tracer"] = trace.get_tracer("merton", __version__) _state["enabled"] = True
[docs] def disable() -> None: """Shut the current TracerProvider down. Tracing becomes a no-op again.""" if not _state["enabled"]: return provider = _state.get("tracer_provider") if provider is not None: # Telemetry teardown must never crash user code; if the exporter is # mid-flight or the network is down we still want disable() to leave # the package in a clean state. with contextlib.suppress(Exception): provider.force_flush() provider.shutdown() _state["enabled"] = False _state["tracer"] = None _state["tracer_provider"] = None
[docs] def get_tracer(): # type: ignore[no-untyped-def] """Return the active tracer if enabled; ``None`` otherwise.""" return _state["tracer"]
@contextmanager
[docs] def span(name: str, **attributes: Any): # type: ignore[no-untyped-def] """Context manager that opens a span when tracing is on. No-op (yields ``None``) otherwise. Use this around any block of work you want measured:: from merton import obs with obs.span("merton.fit", method="duan_mle"): result = model.fit(firm) """ if not _state["enabled"]: yield None return tracer = _state["tracer"] with tracer.start_as_current_span(name, attributes=attributes) as current: try: yield current except Exception as exc: from opentelemetry.trace import Status, StatusCode current.record_exception(exc) current.set_status(Status(StatusCode.ERROR, str(exc))) raise
[docs] def traced(name: str | None = None, **default_attributes: Any) -> Callable[[F], F]: """Decorator that wraps a function in a span when tracing is enabled. The decorated function pays a single ``is_enabled()`` check per call when tracing is off — well under a microsecond. """ def decorator(fn: F) -> F: span_name = name or f"{fn.__module__}.{fn.__qualname__}" @functools.wraps(fn) def wrapper(*args: Any, **kwargs: Any) -> Any: if not _state["enabled"]: return fn(*args, **kwargs) with span(span_name, **default_attributes): return fn(*args, **kwargs) return wrapper # type: ignore[return-value] return decorator
# Auto-enable from the environment if the user opted in. if os.environ.get("MERTON_OBS") == "1": # pragma: no cover - env-driven with contextlib.suppress(ImportError): enable() __all__ = [ "disable", "enable", "get_tracer", "is_enabled", "span", "traced", ]