Rabbit MQ Class
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class RabbitStream {
public ConnectionFactory createRabbitConnection(String hostname)
{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(hostname);
return factory;
}
public Connection createConnection(ConnectionFactory rabbitFactory) throws IOException
{
Connection rabbitconnection = rabbitFactory.newConnection();
return rabbitconnection;
}
public Channel createChannel(Connection rabbitConnection, String queueName) throws IOException
{
final Channel rabbitChannel = rabbitConnection.createChannel();
rabbitChannel.queueDeclare(queueName, false, false, false, null);
return rabbitChannel;
}
public void basicPublish(Channel rabbitChannel, String queueName, String message) throws IOException
{
rabbitChannel.basicPublish("", queueName, null, message.getBytes());
}
public QueueingConsumer consumeChannel(Channel rabbitChannel, String queueName) throws IOException
{
final QueueingConsumer rabbitconsumer = new QueueingConsumer(rabbitChannel);
rabbitChannel.basicConsume(queueName, true, rabbitconsumer);
return rabbitconsumer;
}
public List<String> consumeQueue(QueueingConsumer rabbitConsumer) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException
{
List<String> messageList = new ArrayList<String>();
while (true) {
QueueingConsumer.Delivery delivery = rabbitConsumer.nextDelivery();
String message = new String(delivery.getBody());
messageList.add(message);
}
}
public Channel createRabbitFactory(String hostName, String queueName) throws IOException
{
RabbitStream mainStream = new RabbitStream();
return mainStream.createChannel(mainStream.createConnection(mainStream.createRabbitConnection(hostName)), queueName);
}
}
Cometd Class
import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.ForceStreamingClientModule;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Guice;
import com.google.inject.Injector;
public class PushTopicFactory {
public Injector createInjector()
{
Injector injector = Guice.createInjector(new ForceStreamingClientModule());
return injector;
}
public ForceBayeuxClient createClient(Injector injector)
{
ForceBayeuxClient client = injector.getInstance(ForceBayeuxClient.class);
return client;
}
public PushTopicManager createPushTopManager(Injector injector)
{
PushTopicManager pushTopicManager = injector.getInstance(PushTopicManager.class);
return pushTopicManager;
}
public PushTopic getTopicByName(PushTopicManager pushTopicManager, String topicName) throws ForceStreamingException
{
PushTopic topic = pushTopicManager.getTopicByName(topicName);
return topic;
}
public PushTopic createPushTopic(PushTopicManager pushTopicManager, String name, Double apiVersion, String query, String description)
{
PushTopic createTopic = pushTopicManager.createPushTopic(new PushTopic(name, apiVersion, query, description));
return createTopic;
}
public PushTopic pushTopicFactory(String topicName) throws ForceStreamingException
{
PushTopicFactory pushTopicFactory = new PushTopicFactory();
Injector injector = pushTopicFactory.createInjector();
pushTopicFactory.createClient(injector);
PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
return pushTopicFactory.getTopicByName(publicTopicManager, topicName);
}
public ForceBayeuxClient createPushTopicClientFactory()
{
PushTopicFactory pushTopicClientFactory = new PushTopicFactory();
ForceBayeuxClient fbClient = pushTopicClientFactory.createClient(pushTopicClientFactory.createInjector());
return fbClient;
}
}
Main Class
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import com.rabbitmq.client.Channel;
import com.example.pushtopic.PushTopicFactory;
import com.example.rabbit.RabbitStream;
import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Injector;
public class StreamMain {
private static final String TOPICNAME = "AccountPush";
private static final String QUEUENAME = "streaming";
private static final String HOSTNAME= "localhost";
public static void main(String[] args) throws ForceStreamingException, InterruptedException, Exception {
final Log log = LogFactory.getLog(StreamMain.class);
final RabbitStream rabbitFactory = new RabbitStream();
final Channel factoryChannel = rabbitFactory.createRabbitFactory(HOSTNAME, QUEUENAME);
PushTopicFactory pushTopicFactory = new PushTopicFactory();
Injector injector = pushTopicFactory.createInjector();
ForceBayeuxClient client = pushTopicFactory.createClient(injector);
PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
PushTopic createTopic = pushTopicFactory.createPushTopic(publicTopicManager, "NewAccountPushTopic", 27.0, "select Id, Name from Account", "New Push Topic");
PushTopic topic = pushTopicFactory.getTopicByName(publicTopicManager, createTopic.getName());
client.subscribeTo(topic, new ClientSessionChannel.MessageListener()
{
public void onMessage(ClientSessionChannel channel, Message message)
{
try {
rabbitFactory.basicPublish(factoryChannel, QUEUENAME, message.getJSON());
log.info(message.getJSON());
} catch (IOException e) {
log.error(e);
e.printStackTrace();
}
}
});
}
}
0.000000
0.000000
Like this:
Like Loading...