Recent Entries 10
- snippet minor 112d agoConvert MQTT messages and publish them to Rabbit MQI'm looking for advice on how to find a balance between keeping this code flexible/maintainable, while also keeping it readable. It reads MQTT messages in JSON format from four different topics, converts the JSON into a different JSON format, and publishes it to a common Rabbit message queue. One of the topics, the sensor status topic, has messages in four different formats (four different sensor types publish their status in that topic). In its original form the code is very straightforward and contained in a single class, however the class grows in size for each new sensor or topic added to it. My first decision to improve it was to create two message handler factories, one for the four topics and one for the special sensor topic, but then I thought using only one factory for both was better. The bad thing about the factory is that it must be injected with a lot of arguments in order to perform its duty, which is to pass those arguments to the instantiated message handlers. I don't know how can I improve that. It seems to be a limitation of the Simple Factory "pattern" (not a real pattern though). Two real patterns have also been applied to the code: Template Method (in `ConvertingPublisherHandler`) and Decorator (implemented in `LoggingRabbitPublisher` and used in `ConvertingPublisherHandler`). Regarding Decorator, it is my first time applying it and I'm not sure if it is the best solution to the problem. I wanted to log both the original MQTT message and the converted JSON message in pairs, along with a line separator, so this is what the decorator does. This, however, caused the factory to receive a lot of arguments thus it is not very readable. Regarding Template Method, I'm not sure if it is the most flexible solution to adopt. Firstly because the pattern is not very flexible by itself (it relies on inheritance and also freezes the sequence of methods it calls). Secondly because I took advantage of the common behavior for all messages that is to convert
- pattern moderate 112d agoRabbitMQ wrapper to work in a concurrent environmentI'm developing a common library for my organisation that will be used across different services by different teams. The library is a simple wrapper for RabbitMQ. Some services will only be producers, some will only be consumers and some will be both. For that reason I have created a `Consumer` and a `Producer`. Each of these will be implementations of an abstract base class. The `Producer` should be able to work in a multi threaded environment, e.g. ASP.NET MVC, either as a singleton or separate instance for each thread. It should be possible to start several instances of the `Consumer`, either on separate threads or on the same thread. In RabbitMQ each application should only create one connection and then reuse this connection for all communication. As the `Consumer` and `Procucer` cannot know if the connection can be closed it has to be done in the `Dispose` method. I want to make the implementations as safe as possible to make sure that the developers using the library cannot accidentally create resource leaks or concurrence issues. The implementation works but I'm not sure if there are any concurrency pitfalls that I have not thought about that could make this blow up. Can this implementation handle the connection being disposed and recreated in a concurrent environment? What happens if the applications crashes and nothing handles this, will the Garbage Collector clean up the connection/channels? I'm not all that familiar with this type of programming so I would like to know if this implementations is OK or if I have missed something? The two interfaces for the `Consumer` and `Producer` ``` public interface IMessageQueueConsumer { void StartConsuming(IList messageListeners); } public interface IMessageQueueProducer { void Publish(T message) where T : OutgoingMessage; } ``` The base class ``` public abstract class RmqBase : IDisposable { private static volatile IConnection _connection; private static readonly object ConnectionLock =
- pattern minor 112d agoRabbitMQ ConsumerI am playing with RabbitMQ and .NET and found myself duplicating the connection initialization logic in all of the consumers. Created the `Consumer` in order to avoid that and add some tracing. It's far away from anything usable, but wanted to hear ideas/opinions/alternatives. ``` public class Consumer { private static readonly Logger _logger = LogManager.GetCurrentClassLogger(); public event EventHandler OnReceive; private IModel _exchange; public void Init(string incoming = "#") { var factory = new ConnectionFactory() { HostName = "localhost" }; using (var connection = factory.CreateConnection()) using (_exchange = connection.CreateModel()) { _exchange.ExchangeDeclare("exchange_name", "topic"); var queueName = _exchange.QueueDeclare().QueueName; _exchange.QueueBind(queueName, "queue_name", incoming); var consumer = new QueueingBasicConsumer(_exchange); _exchange.BasicConsume(queueName, true, consumer); while (true) { var ea = consumer.Queue.Dequeue(); var model = new CanonicalModel(ea.Body); _logger.Info(new { Time = DateTime.Now, Type = "IN", Value = "..." }); var startTime = DateTime.Now; if (OnReceive != null) { OnReceive(this, model); } _logger.Info(new { Time = DateTime.Now, Type = "CONTROL", Value = DateTime.Now.Subtract(startTime).Milliseconds }); } } } public void Publish(string routingKey, CanonicalModel model) { var json = JsonConvert.SerializeObject(model); var message = Encoding.UTF8.GetBytes(json); _logger.Info(new { Time = DateTime.Now, Type = "OUT", Value = "..." }); _exchange.BasicPublish("exchange_name", routingKey, null, message); } } ``` Console application which uses a consumer. ``` class ConsoleConsumer { private const string SERVICENAME = "CONSUMER_FOO"; private const string INCOMING = "foo"; private const stri
- pattern minor 112d agoConnecting to RabbitMQI would like a review for this connection to RabbitMQ. I just developed it and seems to be working well, but I would like another set of eyes on it before putting this on the sever. ``` package models import com.rabbitmq.client.Connection import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.ConnectionFactory import com.rabbitmq.client.MessageProperties import anorm.SQL import anorm.sqlToSimple import anorm.toParameterValue import play.api.Play.current object RabbitMQConnection { private var connection: Connection = null def getConnection(ss:Connection): Connection = { println(ss+" connection { val factory = new ConnectionFactory() println("waiting for new connection") factory.setHost("172.22.22.222") println("host setted") connection = factory.newConnection() println("connection created") connection } case _ =>{ println("connection is not null") connection } } } } object RMQ { var connection = RabbitMQConnection.getConnection(null) def setQ(qName: String, message: String) = { println("ping received") try { println(connection) if (connection != null) { if (connection.isOpen()) { println("connection is open") } else { connection = RabbitMQConnection.getConnection(null) println("connection is new "+connection) } println("connetion is ready to use") val channel = connection.createChannel() channel.queueDeclare(qName, true, false, false, null) //suggestion channel.basicPublish("", qName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()) println("status" + channel.close()) println("setQ complete executed for " + qName) Map("result" -> "success") } else { println("connection can't established to rabbit mq for =>" + qName) LogFile.QLogs(qName, message) Map("result" -> "er
- pattern minor 112d agoNodeJS broker between MongoDB and RabbitMQI wrote a small program that aims to take some data from the MongoDB database and put them in the RabbitMQ queue. I tried to use only promise style but I am a beginner in JavaScript. Could you please help me to improve my first draft? ``` var mongo = require('mongod'); var amqp = require('amqplib'); var _ = require('underscore'); var TaskBroker = function () { this.queueName = 'task_queue'; this.rabbit = {}; this.mongo = {}; }; TaskBroker.prototype.connectRabbit = function() { return amqp.connect('amqp://localhost') .then(function (connection) { this.rabbit.connection = connection; return connection.createChannel() }.bind(this)) .then(function(channel) { this.rabbit.channel = channel; return channel.assertQueue(this.queueName, {durable: true}); }.bind(this)) }; TaskBroker.prototype.connectMongo = function(){ return function() { this.mongo.db = mongo('mongodb://127.0.0.1:27017/test', ['test']); return this.mongo.db; }.bind(this); }; TaskBroker.prototype.connect = function () { return this.connectRabbit() .then(this.connectMongo()); }; TaskBroker.prototype.disconnect = function() { this.mongo.db.close(); this.rabbit.channel.close(); this.rabbit.connection.close(); }; TaskBroker.prototype.get_url_array = function(_data) { return _.chain(_data).pluck('probes').flatten().pluck('url').uniq().value(); }; TaskBroker.prototype.getTask = function() { return function () { return this.mongo.db.test.find({ 'status': 'ONGOING' }, { 'probes.url':1, '_id':0}) .then(function(results) { var url_array = []; if (results != null && results.length > 0) { url_array = this.get_url_array(results); } return this.mongo.db.test.find({ 'probes.url' : { $nin: url_array } }); }.bind(this)) .then(function(results) { if (results.length > 0) return results[0]; return null; }); }.bind(this); }; TaskBroker.prototype.produceTask
- pattern major 112d agodocker-compose healthcheck for rabbitMQI'm trying to run `rabbitMQ` using `docker-compose`, but the service is always starting or unhealthy. rabbit is running fine, so I suspect there is something wrong with my health check. Running the healthcheck command locally does return a value. ``` > curl -f http://localhost:5672 AMQP % ``` But `docker-compose ps` always says the service is unhealthy (or starting, before it runs out of time). ``` > docker-compose ps docker-entrypoint.sh rabbi ... Up (unhealthy) 15671/tcp ``` Here is what my docker-compose.yml file looks like. `# docker-compose.yml version: '2.3' # note: I can't change this version, must be 2.3 volumes: rabbit-data: services: rabbit: hostname: 'rabbit' image: rabbitmq:3.8.5-management healthcheck: test: ["CMD", "curl", "-f", "http://localhost:5672"] interval: 30s timeout: 30s retries: 3 ports: - '5672:5672' - '15672:15672' volumes: - 'rabbit-data:/var/lib/rabbitmq/mnesia/' networks: - rabbitmq networks: rabbitmq: driver: bridge ` I have also tried using `nc` instead of `curl` in the healthcheck, but got the same result. `healthcheck: test: [ "CMD", "nc", "-z", "localhost", "5672" ] ` From https://github.com/docker-library/rabbitmq/issues/326
- snippet minor 112d agoHow to automatically remove a dead node from RabbitMQ clusterI am planning to create RabbitMQ cluster using Ansible on AWS VPC with Amazon internal load balancer as the frontend to point connections to it. Any suggestion how to remove a dead node from RabbitMQ cluster based on autoscaling rule where nodes can go up and down, or if you are using spot instances? When a node goes down, RabbitMQ does not remove it from the replication list automatically, I can see `Node not running` in the management UI. I managed to join to the cluster a scaled instance automatically via Ansible and userdata.
- pattern major pending 121d agoMessage queue consumer not processing — dead letter queue setupMessages in the queue are not being processed. Failed messages keep being retried, blocking the queue. Poison messages (malformed data) cause infinite retry loops.
- pattern major 124d agoDead Letter Queue (DLQ) ProcessingPoison messages (messages that always fail processing) get retried indefinitely, consuming worker capacity and blocking the queue.
- principle moderate 124d agoRabbitMQ Exchanges and Queue BindingsMessages sent to RabbitMQ are not reaching the intended consumers, or all consumers receive every message when only specific ones should.