Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.
If you would prefer to see the changes in the code, here’s the upgrade PR from the example app.
Note: Changes above don’t include Zeitwerk setup for your non-Rails projects. See this commit for details on how to replace Karafka::Loader with Zeitwerk.
Note: If you use Sidekiq backend, keep in mind that before an upgrade, you need to consume all of the messages that are already in Redis.
Note: This release is the last release with ruby-kafka under the hood. We’ve already started the process of moving to rdkafka-ruby.
Up until now, in order to see your code changes within the Karafka process, you would have to restart it. That was really cumbersome as for bigger and more complex Kafka clusters, restart with reconnections and rebalancing could take a significant amount of time. Fortunately, those times are already gone!
All you need to do is enabling this part of the code before the App.boot
in your karafka.rb
file:
# For non-Rails app with Zeitwerk loader if Karafka::App.env.development? Karafka.monitor.subscribe( Karafka::CodeReloader.new( APP_LOADER ) ) end # Or for Ruby on Rails if Karafka::App.env.development? Karafka.monitor.subscribe( Karafka::CodeReloader.new( *Rails.application.reloaders ) ) end
and your code changes will be applied after each message/messages batch fetch.
Keep in mind though, that there are a couple of limitations to it:
It is also worth pointing out, that if you have a code that should be re-initialized in any way during the reload phase, you can pass it to the Karafka::CodeReloader
initializer:
if Karafka::App.env.development? Karafka.monitor.subscribe( Karafka::CodeReloader.new( *Rails.application.reloaders ) { Dry::Events::Publisher.registry.clear } ) end
Parsers as a concept, that would be responsible for serialization and deserialization of data violated SRP (see details here). From now on, they are separate entities that you can use independently. For the upgrade, just rename parser to deserializer for each topic you’re using in the routes:
App.consumer_groups.draw do consumer_group :batched_group do batch_fetching true topic :xml_data do consumer XmlMessagesConsumer batch_consuming false # parser XmlDeserializer.new deserializer XmlDeserializer.new end end end
and make sure, you extract the payload
of the message by yourself:
class XmlDeserializer # @param params [Karafka::Params::Params] params to de-serialize # @return [Hash] deserialized xml # @example: # XmlDeserializer.new.call('n') def call(params) ::Hash.from_xml(params.payload) end end
For a justification of this change, please refer to this pull request.
Note: You can skip this section if you use Karafka with Ruby on Rails.
We aren’t the best at loading things. Zeitwerk is. That’s why we’ve dropped our custom loader in favor of it.
Just load your app code in your karafka.rb file before configuring the app and you should be ready to go:
APP_LOADER = Zeitwerk::Loader.new %w[ lib app/consumers app/responders app/workers ].each(&APP_LOADER.method(:push_dir)) APP_LOADER.setup APP_LOADER.eager_load class App < Karafka::App # config here... end
Don’t forget to eager_load the code or some of the Karafka components might not work as expected.
This is probably the biggest change in this release.
Up until now, your data when received was available in the root scope of each params instance in the #params_batch.
It means, that when you’ve sent a message as followed:
WaterDrop::SyncProducer.call( { login: 'maciek', id: '1' }, topic: 'users' )
you would access it like so:
def consume params_batch.each do |params| puts "Hello #{params['login']}!n" end end
Karafka used to merge your data directly within the Karafka::Params::Params object root scope. That was convenient, but not flexible enough. There are some metadata details in the root params scope that could get overwritten, plus in case you would send something else than a JSON hash, let’s say an array, you would get an exception and you would have to use a custom parser to bypass that (see this FAQ question).
Due to that and in order to better separate your incoming data from the rest of the payload (headers, metadata information, etc), from now on, all of your data will be available under the payload params key:
def consume params_batch.each do |params| puts "Hello #{params['payload']['login']}!n" # or puts "Hello #{params.payload['login']}!n" end en
The same applies to the case when you want to access unparsed data:
def consume params_batch.to_a.each |params| puts "Unparsed details: #{params['payload']}" end end
When in the batch_fetching mode, while fetching data from the Kafka cluster, additional information is being received. This details are available using the #metadata consumer method:
class UsersConsumer < ApplicationConsumer def consume puts metadata #=> { batch_size: 200, topic: 'events', partition: 2 } end end
In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.
The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.
Both WaterDrop and Karafka support now messages headers.
WaterDrop::SyncProducer.call( { login: 'maciek', id: '1' }, topic: 'users', headers: { event: 'created' } )
# Karafka consumer def consume puts params_batch.last.headers #=> { 'event' => 'created' } end
Up until now, in order to test consumers, you would have to know the internal format in which Karafka stores Kafka messages. That is no longer true!
We’ve created a new library called Karafka-Testing, that will provide you with all the methods to spec out your consumers much easier.
Add this gem to your Gemfile in the test group:
group :test do gem 'karafka-testing' gem 'rspec' end
and then in your spec_helper.rb file:
require 'karafka/testing/rspec/helpers' RSpec.configure do |config| config.include Karafka::Testing::RSpec::Helpers end
Once included in your RSpec setup, this library will provide you two methods that you can use with your specs:
– #karafka_consumer_for – this method will create a consumer instance for the desired topic. It needs to be set as the spec subject.
– #publish_for_karafka – this method will “send” message to the consumer instance.
Note: Messages sent using the `#publish_for_karafka` method won’t be sent to Kafka. They will be “virtually” delegated to the created consumer instance so your specs can run without Kafka setup.
RSpec.describe InlineBatchConsumer do # This will create a consumer instance with all the # settings defined for the given topic subject(:consumer) do karafka_consumer_for(:inline_batch_data) end let(:nr1_value) { rand } let(:nr2_value) { rand } let(:sum) { nr1_value + nr2_value } before do # Sends first message to Karafka consumer publish_for_karafka({ 'number' => nr1_value }.to_json) # Sends second message to Karafka consumer publish_for_karafka({ 'number' => nr2_value }.to_json) allow(Karafka.logger).to receive(:info) end it 'expects to log a proper message' do expect(Karafka.logger) .to receive(:info).with( "Sum of 2 elements equals to: #{sum}" ) consumer.consume end end
We’ve made some small changed to the default listener and the names of the events that are being published during Karafka runtime flow execution. Please see this commit for more details.
Karafka::Instrumentation::Listener is now Karafka::Instrumentation::StdoutListener.
There has been a rename and a switch to an instantiation version of this listener.
Karafka.monitor.subscribe( # Old Karafka::Instrumentation::Listener # New Karafka::Instrumentation::StdoutListener.new )
Karafka::Instrumentation::ProctitleListener has been added.
New instrumentation called Karafka::Instrumentation::ProctitleListener has been added. Its purpose is to provide you with a nicer proc title with a descriptive value. In order to use it, please put the following line in your karafka.rb boot file:
Karafka.monitor.subscribe( Karafka::Instrumentation::ProctitleListener.new )
A blocking #mark_as_consumed method has been split into two:
If you are not interested in the additional `#params` metadata, you can use the `#payloads` method to access only the Kafka messages deserialized payload:
class EventsConsumer < ApplicationConsumer def consume EventStore.store params_batch.payloads end end
Since now, you are able to use the same consumer class for multiple topics:
App.consumer_groups.draw do consumer_group :default do topic :users do consumer UsersConsumer end topic :admins do consumer UsersConsumer end end end
Note: you will still have separate instances per each topic partition.
If a critical failure occurs (network disconnection or anything similar) Karafka will back off and wait for reconnect_timeout (defaults to 10s) before attempting to reconnect. This should prevent you from being clogged by errors and logs upon serious problems.
Support for Kafka 0.10 has been dropped. Weird things may happen if you decide to use Kafka 0.10 with Karafka 1.3 so just upgrade.
multiple_usage has been removed. Responders won’t raise any exception if you decide to send multiple messages to the same topic without declaring that. This feature was a bad idea and was creating a lot of trouble when using responders in a long-running, batched like flows.
Following code would raise a Karafka::Errors::InvalidResponderUsageError error in Karafka 1.2 but will continue to run in Karafka 1.3:
class ExampleResponder < ApplicationResponder topic :regular_topic def respond(user, profile) respond_to :regular_topic, user respond_to :regular_topic, user end end
All the Karafka internal framework exception names end now with an Error postfix. Please see this file for the whole list of exceptions.
While Karafka is processing, ruby-kafka prebuffers more data under the hood in a separate thread. If you have a big consumer lag, this can cause your Karafka process to prebuffer hundreds or more megabytes of data upfront. Lowering the queue size makes Karafka more predictable by default.
Our Wiki has been updated accordingly to the 1.3 status. Please notify us if you find any incompatibilities.
If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:
git clone https://github.com/karafka/example-app ./example_app
then, just bundle install all the dependencies:
cd ./example_app bundle install
and follow the instructions from the example app Wiki.
The post Karafka framework 1.3.0 Release Notes (Ruby + Kafka) appeared first on Running with Ruby.
Our latest Canonical website rebrand did not just bring the new Vanilla-based frontend, it also…
At Canonical, the work of our teams is strongly embedded in the open source principles…
Welcome to the Ubuntu Weekly Newsletter, Issue 873 for the week of December 29, 2024…
Have WiFi troubles on your Ubuntu 24.04 system? Don’t worry, you’re not alone. WiFi problems…
The following is a post from Mark Shuttleworth on the Ubuntu Discourse instance. For more…
I don’t like my prompt, i want to change it. it has my username and…