Create EmpConnector connection
The first thing is to create a configuration that on startup will connect to the EmpConnector and start listing for incoming events on a specific topic.
1. Set all your Bayeux Parameters (bayeuxParameters)
2. Create EmpConnector connection (empConnector)
3. Start listening to topic (startAndPublishAsyncEventToExchange)
4. Adding listeners to the topic (startAndPublishAsyncEventToExchange)
package com.app.core.events; import com.app.core.service.IncomingRequestHandler; import com.app.utils.AppPlanUtils; import com.salesforce.emp.connector.BayeuxParameters; import com.salesforce.emp.connector.EmpConnector; import com.salesforce.emp.connector.TopicSubscription; import com.salesforce.emp.connector.example.LoggingListener; import org.cometd.bayeux.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Lazy; import java.net.URL; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static com.salesforce.emp.connector.LoginHelper.login; @Configuration @ConfigurationProperties @Lazy public class SalesforceEventConfig { private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventConfig.class); @Value("${salesforce.username}") private String username; @Value("${salesforce.password}") private String password; @Value("${salesforce.token}") private String token; @Value("${salesforce.baseurl}") private String url; @Value("${salesforce.version:39.0}") private String version; @Autowired protected IncomingRequestHandler itsReqHandler; private static final Integer WAIT_TIME = 2; private static final Integer START_UP_WAIT_TIME = 5; private static final String EVENT_NAME = "App_Events__e"; @Bean public BayeuxParameters bayeuxParameters() throws Exception { String theUrlEnv = AppUtils.getConfigVar("SALESFORCE_URL"); String passwordAndToken = password + token; if (theUrlEnv != null) { url = getNextToken(0, theUrlEnv); username = getNextToken(url.length() + version.length() + 2, theUrlEnv); passwordAndToken = theUrlEnv.substring(url.length() + version.length() + username.length() + 3); itsLogger.info("Found SALESFORCE_URL to parse. url={}, version={}, username={}", url, version, username); url = url.split("/services/Soap/u/")[0]; } BayeuxParameters bayeuxParams = getBayeuxParamWithSpecifiedAPIVersion(version); BayeuxParameters bayeuxParameters = login(new URL(url),username, passwordAndToken, bayeuxParams); return bayeuxParameters; } @Bean public EmpConnector empConnector(BayeuxParameters bayeuxParameters){ itsLogger.debug("BayeuxParameters url {} version {}", bayeuxParameters.host(), bayeuxParameters.version()); EmpConnector empConnector = new EmpConnector(bayeuxParameters); itsLogger.debug("EmpConnector isConnected {} isHandshook {}", empConnector.isConnected(), empConnector.isHandshook()); return empConnector; } private static BayeuxParameters getBayeuxParamWithSpecifiedAPIVersion(String apiVersion) { BayeuxParameters params = new BayeuxParameters() { @Override public String version() { return apiVersion; } @Override public String bearerToken() { return null; } }; return params; } private String getNextToken(int startIdx, String origStr) { int theIdx = origStr.indexOf("|", startIdx); if (theIdx > 0) { return origStr.substring(startIdx, theIdx); } else { return origStr.substring(startIdx); } } @Bean public TopicSubscription startAndPublishAsyncEventToExchange(EmpConnector empConnector) { TopicSubscription subscription = null; try { long replayFrom = EmpConnector.REPLAY_FROM_TIP; itsLogger.debug("Setup event consumer with replyFrom {}", replayFrom); SalesforceEventPayload eventPayload = new SalesforceEventPayload(); Consumer<Map<String, Object>> consumer = event -> { eventPayload.setPayload(event); itsReqHandler.handleRequest(eventPayload.getPayload()); }; LoggingListener loggingListener = new LoggingListener(true, true); itsLogger.debug("Adding event listeners"); empConnector.addListener(Channel.META_CONNECT, loggingListener) .addListener(Channel.META_HANDSHAKE, loggingListener) .addListener(Channel.META_DISCONNECT, loggingListener) .addListener(Channel.META_SUBSCRIBE, loggingListener) .addListener(Channel.META_UNSUBSCRIBE, loggingListener) .addListener(Channel.META_DISCONNECT, loggingListener) .addListener(Channel.SERVICE, loggingListener); itsLogger.debug("Starting Event Bus"); empConnector.start().get(START_UP_WAIT_TIME, TimeUnit.SECONDS); itsLogger.debug("Subscribing to event {}", EVENT_NAME); subscription = empConnector.subscribe("/event/" + EVENT_NAME, replayFrom, consumer).get(WAIT_TIME, TimeUnit.SECONDS); itsLogger.debug(String.format("Subscribed: %s", subscription)); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return subscription; } }
Basic POJO to serialize events received for a specific topic
Note: as the ‘payload’ tag has some extra tags I remove them before serializing the payload to POJO.
package com.app.core.events; import com.app.db.model.UserServiceRequestEvent; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringEscapeUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.Map; public class SalesforceEventPayload implements Serializable{ private static final String SALESFORCE_EVENT_KEY = "Event_Data__c="; private static Logger itsLogger = LoggerFactory.getLogger(SalesforceEventPayload.class); public SalesforceEventPayload() {} private UserServiceRequestEvent payload; public UserServiceRequestEvent getPayload() { return payload; } public void setPayload(Map<String, Object> fieldMappings) { ObjectMapper theMapper = new ObjectMapper(); theMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); theMapper.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); theMapper.configure(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY, true); theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, true); theMapper.configure(DeserializationFeature.ACCEPT_EMPTY_ARRAY_AS_NULL_OBJECT, true); if (fieldMappings.containsKey("payload")){ String eventPayload = fieldMappings.get("payload").toString(); String splitEventString = eventPayload.toString().split(SALESFORCE_EVENT_KEY)[1]; String removeLastTwo = splitEventString.substring(1, splitEventString.length() - 2); String unescapeJson = StringEscapeUtils.unescapeJson(removeLastTwo); try{ payload = theMapper.readValue(unescapeJson, UserServiceRequestEvent.class); } catch (JsonParseException e) { itsLogger.error("Event json parse exception {}", e.getMessage()); } catch (JsonMappingException e) { itsLogger.error("Event json mapping exception {}", e.getMessage()); } catch (IOException e) { itsLogger.error("Event io exception {}", e.getMessage()); } } } }