Salesforce Platform Events Streaming using Spring Boot

Create EmpConnector connection
The first thing is to create a configuration that on startup will connect to the EmpConnector and start listing for incoming events on a specific topic.

1. Set all your Bayeux Parameters (bayeuxParameters)
2. Create EmpConnector connection (empConnector)
3. Start listening to topic (startAndPublishAsyncEventToExchange)
4. Adding listeners to the topic (startAndPublishAsyncEventToExchange)

package com.app.core.events;

import com.app.core.service.IncomingRequestHandler;
import com.app.utils.AppPlanUtils;
import com.salesforce.emp.connector.BayeuxParameters;
import com.salesforce.emp.connector.EmpConnector;
import com.salesforce.emp.connector.TopicSubscription;
import com.salesforce.emp.connector.example.LoggingListener;
import org.cometd.bayeux.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import java.net.URL;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static com.salesforce.emp.connector.LoginHelper.login;

@Configuration
@ConfigurationProperties
@Lazy
public class SalesforceEventConfig {

    private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventConfig.class);

    @Value("${salesforce.username}")
    private String username;

    @Value("${salesforce.password}")
    private String password;

    @Value("${salesforce.token}")
    private String token;

    @Value("${salesforce.baseurl}")
    private String url;

    @Value("${salesforce.version:39.0}")
    private String version;

    @Autowired
    protected IncomingRequestHandler itsReqHandler;

    private static final Integer WAIT_TIME = 2;
    private static final Integer START_UP_WAIT_TIME = 5;
    private static final String EVENT_NAME = "App_Events__e";

    @Bean
    public BayeuxParameters bayeuxParameters() throws Exception {
        String theUrlEnv = AppUtils.getConfigVar("SALESFORCE_URL");
        String passwordAndToken = password + token;
        if (theUrlEnv != null) {
            url = getNextToken(0, theUrlEnv);
            username = getNextToken(url.length() + version.length() + 2, theUrlEnv);
            passwordAndToken = theUrlEnv.substring(url.length() + version.length() + username.length() + 3);
            itsLogger.info("Found SALESFORCE_URL to parse. url={}, version={}, username={}", url, version, username);
            url = url.split("/services/Soap/u/")[0];
        }

        BayeuxParameters bayeuxParams = getBayeuxParamWithSpecifiedAPIVersion(version);
        BayeuxParameters bayeuxParameters = login(new URL(url),username, passwordAndToken, bayeuxParams);
        return bayeuxParameters;
    }

    @Bean
    public EmpConnector empConnector(BayeuxParameters bayeuxParameters){
        itsLogger.debug("BayeuxParameters url {} version {}", bayeuxParameters.host(), bayeuxParameters.version());
        EmpConnector empConnector = new EmpConnector(bayeuxParameters);
        itsLogger.debug("EmpConnector isConnected {} isHandshook {}", empConnector.isConnected(), empConnector.isHandshook());
        return empConnector;
    }

    private static BayeuxParameters getBayeuxParamWithSpecifiedAPIVersion(String apiVersion) {
        BayeuxParameters params = new BayeuxParameters() {

            @Override
            public String version() {
                return apiVersion;
            }

            @Override
            public String bearerToken() {
                return null;
            }

        };
        return  params;
    }

    private String getNextToken(int startIdx, String origStr) {
        int theIdx = origStr.indexOf("|", startIdx);
        if (theIdx > 0) {
            return origStr.substring(startIdx, theIdx);
        } else {
            return origStr.substring(startIdx);
        }
    }

    @Bean
    public TopicSubscription startAndPublishAsyncEventToExchange(EmpConnector empConnector)  {
        TopicSubscription subscription = null;
        try {
            long replayFrom = EmpConnector.REPLAY_FROM_TIP;
            itsLogger.debug("Setup event consumer with replyFrom {}", replayFrom);

            SalesforceEventPayload eventPayload = new SalesforceEventPayload();
            Consumer<Map<String, Object>> consumer = event -> {
                eventPayload.setPayload(event);
                itsReqHandler.handleRequest(eventPayload.getPayload());
            };

            LoggingListener loggingListener = new LoggingListener(true, true);
            itsLogger.debug("Adding event listeners");
            empConnector.addListener(Channel.META_CONNECT, loggingListener)
                    .addListener(Channel.META_HANDSHAKE, loggingListener)
                    .addListener(Channel.META_DISCONNECT, loggingListener)
                    .addListener(Channel.META_SUBSCRIBE, loggingListener)
                    .addListener(Channel.META_UNSUBSCRIBE, loggingListener)
                    .addListener(Channel.META_DISCONNECT, loggingListener)
                    .addListener(Channel.SERVICE, loggingListener);

            itsLogger.debug("Starting Event Bus");
            empConnector.start().get(START_UP_WAIT_TIME, TimeUnit.SECONDS);

            itsLogger.debug("Subscribing to event {}", EVENT_NAME);
            subscription = empConnector.subscribe("/event/" + EVENT_NAME, replayFrom, consumer).get(WAIT_TIME, TimeUnit.SECONDS);

            itsLogger.debug(String.format("Subscribed: %s", subscription));

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return subscription;
    }
}

Basic POJO to serialize events received for a specific topic

Note: as the ‘payload’ tag has some extra tags I remove them before serializing the payload to POJO.

package com.app.core.events;

import com.app.db.model.UserServiceRequestEvent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

public class SalesforceEventPayload implements Serializable{

    private static final String SALESFORCE_EVENT_KEY = "Event_Data__c=";

    private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventPayload.class);

    public SalesforceEventPayload() {}

    private UserServiceRequestEvent payload;

    public UserServiceRequestEvent getPayload() {
        return payload;
    }

    public void setPayload(Map<String, Object> fieldMappings) {
        ObjectMapper theMapper = new ObjectMapper();
        theMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        theMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
        theMapper.configure(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, true);
        theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
        theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true);
        if (fieldMappings.containsKey("payload")){
            String eventPayload = fieldMappings.get("payload").toString();
            String splitEventString = eventPayload.toString().split(SALESFORCE_EVENT_KEY)[1];
            String removeLastTwo = splitEventString.substring(1, splitEventString.length() - 2);
            String unescapeJson = StringEscapeUtils.unescapeJson(removeLastTwo);
            try{
                payload = theMapper.readValue(unescapeJson, UserServiceRequestEvent.class);
            } catch (JsonParseException e) {
                itsLogger.error("Event json parse exception {}", e.getMessage());
            } catch (JsonMappingException e) {
                itsLogger.error("Event json mapping exception {}", e.getMessage());
            } catch (IOException e) {
                itsLogger.error("Event io exception {}", e.getMessage());
            }
        }
    }
}

Salesforce Streaming Api using Spring MVC

Check out my Salesforce Streaming Spring MVC project
http://cloudsole-streaming.herokuapp.com/

Find the code here:
https://github.com/thysmichels/cloudsole-force.com-streaming-web

Using Salesforce Streaming in Java project:
Push Topic Factory

package com.example.service;

import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.ForceStreamingClientModule;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Guice;
import com.google.inject.Injector;

public class PushTopicFactory {

	public Injector createInjector(){
		Injector injector = Guice.createInjector(new ForceStreamingClientModule());
		return injector;
	}

	public ForceBayeuxClient createClient(Injector injector){
		  ForceBayeuxClient client = injector.getInstance(ForceBayeuxClient.class);
		  return client;
	}
	
	public PushTopicManager createPushTopManager(Injector injector){
		PushTopicManager pushTopicManager = injector.getInstance(PushTopicManager.class);
		return pushTopicManager;
	}
	
	public PushTopic getTopicByName(PushTopicManager pushTopicManager, String topicName) throws ForceStreamingException{
		PushTopic topic = pushTopicManager.getTopicByName(topicName);
		return topic;
	}
	
	public PushTopic createPushTopic(PushTopicManager pushTopicManager, String name, Double apiVersion, String query, String description){
		PushTopic createTopic = pushTopicManager.createPushTopic(new PushTopic(name, apiVersion, query, description));
		return createTopic;
	}
	
	public PushTopic pushTopicFactory(String topicName) throws ForceStreamingException{
		PushTopicFactory pushTopicFactory = new PushTopicFactory();
		Injector injector = pushTopicFactory.createInjector();
		pushTopicFactory.createClient(injector);
		PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
		
		return pushTopicFactory.getTopicByName(publicTopicManager, topicName); 
	}
	
	public ForceBayeuxClient createPushTopicClientFactory(){
		PushTopicFactory pushTopicClientFactory = new PushTopicFactory();
		ForceBayeuxClient fbClient = pushTopicClientFactory.createClient(pushTopicClientFactory.createInjector());
		return fbClient;
	}	
}

Streaming Service

package com.example.service;

import java.io.IOException;

import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.eclipse.jetty.util.log.Log;

import com.example.service.PushTopicFactory;
import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Injector;

public class StreamingService {
	
	public void pushTopicSubScriber(){

		PushTopicFactory pushTopicFactory = new PushTopicFactory();
		Injector injector = pushTopicFactory.createInjector();
		ForceBayeuxClient client = pushTopicFactory.createClient(injector);
		PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
		PushTopic createTopic = pushTopicFactory.createPushTopic(publicTopicManager, "NewAccountPushTopic", 27.0, "select Id, Name from Account", "New Push Topic");

		PushTopic topic = null;
		try {
			topic = pushTopicFactory.getTopicByName(publicTopicManager, createTopic.getName());
		} catch (ForceStreamingException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} 
		
		try {
			client.subscribeTo(topic, new ClientSessionChannel.MessageListener() 
			{   
				public void onMessage(ClientSessionChannel channel, Message message) 
				{
					Log.info(message.getJSON());
				}
			});
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

Running local mrjob streaming hadoop jobs

Follow the steps below to run an local mrjob. In this example I run an mrjob to calculate word frequency.

Prereq: Needs python 2.6 or 2.7 installed this to work.

Step 1. Download mrjob:

https://github.com/Yelp/mrjob

Step 2. Navigate to Yelp/mrjob/examples in your terminal

Step 3: Create a Dataset download a dataset from http://www.infochimps.com.

Step 4: Test your environment and make sure mrjob works, run:

import mrjob

This will show no errors or dependency issues.

Step 4: Running your mrjob

python mr_word_freq_count.py log1 > counts

log1 input was: (note each line was tabbed delimited)

test	
one	
two	
three	
four	
five	
one	
two	
test	

Result:

"five"	1
"four"	1
"one"	2
"test"	2
"three"	1
"two"	2