// Connects to the message queue, opens a channel, declares a queue
func (amqpBroker *AMQPBroker) open() (*amqp.Connection, *amqp.Channel, amqp.Queue, <-chan amqp.Confirmation, error) {
var conn *amqp.Connection
var channel *amqp.Channel
var queue amqp.Queue
var err error
conn, err = amqp.Dial(amqpBroker.config.Broker)
if err != nil {
return conn, channel, queue, nil, fmt.Errorf("Dial: %s", err)
}
channel, err = conn.Channel()
if err != nil {
return conn, channel, queue, nil, fmt.Errorf("Channel: %s", err)
}
if err := channel.ExchangeDeclare(
amqpBroker.config.Exchange, // name of the exchange
amqpBroker.config.ExchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return conn, channel, queue, nil, fmt.Errorf("Exchange: %s", err)
}
queue, err = channel.QueueDeclare(
amqpBroker.config.DefaultQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return conn, channel, queue, nil, fmt.Errorf("Queue Declare: %s", err)
}
if err := channel.QueueBind(
queue.Name, // name of the queue
amqpBroker.config.BindingKey, // binding key
amqpBroker.config.Exchange, // source exchange
false, // noWait
nil, // arguments
); err != nil {
return conn, channel, queue, nil, fmt.Errorf("Queue Bind: %s", err)
}
confirmsChan := make(chan amqp.Confirmation, 1)
// Enable publish confirmations
if err := channel.Confirm(false); err != nil {
close(confirmsChan)
return conn, channel, queue, nil, fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
return conn, channel, queue, channel.NotifyPublish(confirmsChan), nil
}
请发表评论