cloudsole-streaming force.com streaming with rabbitmq

Rabbit MQ Class

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitStream {

	public ConnectionFactory createRabbitConnection(String hostname)
	{
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(hostname);
		return factory;
	}

	public Connection createConnection(ConnectionFactory rabbitFactory) throws IOException
	{
		Connection rabbitconnection = rabbitFactory.newConnection();
		return rabbitconnection;
	}

	public Channel createChannel(Connection rabbitConnection, String queueName) throws IOException
	{
		final Channel rabbitChannel = rabbitConnection.createChannel();
		rabbitChannel.queueDeclare(queueName, false, false, false, null);
		return rabbitChannel;
	}

	public void basicPublish(Channel rabbitChannel, String queueName, String message) throws IOException
	{
		rabbitChannel.basicPublish("", queueName, null, message.getBytes());
	}

	public QueueingConsumer consumeChannel(Channel rabbitChannel, String queueName) throws IOException
	{
	    final QueueingConsumer rabbitconsumer = new QueueingConsumer(rabbitChannel);
	    rabbitChannel.basicConsume(queueName, true, rabbitconsumer);
	    return rabbitconsumer;
	}


	public List<String> consumeQueue(QueueingConsumer rabbitConsumer) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException
	{
		List<String> messageList = new ArrayList<String>();
		while (true) {
		      QueueingConsumer.Delivery delivery = rabbitConsumer.nextDelivery();
		      String message = new String(delivery.getBody());
		      messageList.add(message);
		}
	}

	public Channel createRabbitFactory(String hostName, String queueName) throws IOException
	{
		RabbitStream mainStream = new RabbitStream();
		return mainStream.createChannel(mainStream.createConnection(mainStream.createRabbitConnection(hostName)), queueName);
	}

}

Cometd Class

import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.ForceStreamingClientModule;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Guice;
import com.google.inject.Injector;

public class PushTopicFactory {
	
	public Injector createInjector()
	{
		Injector injector = Guice.createInjector(new ForceStreamingClientModule());
		return injector;
	}

	public ForceBayeuxClient createClient(Injector injector)
	{
		  ForceBayeuxClient client = injector.getInstance(ForceBayeuxClient.class);
		  return client;
	}
	
	public PushTopicManager createPushTopManager(Injector injector)
	{
		PushTopicManager pushTopicManager = injector.getInstance(PushTopicManager.class);
		return pushTopicManager;
	}
	
	public PushTopic getTopicByName(PushTopicManager pushTopicManager, String topicName) throws ForceStreamingException
	{
		PushTopic topic = pushTopicManager.getTopicByName(topicName);
		return topic;
	}
	
	public PushTopic createPushTopic(PushTopicManager pushTopicManager, String name, Double apiVersion, String query, String description)
	{
		PushTopic createTopic = pushTopicManager.createPushTopic(new PushTopic(name, apiVersion, query, description));
		return createTopic;
	}
	
	public PushTopic pushTopicFactory(String topicName) throws ForceStreamingException
	{
		PushTopicFactory pushTopicFactory = new PushTopicFactory();
		Injector injector = pushTopicFactory.createInjector();
		pushTopicFactory.createClient(injector);
		PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
		
		return pushTopicFactory.getTopicByName(publicTopicManager, topicName); 
	}
	
	public ForceBayeuxClient createPushTopicClientFactory()
	{
		PushTopicFactory pushTopicClientFactory = new PushTopicFactory();
		ForceBayeuxClient fbClient = pushTopicClientFactory.createClient(pushTopicClientFactory.createInjector());
		return fbClient;
	}
}

Main Class

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import com.rabbitmq.client.Channel;
import com.example.pushtopic.PushTopicFactory;
import com.example.rabbit.RabbitStream;
import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Injector;

public class StreamMain {
	
	private static final String TOPICNAME = "AccountPush";
	private static final String QUEUENAME = "streaming";
	private static final String HOSTNAME= "localhost";
	
	public static void main(String[] args) throws ForceStreamingException, InterruptedException, Exception {
			
		   final Log log = LogFactory.getLog(StreamMain.class);
		
			final RabbitStream rabbitFactory = new RabbitStream();
			final Channel factoryChannel = rabbitFactory.createRabbitFactory(HOSTNAME, QUEUENAME);
		
			PushTopicFactory pushTopicFactory = new PushTopicFactory();
			Injector injector = pushTopicFactory.createInjector();
			ForceBayeuxClient client = pushTopicFactory.createClient(injector);
			PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
			PushTopic createTopic = pushTopicFactory.createPushTopic(publicTopicManager, "NewAccountPushTopic", 27.0, "select Id, Name from Account", "New Push Topic");
	
			PushTopic topic = pushTopicFactory.getTopicByName(publicTopicManager, createTopic.getName()); 
			
			client.subscribeTo(topic, new ClientSessionChannel.MessageListener() 
			{   
				public void onMessage(ClientSessionChannel channel, Message message) 
				{
					try {
						rabbitFactory.basicPublish(factoryChannel, QUEUENAME, message.getJSON());
						log.info(message.getJSON());
					} catch (IOException e) {
						log.error(e);
						e.printStackTrace();
					} 
				}
			});
          }
}

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s