How can I implement a blocking queue mechanism with futures::sync::mpsc::channel?
I am trying to understand how futures::sync::mpsc::Receiver
works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.
I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.
What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.
What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel
has its own documentation, but I do not understand how to use it properly.
extern crate futures;
extern crate tokio_core;
use std::thread, time;
use futures::sync::mpsc;
use futures::Future, Sink, Stream;
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
println!("stats = :?", stats);
Ok(())
);
core.run(f2).expect("Core failed to run");
rust future rust-tokio
add a comment |
I am trying to understand how futures::sync::mpsc::Receiver
works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.
I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.
What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.
What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel
has its own documentation, but I do not understand how to use it properly.
extern crate futures;
extern crate tokio_core;
use std::thread, time;
use futures::sync::mpsc;
use futures::Future, Sink, Stream;
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
println!("stats = :?", stats);
Ok(())
);
core.run(f2).expect("Core failed to run");
rust future rust-tokio
Where doesfutures::done
come from? It's not a part of the current version of the futures crate.
– Shepmaster
Nov 13 '18 at 3:05
add a comment |
I am trying to understand how futures::sync::mpsc::Receiver
works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.
I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.
What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.
What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel
has its own documentation, but I do not understand how to use it properly.
extern crate futures;
extern crate tokio_core;
use std::thread, time;
use futures::sync::mpsc;
use futures::Future, Sink, Stream;
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
println!("stats = :?", stats);
Ok(())
);
core.run(f2).expect("Core failed to run");
rust future rust-tokio
I am trying to understand how futures::sync::mpsc::Receiver
works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.
I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.
What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.
What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel
has its own documentation, but I do not understand how to use it properly.
extern crate futures;
extern crate tokio_core;
use std::thread, time;
use futures::sync::mpsc;
use futures::Future, Sink, Stream;
use tokio_core::reactor::Core;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);
match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,
println!("stats = :?", stats);
Ok(())
);
core.run(f2).expect("Core failed to run");
rust future rust-tokio
rust future rust-tokio
edited Nov 13 '18 at 3:04
Shepmaster
150k13291429
150k13291429
asked Nov 12 '18 at 14:11
Akiner AlkanAkiner Alkan
973223
973223
Where doesfutures::done
come from? It's not a part of the current version of the futures crate.
– Shepmaster
Nov 13 '18 at 3:05
add a comment |
Where doesfutures::done
come from? It's not a part of the current version of the futures crate.
– Shepmaster
Nov 13 '18 at 3:05
Where does
futures::done
come from? It's not a part of the current version of the futures crate.– Shepmaster
Nov 13 '18 at 3:05
Where does
futures::done
come from? It's not a part of the current version of the futures crate.– Shepmaster
Nov 13 '18 at 3:05
add a comment |
1 Answer
1
active
oldest
votes
Never call
wait
inside of a future. That's blocking, and blocking should never be done inside a future.Never call
sleep
inside of a future. That's blocking, and blocking should never be done inside a future.Channel backpressure is implemented by the fact that
send
consumes theSender
and returns a future. The future yields theSender
back to you when there is room in the queue.
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);
let mut stats = Stats
success: 0,
failure: 0,
;
let i = Interval::new_interval(Duration::from_millis(20))
.map_err());
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263942%2fhow-can-i-implement-a-blocking-queue-mechanism-with-futuressyncmpscchannel%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
Never call
wait
inside of a future. That's blocking, and blocking should never be done inside a future.Never call
sleep
inside of a future. That's blocking, and blocking should never be done inside a future.Channel backpressure is implemented by the fact that
send
consumes theSender
and returns a future. The future yields theSender
back to you when there is room in the queue.
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);
let mut stats = Stats
success: 0,
failure: 0,
;
let i = Interval::new_interval(Duration::from_millis(20))
.map_err());
add a comment |
Never call
wait
inside of a future. That's blocking, and blocking should never be done inside a future.Never call
sleep
inside of a future. That's blocking, and blocking should never be done inside a future.Channel backpressure is implemented by the fact that
send
consumes theSender
and returns a future. The future yields theSender
back to you when there is room in the queue.
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);
let mut stats = Stats
success: 0,
failure: 0,
;
let i = Interval::new_interval(Duration::from_millis(20))
.map_err());
add a comment |
Never call
wait
inside of a future. That's blocking, and blocking should never be done inside a future.Never call
sleep
inside of a future. That's blocking, and blocking should never be done inside a future.Channel backpressure is implemented by the fact that
send
consumes theSender
and returns a future. The future yields theSender
back to you when there is room in the queue.
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);
let mut stats = Stats
success: 0,
failure: 0,
;
let i = Interval::new_interval(Duration::from_millis(20))
.map_err());
Never call
wait
inside of a future. That's blocking, and blocking should never be done inside a future.Never call
sleep
inside of a future. That's blocking, and blocking should never be done inside a future.Channel backpressure is implemented by the fact that
send
consumes theSender
and returns a future. The future yields theSender
back to you when there is room in the queue.
extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11
use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;
#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,
fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);
tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);
let mut stats = Stats
success: 0,
failure: 0,
;
let i = Interval::new_interval(Duration::from_millis(20))
.map_err());
answered Nov 13 '18 at 3:30
ShepmasterShepmaster
150k13291429
150k13291429
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263942%2fhow-can-i-implement-a-blocking-queue-mechanism-with-futuressyncmpscchannel%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Where does
futures::done
come from? It's not a part of the current version of the futures crate.– Shepmaster
Nov 13 '18 at 3:05