Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orphan telemetry for ecto spans originating from engine.ex in Oban #62

Open
olivermt opened this issue Jan 23, 2022 · 14 comments
Open

Orphan telemetry for ecto spans originating from engine.ex in Oban #62

olivermt opened this issue Jan 23, 2022 · 14 comments
Labels
bug Something isn't working

Comments

@olivermt
Copy link

Describe the bug
Tons of UPDATE "public"."oban_producers" AS o0 SET "updated_at" = $1 WHERE (o0."uuid" = $2) as parentless spans
Expected behavior
Span from engine.ex should be respected and included as the parent span

Additional context
Offending code: https://github.com/sorentwo/oban/blob/main/lib/oban/queue/engine.ex#L117

I suspect this is probably happening for all the functions in engine.ex as there seems to be nothing special about this function compared to all the other ones.

@olivermt olivermt added the bug Something isn't working label Jan 23, 2022
@bryannaegele
Copy link
Collaborator

This may be resolved in #49

@dvic
Copy link
Contributor

dvic commented Mar 19, 2022

@olivermt These queries will become "less" orphaned with #49 merged but it's still quite a lot of spans. Using #49 we actually ignored most of them in our setup using a sampler somewhere along these lines:

defmodule OpentelemetrySampler do
  @behaviour :otel_sampler

  defstruct [:opts]

  @impl true
  def setup(opts), do: %__MODULE__{opts: opts}

  @impl true
  def description(%__MODULE__{}), do: "OpentelemetrySampler"

  @impl true
  def should_sample(ctx, _trace_id, _links, span_name, _span_kind, attributes, _opts) do
    drop =
      String.ends_with?(span_name, ".repo.query:oban_producers") ||
        String.ends_with?(span_name, ".repo.query:oban_jobs") ||
        String.starts_with?(span_name, "Elixir.Oban.Plugins") ||
        String.starts_with?(span_name, "Elixir.Oban.Pro.Plugins") ||
        Map.get(attributes, :"db.statement") in ["begin", "commit"]

    decision =
      if drop do
        :drop
      else
        :record_and_sample
      end

    additional_attributes = []

    tracestate =
      OpenTelemetry.Tracer.current_span_ctx(ctx)
      |> OpenTelemetry.Span.tracestate()

    {decision, additional_attributes, tracestate}
  end
end

@olivermt
Copy link
Author

My sampler already looks more or less like that actually :)
We cut down some 8 milllion events per day in production with the above.

@tsloughter
Copy link
Member

Is it possible to make certain types of Oban spans configurable whether the user wants them included or not? Instead of requiring them add a sampler?

I'm not a fan of samplers having to be used simply to say 'don't include all of these' -- esp since they apply to all spans created, so even ones completely unrelated to oban have to go through the same sampler, at this time.

@indrekj
Copy link
Contributor

indrekj commented Mar 21, 2022

The problem is that these spans are created by Ecto automatically deep inside the oban code.

In ruby there's:

OpenTelemetry::Common::Utilities.untraced do
  # code
end

which basically creates a root span with sampling set to 0. So all the spans that are automatically created (activerecord, postgres, etc) inside the block get the sampled=false as well. The last time I checked then doing that was not possible in erlang opentelemetry. I don't think it is even in the standard but many languages have provided kind of similar behavior because of the same reasons as here.

@tsloughter
Copy link
Member

Oh, interesting, we should probably add that and see if it should be in the spec.

@bryannaegele
Copy link
Collaborator

Nice. I can see how that could be helpful. We could maybe add a flag to the context in pdict that gets checked on span creation. 🤔

@aj-foster
Copy link

Hi folks 👋🏼

Is the custom sampler in the comment above still the best way to handle these traces?

@olivermt
Copy link
Author

@aj-foster here is the current one we use (dominated by Oban still, but more or less the same approach yes):

Added bonus is a free howto on how to not log usernames and passwords in cleartext to honeycomb (🙈 ).
Luckily it was caught in staging and HC has good api for deleting columns :P

defmodule Platform.OpentelemetrySampler do
  require OpenTelemetry.Tracer, as: Tracer
  require OpenTelemetry.Span, as: Span

  @moduledoc """
  `setup/1` takes a map of config you can utilize in your should_sample function

  `description/1` is used to produce a human readable description of the sampler in use.

  `should_sample/7` is where you do the actual sampling.
  The params are the following
  - OpenTelemetry.Ctx, the context of the whole trace
  - trace_id, the id of the current trace
  - links, list of span links (TODO ??)
  - span_name, the name of the current span
  - span_kind, the ??
  - attrs, the attributes for the current span
  - config, the return of the setup function

  `should_sample/7` needs to return a three-tuple:

  {
    drop | record_only | record_and_sample, # the sample decision
    [{"new_attrs", "here"}, ... {...}], #any new attrs
    trace_state #the current tracestate, see `get_tracestate`
  }
  """
  def setup(_) do
    %{}
  end

  def description(_), do: "PlatformSampler"

  @filter_list_db_statement [
    "begin",
    "commit",
    "SELECT pg_notify($1, payload) FROM json_array_elements_text($2::json) AS payload",
    "SELECT pg_try_advisory_xact_lock($1)",
    "select now()",
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1 FROM (SELECT so0."id" AS "id", so0."state" AS "state", so0."queue" AS "queue", so0."worker" AS "worker", so0."args" AS "args", so0."meta" AS "meta", so0."tags" AS "tags", so0."errors" AS "errors", so0."attempt" AS "attempt", so0."attempted_by" AS "attempted_by", so0."max_attempts" AS "max_attempts", so0."priority" AS "priority", so0."attempted_at" AS "attempted_at", so0."cancelled_at" AS "cancelled_at", so0."completed_at" AS "completed_at", so0."discarded_at" AS "discarded_at", so0."inserted_at" AS "inserted_at", so0."scheduled_at" AS "scheduled_at" FROM "public"."oban_jobs" AS so0 WHERE (so0."state" IN ('scheduled','retryable')) AND (NOT (so0."queue" IS NULL)) AND (so0."scheduled_at" <= $2) ORDER BY so0."id" LIMIT $3 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id")/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1, "attempted_at" = $2, "attempted_by" = $3, "attempt" = o0."attempt" + $4 WHERE (o0."id" IN (SELECT so0."id" FROM "public"."oban_jobs" AS so0 WHERE ((so0."state" = 'available') AND (so0."queue" = $5)) ORDER BY so0."priority", so0."scheduled_at", so0."id" LIMIT $6 FOR UPDATE SKIP LOCKED)) RETURNING o0."id", o0."state", o0."queue", o0."worker", o0."args", o0."meta", o0."tags", o0."errors", o0."attempt", o0."attempted_by", o0."max_attempts", o0."priority", o0."attempted_at", o0."cancelled_at", o0."completed_at", o0."discarded_at", o0."inserted_at", o0."scheduled_at"/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1, "discarded_at" = $2 FROM (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 LEFT OUTER JOIN "public"."oban_producers" AS so1 ON array_length(so0."attempted_by", 1) = 2 AND (so1."uuid" = uuid (so0."attempted_by"[2])) WHERE (NOT (so0."queue" IS NULL) AND (so0."state" = 'executing')) AND (so1."uuid" IS NULL)) AS s1 WHERE (o0."id" = s1."id") AND (o0."attempt" >= o0."max_attempts")/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1 FROM (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 LEFT OUTER JOIN "public"."oban_producers" AS so1 ON array_length(so0."attempted_by", 1) = 2 AND (so1."uuid" = uuid (so0."attempted_by"[2])) WHERE (NOT (so0."queue" IS NULL) AND (so0."state" = 'executing')) AND (so1."uuid" IS NULL)) AS s1 WHERE (o0."id" = s1."id") AND (o0."attempt" < o0."max_attempts")/,
    ~s/DELETE FROM "public"."oban_jobs" AS o0 USING (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 WHERE ((so0."state" = 'completed') AND (so0."attempted_at" < $1)) OR ((so0."state" = 'cancelled') AND (so0."cancelled_at" < $2)) OR ((so0."state" = 'discarded') AND (so0."discarded_at" < $3)) LIMIT $4 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id")/,
    ~s/UPDATE "public"."oban_producers" AS o0 SET "updated_at" = $1 WHERE (o0."uuid" = $2)/,
    ~s/DELETE FROM "public"."oban_producers" AS o0 WHERE (((o0."uuid" != $1) AND (o0."updated_at" <= $2))) OR ((((o0."uuid" != $3) AND (o0."name" = $4)) AND (o0."queue" = $5)) AND (o0."updated_at" <= $6))/,
    ~s/SELECT DISTINCT o0."queue" FROM "public"."oban_jobs" AS o0 WHERE (o0."state" = 'available') AND (NOT (o0."queue" IS NULL))/,
    ~s/INSERT INTO "public"."oban_producers" ("meta","name","node","queue","running_ids","started_at","updated_at","uuid") VALUES ($1,$2,$3,$4,$5,$6,$7,$8)/,
    ~s/SELECT (o0."meta"#>'{"global_limit","tracked"}') FROM "public"."oban_producers" AS o0 WHERE (o0."queue" = $1) AND (NOT ((o0."meta"#>'{"global_limit","allowed"}') IS NULL))/,
    ~s/DELETE FROM "public"."oban_peers" AS o0 WHERE (o0."name" = $1) AND (o0."expires_at" < $2)/,
    ~s/INSERT INTO "public"."oban_peers" AS o0 ("expires_at","name","node","started_at") VALUES ($1,$2,$3,$4) ON CONFLICT ("name") DO UPDATE SET "expires_at" = $5/,
    ~s/INSERT INTO "public"."oban_peers" ("expires_at","name","node","started_at") VALUES ($1,$2,$3,$4) ON CONFLICT DO NOTHING/,
    ~s/SELECT key FROM UNNEST($1::int[]) key WHERE NOT pg_try_advisory_xact_lock($2, key)\n/,
    ~s/SELECT o0."uuid", o0."name", o0."node", o0."queue", o0."running_ids", o0."started_at", o0."updated_at", o0."meta" FROM "public"."oban_producers" AS o0 WHERE (o0."uuid" = $1) FOR UPDATE/,
    ~s/select 1/
  ]

  @filter_list_span_name [
    "Elixir.Oban.Plugins.Gossip process",
    "Elixir.Oban.Plugins.Stager process",
    "Elixir.Oban.Plugins.Pruner process"
  ]

  @filter_graphql_variables [
    "password",
    "token"
  ]
  def should_sample(
        ctx,
        _trace_id,
        _links,
        span_name,
        _span_kind,
        attrs,
        _config
      ) do
    {:no_drop, ctx, span_name, attrs}
    |> db_attrs_statement_should_drop()
    |> span_name_should_drop()
    |> gql_variable_atters_modify()
    |> case do
      {:drop, ctx, _span_name, _attrs} ->
        {:drop, [], tracestate(ctx)}

      {:no_drop, ctx, _span_name, new_attrs} ->
        {:record_and_sample, new_attrs, tracestate(ctx)}
    end
  end

  defp tracestate(ctx) do
    Tracer.current_span_ctx(ctx)
    |> Span.tracestate()
  end

  defp db_attrs_statement_should_drop(
         {_decision, ctx, span_name, %{"db.statement": statement} = attrs}
       )
       when statement in @filter_list_db_statement,
       do: {:drop, ctx, span_name, attrs}

  defp db_attrs_statement_should_drop(sample), do: sample

  defp gql_variable_atters_modify(
         {:no_drop, ctx, span_name, %{"graphql.request.variables" => variables} = attrs}
       ) do
    if String.contains?(variables, @filter_graphql_variables) do
      variables =
        variables
        |> Jason.decode()
        |> case do
          {:ok, variables} ->
            variables
            |> filter_values(@filter_graphql_variables)
            |> Jason.encode!()

          {:error, _reason} ->
            "[FILTERED]"
        end

      attrs = Map.put(attrs, "graphql.request.variables", variables)
      {:no_drop, ctx, span_name, attrs}
    else
      {:no_drop, ctx, span_name, attrs}
    end
  end

  defp gql_variable_atters_modify(sample), do: sample

  defp span_name_should_drop({_decision, ctx, span_name, attrs})
       when span_name in @filter_list_span_name do
    {:drop, ctx, span_name, attrs}
  end

  defp span_name_should_drop(sample), do: sample

  def filter_values(%{} = map, filter_params) do
    Enum.into(map, %{}, fn {k, v} ->
      if is_binary(k) and String.contains?(k, filter_params) do
        {k, "[FILTERED]"}
      else
        {k, filter_values(v, filter_params)}
      end
    end)
  end

  def filter_values([_ | _] = list, filter_params) do
    Enum.map(list, &filter_values(&1, filter_params))
  end

  def filter_values(other, _filter_params), do: other
end

@tsloughter
Copy link
Member

Is the best option still a custom sampler of a bunch of spans to filter out these queries?

I realized I don't think untraced would work here since there is no place to actually wrap the code Oban is running in an untraced as we are just working off of telemetry events, not doing any code insertion into Oban or wrapping of Oban functions.

@bryannaegele
Copy link
Collaborator

We filter this query out in our collectors. It's an internal query of Oban afaik so nothing we have control over. I think the long-term solution is enabling excludes to Ecto.

@tsloughter
Copy link
Member

@bryannaegele query or queries. The sampler above has a lot of queries.

I take it you want some Oban Ecto queries in spans? Since each Ecto Repo has to be setup individually, right? Meaning getting spans for Oban db queries is only if they explicitly say they want spans for Oban database queries?

@bryannaegele
Copy link
Collaborator

bryannaegele commented Sep 17, 2024

I'm speaking to spammy queries that are outside the scope of the instrumentation. I'd say 99% of what folks want with the oban instrumentation tracing-wise are the job execution.

This is our oban sampler fwiw. Nothing proprietary. You can see all the stuff we consider "junk queries" that we just filter out.

defmodule SimpleUplink.Tracing.Samplers.Oban do
  require OpenTelemetry.SemanticConventions.Trace, as: SCT
  alias SimpleUplink.Tracing.Samplers
  import SimpleUplink.Tracing.OtelSampler, only: [tracestate: 1]

  @filtered_statements [
    "begin",
    "commit",
    "SELECT pg_notify($1, payload) FROM json_array_elements_text($2::json) AS payload",
    "SELECT pg_try_advisory_xact_lock($1)",
    "select now()",
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1 FROM (SELECT so0."id" AS "id", so0."state" AS "state", so0."queue" AS "queue", so0."worker" AS "worker", so0."args" AS "args", so0."meta" AS "meta", so0."tags" AS "tags", so0."errors" AS "errors", so0."attempt" AS "attempt", so0."attempted_by" AS "attempted_by", so0."max_attempts" AS "max_attempts", so0."priority" AS "priority", so0."attempted_at" AS "attempted_at", so0."cancelled_at" AS "cancelled_at", so0."completed_at" AS "completed_at", so0."discarded_at" AS "discarded_at", so0."inserted_at" AS "inserted_at", so0."scheduled_at" AS "scheduled_at" FROM "public"."oban_jobs" AS so0 WHERE (so0."state" IN ('scheduled','retryable')) AND (NOT (so0."queue" IS NULL)) AND (so0."scheduled_at" <= $2) ORDER BY so0."id" LIMIT $3 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id")/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1, "attempted_at" = $2, "attempted_by" = $3, "attempt" = o0."attempt" + $4 WHERE (o0."id" IN (SELECT so0."id" FROM "public"."oban_jobs" AS so0 WHERE ((so0."state" = 'available') AND (so0."queue" = $5)) ORDER BY so0."priority", so0."scheduled_at", so0."id" LIMIT $6 FOR UPDATE SKIP LOCKED)) RETURNING o0."id", o0."state", o0."queue", o0."worker", o0."args", o0."meta", o0."tags", o0."errors", o0."attempt", o0."attempted_by", o0."max_attempts", o0."priority", o0."attempted_at", o0."cancelled_at", o0."completed_at", o0."discarded_at", o0."inserted_at", o0."scheduled_at"/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1, "discarded_at" = $2 FROM (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 LEFT OUTER JOIN "public"."oban_producers" AS so1 ON array_length(so0."attempted_by", 1) = 2 AND (so1."uuid" = uuid (so0."attempted_by"[2])) WHERE (NOT (so0."queue" IS NULL) AND (so0."state" = 'executing')) AND (so1."uuid" IS NULL)) AS s1 WHERE (o0."id" = s1."id") AND (o0."attempt" >= o0."max_attempts")/,
    ~s/UPDATE "public"."oban_jobs" AS o0 SET "state" = $1 FROM (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 LEFT OUTER JOIN "public"."oban_producers" AS so1 ON array_length(so0."attempted_by", 1) = 2 AND (so1."uuid" = uuid (so0."attempted_by"[2])) WHERE (NOT (so0."queue" IS NULL) AND (so0."state" = 'executing')) AND (so1."uuid" IS NULL)) AS s1 WHERE (o0."id" = s1."id") AND (o0."attempt" < o0."max_attempts")/,
    ~s/DELETE FROM "public"."oban_jobs" AS o0 USING (SELECT so0."id" AS "id" FROM "public"."oban_jobs" AS so0 WHERE ((so0."state" = 'completed') AND (so0."attempted_at" < $1)) OR ((so0."state" = 'cancelled') AND (so0."cancelled_at" < $2)) OR ((so0."state" = 'discarded') AND (so0."discarded_at" < $3)) LIMIT $4 FOR UPDATE SKIP LOCKED) AS s1 WHERE (o0."id" = s1."id")/,
    ~s/UPDATE "public"."oban_producers" AS o0 SET "updated_at" = $1 WHERE (o0."uuid" = $2)/,
    ~s/DELETE FROM "public"."oban_jobs" AS o0 USING (SELECT so0."id" AS "id", so0."queue" AS "queue", so0."state" AS "state", so0."worker" AS "worker", row_number() over (order by id desc) AS "rn" FROM "public"."oban_jobs" AS so0 WHERE (so0."state" = ANY($1)) AND (so0."state" IN ('cancelled','completed','discarded')) ORDER BY so0."id" LIMIT $2) AS s1 WHERE ((o0."id" = s1."id") AND (s1."rn" > $3)) RETURNING s1."id", s1."queue", s1."state"/,
    ~s/DELETE FROM "public"."oban_producers" AS o0 WHERE (((o0."uuid" != $1) AND (o0."updated_at" <= $2))) OR ((((o0."uuid" != $3) AND (o0."name" = $4)) AND (o0."queue" = $5)) AND (o0."updated_at" <= $6))/,
    ~s/SELECT DISTINCT o0."queue" FROM "public"."oban_jobs" AS o0 WHERE (o0."state" = 'available') AND (NOT (o0."queue" IS NULL))/,
    ~s/INSERT INTO "public"."oban_producers" ("meta","name","node","queue","running_ids","started_at","updated_at","uuid") VALUES ($1,$2,$3,$4,$5,$6,$7,$8)/,
    ~s/SELECT (o0."meta"#>'{"global_limit","tracked"}') FROM "public"."oban_producers" AS o0 WHERE (o0."queue" = $1) AND (NOT ((o0."meta"#>'{"global_limit","allowed"}') IS NULL))/,
    ~s/DELETE FROM "public"."oban_peers" AS o0 WHERE (o0."name" = $1) AND (o0."expires_at" < $2)/,
    ~s/INSERT INTO "public"."oban_peers" AS o0 ("expires_at","name","node","started_at") VALUES ($1,$2,$3,$4) ON CONFLICT ("name") DO UPDATE SET "expires_at" = $5/,
    ~s/INSERT INTO "public"."oban_peers" ("expires_at","name","node","started_at") VALUES ($1,$2,$3,$4) ON CONFLICT DO NOTHING/,
    ~s/SELECT key FROM UNNEST($1::int[]) key WHERE NOT pg_try_advisory_xact_lock($2, key)\n/,
    ~s/SELECT o0."uuid", o0."name", o0."node", o0."queue", o0."running_ids", o0."started_at", o0."updated_at", o0."meta" FROM "public"."oban_producers" AS o0 WHERE (o0."uuid" = $1) FOR UPDATE/,
    ~s/select 1/
  ]

  @filtered_sources [
    "oban_crons",
    "oban_jobs",
    "oban_peers",
    "oban_producers"
  ]

  @moduledoc """
  Oban Otel sampler.

  Worker jobs leveraging `use Oban.Worker` will be sampled and all ecto queries
  are filtered.

  ### Filtered Statements

  ...statements enumerated

  ### Filtered Sources (db tables)
  ...sources enumerate
  """
  def should_sample(ctx, <<"Elixir.Oban.Plugins.", _rest::binary>>, _attrs) do
    {:drop, [], tracestate(ctx)}
  end

  def should_sample(ctx, <<"Elixir.Oban.Pro.Plugins.", _rest::binary>>, _attrs) do
    {:drop, [], tracestate(ctx)}
  end

  for source <- @filtered_sources do
    def should_sample(ctx, _span_name, %{source: unquote(source)}) do
      {:drop, [], tracestate(ctx)}
    end
  end

  for statement <- @filtered_statements do
    def should_sample(ctx, _span_name, %{SCT.db_statement() => unquote(statement)}) do
      {:drop, [], tracestate(ctx)}
    end
  end

  def should_sample(ctx, span_name, attrs) do
    Samplers.Ecto.should_sample(ctx, span_name, attrs)
  end
end```

@danschultzer
Copy link

danschultzer commented Dec 15, 2024

FWIW I think all the Ecto otel spans that is triggered through an Oban process should have an Oban parent span by default. Here's my work to make that happen: #437

That should make it easier for the sampler to handle as well instead of guessing it from the sql statements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

7 participants