1. Configure Camel Kafka Component
package com.integration.camel.component; import org.apache.camel.component.kafka.KafkaComponent; import org.apache.camel.component.kafka.KafkaConfiguration; import org.apache.camel.component.kafka.KafkaEndpoint; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by tmichels on 9/3/15. */ @Configuration public class KafkaCamelComponent { @Bean public KafkaEndpoint kafkaEndpoint(){ KafkaEndpoint kafkaEndpoint = new KafkaEndpoint(); kafkaEndpoint.setZookeeperHost("localhost"); kafkaEndpoint.setZookeeperPort(2181); kafkaEndpoint.setTopic("test"); return kafkaEndpoint; } @Bean public KafkaComponent kafkaComponent(KafkaEndpoint kafkaEndpoint){ KafkaComponent kafkaComponent = new KafkaComponent(); kafkaComponent.setEndpointClass(kafkaEndpoint.getClass()); return kafkaComponent; } }
2. Configure Kafka Consume and Producer Route
package com.integration.camel.route; import org.apache.camel.builder.RouteBuilder; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by tmichels on 9/3/15. */ @Configuration public class KafkaCamelRoute { @Bean(name = "KafkaRouteProducer") public RouteBuilder kafkaRouteProducer() { return new RouteBuilder() { public void configure() { from("direct:kafkaRoute").to("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder").bean(kafkaOutputBean.class); } }; } @Bean(name="KafkaRouteConsumer") public RouteBuilder kafkaRouteConsumer() { return new RouteBuilder() { public void configure() { from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder").bean(kafkaOutputBean.class); } }; } public static class kafkaOutputBean { public void printKafkaBody(String body) { System.out.println("KafkaBody result >>>>> " + body); } } }
3. Start Kafka test by adding routes to CamelContext and starting
package com.integration.camel.route; import com.integration.camel.component.KafkaCamelComponent; import com.integration.camel.context.CamelContextConfig; import org.apache.camel.CamelContext; import org.apache.camel.EndpointInject; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.kafka.KafkaComponent; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; /** * Created by tmichels on 9/3/15. */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(classes = {KafkaCamelComponent.class, KafkaCamelRoute.class, CamelContextConfig.class}) public class KafkaCamelIntegrationTest { @Autowired CamelContext camelContext; @Autowired KafkaComponent kafkaComponent; @Autowired @Qualifier("KafkaRouteProducer") RouteBuilder kafkaRouteProducer; @Autowired @Qualifier("KafkaRouteConsumer") RouteBuilder kafkaRouteConsumer; @EndpointInject(uri = "direct:kafkaRoute") ProducerTemplate kafkaProducer; @Before public void before() throws Exception { camelContext.addRoutes(kafkaRouteProducer); camelContext.addRoutes(kafkaRouteConsumer); } @Test public void testKafkaRoute(){ kafkaProducer.sendBody("direct:KafkaRouteProducer", "testKafkaMessage"); try { camelContext.start(); camelContext.stop(); } catch (Exception e) { e.printStackTrace(); } } }
Great blog to start with camel + kafka. Just one question, where is this CamelContextConfig file???
What are the dependencies should be added, I mean pom.xml ?
ust one question, where is this CamelContextConfig file???
where is this CamelContextConfig file???
I think the implementation will be as following:
@Configuration
public class CamelContextConfig {
@Autowired
CamelContext camelContext;
@Bean
public CamelContext camelContext() {
return camelContext;
}
}
How to unit test using Embedded Kafka and Camel
Hi There,
Have you used camel-avro-consumer & producer?? If so could you please provide an example?
Can I have pom file and completing working code in github