amqp-client
amqp-client
An AMQP 0-9-1 client for Crystal.
Installation
- Add the dependency to your 
shard.yml: 
dependencies:
  amqp-client:
    github: cloudamqp/amqp-client.cr
- Run 
shards install 
Usage
require "amqp-client"
AMQP::Client.start("amqp://guest:guest@localhost") do |c|
  c.channel do |ch|
    q = ch.queue("my-queue")
    q.subscribe(no_ack: false) do |msg|
      puts "Received: #{msg.body_io.to_s}"
      ch.basic_ack(msg.delivery_tag)
    end
    # publish directly to a queue without confirm
    q.publish "msg"
    # publish directly to a queue, blocking while waiting for confirm
    q.publish_confirm "msg"
    # publish to any exchange/routing-key
    ch.basic_publish "msg", exchange: "amq.topic", routing_key: "a"
    # publish to any exchange/routing-key and wait for confirm
    ch.basic_publish_confirm "msg", exchange: "amq.topic", routing_key: "a"
    # This statement will block until a message has arrived
    # The only way to "escape" the block is to unsubscribe
    q.subscribe(tag: "myconsumer", block: true) do |msg|
      q.unsubscribe("myconsumer")
    end
    # Consume and ack, nack or reject msgs
    ch.basic_consume("queue", tag: "consumer-tag", no_ack: false, exclusive: false, block: false) do |msg|
      case msg.body_io.to_s
      when "ack"
        ch.ack(msg.delivery_tag)
      when "reject"
        ch.reject(msg.delivery_tag, requeue: true)
      when "nack"
        ch.nack(msg.delivery_tag, requeue: true, multiple: true)
      end
    end
    ch.prefetch(count: 1000) # alias for basic_qos
    name, message_count, consumer_count =
      ch.queue_declare(name: "myqueue", passive: false, durable: true,
                       exclusive: false, auto_delete: false,
                       args = Arguments.new)
    q = ch.queue # temporary queue that is deleted when the channel is closed
    ch.queue_purge("myqueue")
    ch.queue_bind("myqueue", "amq.topic", "routing-key")
    ch.queue_unbind("myqueue", "amq.topic", "routing-key")
    msg = ch.basic_get("myqueue", no_ack: true)
    ch.basic_ack(msg.delivery_tag)
    ch.queue_delete("myqueue")
    ch.exchange_declare("my-exchange", type: "topic")
    ch.exchange_delete("my-exchange")
  end
end
Contributing
- Fork it
 - Create your feature branch (
git checkout -b my-new-feature) - Commit your changes (
git commit -am 'Add some feature') - Push to the branch (
git push origin my-new-feature) - Create a new Pull Request
 
Contributors
- Carl Hörberg - creator and maintainer
 - Anders Bälter