本文整理汇总了Golang中github.com/bitly/nsq/nsq.NewMessage函数的典型用法代码示例。如果您正苦于以下问题:Golang NewMessage函数的具体用法?Golang NewMessage怎么用?Golang NewMessage使用的例子?那么恭喜您, 这里精选的函数代码示例或许可以为您提供帮助。
在下文中一共展示了NewMessage函数的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Golang代码示例。
示例1: TestChannelEmptyConsumer
func TestChannelEmptyConsumer(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
tcpAddr, _, nsqd := mustStartNSQd(NewNsqdOptions())
defer nsqd.Exit()
conn, _ := mustConnectNSQd(tcpAddr)
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
client := NewClientV2(0, conn, &Context{nsqd})
client.SetReadyCount(25)
channel.AddClient(client.ID, client)
for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, 0)
client.SendingMessage()
}
for _, cl := range channel.clients {
stats := cl.Stats()
assert.Equal(t, stats.InFlightCount, int64(25))
}
channel.Empty()
for _, cl := range channel.clients {
stats := cl.Stats()
assert.Equal(t, stats.InFlightCount, int64(0))
}
}
开发者ID:newsky,项目名称:nsq,代码行数:34,代码来源:channel_test.go
示例2: TestBasicV2
// exercise the basic operations of the V2 protocol
func TestBasicV2(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
options := NewNsqdOptions()
options.clientTimeout = 60 * time.Second
tcpAddr, _ := mustStartNSQd(options)
defer nsqd.Exit()
topicName := "test_v2" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test body"))
topic.PutMessage(msg)
conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)
identify(t, conn)
sub(t, conn, topicName, "ch")
err = nsq.Ready(1).Write(conn)
assert.Equal(t, err, nil)
resp, err := nsq.ReadResponse(conn)
assert.Equal(t, err, nil)
frameType, data, err := nsq.UnpackResponse(resp)
msgOut, _ := nsq.DecodeMessage(data)
assert.Equal(t, frameType, nsq.FrameTypeMessage)
assert.Equal(t, msgOut.Id, msg.Id)
assert.Equal(t, msgOut.Body, msg.Body)
assert.Equal(t, msgOut.Attempts, uint16(1))
}
开发者ID:datastream,项目名称:nsq,代码行数:33,代码来源:protocol_v2_test.go
示例3: TestChannelEmpty
func TestChannelEmpty(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()
topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
msgs := make([]*nsq.Message, 0, 25)
for i := 0; i < 25; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, 0)
msgs = append(msgs, msg)
}
channel.RequeueMessage(0, msgs[len(msgs)-1].Id, 100*time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 24)
assert.Equal(t, len(channel.inFlightPQ), 24)
assert.Equal(t, len(channel.deferredMessages), 1)
assert.Equal(t, len(channel.deferredPQ), 1)
channel.Empty()
assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
assert.Equal(t, len(channel.deferredMessages), 0)
assert.Equal(t, len(channel.deferredPQ), 0)
assert.Equal(t, channel.Depth(), int64(0))
}
开发者ID:newsky,项目名称:nsq,代码行数:32,代码来源:channel_test.go
示例4: TestInFlightWorker
func TestInFlightWorker(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
options := NewNsqdOptions()
options.msgTimeout = 200 * time.Millisecond
nsqd := NewNSQd(1, options)
defer nsqd.Exit()
topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
for i := 0; i < 1000; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, 0)
}
assert.Equal(t, len(channel.inFlightMessages), 1000)
assert.Equal(t, len(channel.inFlightPQ), 1000)
// the in flight worker has a resolution of 100ms so we need to wait
// at least that much longer than our msgTimeout (in worst case)
time.Sleep(options.msgTimeout + 100*time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
}
开发者ID:newsky,项目名称:nsq,代码行数:28,代码来源:channel_test.go
示例5: putHandler
func putHandler(w http.ResponseWriter, req *http.Request) {
reqParams, err := util.NewReqParams(req)
if err != nil {
log.Printf("ERROR: failed to parse request params - %s", err.Error())
util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
return
}
topicName, err := reqParams.Get("topic")
if err != nil {
util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
return
}
if !nsq.IsValidTopicName(topicName) {
util.ApiResponse(w, 500, "INVALID_ARG_TOPIC", nil)
return
}
if int64(len(reqParams.Body)) > nsqd.options.maxMessageSize {
util.ApiResponse(w, 500, "MSG_TOO_BIG", nil)
return
}
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, reqParams.Body)
err = topic.PutMessage(msg)
if err != nil {
util.ApiResponse(w, 500, "NOK", nil)
return
}
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
}
开发者ID:jmanero,项目名称:nsq,代码行数:35,代码来源:http.go
示例6: TestInFlightWorker
func TestInFlightWorker(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
options := NewNsqdOptions()
options.msgTimeout = 300 * time.Millisecond
nsqd = NewNSQd(1, options)
defer nsqd.Exit()
topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel := topic.GetChannel("channel")
for i := 0; i < 1000; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test"))
channel.StartInFlightTimeout(msg, NewClientV2(nil))
}
assert.Equal(t, len(channel.inFlightMessages), 1000)
assert.Equal(t, len(channel.inFlightPQ), 1000)
time.Sleep(350 * time.Millisecond)
assert.Equal(t, len(channel.inFlightMessages), 0)
assert.Equal(t, len(channel.inFlightPQ), 0)
}
开发者ID:jmanero,项目名称:nsq,代码行数:26,代码来源:channel_test.go
示例7: BenchmarkTopicToChannelPut
func BenchmarkTopicToChannelPut(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
topicName := "bench_topic_to_channel_put" + strconv.Itoa(b.N)
channelName := "bench"
options := NewNsqdOptions()
options.memQueueSize = int64(b.N)
nsqd := NewNSQd(1, options)
defer nsqd.Exit()
channel := nsqd.GetTopic(topicName).GetChannel(channelName)
b.StartTimer()
for i := 0; i <= b.N; i++ {
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
topic.PutMessage(msg)
}
for {
if len(channel.memoryMsgChan) == b.N {
break
}
runtime.Gosched()
}
}
开发者ID:johnvilsack,项目名称:golang-stuff,代码行数:26,代码来源:topic_test.go
示例8: TestStats
func TestStats(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
options := NewNsqdOptions()
tcpAddr, _, nsqd := mustStartNSQd(options)
defer nsqd.Exit()
topicName := "test_stats" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test body"))
topic.PutMessage(msg)
conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)
identify(t, conn)
sub(t, conn, topicName, "ch")
stats := nsqd.getStats()
assert.Equal(t, len(stats), 1)
assert.Equal(t, len(stats[0].Channels), 1)
assert.Equal(t, len(stats[0].Channels[0].Clients), 1)
log.Printf("stats: %+v", stats)
}
开发者ID:newsky,项目名称:nsq,代码行数:25,代码来源:stats_test.go
示例9: messagePump
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *nsq.Message
var buf []byte
var err error
for {
// do an extra check for exit before we select on all the memory/backend/exitChan
// this solves the case where we are closed and something else is writing into
// backend. we don't want to reverse that
if atomic.LoadInt32(&t.exitFlag) == 1 {
goto exit
}
select {
case msg = <-t.memoryMsgChan:
case buf = <-t.backend.ReadChan():
msg, err = nsq.DecodeMessage(buf)
if err != nil {
log.Printf("ERROR: failed to decode message - %s", err.Error())
continue
}
case <-t.exitChan:
goto exit
}
t.RLock()
// check if all the channels have been deleted
if len(t.channelMap) == 0 {
// put this message back on the queue
// we need to background because we currently hold the lock
go func() {
t.PutMessage(msg)
}()
// reset the sync.Once
t.messagePumpStarter = new(sync.Once)
t.RUnlock()
goto exit
}
for _, channel := range t.channelMap {
// copy the message because each channel
// needs a unique instance
chanMsg := nsq.NewMessage(msg.Id, msg.Body)
chanMsg.Timestamp = msg.Timestamp
err := channel.PutMessage(chanMsg)
if err != nil {
log.Printf("TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.Id, channel.name, err.Error())
}
}
t.RUnlock()
}
exit:
log.Printf("TOPIC(%s): closing ... messagePump", t.name)
}
开发者ID:jmanero,项目名称:nsq,代码行数:59,代码来源:topic.go
示例10: BenchmarkTopicPut
func BenchmarkTopicPut(b *testing.B) {
b.StopTimer()
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
topicName := "bench_topic_put" + strconv.Itoa(b.N)
options := NewNsqdOptions()
options.memQueueSize = int64(b.N)
nsqd := NewNSQd(1, options)
defer nsqd.Exit()
b.StartTimer()
for i := 0; i <= b.N; i++ {
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
topic.PutMessage(msg)
}
}
开发者ID:johnvilsack,项目名称:golang-stuff,代码行数:17,代码来源:topic_test.go
示例11: TestMultipleConsumerV2
func TestMultipleConsumerV2(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
msgChan := make(chan *nsq.Message)
options := NewNsqdOptions()
options.clientTimeout = 60 * time.Second
tcpAddr, _ := mustStartNSQd(options)
defer nsqd.Exit()
topicName := "test_multiple_v2" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, []byte("test body"))
topic.GetChannel("ch1")
topic.GetChannel("ch2")
topic.PutMessage(msg)
for _, i := range []string{"1", "2"} {
conn, err := mustConnectNSQd(tcpAddr)
assert.Equal(t, err, nil)
identify(t, conn)
sub(t, conn, topicName, "ch"+i)
err = nsq.Ready(1).Write(conn)
assert.Equal(t, err, nil)
go func(c net.Conn) {
resp, _ := nsq.ReadResponse(c)
_, data, _ := nsq.UnpackResponse(resp)
msg, _ := nsq.DecodeMessage(data)
msgChan <- msg
}(conn)
}
msgOut := <-msgChan
assert.Equal(t, msgOut.Id, msg.Id)
assert.Equal(t, msgOut.Body, msg.Body)
assert.Equal(t, msgOut.Attempts, uint16(1))
msgOut = <-msgChan
assert.Equal(t, msgOut.Id, msg.Id)
assert.Equal(t, msgOut.Body, msg.Body)
assert.Equal(t, msgOut.Attempts, uint16(1))
}
开发者ID:datastream,项目名称:nsq,代码行数:45,代码来源:protocol_v2_test.go
示例12: mputHandler
func (s *httpServer) mputHandler(w http.ResponseWriter, req *http.Request) {
if req.Method != "POST" {
util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
return
}
reqParams, err := util.NewReqParams(req)
if err != nil {
log.Printf("ERROR: failed to parse request params - %s", err.Error())
util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
return
}
topicName, err := reqParams.Get("topic")
if err != nil {
util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
return
}
if !nsq.IsValidTopicName(topicName) {
util.ApiResponse(w, 500, "INVALID_ARG_TOPIC", nil)
return
}
topic := s.context.nsqd.GetTopic(topicName)
for _, block := range bytes.Split(reqParams.Body, []byte("\n")) {
if len(block) != 0 {
if int64(len(reqParams.Body)) > s.context.nsqd.options.maxMessageSize {
util.ApiResponse(w, 500, "MSG_TOO_BIG", nil)
return
}
msg := nsq.NewMessage(<-s.context.nsqd.idChan, block)
err := topic.PutMessage(msg)
if err != nil {
util.ApiResponse(w, 500, "NOK", nil)
return
}
}
}
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
}
开发者ID:Eric-Chen,项目名称:nsq,代码行数:44,代码来源:http.go
示例13: TestPutMessage
// ensure that we can push a message through a topic and get it out of a channel
func TestPutMessage(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()
topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)
channel1 := topic.GetChannel("ch")
var id nsq.MessageID
msg := nsq.NewMessage(id, []byte("test"))
topic.PutMessage(msg)
outputMsg := <-channel1.clientMsgChan
assert.Equal(t, msg.Id, outputMsg.Id)
assert.Equal(t, msg.Body, outputMsg.Body)
}
开发者ID:newsky,项目名称:nsq,代码行数:20,代码来源:channel_test.go
示例14: TestEphemeralChannel
func TestEphemeralChannel(t *testing.T) {
// a normal channel sticks around after clients disconnect; an ephemeral channel is
// lazily removed after the last client disconnects
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
options := NewNsqdOptions()
options.memQueueSize = 100
_, _, nsqd := mustStartNSQd(options)
topicName := "ephemeral_test" + strconv.Itoa(int(time.Now().Unix()))
doneExitChan := make(chan int)
exitChan := make(chan int)
go func() {
<-exitChan
nsqd.Exit()
doneExitChan <- 1
}()
body := []byte("an_ephemeral_message")
topic := nsqd.GetTopic(topicName)
ephemeralChannel := topic.GetChannel("ch1#ephemeral")
client := NewClientV2(0, nil, &Context{nsqd})
ephemeralChannel.AddClient(client.ID, client)
msg := nsq.NewMessage(<-nsqd.idChan, body)
topic.PutMessage(msg)
msg = <-ephemeralChannel.clientMsgChan
assert.Equal(t, msg.Body, body)
log.Printf("pulling from channel")
ephemeralChannel.RemoveClient(client.ID)
time.Sleep(50 * time.Millisecond)
assert.Equal(t, len(topic.channelMap), 0)
exitChan <- 1
<-doneExitChan
}
开发者ID:newsky,项目名称:nsq,代码行数:40,代码来源:nsqd_test.go
示例15: TestDeleteLast
func TestDeleteLast(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
nsqd := NewNSQd(1, NewNsqdOptions())
defer nsqd.Exit()
topic := nsqd.GetTopic("test")
channel1 := topic.GetChannel("ch1")
assert.NotEqual(t, nil, channel1)
err := topic.DeleteExistingChannel("ch1")
assert.Equal(t, nil, err)
assert.Equal(t, 0, len(topic.channelMap))
msg := nsq.NewMessage(<-nsqd.idChan, []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
err = topic.PutMessage(msg)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, nil, err)
assert.Equal(t, topic.Depth(), int64(1))
}
开发者ID:johnvilsack,项目名称:golang-stuff,代码行数:22,代码来源:topic_test.go
示例16: PUB
func (p *ProtocolV2) PUB(client *ClientV2, params [][]byte) ([]byte, error) {
var err error
var bodyLen int32
if len(params) < 2 {
return nil, nsq.NewFatalClientErr(nil, "E_INVALID", "PUB insufficient number of parameters")
}
topicName := string(params[1])
if !nsq.IsValidTopicName(topicName) {
return nil, nsq.NewFatalClientErr(nil, "E_BAD_TOPIC",
fmt.Sprintf("PUB topic name '%s' is not valid", topicName))
}
err = binary.Read(client.Reader, binary.BigEndian, &bodyLen)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body size")
}
if int64(bodyLen) > nsqd.options.maxMessageSize {
return nil, nsq.NewFatalClientErr(nil, "E_BAD_MESSAGE",
fmt.Sprintf("PUB message too big %d > %d", bodyLen, nsqd.options.maxMessageSize))
}
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
}
topic := nsqd.GetTopic(topicName)
msg := nsq.NewMessage(<-nsqd.idChan, messageBody)
err = topic.PutMessage(msg)
if err != nil {
return nil, nsq.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}
return []byte("OK"), nil
}
开发者ID:jmanero,项目名称:nsq,代码行数:39,代码来源:protocol_v2.go
示例17: messagePump
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *nsq.Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *nsq.Message
var backendChan chan []byte
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
for {
select {
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = nsq.DecodeMessage(buf)
if err != nil {
log.Printf("ERROR: failed to decode message - %s", err.Error())
continue
}
case <-t.channelUpdateChan:
chans = make([]*Channel, 0)
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = nsq.NewMessage(msg.Id, msg.Body)
chanMsg.Timestamp = msg.Timestamp
}
err := channel.PutMessage(chanMsg)
if err != nil {
log.Printf("TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.Id, channel.name, err.Error())
}
}
}
exit:
log.Printf("TOPIC(%s): closing ... messagePump", t.name)
}
开发者ID:newsky,项目名称:nsq,代码行数:69,代码来源:topic.go
示例18: messagePump
// messagePump selects over the in-memory and backend queue and
// writes messages to every channel for this topic
func (t *Topic) messagePump() {
var msg *nsq.Message
var buf []byte
var err error
var chans []*Channel
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
for {
select {
case msg = <-t.memoryMsgChan:
case buf = <-t.backend.ReadChan():
msg, err = nsq.DecodeMessage(buf)
if err != nil {
log.Printf("ERROR: failed to decode message - %s", err.Error())
continue
}
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
continue
case <-t.exitChan:
goto exit
}
// check if all the channels have been deleted
if len(chans) == 0 {
// put this message back on the queue
t.PutMessage(msg)
// reset the sync.Once
t.messagePumpStarter = new(sync.Once)
goto exit
}
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = nsq.NewMessage(msg.Id, msg.Body)
chanMsg.Timestamp = msg.Timestamp
}
err := channel.PutMessage(chanMsg)
if err != nil {
log.Printf("TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s", t.name, msg.Id, channel.name, err.Error())
}
}
}
exit:
log.Printf("TOPIC(%s): closing ... messagePump", t.name)
}
开发者ID:kiloboot,项目名称:nsq,代码行数:64,代码来源:topic.go
示例19: mputHandler
func (s *httpServer) mputHandler(w http.ResponseWriter, req *http.Request) {
var msgs []*nsq.Message
var exit bool
if req.Method != "POST" {
util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
return
}
reqParams, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
log.Printf("ERROR: failed to parse request params - %s", err.Error())
util.ApiResponse(w, 500, "INVALID_REQUEST", nil)
}
topicNames, ok := reqParams["topic"]
if !ok {
util.ApiResponse(w, 500, "MISSING_ARG_TOPIC", nil)
return
}
topicName := topicNames[0]
if !nsq.IsValidTopicName(topicName) {
util.ApiResponse(w, 500, "INVALID_ARG_TOPIC", nil)
return
}
topic := s.context.nsqd.GetTopic(topicName)
rdr := bufio.NewReader(req.Body)
for !exit {
block, err := rdr.ReadBytes('\n')
if err != nil {
if err != io.EOF {
util.ApiResponse(w, 500, "INTERNAL_ERROR", nil)
return
}
exit = true
}
if len(block) > 0 && block[len(block)-1] == '\n' {
block = block[:len(block)-1]
}
if int64(len(block)) > s.context.nsqd.options.maxMessageSize {
util.ApiResponse(w, 500, "MSG_TOO_BIG", nil)
return
}
msg := nsq.NewMessage(<-s.context.nsqd.idChan, block)
msgs = append(msgs, msg)
}
err = topic.PutMessages(msgs)
if err != nil {
util.ApiResponse(w, 500, "NOK", nil)
return
}
w.Header().Set("Content-Length", "2")
io.WriteString(w, "OK")
}
开发者ID:newsky,项目名称:nsq,代码行数:61,代码来源:http.go
示例20: TestStartup
func TestStartup(t *testing.T) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stdout)
iterations := 300
doneExitChan := make(chan int)
options := NewNsqdOptions()
options.memQueueSize = 100
options.maxBytesPerFile = 10240
mustStartNSQd(options)
topicName := "nsqd_test" + strconv.Itoa(int(time.Now().Unix()))
exitChan := make(chan int)
go func() {
<-exitChan
nsqd.Exit()
doneExitChan <- 1
}()
body := make([]byte, 256)
topic := nsqd.GetTopic(topicName)
for i := 0; i < iterations; i++ {
msg := nsq.NewMessage(<-nsqd.idChan, body)
topic.PutMessage(msg)
}
log.Printf("pulling from channel")
channel1 := topic.GetChannel("ch1")
log.Printf("read %d msgs", iterations/2)
for i := 0; i < iterations/2; i++ {
msg := <-channel1.clientMsgChan
log.Printf("read message %d", i+1)
assert.Equal(t, msg.Body, body)
}
for {
if channel1.Depth() == int64(iterations/2) {
break
}
time.Sleep(50 * time.Millisecond)
}
exitChan <- 1
<-doneExitChan
// start up a new nsqd w/ the same folder
options = NewNsqdOptions()
options.memQueueSize = 100
options.maxBytesPerFile = 10240
mustStartNSQd(options)
go func() {
<-exitChan
nsqd.Exit()
doneExitChan <- 1
}()
topic = nsqd.GetTopic(topicName)
// should be empty; channel should have drained everything
count := topic.Depth()
assert.Equal(t, count, int64(0))
channel1 = topic.GetChannel("ch1")
chan_count := channel1.Depth()
assert.Equal(t, chan_count, int64(iterations/2))
// read the other half of the messages
for i := 0; i < iterations/2; i++ {
msg := <-channel1.clientMsgChan
log.Printf("read message %d", i+1)
assert.Equal(t, msg.Body, body)
}
// verify we drained things
assert.Equal(t, len(topic.memoryMsgChan), 0)
assert.Equal(t, topic.backend.Depth(), int64(0))
exitChan <- 1
<-doneExitChan
}
开发者ID:kiloboot,项目名称:nsq,代码行数:85,代码来源:nsqd_test.go
注:本文中的github.com/bitly/nsq/nsq.NewMessage函数示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论