Seeinghow they crack open a tasty crustacean or comparing how otters of different ages perform when it comes to varying tests will assist researchers in learning as much as they can about these fuzzy little rascals.
Rascal is a rich pub/sub wrapper for the excellent amqplib. One of the best things about amqplib is that it doesn't make assumptions about how you use it. Another is that it doesn't attempt to abstract away AMQP Concepts. As a result the library offers a great deal of control and flexibility, but the onus is on you adopt appropriate patterns and configuration. You need to be aware that:
A publication is a named configuration for publishing a message, including the destination queue or exchange, routing configuration, encryption profile and reliability guarantees, message options, etc. A subscription is a named configuration for consuming messages, including the source queue, encryption profile, content encoding, delivery options (e.g. acknowledgement handling and prefetch), etc. These must be configured and supplied when creating the Rascal broker. After the broker has been created the subscriptions and publications can be retrieved from the broker and used to publish and consume messages.
RabbitMQ 3.8.0 introduced quorum queues. Although quorum queues may not be suitable in all situations, they provide poison message handling without the need for an external redelivery counter and offer better data safety in the event of a network partition. You can read more about them here and here.
The reason Rascal nacks the message is because the alternatives are to leave the message unacknowledged indefinitely, or to rollback and retry the message in an infinite tight loop. This can DDOS your application and cause problems for your infrastructure. Providing you have correctly configured dead letter queues and/or listen to the "invalid_content" and "redeliveries_exceeded" subscriber events, your messages should be safe.
amqplib emits error events when a connection or channel encounters a problem. Rascal will listen for these and provided you use the default configuration will attempt automatic recovery (reconnection etc), however these events can indicate errors in your code, so it's also important to bring them to your attention. Rascal does this by re-emitting the error event, which means if you don't handle them, they will bubble up to the uncaught error handler and crash your application. It is insufficient to register a global uncaughtException handler - doing so without registering individual handlers will prevent your application from crashing, but also prevent Rascal from recovering.
The broker emits the vhost_initialised event after recovering from a connection error. An object containing the vhost name and connection url (with obfuscated password) are passed to the event handler. e.g.
Any attributes you add to the "options" sub document will be converted to query parameters. Any attributes you add in the "socketOptions" sub document will be passed directly to amqplib's connect method (which hands them off to net or tls. Providing you merge your configuration with the default configuration rascal.withDefaultConfig(config) you need only specify the attributes you want to override
The exponential configuration will cause rascal to retry the connection at exponentially increasing intervals to a maximum of one minute. The intervals are adjusted by a random amount so that if you have multiple services they will not all reconnect at the same time.
If you specify an array of connections instead of a single connection object Rascal will order then as per the connection strategy at startup, and cycle through until it obtains a connection or exhausts all hosts.
The AMQP protocol doesn't support assertion or checking of vhosts, so Rascal uses the RabbitMQ management API to achieve a similar result. The management connection configuration is derived from defaults and the vhost connection, but can be explicitly specified as follows...
If you have a high number of exchanges, queues and bindings you may wish to initialise Rascal using multiple channels to improve startup time. Do this per vhost by setting the concurrency attribute to the number of channels you want to create and use.
When set to true, Rascal will create the vhost if one doesn't exist using the RabbitMQ management API. This requires the management plugin to be installed on the broker and for the management user to have necessary permissions.
When set to true, Rascal will check that the vhost exists using the RabbitMQ management API. This requires the management plugin to be installed on the broker and for the management user to have necessary permissions.
Rascal useds pools channels it uses for publishing messages. It creates two pools per vhost - one for confirm channels, and other one for regular channels. The default maximum pool size is 5 and the minimum 1, but neither pool will be created until first use (override this by setting autostart: true). Idle channels are automatically evicted from the pool. The pool configuration can be adjusted through config, which is passed through to the underlying generic-pool library.
Unfortunately there is a bug in generic-pool's implementation, which means that if the pool fails to create a channel, it can enter a tight loop, thrashing your CPU and potentially crashing your node process due to a memory leak. While we assess the long term use of pooling, we have put in a workaround. Errors will only be rejected after a configurable delay. This defaults to one second but can be overridden through the rejectionDelayMillis pool attribute. Special thanks to @willthrom for helping diagnose and fix this issue.
amqplib flow control dictates channels act like stream.Writable when Rascal calls channel.publish or channel.sendToQueue, returning false when the channel is saturated and true if it is not. While it is possible to ignore this and keep publishing messages, it is preferable to apply back pressure to the message source. You can do this by listening to the broker busy and ready events. Busy events are emitted when the number of outstanding channel requests reach the pool max size, and ready events emitted when the outstanding channel requests falls back down to zero. The pool details are passed to both event handlers so you can take selective action.
Running automated tests against shared queues and exchanges is problematic. Messages left over from a previous test run can cause assertions to fail. Rascal has several strategies which help you cope with this problem, one of which is to namespace your queues and exchange.
If you specify "namespace" :true Rascal will prefix the queues and exchanges it creates with a uuid. Alternatively you can specify your own namespace, "namespace": "foo". Namespaces are also if you want to use a single vhost locally but multiple vhosts in other environments.
Setting assert to true will cause Rascal to create the exchange on initialisation. If the exchange already exists and has the same configuration (type, durability, etc) everything will be fine, however if the existing exchange has a different configuration an error will be returned. Assert is enabled in the default configuration.
Setting assert to true will cause Rascal to create the queue on initialisation. If the queue already exists and has the same configuration (durability, etc) everything will be fine, however if the existing queue has a different configuration an error will be returned. Assert is enabled in the default configuration.
Sometimes you want to publish a message, and have the consumer of the message send a reply to the same application instance that published the original message. This can be difficult if you application is deployed using multiple instances which share a common configuration. Quite often the solution is to make your application stateless so it doesn't matter which instance receives the reply. An alternative is to mark the queue as a reply queue using the replyTo.
When true, Rascal will append a uuid to the queue name so that it is unique for each instance of the application. Use this conjunction with the publication replyTo property, to automatically set the replyTo property on outbound messages to the unique queue name. You may also want to make the queue non durable and exclusive too (see below).
Streams are not a replacement for regular messaging - instead they are best suited for when you can tolerate occasional message loss and need for higher throughput, such as sampling web based analytics.
When working with streams you need to think carefully about data retention. Unless you specify retention configuration, messages will never be deleted and eventually you will run out of space. Conversely, if you automatically delete messages based on queue size or age, they may be lost without ever being read.
You also need to think about how you will track the consumer offset. Typically you will need to store this in a database after successfully processing the message and use it to tell the broker where to resume from after your application restarts. For example...
However, if your application is offline for too long, and messages are still being published to the stream, it may not be able to resume from where you left off, since those messages may have been deleted. Furthermore, if your application consumes messages concurrently, you need to think about how you will recover should one fail. If you naively override the previouly saved offset, you may be replacing a higher/later offset with an lower/older one, causing in your application to restart from the wrong point. Finally, you also need to decide what to do if the message cannot be processed. You cannot simply replay the message since you are working with a stream, rather than a queue. You could cancel the subscription and resume from the current offset, but this will lead to duplicates if you have been consuming messages concurrently. Alternatively you could republish the failures to a dead letter queue and process them separately.
3a8082e126