本文整理汇总了Golang中github.com/gmallard/stompngo.Connection类的典型用法代码示例。如果您正苦于以下问题:Golang Connection类的具体用法?Golang Connection怎么用?Golang Connection使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
在下文中一共展示了Connection类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: recMessages
func recMessages(c *stompngo.Connection, q string) {
var error error
fmt.Printf("Start for q: %s\n", q)
// Receive phase
headers := stompngo.Headers{"destination", q}
fmt.Printf("qhdrs: %v\n", headers)
sc, error := c.Subscribe(headers)
if error != nil {
// Handle error properly
log.Fatalf("sub error: %v\n", error)
}
var md stompngo.MessageData
var inmsg string
for {
select {
case md = <-c.MessageData:
log.Fatalf("unexpected message: %v\n", md)
case md = <-sc:
inmsg = md.Message.BodyString()
}
if printMsgs {
fmt.Println("queue:", q, "Next Receive: ", inmsg)
}
incrCtl.Lock()
numRecv++
incrCtl.Unlock()
if strings.HasPrefix(inmsg, "***EOF***") {
fmt.Printf("goteof: %v %v\n", q, inmsg)
break
}
}
wg.Done()
}
开发者ID:gmallard,项目名称:go-samp,代码行数:35,代码来源:receiver1.go
示例2: recMessages
func recMessages(c *stompngo.Connection, q string) {
var error error
fmt.Printf("Start for q: %s\n", q)
// Receive phase
headers := stompngo.Headers{"destination", q}
fmt.Printf("qhdrs: %v\n", headers)
_, error = c.Subscribe(headers)
if error != nil {
// Handle error properly
log.Fatalf("sub error: %v\n", error)
}
for input := range c.MessageData {
inmsg := input.Message.BodyString()
if printMsgs {
fmt.Println("queue:", q, "Next Receive: ", inmsg)
}
incrCtl.Lock()
numRecv++
incrCtl.Unlock()
if strings.HasPrefix(inmsg, "***EOF***") {
fmt.Printf("goteof: %v %v\n", q, inmsg)
break
}
}
wg.Done()
}
开发者ID:ra,项目名称:go-samp,代码行数:28,代码来源:receiver1.go
示例3: sendMessages
func sendMessages(c *stompngo.Connection, q string, n int, k int) {
var error error
ks := fmt.Sprintf("%d", k)
// Send
eh := stompngo.Headers{"destination", q} // Extra headers
for i := 1; i <= n; i++ {
m := ks + " gostomp message #" + strconv.Itoa(i)
if printMsgs {
log.Println("Send:", q, " / ", m)
}
error = c.Send(eh, m)
if error != nil {
log.Fatal(error)
}
//
d := time.Duration(getStagger(1e9/20, 1e9/10))
time.Sleep(d)
}
error = c.Send(eh, "***EOF***")
if error != nil {
log.Fatal(error)
}
wgsend.Done()
}
开发者ID:ra,项目名称:go-samp,代码行数:25,代码来源:sendrcv.go
示例4: sendMessages
func sendMessages(c *stompngo.Connection, q string, n int, k int) {
var error error
// Send
eh := stompngo.Headers{"destination", q} // Extra headers
for i := 1; i <= n; i++ {
m := q + " gostomp message #" + strconv.Itoa(i)
if printMsgs {
fmt.Println("msg:", m)
}
error = c.Send(eh, m)
if error != nil {
log.Fatalf("send error: %v\n", error)
}
//
time.Sleep(1e9 / 100) // Simulate message build
}
error = c.Send(eh, "***EOF*** "+q)
if error != nil {
log.Fatal(error)
}
wg.Done()
}
开发者ID:ra,项目名称:go-samp,代码行数:25,代码来源:sendernid.go
示例5: closeSconn
/*
closeSconn closes a stompngo Connection.
*/
func closeSconn(n net.Conn, conn *stompngo.Connection) {
ltag := tag + "-closesconn"
// Standard example disconnect sequence
e := sngecomm.CommonDisconnect(n, conn, exampid, ltag, ll)
if e != nil {
ll.Fatalf("%stag:%s connsess:%s disconnect_error error:%s\n",
exampid, ltag, conn.Session(),
e.Error()) // Handle this ......
}
return
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:15,代码来源:srmgor_1smrconn.go
示例6: CommonDisconnect
// Common example disconnect logic
func CommonDisconnect(n net.Conn, conn *stompngo.Connection,
exampid, tag string,
l *log.Logger) error {
// Disconnect from the Stomp server
e := conn.Disconnect(stompngo.Headers{})
if e != nil {
return e
}
l.Printf("%stag:%s consess:%v common_disconnect_complete local_addr:%s remote_addr:%s\n",
exampid, tag, conn.Session(),
n.LocalAddr().String(), n.RemoteAddr().String())
// Close the network connection
e = n.Close()
if e != nil {
return e
}
// Parting messages
l.Printf("%stag:%s consess:%v common_disconnect_network_close_complete\n",
exampid, tag, conn.Session())
l.Printf("%stag:%s consess:%v common_disconnect_ends\n",
exampid, tag, conn.Session())
//
return nil
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:29,代码来源:utilities.go
示例7: sendMessages
func sendMessages(conn *stompngo.Connection, qnum int, nc net.Conn) {
ltag := tag + "-sendmessages"
qns := fmt.Sprintf("%d", qnum) // queue number
d := sngecomm.Dest() + "." + qns
ll.Printf("%stag:%s connsess:%s start d:%s qnum:%d\n",
exampid, ltag, conn.Session(),
d, qnum)
wh := stompngo.Headers{"destination", d,
"qnum", qns} // send Headers
if senv.Persistent() {
wh = wh.Add("persistent", "true")
}
//
tmr := time.NewTimer(100 * time.Hour)
// Send messages
for mc := 1; mc <= nmsgs; mc++ {
mcs := fmt.Sprintf("%d", mc)
sh := append(wh, "msgnum", mcs)
// Generate a message to send ...............
ll.Printf("%stag:%s connsess:%s message mc:%d qnum:%d\n",
exampid, ltag, conn.Session(),
mc, qnum)
e := conn.Send(sh, string(sngecomm.Partial()))
if e != nil {
ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
exampid, ltag, conn.Session(),
qnum, e.Error()) // Handle this ......
}
if mc == nmsgs {
break
}
if sw {
runtime.Gosched() // yield for this example
dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qnum:%s mc:%d\n",
exampid, ltag, conn.Session(),
dt, qnum, mc)
tmr.Reset(dt)
_ = <-tmr.C
}
}
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:44,代码来源:srmgor_manyconn.go
示例8: HandleUnsubscribe
// Handle a unsubscribe for the different protocol levels.
func HandleUnsubscribe(c *stompngo.Connection, d, i string) {
sbh := stompngo.Headers{}
//
switch c.Protocol() {
case stompngo.SPL_12:
sbh = sbh.Add("id", i)
case stompngo.SPL_11:
sbh = sbh.Add("id", i)
case stompngo.SPL_10:
sbh = sbh.Add("destination", d)
default:
llu.Fatalf("v1:%v v2:%v\n", "unsubscribe invalid protocol level, should not happen")
}
e := c.Unsubscribe(sbh)
if e != nil {
llu.Fatalf("v1:%v v2:%v d:%v\n", "unsubscribe failed", e, d)
}
return
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:20,代码来源:utilities.go
示例9: receiverConnection
/*
receiverConnection starts individual receivers for this connection.
*/
func receiverConnection(conn *stompngo.Connection, cn, qpc int) {
ltag := tag + "-receiverconnection"
ll.Printf("%stag:%s connsess:%s starts cn:%d qpc:%d\n",
exampid, ltag, conn.Session(),
cn, qpc)
// cn -> a connection number: 1..n
// qpc -> destinations per connection
// Ex:
// 1, 2
// 2, 2
// 3, 2
// This code runs *once* for each connection
// These calcs are what causes a skip below. It is a safety valve to keep
// from starting one too many connections.
cb := cn - 1 // this connection number, zero based
q1 := qpc*cb + 1 // 1st queue number
ql := q1 + qpc - 1 // last queue number
if ql > sngecomm.Nqs() {
ql = sngecomm.Nqs() // truncate last if over max destinations
}
var wgrconn sync.WaitGroup
var skipped bool
if q1 <= ql {
ll.Printf("%stag:%s connsess:%s startq cn:%d q1:%d ql: %d\n",
exampid, ltag, conn.Session(),
cn, q1, ql)
skipped = false
} else {
// Skips are possible, at least with the current calling code, see above
ll.Printf("%stag:%s connsess:%s startskip cn:%d q1:%d ql: %d\n",
exampid, ltag, conn.Session(),
cn, q1, ql)
skipped = true
}
for q := q1; q <= ql; q++ {
wgrconn.Add(1)
go runReceive(conn, q, &wgrconn)
}
wgrconn.Wait()
//
ll.Printf("%stag:%s connsess:%s ends cn:%d qpc:%d skipped:%t\n",
exampid, ltag, conn.Session(),
cn, qpc, skipped)
wgr.Done()
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:55,代码来源:srmgor_1smrconn.go
示例10: HandleSubscribe
// Handle a subscribe for the different protocol levels.
func HandleSubscribe(c *stompngo.Connection, d, i, a string) <-chan stompngo.MessageData {
h := stompngo.Headers{"destination", d, "ack", a}
//
switch c.Protocol() {
case stompngo.SPL_12:
// Add required id header
h = h.Add("id", i)
case stompngo.SPL_11:
// Add required id header
h = h.Add("id", i)
case stompngo.SPL_10:
// Nothing else to do here
default:
llu.Fatalf("v1:%v v2:%v\n", "subscribe invalid protocol level, should not happen")
}
//
r, e := c.Subscribe(h)
if e != nil {
llu.Fatalf("v1:%v v2:%v\n", "subscribe failed", e)
}
return r
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:23,代码来源:utilities.go
示例11: doSubscribe
// Handle a subscribe for the different protocol levels.
func doSubscribe(c *stompngo.Connection, d, id, a string, h stompngo.Headers) <-chan stompngo.MessageData {
h = h.Add("destination", d).Add("ack", a)
//
switch c.Protocol() {
case stompngo.SPL_12:
// Add required id header
h = h.Add("id", id)
case stompngo.SPL_11:
// Add required id header
h = h.Add("id", id)
case stompngo.SPL_10:
// Nothing else to do here
default:
ll.Fatalf("v1:%v\n", "subscribe invalid protocol level, should not happen")
}
//
r, e := c.Subscribe(h)
if e != nil {
ll.Fatalf("subscribe failed err:[%v]\n", e)
}
return r
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:23,代码来源:noPMod1.go
示例12: recMessages
func recMessages(c *stompngo.Connection, q string, k int) {
var error error
ks := fmt.Sprintf("%d", k)
// Receive phase
headers := stompngo.Headers{"destination", q}
sh := headers.Add("id", q)
//
log.Println("start subscribe", q)
sc, error := c.Subscribe(sh)
log.Println("end subscribe", q)
if error != nil {
log.Fatal(error)
}
for input := range sc {
inmsg := input.Message.BodyString()
if printMsgs {
log.Println("Receive:", q, " / ", inmsg)
}
if inmsg == "***EOF***" {
break
}
if !strings.HasPrefix(inmsg, ks) {
log.Printf("bad prefix: [%v], [%v], [%v]\n", q, inmsg, ks)
log.Fatal("bad prefix ....")
}
//
d := time.Duration(getStagger(1e9/10, 1e9/5))
time.Sleep(d)
}
log.Println("quit for", q)
error = c.Unsubscribe(headers)
log.Println("end unsubscribe", q)
if error != nil {
log.Fatal(error)
}
wgrecv.Done()
}
开发者ID:ra,项目名称:go-samp,代码行数:39,代码来源:sendrcv.go
示例13: HandleAck
// Handle ACKs for the different protocol levels.
func HandleAck(c *stompngo.Connection, h stompngo.Headers, id string) {
ah := stompngo.Headers{}
//
switch c.Protocol() {
case stompngo.SPL_12:
ah = ah.Add("id", h.Value("ack"))
case stompngo.SPL_11:
ah = ah.Add("message-id", h.Value("message-id")).Add("subscription", id)
case stompngo.SPL_10:
ah = ah.Add("message-id", h.Value("message-id"))
default:
llu.Fatalf("v1:%v v2:%v\n", "ack invalid protocol level, should not happen")
}
if cv, ok := h.Contains(stompngo.HK_RECEIPT); ok {
ah = ah.Add(stompngo.HK_RECEIPT, cv)
}
e := c.Ack(ah)
if e != nil {
llu.Fatalf("v1:%v v2:%v v3:%v\n", "ack failed", e, c.Protocol())
}
return
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:23,代码来源:utilities.go
示例14: sender
// Send messages to a particular queue
func sender(conn *stompngo.Connection, qn, nmsgs int) {
ltag := tag + "-sender"
qns := fmt.Sprintf("%d", qn) // queue number
d := sngecomm.Dest() + "." + qns
ll.Printf("%stag:%s connsess:%s starts qn:%d nmsgs:%d d:%s\n",
exampid, ltag, conn.Session(),
qn, nmsgs, d)
//
wh := stompngo.Headers{"destination", d,
"qnum", qns} // send Headers
if senv.Persistent() {
wh = wh.Add("persistent", "true")
}
//
tmr := time.NewTimer(100 * time.Hour)
// Send loop
for i := 1; i <= nmsgs; i++ {
si := fmt.Sprintf("%d", i)
sh := append(wh, "msgnum", si)
// Generate a message to send ...............
ll.Printf("%stag:%s connsess:%s message qns:%s si:%s\n",
exampid, ltag, conn.Session(),
qns, si)
e := conn.Send(sh, string(sngecomm.Partial()))
if e != nil {
ll.Fatalf("%stag:%s connsess:%s send_error qnum:%v error:%v",
exampid, ltag, conn.Session(),
qn, e.Error()) // Handle this ......
}
if i == nmsgs {
break
}
if sw {
runtime.Gosched() // yield for this example
dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s\n",
exampid, ltag, conn.Session(),
dt, qns)
tmr.Reset(dt)
_ = <-tmr.C
}
}
// Sending is done
ll.Printf("%stag:%s connsess:%s sender_ends qn:%d nmsgs:%d\n",
exampid, ltag, conn.Session(),
qn, nmsgs)
wgs.Done()
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:50,代码来源:srmgor_2conn.go
示例15: recv
func recv(conn *stompngo.Connection, s int) {
ltag := tag + "-recv"
ll.Printf("%stag:%s connsess:%s receiver_starts s:%d\n",
exampid, ltag, conn.Session(),
s)
// Setup Headers ...
id := stompngo.Uuid() // Use package convenience function for unique ID
d := sngecomm.Dest()
ackMode = sngecomm.AckMode() // get ack mode
pbc := sngecomm.Pbc() // Print byte count
sc := sngecomm.HandleSubscribe(conn, d, id, ackMode)
// Receive loop.
mc := 0
var md stompngo.MessageData
for {
select {
case md = <-sc: // Read a messagedata struct, with a MESSAGE frame
case md = <-conn.MessageData: // Read a messagedata struct, with a ERROR/RECEIPT frame
// Frames RECEIPT or ERROR not expected here
ll.Fatalf("%stag:%s connsess:%s bad_frame md:%v",
exampid, ltag, conn.Session(),
md) // Handle this ......
}
//
mc++
if md.Error != nil {
ll.Fatalf("%stag:%s connsess:%s error_read error:%v",
exampid, ltag, conn.Session(),
md.Error) // Handle this ......
}
ll.Printf("%stag:%s connsess:%s received_message s:%d id:%s mc:%d\n",
exampid, ltag, conn.Session(),
s, id, mc)
if pbc > 0 {
maxlen := pbc
if len(md.Message.Body) < maxlen {
maxlen = len(md.Message.Body)
}
ss := string(md.Message.Body[0:maxlen])
ll.Printf("%stag:%s connsess:%s payload body:%s\n",
exampid, tag, conn.Session(),
ss)
}
// time.Sleep(3 * time.Second) // A very arbitrary number
// time.Sleep(500 * time.Millisecond) // A very arbitrary number
runtime.Gosched()
time.Sleep(1500 * time.Millisecond) // A very arbitrary number
runtime.Gosched()
if ackMode != "auto" {
sngecomm.HandleAck(conn, md.Message.Headers, id)
ll.Printf("%stag:%s connsess:%s ack_complete s:%d id:%s mc:%d\n",
exampid, ltag, conn.Session(),
s, id, mc)
}
runtime.Gosched()
}
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:62,代码来源:recv_mds.go
示例16: runSender
/*
runSender sends all messages to a specified queue.
*/
func runSender(conn *stompngo.Connection, qns string) {
ltag := tag + "-runsender"
d := sngecomm.Dest() + "." + qns
id := stompngo.Uuid() // A unique sender id
ll.Printf("%stag:%s connsess:%s start id:%s dest:%s\n",
exampid, ltag, conn.Session(),
id, d)
wh := stompngo.Headers{"destination", d, "senderId", id,
"qnum", qns} // basic send Headers
if senv.Persistent() {
wh = wh.Add("persistent", "true")
}
tmr := time.NewTimer(100 * time.Hour)
nmsgs := senv.Nmsgs()
for mc := 1; mc <= nmsgs; mc++ {
sh := append(wh, "msgnum", fmt.Sprintf("%d", mc))
// Generate a message to send ...............
ll.Printf("%stag:%s connsess:%s send id:%s qns:%s mc:%d\n",
exampid, ltag, conn.Session(),
id, qns, mc)
e := conn.Send(sh, string(sngecomm.Partial()))
if e != nil {
ll.Fatalf("%stag:%s connsess:%s send_error qns:%v error:%v",
exampid, ltag, conn.Session(),
qns, e.Error()) // Handle this ......
}
if mc == nmsgs {
break
}
if sw {
dt := time.Duration(sngecomm.ValueBetween(min, max, sf))
ll.Printf("%stag:%s connsess:%s send_stagger dt:%v qns:%s mc:%d\n",
exampid, ltag, conn.Session(),
dt, qns, mc)
tmr.Reset(dt)
_ = <-tmr.C
runtime.Gosched()
}
}
ll.Printf("%stag:%s connsess:%s end id:%s dest:%s\n",
exampid, ltag, conn.Session(),
id, d)
//
wgs.Done()
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:49,代码来源:srmgor_1smrconn.go
示例17: ShowStats
// Show connection metrics.
func ShowStats(exampid, tag string, conn *stompngo.Connection) {
r := conn.FramesRead()
br := conn.BytesRead()
w := conn.FramesWritten()
bw := conn.BytesWritten()
s := conn.Running().Seconds()
n := conn.Running().Nanoseconds()
llu.Printf("%stag:%s frame_read_count:%v\n", exampid, tag, r)
llu.Printf("%stag:%s bytes_read:%v\n", exampid, tag, br)
llu.Printf("%stag:%s frame_write_count:%v\n", exampid, tag, w)
llu.Printf("%stag:%s bytes_written:%v\n", exampid, tag, bw)
llu.Printf("%stag:%s current_duration(ns):%v\n", exampid, tag, n)
llu.Printf("%stag:%s current_duration(sec):%20.6f\n", exampid, tag, s)
llu.Printf("%stag:%s frame_reads/sec:%20.6f\n", exampid, tag, float64(r)/s)
llu.Printf("%stag:%s bytes_read/sec:%20.6f\n", exampid, tag, float64(br)/s)
llu.Printf("%stag:%s frame_writes/sec:%20.6f\n", exampid, tag, float64(w)/s)
llu.Printf("%stag:%s bytes_written/sec:%20.6f\n", exampid, tag, float64(bw)/s)
}
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:20,代码来源:utilities.go
示例18: runReceive
/*
runReceive receives all messages from a specified queue.
*/
func runReceive(conn *stompngo.Connection, q int, w *sync.WaitGroup) {
ltag := tag + "-runreceive"
qns := fmt.Sprintf("%d", q) // queue number
id := stompngo.Uuid() // A unique subscription ID
d := sngecomm.Dest() + "." + qns
ll.Printf("%stag:%s connsess:%s starts id:%s qns:%s d:%s\n",
exampid, ltag, conn.Session(),
id, qns, d)
// Subscribe (use common helper)
sc := sngecomm.HandleSubscribe(conn, d, id, sngecomm.AckMode())
ll.Printf("%stag:%s connsess:%s subscribe_done id:%s qns:%s d:%s\n",
exampid, ltag, conn.Session(),
id, qns, d)
//
tmr := time.NewTimer(100 * time.Hour)
pbc := sngecomm.Pbc() // Print byte count
nmsgs := senv.Nmsgs()
// Receive loop
var md stompngo.MessageData
for mc := 1; mc <= nmsgs; mc++ {
ll.Printf("%stag:%s connsess:%s chanchek id:%s qns:%s lensc:%d capsc:%d\n",
exampid, ltag, conn.Session(),
id, qns, len(sc), cap(sc))
select {
case md = <-sc:
case md = <-conn.MessageData:
// Frames RECEIPT or ERROR not expected here
ll.Fatalf("%stag:%s connsess:%s send_error qns:%v md:%v",
exampid, ltag, conn.Session(),
qns, md) // Handle this ......
}
if md.Error != nil {
ll.Fatalf("%stag:%s connsess:%s receive_error qns:%v error:%v\n",
exampid, ltag, conn.Session(),
qns, md.Error)
}
// Process the inbound message .................
ll.Printf("%stag:%s connsess:%s inbound id:%s qns:%s mc:%d\n",
exampid, ltag, conn.Session(),
id, qns, mc)
// Sanity check the message Command, and the queue and message numbers
mns := fmt.Sprintf("%d", mc) // string message number
if md.Message.Command != stompngo.MESSAGE {
ll.Fatalf("%stag:%s connsess:%s bad_frame qns:%s mc:%d md:%v\n",
exampid, ltag, conn.Session(),
qns, mc, md)
}
if !md.Message.Headers.ContainsKV("qnum", qns) || !md.Message.Headers.ContainsKV("msgnum", mns) {
ll.Fatalf("%stag:%s connsess:%s dirty_message qns:%v msgnum:%v md:%v",
exampid, tag, conn.Session(),
qns, mns, md) // Handle this ......
}
sl := len(md.Message.Body)
if pbc > 0 {
sl = pbc
if len(md.Message.Body) < sl {
sl = len(md.Message.Body)
}
}
ll.Printf("%stag:%s connsess:%s runReceive_recv_message id:%s body:%s qns:%s msgnum:%s\n",
exampid, ltag, conn.Session(),
id, string(md.Message.Body[0:sl]), qns,
md.Message.Headers.Value("msgnum"))
// Handle ACKs if needed
if sngecomm.AckMode() != "auto" {
ah := stompngo.Headers{}
sngecomm.HandleAck(conn, ah, id)
}
if mc == nmsgs {
break
}
if rw {
dt := time.Duration(sngecomm.ValueBetween(min, max, rf))
ll.Printf("%stag:%s connsess:%s recv_stagger dt:%v qns:%s mc:%d\n",
exampid, ltag, conn.Session(),
dt, qns, mc)
tmr.Reset(dt)
_ = <-tmr.C
runtime.Gosched()
}
}
// Unsubscribe
sngecomm.HandleUnsubscribe(conn, d, id)
ll.Printf("%stag:%s connsess:%s runRecieve_ends id:%s qns:%s\n",
//.........这里部分代码省略.........
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:101,代码来源:srmgor_1smrconn.go
示例19: recMessages
func recMessages(c *stompngo.Connection, q string) {
var error error
fmt.Printf("Start for q: %s\n", q)
// Receive phase
headers := stompngo.Headers{"destination", q} // no ID here. 1.1 library should provide
fmt.Printf("qhdrs: %v\n", headers)
sc, error := c.Subscribe(headers)
if error != nil {
// Handle error properly
fmt.Printf("sub error: %v\n", error)
}
first := true
firstSub := ""
for input := range sc {
inmsg := string(input.Message.Body)
if printHdrs {
fmt.Println("queue:", q, "Next Receive: ", input.Message.Headers)
}
if printMsgs {
fmt.Println("queue:", q, "Next Receive: ", inmsg)
}
firstSub = input.Message.Headers.Value("subscription")
if first {
if firstSub == "" {
panic("first subscription header is empty")
}
fmt.Println("queue:", q, "FirstSub: ", firstSub)
first = false
} else {
if firstSub != input.Message.Headers.Value("subscription") {
panic(firstSub + " / " + input.Message.Headers.Value("subscription"))
}
}
time.Sleep(1e9 / 100) // Crudely simulate message processing
incrCtl.Lock()
numRecv++
incrCtl.Unlock()
if strings.HasPrefix(inmsg, "***EOF***") {
fmt.Println("queue:", q, "FirstSub:", firstSub, "goteof")
break
}
if !strings.HasPrefix(inmsg, q) {
fmt.Printf("bad prefix: %v, %v\n", q, inmsg)
panic("bad prefix ....")
}
// Poll for adhoc errors
select {
case v := <-c.MessageData:
fmt.Printf("frameError: %v\n", v.Message)
fmt.Printf("frameError: [%v] [%v]\n", q, firstSub)
default:
fmt.Println("Nothing to show")
}
}
uh := stompngo.Headers{"id", firstSub,
"destination", q}
error = c.Unsubscribe(uh)
if error != nil {
log.Fatalf("unsub error: %v\n", error)
}
wg.Done()
}
开发者ID:ra,项目名称:go-samp,代码行数:66,代码来源:receivernid.go
示例20: runNextQueue
func runNextQueue(qn int, conn *stompngo.Connection) {
qns := fmt.Sprintf("%d", qn) // string number of the queue
conn.SetLogger(ll) // stompngo logging
pbc := sngecomm.Pbc() // Print byte count
d := senv.Dest() + qns // Destination
id := stompngo.Uuid() // A unique name/id
nmsgs := qn // int number of messages to get, same as queue number
am := sngecomm.AckMode() // ACK mode to use on SUBSCRIBE
nfa := true // Need "final" ACK (possiby reset below)
wh := stompngo.Headers{} // Starting SUBSCRIBE headers
// Sanity check ACK mode
if conn.Protocol() == stompngo.SPL_10 &&
am == stompngo.AckModeClientIndividual {
ll.Fatalf("%stag:%s connsess:%s invalid_ack_mode am:%v proto:%v\n",
exampid, tag, session,
am, conn.Protocol()) //
}
// Do not do final ACK if running ACKs are issued
if am == stompngo.AckModeClientIndividual ||
am == stompngo.AckModeAuto {
nfa = false
}
// Show run parameters
ll.Printf("%stag:%s connsess:%s run_parms\n\tqns:%v\n\tpbc:%v\n\td:%v\n\tid:%v\n\tnmsgs:%v\n\tam:%v\n\tnfa:%v\n\twh:%v\n",
exampid, tag, session,
qns, pbc, d, id, nmsgs, am, nfa, wh)
// Run SUBSCRIBE
sc := doSubscribe(conn, d, id, am, wh)
ll.Printf("%stag:%s connsess:%s stomp_subscribe_complete\n",
exampid, tag, session)
var md stompngo.MessageData // Message data from basic read
var lmd stompngo.MessageData // Possible save (copy) of received data
mc := 1 // Initial message number
// Loop for the requested number of messages
GetLoop:
for {
ll.Printf("%stag:%s connsess:%s start_of_read_loop mc:%v nmsgs:%v\n",
exampid, tag, session, mc, nmsgs)
mcs := fmt.Sprintf("%d", mc) // string number message count
// Get something from the stompngo read routine
select {
case md = <-sc:
case md = <-conn.MessageData:
//
if md.Message.Command == stompngo.RECEIPT {
ll.Printf("%stag:%s connsess:%s have_receipt md:%v\n",
exampid, tag, session,
md)
continue GetLoop
}
ll.Fatalf("%stag:%s connsess:%s ERROR_frame hdrs:%v body:%v\n",
exampid, tag, session,
md.Message.Headers, string(md.Message.Body)) // Handle this ......
}
// Save message data for possible use in the final ACK
if mc == nmsgs && nfa {
lmd = md // Save last message
}
// Basic loop logging
ll.Printf("%stag:%s connsess:%s channel_read_complete qn:%d mc:%d\n",
exampid, tag, session,
qn, mc)
ll.Printf("%stag:%s connsess:%s message_number:%v\n",
exampid, tag, session,
mc)
// Check if reader returned any error
if md.Error != nil {
ll.Fatalf("%stag:%s connsess:%s error_read error:%v",
exampid, tag, session,
md.Error) // Handle this ......
}
// Show frame type
ll.Printf("%stag:%s connsess:%s frame_type cmd:%s\n",
exampid, tag, session,
md.Message.Command)
// Pure sanity check: this should *never* happen based on logic
// above.
if md.Message.Command != stompngo.MESSAGE {
ll.Fatalf("%stag:%s connsess:%s error_frame_type md:%v",
exampid, tag, session,
md) // Handle this ......
}
// Show Message Headers
wh := md.Message.Headers
for j := 0; j < len(wh)-1; j += 2 {
ll.Printf("%stag:%s connsess:%s Header:%s:%s\n",
//.........这里部分代码省略.........
开发者ID:gmallard,项目名称:stompngo_examples,代码行数:101,代码来源:noPMod1.go
注:本文中的github.com/gmallard/stompngo.Connection类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论