本文整理汇总了Golang中github.com/streadway/amqp.DialConfig函数的典型用法代码示例。如果您正苦于以下问题:Golang DialConfig函数的具体用法?Golang DialConfig怎么用?Golang DialConfig使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了DialConfig函数的16个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: DialURL
func DialURL(url, rootCACert, dnsResolvers string) ConnectionFactory {
return func() (*amqp.Connection, error) {
config, err := buildAMQPConfig(rootCACert, dnsResolvers)
if err != nil {
return nil, err
}
return amqp.DialConfig(url, config)
}
}
开发者ID:codequest-eu,项目名称:octopussy,代码行数:9,代码来源:amqp_utils.go
示例2: connect
func (rpc *RPCServer) connect() {
var err error
for {
rpc.connection, err = amqp.DialConfig(rpc.dsn, amqp.Config{Properties: amqp.Table{"product": "RPC/Server." + rpc.queueName}})
if err != nil {
rpc.log.Printf("Error connecting: %s", err)
time.Sleep(time.Second)
continue
}
rpc.channel, err = rpc.connection.Channel()
if err != nil {
rpc.log.Printf("Error getting channel: %s", err)
time.Sleep(time.Second)
continue
}
q, err := rpc.channel.QueueDeclare(
rpc.queueName, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
rpc.log.Printf("Error declaring queue: %s", err)
time.Sleep(time.Second)
continue
}
err = rpc.channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
if err != nil {
rpc.log.Printf("Error setting QOS: %s", err)
time.Sleep(time.Second)
continue
}
rpc.messages, err = rpc.channel.Consume(
q.Name, // queue
"", // consume
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
rpc.log.Printf("Error consuming: %s", err)
time.Sleep(time.Second)
continue
}
rpc.log.Printf("Connected")
break
}
}
开发者ID:BrightLocal,项目名称:amqp-rpc,代码行数:56,代码来源:server.go
示例3: ExampleConfig_timeout
func ExampleConfig_timeout() {
// Provide your own anonymous Dial function that delgates to net.DialTimout
// for custom timeouts
conn, err := amqp.DialConfig("amqp:///", amqp.Config{
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 2*time.Second)
},
})
log.Printf("conn: %v, err: %v", conn, err)
}
开发者ID:trist4n,项目名称:amqp,代码行数:12,代码来源:examples_test.go
示例4: NewClient
// Creates a new client, connecting to the AMQP URL specified. A nil
// amqp.Config may be specified for default connection parameters.
func NewClient(url string, cfg amqp.Config) (*Client, error) {
var err error
c := &Client{
responseChans: map[string]chan amqp.Delivery{},
}
c.conn, err = amqp.DialConfig(url, cfg)
if err != nil {
return nil, err
}
c.txChannel, err = c.conn.Channel()
if err != nil {
return nil, err
}
c.rxChannel, err = c.conn.Channel()
if err != nil {
return nil, err
}
rxQueue, err := c.rxChannel.QueueDeclare(
"", // name
false, // durable
true, // autodelete
true, // exclusive
false, // nowait
nil,
)
if err != nil {
return nil, err
}
c.rxQueueName = rxQueue.Name
c.rxCh, err = c.rxChannel.Consume(c.rxQueueName,
"", // consumer
true, // autoAck
true, // exclusive
false, // nolocal
false, // nowait
nil,
)
if err != nil {
return nil, err
}
go c.responseHandler()
return c, nil
}
开发者ID:hlandau,项目名称:degoutils,代码行数:54,代码来源:client.go
示例5: Dial
/*
* Dial connects to an amqp URL where it expects a rabbitMQ instance to be running.
* Returns a multiplexable connection that can then be used to produce/consume on different queues
*/
func Dial(config Config) RabbitHandler {
if config.AmqpUrl == "" {
config.AmqpUrl = LocalhostAmqpUrl
}
conn, err := amqp.DialConfig(config.AmqpUrl, config.AmqpConfig)
rabbitbeans.FailOnError(err, "Failed to connect to RabbitMQ")
return &Connection{
config,
conn,
}
}
开发者ID:urjitbhatia,项目名称:rabbitbeans,代码行数:17,代码来源:rabbit.go
示例6: DialURLWithBackoff
func DialURLWithBackoff(url, rootCACert, dnsResolvers string) ConnectionFactory {
return func() (conn *amqp.Connection, err error) {
config, err := buildAMQPConfig(rootCACert, dnsResolvers)
if err != nil {
return nil, err
}
for i := 1; ; i++ {
if conn, err = amqp.DialConfig(url, config); err == nil || i == 5 {
return
}
log.Printf("AMQP connection failed, attempt %d", i)
time.Sleep(time.Duration(i*3) * time.Second)
}
return
}
}
开发者ID:codequest-eu,项目名称:octopussy,代码行数:16,代码来源:amqp_utils.go
示例7: connect
func (r *RPCClient) connect() {
var err error
for {
if r.connection == nil {
r.connection, err = amqp.DialConfig(
r.dsn,
amqp.Config{
Properties: amqp.Table{"product": "RPC/Client." + r.name},
},
)
if err != nil {
r.log.Printf("Error connecting: %s", err)
time.Sleep(time.Second)
continue
}
}
r.channel, err = r.connection.Channel()
if err != nil {
r.log.Printf("Error getting channel: %s", err)
r.connection = nil
continue
}
r.queue, err = r.channel.QueueDeclare(
"", // name
false, // durable
true, // delete when unused
true, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
r.log.Printf("Error declaring a queue: %s", err)
time.Sleep(time.Second)
continue
}
break
}
}
开发者ID:BrightLocal,项目名称:amqp-rpc,代码行数:38,代码来源:client.go
示例8: connect
func (tc *testClient) connect() *amqpclient.Connection {
internal, external := net.Pipe()
go tc.s.OpenConnection(internal)
// Set up connection
clientconfig := amqpclient.Config{
SASL: nil,
Vhost: "/",
ChannelMax: 100000,
FrameSize: 100000,
Heartbeat: time.Duration(0),
TLSClientConfig: nil,
Properties: make(amqpclient.Table),
Dial: func(network, addr string) (net.Conn, error) {
return external, nil
},
}
client, err := amqpclient.DialConfig("amqp://localhost:1234", clientconfig)
if err != nil {
panic(err.Error())
}
return client
}
开发者ID:jeffjenkins,项目名称:dispatchd,代码行数:23,代码来源:server_test.go
示例9: initMQ
//.........这里部分代码省略.........
}
// write a CONNECT request in the tcp connection
fmt.Fprintf(conn, "CONNECT "+addr+" HTTP/1.1\r\nHost: "+addr+"\r\n\r\n")
// verify status is 200, and flush the buffer
status, err := bufio.NewReader(conn).ReadString('\n')
if err != nil {
return
}
if status == "" || len(status) < 12 {
err = fmt.Errorf("Invalid status received from proxy: '%s'", status[0:len(status)-2])
return
}
// 9th character in response should be "2"
// HTTP/1.0 200 Connection established
// ^
if status[9] != '2' {
err = fmt.Errorf("Invalid status received from proxy: '%s'", status[0:len(status)-2])
return
}
ctx.Agent.Env.IsProxied = true
ctx.Agent.Env.Proxy = proxy
return
}
} else {
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 5*time.Second)
}
}
if ctx.MQ.UseTLS {
ctx.Channels.Log <- mig.Log{Desc: "Loading AMQPS TLS parameters"}.Debug()
// import the client certificates
cert, err := tls.X509KeyPair(AGENTCERT, AGENTKEY)
if err != nil {
panic(err)
}
// import the ca cert
ca := x509.NewCertPool()
if ok := ca.AppendCertsFromPEM(CACERT); !ok {
panic("failed to import CA Certificate")
}
TLSconfig := tls.Config{Certificates: []tls.Certificate{cert},
RootCAs: ca,
InsecureSkipVerify: false,
Rand: rand.Reader}
dialConfig.TLSClientConfig = &TLSconfig
}
// Open AMQP connection
ctx.Channels.Log <- mig.Log{Desc: "Establishing connection to relay"}.Debug()
ctx.MQ.conn, err = amqp.DialConfig(AMQPBROKER, dialConfig)
if err != nil {
ctx.Channels.Log <- mig.Log{Desc: "Connection failed"}.Debug()
panic(err)
}
ctx.MQ.Chan, err = ctx.MQ.conn.Channel()
if err != nil {
panic(err)
}
// Limit the number of message the channel will receive at once
err = ctx.MQ.Chan.Qos(1, // prefetch count (in # of msg)
0, // prefetch size (in bytes)
false) // is global
_, err = ctx.MQ.Chan.QueueDeclare(ctx.MQ.Bind.Queue, // Queue name
true, // is durable
false, // is autoDelete
false, // is exclusive
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.QueueBind(ctx.MQ.Bind.Queue, // Queue name
ctx.MQ.Bind.Key, // Routing key name
mig.Mq_Ex_ToAgents, // Exchange name
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
// Consume AMQP message into channel
ctx.MQ.Bind.Chan, err = ctx.MQ.Chan.Consume(ctx.MQ.Bind.Queue, // queue name
"", // some tag
false, // is autoAck
false, // is exclusive
false, // is noLocal
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
return
}
开发者ID:igofman,项目名称:mig,代码行数:101,代码来源:context.go
示例10: InitMQ
func InitMQ(conf MqConf) (amqpChan *amqp.Channel, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("worker.initMQ() -> %v", e)
}
}()
// create an AMQP configuration with a 10min heartbeat and timeout
// dialing address use format "<scheme>://<user>:<pass>@<host>:<port><vhost>"
var scheme, user, pass, host, port, vhost string
if conf.UseTLS {
scheme = "amqps"
} else {
scheme = "amqp"
}
if conf.User == "" {
panic("MQ User is missing")
}
user = conf.User
if conf.Pass == "" {
panic("MQ Pass is missing")
}
pass = conf.Pass
if conf.Host == "" {
panic("MQ Host is missing")
}
host = conf.Host
if conf.Port < 1 {
panic("MQ Port is missing")
}
port = fmt.Sprintf("%d", conf.Port)
vhost = conf.Vhost
dialaddr := scheme + "://" + user + ":" + pass + "@" + host + ":" + port + "/" + vhost
timeout, _ := time.ParseDuration(conf.Timeout)
var dialConfig amqp.Config
dialConfig.Heartbeat = timeout
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, timeout)
}
if conf.UseTLS {
// import the client certificates
cert, err := tls.LoadX509KeyPair(conf.TLScert, conf.TLSkey)
if err != nil {
panic(err)
}
// import the ca cert
data, err := ioutil.ReadFile(conf.CAcert)
ca := x509.NewCertPool()
if ok := ca.AppendCertsFromPEM(data); !ok {
panic("failed to import CA Certificate")
}
TLSconfig := tls.Config{Certificates: []tls.Certificate{cert},
RootCAs: ca,
InsecureSkipVerify: false,
Rand: rand.Reader}
dialConfig.TLSClientConfig = &TLSconfig
}
// Setup the AMQP broker connection
amqpConn, err := amqp.DialConfig(dialaddr, dialConfig)
if err != nil {
panic(err)
}
amqpChan, err = amqpConn.Channel()
if err != nil {
panic(err)
}
return
}
开发者ID:Novemburr,项目名称:mig,代码行数:68,代码来源:workers.go
示例11: initBroker
// initBroker() sets up the connection to the RabbitMQ broker
func initBroker(orig_ctx Context) (ctx Context, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("initBroker() -> %v", e)
}
ctx.Channels.Log <- mig.Log{Desc: "leaving initBroker()"}.Debug()
}()
ctx = orig_ctx
// dialing address use format "<scheme>://<user>:<pass>@<host>:<port><vhost>"
var scheme, user, pass, host, port, vhost string
if ctx.MQ.UseTLS {
scheme = "amqps"
} else {
scheme = "amqp"
}
if ctx.MQ.User == "" {
panic("MQ User is missing")
}
user = ctx.MQ.User
if ctx.MQ.Pass == "" {
panic("MQ Pass is missing")
}
pass = ctx.MQ.Pass
if ctx.MQ.Host == "" {
panic("MQ Host is missing")
}
host = ctx.MQ.Host
if ctx.MQ.Port < 1 {
panic("MQ Port is missing")
}
port = fmt.Sprintf("%d", ctx.MQ.Port)
vhost = ctx.MQ.Vhost
dialaddr := scheme + "://" + user + ":" + pass + "@" + host + ":" + port + "/" + vhost
if ctx.MQ.Timeout == "" {
ctx.MQ.Timeout = "600s"
}
timeout, err := time.ParseDuration(ctx.MQ.Timeout)
if err != nil {
panic("Failed to parse timeout duration")
}
// create an AMQP configuration with specific timers
var dialConfig amqp.Config
dialConfig.Heartbeat = timeout
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, timeout)
}
// create the TLS configuration
if ctx.MQ.UseTLS {
// import the client certificates
cert, err := tls.LoadX509KeyPair(ctx.MQ.TLScert, ctx.MQ.TLSkey)
if err != nil {
panic(err)
}
// import the ca cert
data, err := ioutil.ReadFile(ctx.MQ.CAcert)
ca := x509.NewCertPool()
if ok := ca.AppendCertsFromPEM(data); !ok {
panic("failed to import CA Certificate")
}
TLSconfig := tls.Config{Certificates: []tls.Certificate{cert},
RootCAs: ca,
InsecureSkipVerify: false,
Rand: rand.Reader}
dialConfig.TLSClientConfig = &TLSconfig
}
// Setup the AMQP broker connection
ctx.MQ.conn, err = amqp.DialConfig(dialaddr, dialConfig)
if err != nil {
panic(err)
}
ctx.MQ.Chan, err = ctx.MQ.conn.Channel()
if err != nil {
panic(err)
}
// declare the "mig" exchange used for all publications
err = ctx.MQ.Chan.ExchangeDeclare("mig", "topic", true, false, false, false, nil)
if err != nil {
panic(err)
}
ctx.Channels.Log <- mig.Log{Sev: "info", Desc: "AMQP connection opened"}
return
}
开发者ID:netantho,项目名称:mig,代码行数:92,代码来源:context.go
示例12: InitAmqp
// InitAmqp establishes a connection to the rabbitmq endpoint defined in the configuration
func InitAmqp(conf MqConf) (p Publisher, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("InitAmqp failed with error: %v", e)
}
}()
var scheme, user, pass, host, port, vhost string
if conf.UseTLS {
scheme = "amqps"
} else {
scheme = "amqp"
}
if conf.User == "" {
panic("MQ User is missing")
}
user = conf.User
if conf.Pass == "" {
panic("MQ Pass is missing")
}
pass = conf.Pass
if conf.Host == "" {
panic("MQ Host is missing")
}
host = conf.Host
if conf.Port < 1 {
panic("MQ Port is missing")
}
port = fmt.Sprintf("%d", conf.Port)
vhost = conf.Vhost
dialaddr := scheme + "://" + user + ":" + pass + "@" + host + ":" + port + "/" + vhost
timeout, _ := time.ParseDuration(conf.Timeout)
var dialConfig amqp.Config
dialConfig.Heartbeat = timeout
dialConfig.Dial = func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, timeout)
}
if conf.UseTLS {
// import the ca cert
data, err := ioutil.ReadFile(conf.CACertPath)
if err != nil {
panic(err)
}
ca := x509.NewCertPool()
if ok := ca.AppendCertsFromPEM(data); !ok {
panic("failed to import CA Certificate")
}
TLSconfig := tls.Config{
RootCAs: ca,
InsecureSkipVerify: false,
Rand: rand.Reader,
}
dialConfig.TLSClientConfig = &TLSconfig
if conf.ClientCertPath != "" && conf.ClientKeyPath != "" {
// import the client certificates
cert, err := tls.LoadX509KeyPair(conf.ClientCertPath, conf.ClientKeyPath)
if err != nil {
panic(err)
}
TLSconfig.Certificates = []tls.Certificate{cert}
}
}
// Setup the AMQP broker connection
amqpConn, err := amqp.DialConfig(dialaddr, dialConfig)
if err != nil {
panic(err)
}
p.amqpChan, err = amqpConn.Channel()
if err != nil {
panic(err)
}
p.use_amqp = true
p.mqconf = conf
return
}
开发者ID:ZhuHangpeng,项目名称:mig,代码行数:76,代码来源:gozdef.go
示例13: connect
func (r *RabbitConnection) connect() (err error) {
r.conn, err = amqp.DialConfig(r.ampq, amqp.Config{
Heartbeat: 2 * time.Second,
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, 2*time.Second)
},
})
//r.conn, err = amqp.Dial(r.ampq)
if err != nil {
log.Error(err)
return err
}
r.ch, err = r.conn.Channel()
if err != nil {
log.Error(err)
return err
}
if err = r.ch.Confirm(false); err != nil {
log.Error(err)
return err
}
//r.ackChn, r.nackChn = r.Channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
// Объявляем обменники
for _, exchange := range r.exchanges {
if err := r.ch.ExchangeDeclare(exchange.Name, exchange.Type, exchange.Durable, exchange.Autodelete, false, false, nil); err != nil {
log.Error(err)
return err
}
}
for key := range r.queues {
_, err := r.ch.QueueDeclare(r.queues[key].Name, r.queues[key].Durable, r.queues[key].Autodelete, false, false, nil)
if err != nil {
log.Error(err)
return err
}
// Привязываем очередь к обменнику
if len(r.queues[key].Exchange) != 0 {
err = r.ch.QueueBind(r.queues[key].Name, r.queues[key].Key, r.queues[key].Exchange, false, nil)
if err != nil {
log.Error(err)
return err
}
}
r.queues[key].inChan, err = r.ch.Consume(r.queues[key].Name, "", false, false, false, false, nil)
if err != nil {
log.Error(err)
return err
}
/*r.queues[key].queueMQ = &q*/
r.queues[key].done = r.done
go r.queues[key].listenQueue()
}
err = r.ch.Qos(1, 0, true)
if err != nil {
log.Error(err)
return err
}
<-r.done
return
}
开发者ID:IntelliQru,项目名称:webgo-modules,代码行数:72,代码来源:rabbit.go
示例14: initMQ
func initMQ(orig_ctx Context) (ctx Context, err error) {
ctx = orig_ctx
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("initMQ() -> %v", e)
}
ctx.Channels.Log <- mig.Log{Desc: "leaving initMQ()"}.Debug()
}()
//Define the AMQP binding
ctx.MQ.Bind.Queue = fmt.Sprintf("mig.agt.%s", ctx.Agent.QueueLoc)
ctx.MQ.Bind.Key = fmt.Sprintf("mig.agt.%s", ctx.Agent.QueueLoc)
// parse the dial string and use TLS if using amqps
if strings.Contains(AMQPBROKER, "amqps://") {
ctx.MQ.UseTLS = true
}
// create an AMQP configuration with specific timers
var dialConfig amqp.Config
dialConfig.ConnectionTimeout = 10 * ctx.Sleeper
dialConfig.Heartbeat = 2 * ctx.Sleeper
if ctx.MQ.UseTLS {
// import the client certificates
cert, err := tls.X509KeyPair([]byte(AGENTCERT), []byte(AGENTKEY))
if err != nil {
panic(err)
}
// import the ca cert
ca := x509.NewCertPool()
if ok := ca.AppendCertsFromPEM([]byte(CACERT)); !ok {
panic("failed to import CA Certificate")
}
TLSconfig := tls.Config{Certificates: []tls.Certificate{cert},
RootCAs: ca,
InsecureSkipVerify: false,
Rand: rand.Reader}
dialConfig.TLSClientConfig = &TLSconfig
}
// Open a non-encrypted AMQP connection
ctx.MQ.conn, err = amqp.DialConfig(AMQPBROKER, dialConfig)
if err != nil {
panic(err)
}
ctx.MQ.Chan, err = ctx.MQ.conn.Channel()
if err != nil {
panic(err)
}
// Limit the number of message the channel will receive at once
err = ctx.MQ.Chan.Qos(7, // prefetch count (in # of msg)
0, // prefetch size (in bytes)
false) // is global
_, err = ctx.MQ.Chan.QueueDeclare(ctx.MQ.Bind.Queue, // Queue name
true, // is durable
false, // is autoDelete
false, // is exclusive
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
err = ctx.MQ.Chan.QueueBind(ctx.MQ.Bind.Queue, // Queue name
ctx.MQ.Bind.Key, // Routing key name
"mig", // Exchange name
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
// Consume AMQP message into channel
ctx.MQ.Bind.Chan, err = ctx.MQ.Chan.Consume(ctx.MQ.Bind.Queue, // queue name
"", // some tag
false, // is autoAck
false, // is exclusive
false, // is noLocal
false, // is noWait
nil) // AMQP args
if err != nil {
panic(err)
}
return
}
开发者ID:jeffbryner,项目名称:mig,代码行数:91,代码来源:context.go
示例15: Connect
func (q *AMQP) Connect() error {
q.Lock()
defer q.Unlock()
q.headers = amqp.Table{
"precision": q.Precision,
"database": q.Database,
"retention_policy": q.RetentionPolicy,
}
var connection *amqp.Connection
// make new tls config
tls, err := internal.GetTLSConfig(
q.SSLCert, q.SSLKey, q.SSLCA, q.InsecureSkipVerify)
if err != nil {
return err
}
// parse auth method
var sasl []amqp.Authentication // nil by default
if strings.ToUpper(q.AuthMethod) == "EXTERNAL" {
sasl = []amqp.Authentication{&externalAuth{}}
}
amqpConf := amqp.Config{
TLSClientConfig: tls,
SASL: sasl, // if nil, it will be PLAIN
}
connection, err = amqp.DialConfig(q.URL, amqpConf)
if err != nil {
return err
}
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Failed to open a channel: %s", err)
}
err = channel.ExchangeDeclare(
q.Exchange, // name
"topic", // type
true, // durable
false, // delete when unused
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("Failed to declare an exchange: %s", err)
}
q.channel = channel
go func() {
log.Printf("I! Closing: %s", <-connection.NotifyClose(make(chan *amqp.Error)))
log.Printf("I! Trying to reconnect")
for err := q.Connect(); err != nil; err = q.Connect() {
log.Println("E! ", err.Error())
time.Sleep(10 * time.Second)
}
}()
return nil
}
开发者ID:Wikia,项目名称:telegraf,代码行数:63,代码来源:amqp.go
示例16: Loop
// Loop should be run as condition for `for` with receiving from (*Client).Errors()
//
// It will manage AMQP connection, run queue and exchange declarations, consumers.
// Will start to return false once (*Client).Close() called.
func (c *Client) Loop() bool {
var (
err error
)
if atomic.LoadInt32(&c.run) == noRun {
return false
}
conn, _ := c.conn.Load().(*amqp.Connection)
if conn != nil {
return true
}
if c.bo != nil {
time.Sleep(c.bo.Backoff(int(c.attempt)))
atomic.AddInt32(&c.attempt, 1)
}
// set default Heartbeat to 10 seconds like in original amqp.Dial
if c.config.Heartbeat == 0 {
c.config.Heartbeat = 10 * time.Second
}
conn, err = amqp.DialConfig(c.addr, c.config)
if c.reportErr(err) {
return true
}
c.conn.Store(conn)
atomic.StoreInt32(&c.attempt, 0)
// guard conn
go func() {
chanErr := make(chan *amqp.Error)
chanBlocking := make(chan amqp.Blocking)
conn.NotifyClose(chanErr)
conn.NotifyBlocked(chanBlocking)
// loop for blocking/deblocking
for {
select {
case err1 := <-chanErr:
c.reportErr(err1)
if conn1 := c.conn.Load().(*amqp.Connection); conn1 != nil {
c.conn.Store((*amqp.Connection)(nil))
conn1.Close()
}
// return from routine to launch reconnect process
return
case blocking := <-chanBlocking:
select {
case c.blocking <- blocking:
default:
}
}
}
}()
ch, err := conn.Channel()
if c.reportErr(err) {
return true
}
for _, declare := range c.declarations {
c.reportErr(declare(ch))
}
for cons := range c.consumers {
ch1, err := c.channel()
if err == nil {
go cons.serve(c, ch1)
}
}
for pub := range c.publishers {
ch1, err := c.channel()
if err == nil {
go pub.serve(c, ch1)
}
}
return true
}
开发者ID:assembla,项目名称:cony,代码行数:92,代码来源:client.go
注:本文中的github.com/streadway/amqp.DialConfig函数示例由纯净天空整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论