Make go routine wait until result from rabbitMQ is sent










1














I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() 
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))

defer wg.Done()
()
wg.Wait()

log.Println("Result", resArr)



rabbitSend method:



func rabbitSend(queueName string, msg int) 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")



rabbitReceive method:



func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

close(resCh)
()
return resCh



Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)










share|improve this question























  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 '18 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 '18 at 8:21















1














I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() 
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))

defer wg.Done()
()
wg.Wait()

log.Println("Result", resArr)



rabbitSend method:



func rabbitSend(queueName string, msg int) 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")



rabbitReceive method:



func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

close(resCh)
()
return resCh



Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)










share|improve this question























  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 '18 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 '18 at 8:21













1












1








1


1





I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() 
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))

defer wg.Done()
()
wg.Wait()

log.Println("Result", resArr)



rabbitSend method:



func rabbitSend(queueName string, msg int) 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")



rabbitReceive method:



func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

close(resCh)
()
return resCh



Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)










share|improve this question















I am fairly new to Go, I want to make a pipeline that translate every requests I receive by send it to first queue (TEST), and get the final result from the last queue (RESULT) and send it back as a response.



The problem I am facing is, the response never wait til all result back from the queue. Here is the code:



func main() 
requests := int3, 4, 5, 6, 7
var wg sync.WaitGroup
wg.Add(1)
resArr := string
go func()
for _, r := range requests
rabbitSend("TEST", r)
resArr = append(resArr, <-rabbitReceive("RESULT"))

defer wg.Done()
()
wg.Wait()

log.Println("Result", resArr)



rabbitSend method:



func rabbitSend(queueName string, msg int) 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

body, _ := json.Marshal(msg)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing
ContentType: "application/json",
Body: byte(body),
)
log.Printf("[x] Sent %s to %s", body, q.Name)
failOnError(err, "Failed to publish a message")



rabbitReceive method:



func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

close(resCh)
()
return resCh



Here is what I get when I run the program:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 Result [ 9 15 18]


What I want is that, I receive the result exactly after I send the request, so the request will not get the wrong result as a response. Something like:



2018/11/12 05:11:54 [x] Sent 3 to TEST
2018/11/12 05:11:54 Received a message: 9 from RESULT
2018/11/12 05:11:54 [x] Sent 4 to TEST
2018/11/12 05:11:54 Received a message: 12 from RESULT
2018/11/12 05:11:54 [x] Sent 5 to TEST
2018/11/12 05:11:54 Received a message: 15 from RESULT
2018/11/12 05:11:54 [x] Sent 6 to TEST
2018/11/12 05:11:54 Received a message: 18 from RESULT
2018/11/12 05:11:54 [x] Sent 7 to TEST
2018/11/12 05:11:54 Received a message: 21 from RESULT
2018/11/12 05:11:54 Result [ 9 12 15 18 21]


I believe I did not use goroutine or sync.WaitGroup correctly here. Thanks in advance :)







go rabbitmq synchronization goroutine






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Dec 22 '18 at 15:14









marc_s

571k12811031251




571k12811031251










asked Nov 11 '18 at 22:17









Kris MP

523921




523921











  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 '18 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 '18 at 8:21
















  • here, msgs, err := ch.Consume( what is the type of msgs?
    – nightfury1204
    Nov 12 '18 at 6:08










  • @nightfury1204 it's <-chan Delivery
    – Kris MP
    Nov 12 '18 at 8:21















here, msgs, err := ch.Consume( what is the type of msgs?
– nightfury1204
Nov 12 '18 at 6:08




here, msgs, err := ch.Consume( what is the type of msgs?
– nightfury1204
Nov 12 '18 at 6:08












@nightfury1204 it's <-chan Delivery
– Kris MP
Nov 12 '18 at 8:21




@nightfury1204 it's <-chan Delivery
– Kris MP
Nov 12 '18 at 8:21












2 Answers
2






active

oldest

votes


















2














Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh



The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer




















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 '18 at 4:18


















0














following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() 
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

conn.Close()
ch.Close()
close(resCh)
()





share|improve this answer




















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 '18 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 '18 at 4:22











  • I c, thank you for the explanation
    – Kris MP
    Nov 13 '18 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 '18 at 13:29










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























2 Answers
2






active

oldest

votes








2 Answers
2






active

oldest

votes









active

oldest

votes






active

oldest

votes









2














Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh



The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer




















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 '18 at 4:18















2














Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh



The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer




















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 '18 at 4:18













2












2








2






Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh



The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.






share|improve this answer












Modify your func rabbitReceive(queueName string) <-chan string as below:



 func rabbitReceive(queueName string) <-chan string 
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")

ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")

q, err := ch.QueueDeclare(
queueName, // name
true, // durable
false, // delete when usused
false, // exclusive
false, // no-waits
nil, // arguments
)
failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")

resCh := make(chan string)
go func()
d := <-msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)
conn.Close()
ch.Close()
close(resCh)
()
return resCh



The reason previous code cause you problem was defer ch.Close(). ch closes before response was written to resCh.







share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 12 '18 at 9:25









nightfury1204

1,43048




1,43048











  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 '18 at 4:18
















  • thank you for your help, it works now :)
    – Kris MP
    Nov 13 '18 at 4:18















thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18




thank you for your help, it works now :)
– Kris MP
Nov 13 '18 at 4:18













0














following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() 
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

conn.Close()
ch.Close()
close(resCh)
()





share|improve this answer




















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 '18 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 '18 at 4:22











  • I c, thank you for the explanation
    – Kris MP
    Nov 13 '18 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 '18 at 13:29















0














following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() 
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

conn.Close()
ch.Close()
close(resCh)
()





share|improve this answer




















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 '18 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 '18 at 4:22











  • I c, thank you for the explanation
    – Kris MP
    Nov 13 '18 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 '18 at 13:29













0












0








0






following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() 
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

conn.Close()
ch.Close()
close(resCh)
()





share|improve this answer












following up on @nightfury1204 great answer, you are indeed closing ch before writing to resCh. just one thing, in the go routine you want to go over all the messages so a better way to do it will be:



go func() 
for d := range msgs
log.Printf("Received a message: %v from %v", string(d.Body), q.Name)
resCh <- string(d.Body)

conn.Close()
ch.Close()
close(resCh)
()






share|improve this answer












share|improve this answer



share|improve this answer










answered Nov 12 '18 at 10:22









eladm26

387314




387314











  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 '18 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 '18 at 4:22











  • I c, thank you for the explanation
    – Kris MP
    Nov 13 '18 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 '18 at 13:29
















  • Hi thanks, somehow if I try your solution it stuck on the second loop
    – Kris MP
    Nov 13 '18 at 4:21






  • 1




    msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
    – nightfury1204
    Nov 13 '18 at 4:22











  • I c, thank you for the explanation
    – Kris MP
    Nov 13 '18 at 4:27










  • @nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
    – eladm26
    Nov 13 '18 at 13:29















Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21




Hi thanks, somehow if I try your solution it stuck on the second loop
– Kris MP
Nov 13 '18 at 4:21




1




1




msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22





msgs, err := ch.Consume( here msgs is derived from ch, so if ch doesn't close it, then for d := range msgs this will be blocking. if msgs is not closed, then ` ch.Close()` will not be executed. So it will cause deadlock
– nightfury1204
Nov 13 '18 at 4:22













I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27




I c, thank you for the explanation
– Kris MP
Nov 13 '18 at 4:27












@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29




@nightfury1204 it's ok that the goroutine will be blocking, it supposed to wait on RESULT for an incoming message. We can add another goroutine to act as a signal handler and close ch in case of receiving a SIGKILL or some other signal.
– eladm26
Nov 13 '18 at 13:29

















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.





Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


Please pay close attention to the following guidance:


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53253797%2fmake-go-routine-wait-until-result-from-rabbitmq-is-sent%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







Popular posts from this blog

Use pre created SQLite database for Android project in kotlin

Darth Vader #20

Ondo