Make go routine wait until result from rabbitMQ is sent
I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.
The problem I am facing is, the response never wait til all result back from the queue. Here is the code:
func main()
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
defer wg.Done()
()
wg.Wait()
log.Println("Result", resArr)
rabbitSend method:
func rabbitSend(queueName string, msg int)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
rabbitReceive method:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
close(resCh)
()
return resCh
Here is what I get when I run the program:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]
What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]
I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)
go rabbitmq synchronization goroutine
add a comment |
I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.
The problem I am facing is, the response never wait til all result back from the queue. Here is the code:
func main()
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
defer wg.Done()
()
wg.Wait()
log.Println("Result", resArr)
rabbitSend method:
func rabbitSend(queueName string, msg int)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
rabbitReceive method:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
close(resCh)
()
return resCh
Here is what I get when I run the program:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]
What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]
I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)
go rabbitmq synchronization goroutine
here,msgs, err := ch.Consume(
what is the type ofmsgs
?
– nightfury1204
Nov 12 '18 at 6:08
@nightfury1204 it's<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21
add a comment |
I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.
The problem I am facing is, the response never wait til all result back from the queue. Here is the code:
func main()
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
defer wg.Done()
()
wg.Wait()
log.Println("Result", resArr)
rabbitSend method:
func rabbitSend(queueName string, msg int)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
rabbitReceive method:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
close(resCh)
()
return resCh
Here is what I get when I run the program:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]
What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]
I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)
go rabbitmq synchronization goroutine
I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.
The problem I am facing is, the response never wait til all result back from the queue. Here is the code:
func main()
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))
defer wg.Done()
()
wg.Wait()
log.Println("Result", resArr)
rabbitSend method:
func rabbitSend(queueName string, msg int)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")
rabbitReceive method:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
close(resCh)
()
return resCh
Here is what I get when I run the program:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]
What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:
2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]
I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)
go rabbitmq synchronization goroutine
go rabbitmq synchronization goroutine
edited Dec 22 '18 at 15:14
marc_s
571k12811031251
571k12811031251
asked Nov 11 '18 at 22:17
Kris MP
523921
523921
here,msgs, err := ch.Consume(
what is the type ofmsgs
?
– nightfury1204
Nov 12 '18 at 6:08
@nightfury1204 it's<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21
add a comment |
here,msgs, err := ch.Consume(
what is the type ofmsgs
?
– nightfury1204
Nov 12 '18 at 6:08
@nightfury1204 it's<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21
here,
msgs, err := ch.Consume(
what is the type of msgs
?– nightfury1204
Nov 12 '18 at 6:08
here,
msgs, err := ch.Consume(
what is the type of msgs
?– nightfury1204
Nov 12 '18 at 6:08
@nightfury1204 it's
<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21
@nightfury1204 it's
<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21
add a comment |
2 Answers
2
active
oldest
votes
Modify your func rabbitReceive(queueName string) <-chan string
as below:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh
The reason previous code cause you problem was defer ch.Close()
. ch
closes before response was written to resCh
.
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
add a comment |
following up on @nightfury1204 great answer, you are indeed closing ch
before writing to resCh
. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
msgs, err := ch.Consume(
heremsgs
is derived fromch
, so ifch
doesn't close it, thenfor d := range msgs
this will be blocking. ifmsgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait onRESULT
for an incoming message. We can add another goroutine to act as a signal handler and closech
in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
2 Answers
2
active
oldest
votes
2 Answers
2
active
oldest
votes
active
oldest
votes
active
oldest
votes
Modify your func rabbitReceive(queueName string) <-chan string
as below:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh
The reason previous code cause you problem was defer ch.Close()
. ch
closes before response was written to resCh
.
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
add a comment |
Modify your func rabbitReceive(queueName string) <-chan string
as below:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh
The reason previous code cause you problem was defer ch.Close()
. ch
closes before response was written to resCh
.
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
add a comment |
Modify your func rabbitReceive(queueName string) <-chan string
as below:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh
The reason previous code cause you problem was defer ch.Close()
. ch
closes before response was written to resCh
.
Modify your func rabbitReceive(queueName string) <-chan string
as below:
func rabbitReceive(queueName string) <-chan string
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh
The reason previous code cause you problem was defer ch.Close()
. ch
closes before response was written to resCh
.
answered Nov 12 '18 at 9:25
nightfury1204
1,43048
1,43048
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
add a comment |
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18
add a comment |
following up on @nightfury1204 great answer, you are indeed closing ch
before writing to resCh
. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
msgs, err := ch.Consume(
heremsgs
is derived fromch
, so ifch
doesn't close it, thenfor d := range msgs
this will be blocking. ifmsgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait onRESULT
for an incoming message. We can add another goroutine to act as a signal handler and closech
in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29
add a comment |
following up on @nightfury1204 great answer, you are indeed closing ch
before writing to resCh
. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
msgs, err := ch.Consume(
heremsgs
is derived fromch
, so ifch
doesn't close it, thenfor d := range msgs
this will be blocking. ifmsgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait onRESULT
for an incoming message. We can add another goroutine to act as a signal handler and closech
in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29
add a comment |
following up on @nightfury1204 great answer, you are indeed closing ch
before writing to resCh
. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
following up on @nightfury1204 great answer, you are indeed closing ch
before writing to resCh
. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
answered Nov 12 '18 at 10:22
eladm26
387314
387314
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
msgs, err := ch.Consume(
heremsgs
is derived fromch
, so ifch
doesn't close it, thenfor d := range msgs
this will be blocking. ifmsgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait onRESULT
for an incoming message. We can add another goroutine to act as a signal handler and closech
in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29
add a comment |
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
msgs, err := ch.Consume(
heremsgs
is derived fromch
, so ifch
doesn't close it, thenfor d := range msgs
this will be blocking. ifmsgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait onRESULT
for an incoming message. We can add another goroutine to act as a signal handler and closech
in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21
1
1
msgs, err := ch.Consume(
here msgs
is derived from ch
, so if ch
doesn't close it, then for d := range msgs
this will be blocking. if msgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock– nightfury1204
Nov 13 '18 at 4:22
msgs, err := ch.Consume(
here msgs
is derived from ch
, so if ch
doesn't close it, then for d := range msgs
this will be blocking. if msgs
is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock– nightfury1204
Nov 13 '18 at 4:22
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on
RESULT
for an incoming message. We can add another goroutine to act as a signal handler and close ch
in case of receiving a SIGKILL or some other signal.– eladm26
Nov 13 '18 at 13:29
@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on
RESULT
for an incoming message. We can add another goroutine to act as a signal handler and close ch
in case of receiving a SIGKILL or some other signal.– eladm26
Nov 13 '18 at 13:29
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Some of your past answers have not been well-received, and you're in danger of being blocked from answering.
Please pay close attention to the following guidance:
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
here,
msgs, err := ch.Consume(
what is the type ofmsgs
?– nightfury1204
Nov 12 '18 at 6:08
@nightfury1204 it's
<-chan Delivery
– Kris MP
Nov 12 '18 at 8:21