You are browsing unreleased documentation.
Looking for the plugin's configuration parameters? You can find them in the Kafka Upstream configuration reference doc.
This plugin transforms requests into Kafka messages in an Apache Kafka topic. For more information, see Kafka topics.
Kong also provides a Kafka Log plugin for publishing logs to a Kafka topic. See Kafka Log.
Enable on a service-less route
curl -X POST http://localhost:8001/routes/my-route/plugins \
--data "name=kafka-upstream" \
--data "config.bootstrap_servers[1].host=localhost" \
--data "config.bootstrap_servers[1].port=9092" \
--data "config.topic=kong-upstream" \
--data "config.timeout=10000" \
--data "config.keepalive=60000" \
--data "config.forward_method=false" \
--data "config.forward_uri=false" \
--data "config.forward_headers=false" \
--data "config.forward_body=true" \
--data "config.producer_request_acks=1" \
--data "config.producer_request_timeout=2000" \
--data "config.producer_request_limits_messages_per_request=200" \
--data "config.producer_request_limits_bytes_per_request=1048576" \
--data "config.producer_request_retries_max_attempts=10" \
--data "config.producer_request_retries_backoff_timeout=100" \
--data "config.producer_async=true" \
--data "config.producer_async_flush_timeout=1000" \
--data "config.producer_async_buffering_limits_messages_in_memory=50000"
Implementation details
This plugin uses the lua-resty-kafka client.
When encoding request bodies, several things happen:
- For requests with a content-type header of
application/x-www-form-urlencoded
,multipart/form-data
, orapplication/json
, this plugin passes the raw request body in thebody
attribute, and tries to return a parsed version of those arguments inbody_args
. If this parsing fails, an error message is returned and the message is not sent. - If the
content-type
is nottext/plain
,text/html
,application/xml
,text/xml
, orapplication/soap+xml
, then the body will be base64-encoded to ensure that the message can be sent as JSON. In such a case, the message has an extra attribute calledbody_base64
set totrue
.
TLS
Enable TLS by setting config.security.ssl
to true
.
mTLS
Enable mTLS by setting a valid UUID of a certificate in config.security.certificate_id
.
Note that this option needs config.security.ssl
set to true.
See Certificate Object
in the Admin API documentation for information on how to set up Certificates.
SASL Authentication
To use SASL authentication, set the configuration option config.authentication.strategy
to sasl
.
Make sure that these mechanism are enabled on the Kafka side as well.
This plugin supports the following authentication mechanisms:
-
PLAIN: Enable this mechanism by setting
config.authentication.mechanism
toPLAIN
. You also need to provide a username and password with the config optionsconfig.authentication.user
andconfig.authentication.password
respectively. -
SCRAM: In cryptography, the Salted Challenge Response Authentication Mechanism (SCRAM) is a family of modern, password-based challenge–response authentication mechanisms providing authentication of a user to a server. The Kafka Upstream plugin supports the following:
-
SCRAM-SHA-256: Enable this mechanism by setting
config.authentication.mechanism
toSCRAM-SHA-256
. You also need to provide a username and password with the config optionsconfig.authentication.user
andconfig.authentication.password
respectively. -
SCRAM-SHA-512: Enable this mechanism by setting
config.authentication.mechanism
toSCRAM-SHA-512
. You also need to provide a username and password with the config optionsconfig.authentication.user
andconfig.authentication.password
respectively.
-
-
Delegation Tokens: Delegation Tokens can be generated in Kafka and then used to authenticate this plugin.
Delegation Tokens
leverage theSCRAM-SHA-256
authentication mechanism. ThetokenID
is provided with theconfig.authentication.user
field and thetoken-hmac
is provided with theconfig.authentication.password
field. To indicate that a token is used you have to set theconfig.authentication.tokenauth
setting totrue
.Read more on how to create, renew, and revoke delegation tokens.
Known issues and limitations
Known limitations:
- Message compression is not supported.
- The message format is not customizable.
Quickstart
The following steps assume that Kong Gateway is installed and the Kafka Upstream plugin is enabled.
Note: We use
zookeeper
in the following example, which is not required or has been removed on some Kafka versions. Refer to the Kafka ZooKeeper documentation for more information.
-
Create a
kong-upstream
topic in your Kafka cluster:${KAFKA_HOME}/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 10 \ --topic kong-upstream
-
Create a Service-less Route, and add the
kafka-upstream
plugin to it:curl -X POST http://localhost:8001/routes \ --data "name=kafka-upstream" \ --data "hosts[]=kafka-upstream.dev"
curl -X POST http://localhost:8001/routes/kafka-upstream/plugins \ --data "name=kafka-upstream" \ --data "config.bootstrap_servers[1].host=localhost" \ --data "config.bootstrap_servers[1].port=9092" \ --data "config.topic=kong-upstream"
-
In a different console, start a Kafka consumer:
${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic kong-upstream \ --partition 0 \ --from-beginning \ --timeout-ms 1000
-
Make sample requests:
curl -X POST http://localhost:8000 --header 'Host: kafka-upstream.dev' foo=bar
You should receive a
200 { message: "message sent" }
response, and should see the request bodies appear on the Kafka consumer console you started in the previous step.