Salesforce Platform Events Streaming using Spring Boot

Create EmpConnector connection
The first thing is to create a configuration that on startup will connect to the EmpConnector and start listing for incoming events on a specific topic.

1. Set all your Bayeux Parameters (bayeuxParameters)
2. Create EmpConnector connection (empConnector)
3. Start listening to topic (startAndPublishAsyncEventToExchange)
4. Adding listeners to the topic (startAndPublishAsyncEventToExchange)

package com.app.core.events;

import com.app.core.service.IncomingRequestHandler;
import com.app.utils.AppPlanUtils;
import com.salesforce.emp.connector.BayeuxParameters;
import com.salesforce.emp.connector.EmpConnector;
import com.salesforce.emp.connector.TopicSubscription;
import com.salesforce.emp.connector.example.LoggingListener;
import org.cometd.bayeux.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import java.net.URL;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;

import static com.salesforce.emp.connector.LoginHelper.login;

@Configuration
@ConfigurationProperties
@Lazy
public class SalesforceEventConfig {

    private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventConfig.class);

    @Value("${salesforce.username}")
    private String username;

    @Value("${salesforce.password}")
    private String password;

    @Value("${salesforce.token}")
    private String token;

    @Value("${salesforce.baseurl}")
    private String url;

    @Value("${salesforce.version:39.0}")
    private String version;

    @Autowired
    protected IncomingRequestHandler itsReqHandler;

    private static final Integer WAIT_TIME = 2;
    private static final Integer START_UP_WAIT_TIME = 5;
    private static final String EVENT_NAME = "App_Events__e";

    @Bean
    public BayeuxParameters bayeuxParameters() throws Exception {
        String theUrlEnv = AppUtils.getConfigVar("SALESFORCE_URL");
        String passwordAndToken = password + token;
        if (theUrlEnv != null) {
            url = getNextToken(0, theUrlEnv);
            username = getNextToken(url.length() + version.length() + 2, theUrlEnv);
            passwordAndToken = theUrlEnv.substring(url.length() + version.length() + username.length() + 3);
            itsLogger.info("Found SALESFORCE_URL to parse. url={}, version={}, username={}", url, version, username);
            url = url.split("/services/Soap/u/")[0];
        }

        BayeuxParameters bayeuxParams = getBayeuxParamWithSpecifiedAPIVersion(version);
        BayeuxParameters bayeuxParameters = login(new URL(url),username, passwordAndToken, bayeuxParams);
        return bayeuxParameters;
    }

    @Bean
    public EmpConnector empConnector(BayeuxParameters bayeuxParameters){
        itsLogger.debug("BayeuxParameters url {} version {}", bayeuxParameters.host(), bayeuxParameters.version());
        EmpConnector empConnector = new EmpConnector(bayeuxParameters);
        itsLogger.debug("EmpConnector isConnected {} isHandshook {}", empConnector.isConnected(), empConnector.isHandshook());
        return empConnector;
    }

    private static BayeuxParameters getBayeuxParamWithSpecifiedAPIVersion(String apiVersion) {
        BayeuxParameters params = new BayeuxParameters() {

            @Override
            public String version() {
                return apiVersion;
            }

            @Override
            public String bearerToken() {
                return null;
            }

        };
        return  params;
    }

    private String getNextToken(int startIdx, String origStr) {
        int theIdx = origStr.indexOf("|", startIdx);
        if (theIdx > 0) {
            return origStr.substring(startIdx, theIdx);
        } else {
            return origStr.substring(startIdx);
        }
    }

    @Bean
    public TopicSubscription startAndPublishAsyncEventToExchange(EmpConnector empConnector)  {
        TopicSubscription subscription = null;
        try {
            long replayFrom = EmpConnector.REPLAY_FROM_TIP;
            itsLogger.debug("Setup event consumer with replyFrom {}", replayFrom);

            SalesforceEventPayload eventPayload = new SalesforceEventPayload();
            Consumer<Map<String, Object>> consumer = event -> {
                eventPayload.setPayload(event);
                itsReqHandler.handleRequest(eventPayload.getPayload());
            };

            LoggingListener loggingListener = new LoggingListener(true, true);
            itsLogger.debug("Adding event listeners");
            empConnector.addListener(Channel.META_CONNECT, loggingListener)
                    .addListener(Channel.META_HANDSHAKE, loggingListener)
                    .addListener(Channel.META_DISCONNECT, loggingListener)
                    .addListener(Channel.META_SUBSCRIBE, loggingListener)
                    .addListener(Channel.META_UNSUBSCRIBE, loggingListener)
                    .addListener(Channel.META_DISCONNECT, loggingListener)
                    .addListener(Channel.SERVICE, loggingListener);

            itsLogger.debug("Starting Event Bus");
            empConnector.start().get(START_UP_WAIT_TIME, TimeUnit.SECONDS);

            itsLogger.debug("Subscribing to event {}", EVENT_NAME);
            subscription = empConnector.subscribe("/event/" + EVENT_NAME, replayFrom, consumer).get(WAIT_TIME, TimeUnit.SECONDS);

            itsLogger.debug(String.format("Subscribed: %s", subscription));

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

        return subscription;
    }
}

Basic POJO to serialize events received for a specific topic

Note: as the ‘payload’ tag has some extra tags I remove them before serializing the payload to POJO.

package com.app.core.events;

import com.app.db.model.UserServiceRequestEvent;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;

public class SalesforceEventPayload implements Serializable{

    private static final String SALESFORCE_EVENT_KEY = "Event_Data__c=";

    private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventPayload.class);

    public SalesforceEventPayload() {}

    private UserServiceRequestEvent payload;

    public UserServiceRequestEvent getPayload() {
        return payload;
    }

    public void setPayload(Map<String, Object> fieldMappings) {
        ObjectMapper theMapper = new ObjectMapper();
        theMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        theMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
        theMapper.configure(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, true);
        theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true);
        theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true);
        if (fieldMappings.containsKey("payload")){
            String eventPayload = fieldMappings.get("payload").toString();
            String splitEventString = eventPayload.toString().split(SALESFORCE_EVENT_KEY)[1];
            String removeLastTwo = splitEventString.substring(1, splitEventString.length() - 2);
            String unescapeJson = StringEscapeUtils.unescapeJson(removeLastTwo);
            try{
                payload = theMapper.readValue(unescapeJson, UserServiceRequestEvent.class);
            } catch (JsonParseException e) {
                itsLogger.error("Event json parse exception {}", e.getMessage());
            } catch (JsonMappingException e) {
                itsLogger.error("Event json mapping exception {}", e.getMessage());
            } catch (IOException e) {
                itsLogger.error("Event io exception {}", e.getMessage());
            }
        }
    }
}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: