Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

514 - bring back mailbox size metric #514

Merged
merged 3 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.scalatest.{ BeforeAndAfterEach, Inspectors, OptionValues }

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random
import scala.util.control.NoStackTrace

final class AkkaActorTest
Expand Down Expand Up @@ -194,6 +195,31 @@ final class AkkaActorTest
}("", "work", "work")(messages -> check)
}

it should "record mailbox size properly" in {
val processingTime = 200
val actor = system.classicSystem.actorOf(SuspendActor.props(processingTime), createUniqueId)

def expectMailboxSize(run: Int, size: Int): Unit =
assertMetric("mesmer_akka_actor_mailbox_size") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
.contains(actor.path.toStringWithoutAddress)
)

points.map(_.getValue) should contain(size)
}
val runId = Random.nextInt()
actor ! Message
actor ! Message
expectMailboxSize(runId, 1)
actor ! Message
expectMailboxSize(runId, 2)
Thread.sleep(processingTime)
expectMailboxSize(runId, 1)

}

it should "record stash operation from actors beginning" in {
val stashActor = system.classicSystem.actorOf(ClassicStashActor.props(), createUniqueId)

Expand Down Expand Up @@ -449,6 +475,16 @@ object AkkaActorAgentTest {
// replies
final case class StashSize(stash: Option[Long])

object SuspendActor {
def props(processingTime: Long): Props = Props(new SuspendActor(processingTime))
}

class SuspendActor(processingTime: Long) extends classic.Actor {
def receive: Receive = { case Message =>
Thread.sleep(processingTime)
}
}

object ClassicStashActor {
def props(): Props = Props(new ClassicStashActor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ public static void exit(@Advice.Return Envelope envelope, @Advice.This Mailbox s
Instruments instruments = InstrumentsProvider.instance();

if (Objects.nonNull(context) && Objects.nonNull(attrs)) {

long interval = new Interval(System.nanoTime() - context.sentTime()).toMillis();

instruments.mailboxTime().record(interval, attrs);
instruments.mailboxSize().add(-1, attrs);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package akka.actor.impl;

import akka.actor.ActorContext;
import akka.dispatch.Mailbox;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.scalac.mesmer.otelextension.instrumentations.akka.actor.Instruments;
import io.scalac.mesmer.otelextension.instrumentations.akka.actor.InstrumentsProvider;
import net.bytebuddy.asm.Advice;

import java.util.Objects;

public class MailboxEnqueueAdvice {
Copy link
Contributor

@lgajowy lgajowy Nov 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a method in ActorCell:

 /**
   * If the actor isLocal, returns the number of "user messages" currently queued,
   * which may be a costly operation, 0 otherwise.
   */
  def numberOfMessages: Int

We could (maybe?) pass a reference to that field every time an actor is created.

object LocalActorRefProviderAdvice {

  @OnMethodExit
  def actorOf(@Return ref: ActorRef, @Argument(0) system: ActorSystem): Unit =
    ??? // get the reference, the path and create the instrument for the actorRef (that contains the actorcell, which has the numberOfMessages method).

}

that advice could be attached to:

private val localActorRefProviderInstrumentation: AgentInstrumentation =
    instrument("akka.actor.LocalActorRefProvider").visit(LocalActorRefProviderAdvice, "actorOf")

(taken from v0.7.0) Not sure if it's enough to instrument the localActorRefProvider

everytime we create a new actor maybe we can setup an async instrument that would periodically poll for the value from numberOfMessages, thus taking the mailbox size. Gauge seems a good fit. Not sure about the performance of this (probably awful, but maybe not?).

@skipper1982 lmk what do you think. I realize this can be unclear so let's have a call if needed. :)


@Advice.OnMethodExit
public static void exit(@Advice.This Mailbox self) {
if (Objects.nonNull(self.actor())
&& Objects.nonNull(self.actor().getSystem())) {
Attributes attrs = VirtualField.find(ActorContext.class, Attributes.class).get(self.actor());
Instruments instruments = InstrumentsProvider.instance();

if (Objects.nonNull(attrs)) {
instruments.mailboxSize().add(1, attrs);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
AkkaActorAgent.actorCellInit(),
AkkaActorAgent.dispatchSendMessage(),
AkkaActorAgent.mailboxDequeue(),
AkkaActorAgent.mailboxEnqueue(),
AkkaActorAgent.classicStashSupportStashAdvice(),
AkkaActorAgent.classicStashSupportPrependAdvice(),
AkkaActorAgent.typedStashBufferAdvice(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ object AkkaActorAgent {
)
)

val mailboxEnqueue: TypeInstrumentation = Instrumentation(named("akka.dispatch.Mailbox"))
.`with`(
Advice(
named("enqueue"),
"akka.actor.impl.MailboxEnqueueAdvice"
)
)

val classicStashSupportStashAdvice: TypeInstrumentation =
Instrumentation(named("akka.actor.StashSupport"))
.`with`(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package io.scalac.mesmer.otelextension.instrumentations.akka.actor
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.metrics.LongCounter
import io.opentelemetry.api.metrics.LongHistogram
import io.opentelemetry.api.metrics.LongUpDownCounter
import io.opentelemetry.api.metrics.MeterProvider

trait Instruments {
Expand All @@ -16,6 +17,8 @@ trait Instruments {

def mailboxTime: LongHistogram

def mailboxSize: LongUpDownCounter

def stashedMessages: LongCounter

def sentMessages: LongCounter
Expand Down Expand Up @@ -55,6 +58,11 @@ object Instruments {
.ofLongs()
.build()

lazy val mailboxSize: LongUpDownCounter = provider
.get("mesmer")
.upDownCounterBuilder("mesmer_akka_actor_mailbox_size")
.build()

lazy val stashedMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_actor_stashed_messages_total")
Expand Down