Add CloudAMQP to your project:
heroku addons:add cloudamqp
Crate Amqp Configuration:
package com.example.amqp; import java.io.IOException; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; @Configuration public class AmqpConfiguration { @Bean public ConnectionFactory setupCloudAmpFactory(){ try { String uri = System.getenv("CLOUDAMQP_URL"); if (uri == null) uri = "amqp://guest:guest@localhost"; ConnectionFactory factory = new ConnectionFactory(); factory.setUri(System.getenv("CLOUDAMQP_URL")); return factory; } catch (KeyManagementException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (NoSuchAlgorithmException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (URISyntaxException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } @Bean public Channel setupCloudAmpChannel(ConnectionFactory connectionFactory){ Connection connection; try { connection = (Connection) connectionFactory.newConnection(); Channel channel = ((com.rabbitmq.client.Connection) connection).createChannel(); return channel; } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } return null; } }
Amqp Service:
package com.example.amqp; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Import; import org.springframework.stereotype.Service; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; @Service @Import({AmqpConfiguration.class}) public class AmpqService { @Autowired Channel ampqChannel; public void createChannel(String queueName){ try { ampqChannel.queueDeclare(queueName, false, false, false, null); } catch (IOException e) { e.printStackTrace(); } } public void publishMessage(String queueName, String message){ try { ampqChannel.basicPublish("", queueName, null, message.getBytes()); } catch (IOException e) { e.printStackTrace(); } } public List<QueueingConsumer.Delivery> consumeMessage(String queueName){ List<QueueingConsumer.Delivery> messages = new ArrayList<QueueingConsumer.Delivery>(); try { QueueingConsumer consumer = new QueueingConsumer(ampqChannel); ampqChannel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); messages.add(delivery); } } catch (IOException e) { e.printStackTrace(); } catch (ShutdownSignalException e) { e.printStackTrace(); } catch (ConsumerCancelledException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return messages; } public int getTotalMessages(){ try { return ampqChannel.queueDeclare().getMessageCount(); } catch (IOException e) { e.printStackTrace(); } return 0; } public int getConsumerCount(){ try { return ampqChannel.queueDeclare().getConsumerCount(); } catch (IOException e) { e.printStackTrace(); } return 0; } public String getQueue(){ try { return ampqChannel.queueDeclare().getQueue(); } catch (IOException e) { e.printStackTrace(); } return null; } }