<dependencies>
...
<dependency>
<groupId>com.mulesoft.connectors</groupId>
<artifactId>mule-kafka-connector</artifactId>
<version>4.1.0</version>
<classifier>mule-plugin</classifier>
</dependency>
</dependencies>
<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="95194c1c-bbdd-4446-9196-6fbc6d42c440" topic="test">
<kafka:producer-plaintext-connection >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="<server url>:9092" />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="30061a79-c926-40e2-8b58-3ee360e788bb" >
<kafka:consumer-plaintext-connection groupId="my-group" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="<server url>t:9092" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value="test" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<kafka:producer-config name="APAC_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="7b8abb6c-fba1-43e6-97f3-7018aa624835" topic="topic-secure">
<kafka:producer-plaintext-connection endpointIdentificationAlgorithm="https" tlsContext="TLS_Context" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="solares.gonfi.net:9093" />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<kafka:consumer-config name="APAC_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="3e1e4fe2-3aee-41f6-b99f-340ce8f49e4d" pollTimeout="250">
<kafka:consumer-plaintext-connection groupId="my-group" tlsContext="TLS_Context" endpointIdentificationAlgorithm='#[""]' retryBackoffTimeout="1" requestTimeout="60" sessionTimeout="60" fetchMaximumWaitTimeout="1" fetchMaximumWaitTimeoutUnit="SECONDS" maximumPollingInterval="300" retryBackoffTimeoutTimeUnit="SECONDS" heartbeatInterval="10" autoOffsetReset="EARLIEST">
<reconnection failsDeployment="true" >
<reconnect-forever frequency="30000" />
</reconnection>
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="solares.gonfi.net:9093" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value="topic-secure" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<tls:context name="TLS_Context" doc:name="TLS Context" doc:id="b7f5796e-79e4-49d1-a8ad-b3afba9853f6" >
<tls:trust-store path="/Users/lsanchez/ssl2/kafka.client.truststore.jks" password="mule123" insecure="true"/>
<tls:key-store path="/Users/lsanchez/ssl2/kafka.client.keystore.jks" keyPassword="mule123" password="mule123" type="jks"/>
</tls:context>
The path is /opt/mule/mule-CURRENT/apps/<app-name>/<truststore-name> , where app-name and truststore-name need to be replaced with the actual names of your application and your truststore file respectively. So the configuration will look something like the following:
<tls:trust-store path="/opt/mule/mule-CURRENT/apps/<app-name>/kafka.client.truststore.jks" password="mule123" insecure="true"/>
<kafka:publish doc:name="Producer" doc:id="760e8cd6-c4d5-41bf-bcd8-0690c86b1776" config-ref="Apache_Kafka_Producer_configuration" topic="#[payload.topic]" key="#[now()]">
<kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
</kafka:publish>
<kafka:message-listener doc:name="Message listener Secure" doc:id="b4bcab6c-e83d-4bf6-a258-46fab0a8950a" config-ref="APAC_Kafka_Consumer_configuration" ackMode="AUTO"/>
<kafka:message-listener doc:name="Message listener" doc:id="298c1a10-adf3-4ee9-8574-f3dcb381c628" config-ref="Apache_Kafka_Consumer_configuration" ackMode="MANUAL"/>
<kafka:commit doc:name="Commit" doc:id="265eb045-0e02-4cec-af73-873d6287e5a2" config-ref="Apache_Kafka_Consumer_configuration" commitKey="#[attributes.consumerCommitKey]"/>
<AsyncLogger name="com.mulesoft.connectors.kafka" level="TRACE"/> <AsyncLogger name="org.mule.modules.kafka" level="TRACE"/> <ASyncLogger name="org.apache.kafka" level="TRACE"/>
001116612

We use three kinds of cookies on our websites: required, functional, and advertising. You can choose whether functional and advertising cookies apply. Click on the different cookie categories to find out more about each category and to change the default settings.
Privacy Statement
Required cookies are necessary for basic website functionality. Some examples include: session cookies needed to transmit the website, authentication cookies, and security cookies.
Functional cookies enhance functions, performance, and services on the website. Some examples include: cookies used to analyze site traffic, cookies used for market research, and cookies used to display advertising that is not directed to a particular individual.
Advertising cookies track activity across websites in order to understand a viewer’s interests, and direct them specific marketing. Some examples include: cookies used for remarketing, or interest-based advertising.