/** * Copyright © Cisco Systems, Inc. */package com.cisco.cmse.csra.cluster.rabbitmq;import org.springframework.amqp.core.AmqpAdmin;import org.springframework.amqp.core.TopicExchange;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitAdmin;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.support.converter.JsonMessageConverter;import org.springframework.amqp.support.converter.MessageConverter;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @author sanlli * */@Configurationpublic abstract class AbstractRabbitConfiguration { private static final String Reload_Policy_Exchange = "Reload_Policy_Exchange"; @Value("${rabbitmq_addresses}") private String addresses; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(addresses); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); return connectionFactory; } @Bean public AmqpAdmin amqpAdmin(){ return new RabbitAdmin(connectionFactory()); } @Bean public RabbitTemplate amqpTemplate() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); template.setMessageConverter(jsonMessageConverter()); template.setExchange(Reload_Policy_Exchange); return template; } @Bean public MessageConverter jsonMessageConverter() { return new JsonMessageConverter(); } @Bean public TopicExchange reloadPolicyExchange() { TopicExchange exchange = new TopicExchange(Reload_Policy_Exchange, true, false); return exchange; } }
/** * Copyright © Cisco Systems, Inc. */package com.cisco.cmse.csra.cluster.rabbitmq;import java.net.InetAddress;import java.net.UnknownHostException;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.task.TaskExecutor;import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import com.cisco.cmse.csra.cluster.message.ReloadNotificationMessageListener;/** * @author sanlli * */@Configurationpublic class RabbitClientConfiguration extends AbstractRabbitConfiguration { @Bean public Queue reloadPolicyQueue(){ String locahost = "0.0.0.0"; try { locahost = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { try { locahost = InetAddress.getLocalHost().getHostName(); } catch (UnknownHostException e1) { locahost = "0.0.0.0"; } } return new Queue(locahost, true, false, false); } @Bean public Binding reloadPolicyBinding() { return BindingBuilder.bind(reloadPolicyQueue()).to(reloadPolicyExchange()).with("reload.#"); } @Bean public TaskExecutor taskExecutor(){ ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(10); taskExecutor.setCorePoolSize(10); taskExecutor.setQueueCapacity(1500); return taskExecutor; } @Bean public ReloadNotificationMessageListener reloadNotificationMessageListener(){ return new ReloadNotificationMessageListener(); } @Bean public SimpleMessageListenerContainer messageListenerContainer() { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory()); container.setTaskExecutor(taskExecutor()); container.setQueues(reloadPolicyQueue()); container.setMessageListener(reloadNotificationMessageListener()); return container; } }