Loading

Kafka-Connector: How to Configure access Using TLS in Mule 4

Fecha de publicación: Jul 25, 2025
Tarea

GOAL

Showing how to configure Kafka connector to access the broker using TLS.
Pasos
1. The following dependencies need to be included in the pom.xml file. They are added by Studio when you add the first connector.
 
    <dependencies>
...
        <dependency>
            <groupId>com.mulesoft.connectors</groupId>
            <artifactId>mule-kafka-connector</artifactId>
            <version>4.1.0</version>
            <classifier>mule-plugin</classifier>
        </dependency>
    </dependencies>

2. We need to define the Kafka configuration access, to establish connection with the Kafka broker. 

Example plain text configuration, for producer and consumer.
 
<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="95194c1c-bbdd-4446-9196-6fbc6d42c440" topic="test">
        <kafka:producer-plaintext-connection >
            <kafka:bootstrap-servers >
                <kafka:bootstrap-server value="<server url>:9092" />
            </kafka:bootstrap-servers>
        </kafka:producer-plaintext-connection>
    </kafka:producer-config>
    
    
    <kafka:consumer-config name="Apache_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="30061a79-c926-40e2-8b58-3ee360e788bb" >
        <kafka:consumer-plaintext-connection groupId="my-group" >
            <kafka:bootstrap-servers >
                <kafka:bootstrap-server value="<server url>t:9092" />
            </kafka:bootstrap-servers>
            <kafka:topic-patterns >
                <kafka:topic-pattern value="test" />
            </kafka:topic-patterns>
        </kafka:consumer-plaintext-connection>
    </kafka:consumer-config>

Example TSL Configuration. In this case additionally, the TLS Context is required for adding the Key Store and Trust Store references. The article here can be used as a reference for creating Key and Trust stores.
 
<kafka:producer-config name="APAC_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="7b8abb6c-fba1-43e6-97f3-7018aa624835" topic="topic-secure">
        <kafka:producer-plaintext-connection endpointIdentificationAlgorithm="https" tlsContext="TLS_Context" >
            <kafka:bootstrap-servers >
                <kafka:bootstrap-server value="solares.gonfi.net:9093" />
            </kafka:bootstrap-servers>
        </kafka:producer-plaintext-connection>
    </kafka:producer-config>


    <kafka:consumer-config name="APAC_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="3e1e4fe2-3aee-41f6-b99f-340ce8f49e4d" pollTimeout="250">
        <kafka:consumer-plaintext-connection groupId="my-group" tlsContext="TLS_Context" endpointIdentificationAlgorithm='#[""]' retryBackoffTimeout="1" requestTimeout="60" sessionTimeout="60" fetchMaximumWaitTimeout="1" fetchMaximumWaitTimeoutUnit="SECONDS" maximumPollingInterval="300" retryBackoffTimeoutTimeUnit="SECONDS" heartbeatInterval="10" autoOffsetReset="EARLIEST">
            <reconnection failsDeployment="true" >
                <reconnect-forever frequency="30000" />
            </reconnection>
            <kafka:bootstrap-servers >
                <kafka:bootstrap-server value="solares.gonfi.net:9093" />
            </kafka:bootstrap-servers>
            <kafka:topic-patterns >
                <kafka:topic-pattern value="topic-secure" />
            </kafka:topic-patterns>
        </kafka:consumer-plaintext-connection>
    </kafka:consumer-config>


    <tls:context name="TLS_Context" doc:name="TLS Context" doc:id="b7f5796e-79e4-49d1-a8ad-b3afba9853f6" >
        <tls:trust-store path="/Users/lsanchez/ssl2/kafka.client.truststore.jks" password="mule123" insecure="true"/>
        <tls:key-store path="/Users/lsanchez/ssl2/kafka.client.keystore.jks" keyPassword="mule123" password="mule123" type="jks"/>
    </tls:context>

In Cloudhub:  The SSL truststore file needs to be included in the application distribution. This is done by adding the file to the application's src/main/resources directory prior to building the deployable file.
The path is /opt/mule/mule-CURRENT/apps/<app-name>/<truststore-name> , where app-name and truststore-name need to be replaced with the actual names of your application and your truststore file respectively. So the configuration will look something like the following:
<tls:trust-store  path="/opt/mule/mule-CURRENT/apps/<app-name>/kafka.client.truststore.jks" password="mule123" insecure="true"/>


3. Then for instance you can use any of the Kafka connectors for publishing or listening.
 
<kafka:publish doc:name="Producer" doc:id="760e8cd6-c4d5-41bf-bcd8-0690c86b1776" config-ref="Apache_Kafka_Producer_configuration" topic="#[payload.topic]" key="#[now()]">
            <kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
        </kafka:publish>

        <kafka:message-listener doc:name="Message listener Secure" doc:id="b4bcab6c-e83d-4bf6-a258-46fab0a8950a" config-ref="APAC_Kafka_Consumer_configuration" ackMode="AUTO"/>

        <kafka:message-listener doc:name="Message listener" doc:id="298c1a10-adf3-4ee9-8574-f3dcb381c628" config-ref="Apache_Kafka_Consumer_configuration" ackMode="MANUAL"/>

        <kafka:commit doc:name="Commit" doc:id="265eb045-0e02-4cec-af73-873d6287e5a2" config-ref="Apache_Kafka_Consumer_configuration" commitKey="#[attributes.consumerCommitKey]"/>



 

ADDITIONAL INFORMATION

In order to see details on the logs related to this you can enable TRACE level to the following packages:
<AsyncLogger name="com.mulesoft.connectors.kafka" level="TRACE"/> 
 		<AsyncLogger name="org.mule.modules.kafka" level="TRACE"/> 
		<ASyncLogger name="org.apache.kafka" level="TRACE"/>



 
Número del artículo de conocimiento

001116612

 
Cargando
Salesforce Help | Article