Apache Camel Kafka Spring Integration

1. Configure Camel Kafka Component

package com.integration.camel.component;

import org.apache.camel.component.kafka.KafkaComponent;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaEndpoint;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by tmichels on 9/3/15.
 */

@Configuration
public class KafkaCamelComponent {
    
    @Bean
    public KafkaEndpoint kafkaEndpoint(){
        KafkaEndpoint kafkaEndpoint = new KafkaEndpoint();
        kafkaEndpoint.setZookeeperHost("localhost");
        kafkaEndpoint.setZookeeperPort(2181);
        kafkaEndpoint.setTopic("test");
        return kafkaEndpoint;
    }

    @Bean
    public KafkaComponent kafkaComponent(KafkaEndpoint kafkaEndpoint){
        KafkaComponent kafkaComponent = new KafkaComponent();
        kafkaComponent.setEndpointClass(kafkaEndpoint.getClass());
        return kafkaComponent;
    }
}

2. Configure Kafka Consume and Producer Route

package com.integration.camel.route;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Created by tmichels on 9/3/15.
 */

@Configuration
public class KafkaCamelRoute {

    @Bean(name = "KafkaRouteProducer")
    public RouteBuilder kafkaRouteProducer() {
        return new RouteBuilder() {
            public void configure() {
                from("direct:kafkaRoute").to("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder").bean(kafkaOutputBean.class);
            }
        };
    }

    @Bean(name="KafkaRouteConsumer")
    public RouteBuilder kafkaRouteConsumer() {
        return new RouteBuilder() {
            public void configure() {
                from("kafka:localhost:9092?topic=test&zookeeperHost=localhost&zookeeperPort=2181&groupId=group1&serializerClass=kafka.serializer.StringEncoder").bean(kafkaOutputBean.class);
            }
        };
    }

    public static class kafkaOutputBean {
        public void printKafkaBody(String body) {
            System.out.println("KafkaBody result >>>>> " + body);
        }
    }
}

3. Start Kafka test by adding routes to CamelContext and starting

package com.integration.camel.route;

import com.integration.camel.component.KafkaCamelComponent;
import com.integration.camel.context.CamelContextConfig;
import org.apache.camel.CamelContext;
import org.apache.camel.EndpointInject;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.kafka.KafkaComponent;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * Created by tmichels on 9/3/15.
 */

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {KafkaCamelComponent.class,
                                KafkaCamelRoute.class,
                                CamelContextConfig.class})
public class KafkaCamelIntegrationTest {

    @Autowired
    CamelContext camelContext;

    @Autowired
    KafkaComponent kafkaComponent;

    @Autowired
    @Qualifier("KafkaRouteProducer")
    RouteBuilder kafkaRouteProducer;

    @Autowired
    @Qualifier("KafkaRouteConsumer")
    RouteBuilder kafkaRouteConsumer;

    @EndpointInject(uri = "direct:kafkaRoute")
    ProducerTemplate kafkaProducer;
    
    @Before
    public void before() throws Exception {
        camelContext.addRoutes(kafkaRouteProducer);
        camelContext.addRoutes(kafkaRouteConsumer);
    }

    @Test
    public void testKafkaRoute(){
        kafkaProducer.sendBody("direct:KafkaRouteProducer", "testKafkaMessage");
        try {
            camelContext.start();
            camelContext.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

8 Comments

  1. Mohit says:

    Great blog to start with camel + kafka. Just one question, where is this CamelContextConfig file???

  2. Praveen says:

    What are the dependencies should be added, I mean pom.xml ?

  3. srinivas says:

    ust one question, where is this CamelContextConfig file???

  4. Helen Clampett says:

    where is this CamelContextConfig file???

  5. Alexander Kravin says:

    I think the implementation will be as following:

    @Configuration
    public class CamelContextConfig {

    @Autowired
    CamelContext camelContext;

    @Bean
    public CamelContext camelContext() {
    return camelContext;
    }

    }

  6. Partha says:

    How to unit test using Embedded Kafka and Camel

  7. Kavitha says:

    Hi There,

    Have you used camel-avro-consumer & producer?? If so could you please provide an example?

  8. mohan.viswan1@gmail.com says:

    Can I have pom file and completing working code in github

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s