Rabbit MQ Class
import java.io.IOException; import java.util.ArrayList; import java.util.List; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class RabbitStream { public ConnectionFactory createRabbitConnection(String hostname) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(hostname); return factory; } public Connection createConnection(ConnectionFactory rabbitFactory) throws IOException { Connection rabbitconnection = rabbitFactory.newConnection(); return rabbitconnection; } public Channel createChannel(Connection rabbitConnection, String queueName) throws IOException { final Channel rabbitChannel = rabbitConnection.createChannel(); rabbitChannel.queueDeclare(queueName, false, false, false, null); return rabbitChannel; } public void basicPublish(Channel rabbitChannel, String queueName, String message) throws IOException { rabbitChannel.basicPublish("", queueName, null, message.getBytes()); } public QueueingConsumer consumeChannel(Channel rabbitChannel, String queueName) throws IOException { final QueueingConsumer rabbitconsumer = new QueueingConsumer(rabbitChannel); rabbitChannel.basicConsume(queueName, true, rabbitconsumer); return rabbitconsumer; } public List<String> consumeQueue(QueueingConsumer rabbitConsumer) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException { List<String> messageList = new ArrayList<String>(); while (true) { QueueingConsumer.Delivery delivery = rabbitConsumer.nextDelivery(); String message = new String(delivery.getBody()); messageList.add(message); } } public Channel createRabbitFactory(String hostName, String queueName) throws IOException { RabbitStream mainStream = new RabbitStream(); return mainStream.createChannel(mainStream.createConnection(mainStream.createRabbitConnection(hostName)), queueName); } }
Cometd Class
import com.force.sdk.streaming.client.ForceBayeuxClient; import com.force.sdk.streaming.client.ForceStreamingClientModule; import com.force.sdk.streaming.client.PushTopicManager; import com.force.sdk.streaming.exception.ForceStreamingException; import com.force.sdk.streaming.model.PushTopic; import com.google.inject.Guice; import com.google.inject.Injector; public class PushTopicFactory { public Injector createInjector() { Injector injector = Guice.createInjector(new ForceStreamingClientModule()); return injector; } public ForceBayeuxClient createClient(Injector injector) { ForceBayeuxClient client = injector.getInstance(ForceBayeuxClient.class); return client; } public PushTopicManager createPushTopManager(Injector injector) { PushTopicManager pushTopicManager = injector.getInstance(PushTopicManager.class); return pushTopicManager; } public PushTopic getTopicByName(PushTopicManager pushTopicManager, String topicName) throws ForceStreamingException { PushTopic topic = pushTopicManager.getTopicByName(topicName); return topic; } public PushTopic createPushTopic(PushTopicManager pushTopicManager, String name, Double apiVersion, String query, String description) { PushTopic createTopic = pushTopicManager.createPushTopic(new PushTopic(name, apiVersion, query, description)); return createTopic; } public PushTopic pushTopicFactory(String topicName) throws ForceStreamingException { PushTopicFactory pushTopicFactory = new PushTopicFactory(); Injector injector = pushTopicFactory.createInjector(); pushTopicFactory.createClient(injector); PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector); return pushTopicFactory.getTopicByName(publicTopicManager, topicName); } public ForceBayeuxClient createPushTopicClientFactory() { PushTopicFactory pushTopicClientFactory = new PushTopicFactory(); ForceBayeuxClient fbClient = pushTopicClientFactory.createClient(pushTopicClientFactory.createInjector()); return fbClient; } }
Main Class
import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.cometd.bayeux.Message; import org.cometd.bayeux.client.ClientSessionChannel; import com.rabbitmq.client.Channel; import com.example.pushtopic.PushTopicFactory; import com.example.rabbit.RabbitStream; import com.force.sdk.streaming.client.ForceBayeuxClient; import com.force.sdk.streaming.client.PushTopicManager; import com.force.sdk.streaming.exception.ForceStreamingException; import com.force.sdk.streaming.model.PushTopic; import com.google.inject.Injector; public class StreamMain { private static final String TOPICNAME = "AccountPush"; private static final String QUEUENAME = "streaming"; private static final String HOSTNAME= "localhost"; public static void main(String[] args) throws ForceStreamingException, InterruptedException, Exception { final Log log = LogFactory.getLog(StreamMain.class); final RabbitStream rabbitFactory = new RabbitStream(); final Channel factoryChannel = rabbitFactory.createRabbitFactory(HOSTNAME, QUEUENAME); PushTopicFactory pushTopicFactory = new PushTopicFactory(); Injector injector = pushTopicFactory.createInjector(); ForceBayeuxClient client = pushTopicFactory.createClient(injector); PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector); PushTopic createTopic = pushTopicFactory.createPushTopic(publicTopicManager, "NewAccountPushTopic", 27.0, "select Id, Name from Account", "New Push Topic"); PushTopic topic = pushTopicFactory.getTopicByName(publicTopicManager, createTopic.getName()); client.subscribeTo(topic, new ClientSessionChannel.MessageListener() { public void onMessage(ClientSessionChannel channel, Message message) { try { rabbitFactory.basicPublish(factoryChannel, QUEUENAME, message.getJSON()); log.info(message.getJSON()); } catch (IOException e) { log.error(e); e.printStackTrace(); } } }); } }