Apex callout to RabbitMQ Queue

Create AMQP HTTPRequest calling POST /api/exchanges/{username}/{exchangeName}/publish

WebCustomSettings are custom settings in Salesforce the values are the following:
AMQP_Url__c: https://jellyfish.rmq.cloudamqp.com
AMQP_Credentials__c: xxxxxx:xxxxxxxxxxxxxxxxxxxxxxxx (username:password)
	public void callAmqpEndpoint(String exchangeName, String method, String aPayload) {
  	HttpRequest req = new HttpRequest();
    try {
      if (exchangeName != null) {
        req.setTimeout(120000);
        req.setMethod(method);
				setAmqpAuthHeader(req);
				System.debug(String.valueOf(aPayload));
        req.setEndpoint(WebCustomSettings.AMQP_Url__c + '/api/exchanges/' + WebCustomSettings.AMQP_Credentials__c.split(':')[0] + '/' + exchangeName + '/publish');
				if (aPayload!=null)
        	req.setBody(aPayload);
        System.debug('Sending api request to endpoint' + req.getEndpoint());
        Http http = new Http();
        http.send(req);
      } else {
        throw new Rest_Exception(ResponseCodes_Mgr.getCode('AMQP_REQUEST_FAILED'));
      }
    } catch (Exception ex) {
      System.debug('Error sending amqp request ' + ex);
      List<String> theArgs = new List<String>();
      theArgs.add('AMQP');
      theArgs.add(req.getEndpoint());
      throw new Rest_Exception(ResponseCodes_Mgr.getCode('AMQP_REQUEST_FAILED', ex, theArgs));
    }
  }

Setup AMQP headers

private void setAmqpAuthHeader(HttpRequest aReq) {
    Blob headerValue = Blob.valueOf(WebCustomSettings.AMQP_Credentials__c);
    String authorizationHeader = 'Basic ' + EncodingUtil.base64Encode(headerValue);
    aReq.setHeader('Authorization', authorizationHeader);
		aReq.setHeader('Content-Type', 'application/json');
		aReq.setHeader('X-AMQP-Tracer', requestJson!=null && requestJson.getTraceId()!=null ? requestJson.getTraceId() : '');
  }

Serialize AMQP Request JSON

	private String serializeAmqpRequests(String payload) {
		JSONGenerator generator = JSON.createGenerator(false);
		generator.writeStartObject();
		generator.writeStringField('routing_key','amqp-events');
		generator.writeFieldName('properties');
		generator.writeStartObject();
		generator.writeEndObject();
		generator.writeStringField('payload', payload);
		generator.writeStringField('payload_encoding', 'string');
		generator.writeEndObject();
		return generator.getAsString();
	}

Callout RabbitMQ

public void sendAmqpRequest(String payload){
		 String amqpPayload = serializeAmqpRequests(payload);
		 callAmqpEndpoint('event-exchange', 'POST', amqpPayload);
	}

cloudsole-streaming force.com streaming with rabbitmq

Rabbit MQ Class

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class RabbitStream {

	public ConnectionFactory createRabbitConnection(String hostname)
	{
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost(hostname);
		return factory;
	}

	public Connection createConnection(ConnectionFactory rabbitFactory) throws IOException
	{
		Connection rabbitconnection = rabbitFactory.newConnection();
		return rabbitconnection;
	}

	public Channel createChannel(Connection rabbitConnection, String queueName) throws IOException
	{
		final Channel rabbitChannel = rabbitConnection.createChannel();
		rabbitChannel.queueDeclare(queueName, false, false, false, null);
		return rabbitChannel;
	}

	public void basicPublish(Channel rabbitChannel, String queueName, String message) throws IOException
	{
		rabbitChannel.basicPublish("", queueName, null, message.getBytes());
	}

	public QueueingConsumer consumeChannel(Channel rabbitChannel, String queueName) throws IOException
	{
	    final QueueingConsumer rabbitconsumer = new QueueingConsumer(rabbitChannel);
	    rabbitChannel.basicConsume(queueName, true, rabbitconsumer);
	    return rabbitconsumer;
	}


	public List<String> consumeQueue(QueueingConsumer rabbitConsumer) throws ShutdownSignalException, ConsumerCancelledException, InterruptedException
	{
		List<String> messageList = new ArrayList<String>();
		while (true) {
		      QueueingConsumer.Delivery delivery = rabbitConsumer.nextDelivery();
		      String message = new String(delivery.getBody());
		      messageList.add(message);
		}
	}

	public Channel createRabbitFactory(String hostName, String queueName) throws IOException
	{
		RabbitStream mainStream = new RabbitStream();
		return mainStream.createChannel(mainStream.createConnection(mainStream.createRabbitConnection(hostName)), queueName);
	}

}

Cometd Class

import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.ForceStreamingClientModule;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Guice;
import com.google.inject.Injector;

public class PushTopicFactory {
	
	public Injector createInjector()
	{
		Injector injector = Guice.createInjector(new ForceStreamingClientModule());
		return injector;
	}

	public ForceBayeuxClient createClient(Injector injector)
	{
		  ForceBayeuxClient client = injector.getInstance(ForceBayeuxClient.class);
		  return client;
	}
	
	public PushTopicManager createPushTopManager(Injector injector)
	{
		PushTopicManager pushTopicManager = injector.getInstance(PushTopicManager.class);
		return pushTopicManager;
	}
	
	public PushTopic getTopicByName(PushTopicManager pushTopicManager, String topicName) throws ForceStreamingException
	{
		PushTopic topic = pushTopicManager.getTopicByName(topicName);
		return topic;
	}
	
	public PushTopic createPushTopic(PushTopicManager pushTopicManager, String name, Double apiVersion, String query, String description)
	{
		PushTopic createTopic = pushTopicManager.createPushTopic(new PushTopic(name, apiVersion, query, description));
		return createTopic;
	}
	
	public PushTopic pushTopicFactory(String topicName) throws ForceStreamingException
	{
		PushTopicFactory pushTopicFactory = new PushTopicFactory();
		Injector injector = pushTopicFactory.createInjector();
		pushTopicFactory.createClient(injector);
		PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
		
		return pushTopicFactory.getTopicByName(publicTopicManager, topicName); 
	}
	
	public ForceBayeuxClient createPushTopicClientFactory()
	{
		PushTopicFactory pushTopicClientFactory = new PushTopicFactory();
		ForceBayeuxClient fbClient = pushTopicClientFactory.createClient(pushTopicClientFactory.createInjector());
		return fbClient;
	}
}

Main Class

import java.io.IOException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import com.rabbitmq.client.Channel;
import com.example.pushtopic.PushTopicFactory;
import com.example.rabbit.RabbitStream;
import com.force.sdk.streaming.client.ForceBayeuxClient;
import com.force.sdk.streaming.client.PushTopicManager;
import com.force.sdk.streaming.exception.ForceStreamingException;
import com.force.sdk.streaming.model.PushTopic;
import com.google.inject.Injector;

public class StreamMain {
	
	private static final String TOPICNAME = "AccountPush";
	private static final String QUEUENAME = "streaming";
	private static final String HOSTNAME= "localhost";
	
	public static void main(String[] args) throws ForceStreamingException, InterruptedException, Exception {
			
		   final Log log = LogFactory.getLog(StreamMain.class);
		
			final RabbitStream rabbitFactory = new RabbitStream();
			final Channel factoryChannel = rabbitFactory.createRabbitFactory(HOSTNAME, QUEUENAME);
		
			PushTopicFactory pushTopicFactory = new PushTopicFactory();
			Injector injector = pushTopicFactory.createInjector();
			ForceBayeuxClient client = pushTopicFactory.createClient(injector);
			PushTopicManager publicTopicManager = pushTopicFactory.createPushTopManager(injector);
			PushTopic createTopic = pushTopicFactory.createPushTopic(publicTopicManager, "NewAccountPushTopic", 27.0, "select Id, Name from Account", "New Push Topic");
	
			PushTopic topic = pushTopicFactory.getTopicByName(publicTopicManager, createTopic.getName()); 
			
			client.subscribeTo(topic, new ClientSessionChannel.MessageListener() 
			{   
				public void onMessage(ClientSessionChannel channel, Message message) 
				{
					try {
						rabbitFactory.basicPublish(factoryChannel, QUEUENAME, message.getJSON());
						log.info(message.getJSON());
					} catch (IOException e) {
						log.error(e);
						e.printStackTrace();
					} 
				}
			});
          }
}