Creating and exposing customized Kafka Shopper Streaming metrics in Apache Spark utilizing PrometheusServlet
On this weblog submit, I’ll describe the right way to create and improve present Spark Structured Streaming metrics with Kafka shopper metrics and expose them utilizing the Spark 3 PrometheusServlet that may be instantly focused by Prometheus. In earlier Spark variations, one should arrange both a JmxSink/JmxExporter, GraphiteSink/GraphiteExporter, or a customized sink deploying metrics to a PushGateway server. With that stated, we couldn’t actually keep away from the rise within the complexity of our options as we should arrange exterior elements that work together with our functions in order that they are often scraped by Prometheus.
Motivation
Greater than ever, observability is a should on the subject of software program. It permits us to get insights into what is occurring contained in the software program with out having to instantly work together with the system. A technique of constructing upon this observability pillar is by exposing utility metrics. When constructed upon an observability stack, they permit us to detect issues both by alerts or just a dashboard and discovering their root trigger by analyzing metrics.
Apache Spark functions aren’t any completely different. It’s true that one can entry the Spark Internet UI and collect insights into how our utility is working, however when the variety of functions will increase by ten or hundredfold it turns into laborious to troubleshoot them. That’s when an observability instrument like Grafana turns out to be useful. Grafana is in a position to hook up with Prometheus databases, and Prometheus integrates seamlessly with our functions by focusing on the PrometheusServlet.
When configurated, Apache Spark exposes a number of metrics natively, that are detailed right here. In Structured Streaming, no metrics are uncovered by default except we set “spark.sql.streaming.metricsEnabled” -> “true”. Beneath is an instance of the metrics which might be uncovered on a Kafka Streaming job:
As we will see, these metrics are very generic and don’t present any detailed details about our supply.
The aim is to have the ability to expose Kafka Shopper metrics that assist us monitor how our occasion consumption goes.
Metrics must be quantifiable values that present real-time insights in regards to the standing or efficiency of the applying. Within the scope of this text, we’ll be overlaying the next metrics:
Begin offsets: The offsets the place the streaming question first began.Finish offsets: The final processed offsets by the streaming question. Tracks the patron progress of the question.Lead offsets: The most recent offsets of the subject the streaming question is consuming. Tracks the evolution of the consumed subject.Lag offsets: The distinction between the final processed offsets and the lead offsets from the subject. Tracks how far a streaming question is as compared with real-time.Consumed fee: The consumed fee of the streaming question subjects. It’s the sum of all of the subjects subscribed on a streaming question.Final document timestamp: The final message timestamp consumed from every TopicPartition. It tracks the latency between the producer and the patron.
The subsequent step, after defining the metrics, is to create the metric supply that can be liable for exposing the metrics to Spark’s MetricsSystem.
With a purpose to expose the metrics we have to create a category that extends Supply. What this does is create an executor connection to the motive force to cross the metrics as a part of the heartbeat course of.
Make sure that to outline the gauges as SettableGauge so as to have the ability to replace them throughout the execution.
After having the supply outlined all we have to do is instantiate the supply and register it in Spark’s MetricsSystem.
For the total code of the supply implementation, you may examine:
Now that now we have our supply in place all we’d like is to utilize it. For that, we’ll want the metrics to populate our not too long ago created gauges.
For those who ran a Structured Streaming job earlier than you may need observed an output just like the next when a Streaming Question progresses:
By analyzing the output we will see that we have already got a lot of the metrics accessible simply by parsing the JSON. Begin offsets metric utilizing startOffsets, finish offsets utilizing endOffset, lead offsets utilizing latestOffset, supply.inputRowsPerSecondas the consumed fee for the supply, and lag offsets, that gained’t be utilizing the metrics values as we will as a substitute calculate the lag for every TopicPartition through the use of the endOffset and latestOffset values, ending up with a extra granular metric.
With a purpose to make use of this data we will leverage the StreamingQueryListener onProgress occasion.
Step one is creating a category that extends StreamingQueryListener in order that we’re capable of obtain and act upon the progress occasions. It receives the KafkaMetricsSource we beforehand created that can be liable for emitting the metrics.
The final step is simply registering the listener within the desired streams on which we might need to obtain the updates.
For those who want to examine the total listener code you are able to do so right here:
The final metric is the final processed document timestamp for every TopicPartition. This metric is trickier than the earlier ones as a result of the data resides in Spark Executors in computing time.
The one method we will entry that data is by making a dummy Spark expression that acts as a facet impact. We’ll use worth column from Spark’s Kafka Schema as a approach to trick Spark into working this transformation.
This expression receives the entire Kafka Row, extracts the mandatory values, and emits the metric. It can return the worth column which, as we stated above, can be used to trick Spark into working this expression.
If we, as an example, used timestamp column and ended up deciding on the worth column additional in our question, Spark would truncate this expression out of the ultimate plan and our metric can be ignored. For those who don’t plan to make use of the worth column (which is fairly unusual), be certain that to make use of an applicable column.
After now we have all the things arrange all we’d like is to allow the Prometheus on the UI by setting spark.ui.prometheus.enabled=true (it creates a single endpoint containing all driver and executor metrics) and configuring spark.metrics.conf with the required configurations.
Operating the applying and accessing will present you all of the metrics we’ve beforehand created alongside the native ones.
There are a number of limitations to this new function. One in every of them is that this endpoint solely exposes metrics that begin with metrics_ or spark_info. Along with this, Prometheus naming conventions should not adopted by Spark, and labels aren’t presently supported (not that I do know, if a method hit me up!). Which means we’ll have A LOT of various metrics in Prometheus, which is likely to be a bit overwhelming. This difficulty might be solved by relabeling the metrics however it may be troublesome.
That’s it! You at the moment are capable of create and expose Kafka customized metrics utilizing Spark’s Prometheus native integration. Now, all it is left is to have Prometheus scrape all of the endpoints and use the metrics to create each fairly dashboards and alarms that may assist you’ve gotten extra visibility on what’s taking place inside your functions.
Regardless of Prometheus metrics sink early limitations, which is comprehensible as it’s an experimental function, I consider that it is going to be quickly enhanced with increasingly more customizations.
For those who want to see the total undertaking used on this article please examine: