How can I implement a blocking queue mechanism with futures::sync::mpsc::channel?










1















I am trying to understand how futures::sync::mpsc::Receiver works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.



I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.



What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.



What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel has its own documentation, but I do not understand how to use it properly.



extern crate futures;
extern crate tokio_core;

use std::thread, time;

use futures::sync::mpsc;
use futures::Future, Sink, Stream;

use tokio_core::reactor::Core;

#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,


fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);

match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,

println!("stats = :?", stats);

Ok(())
);

core.run(f2).expect("Core failed to run");










share|improve this question
























  • Where does futures::done come from? It's not a part of the current version of the futures crate.

    – Shepmaster
    Nov 13 '18 at 3:05















1















I am trying to understand how futures::sync::mpsc::Receiver works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.



I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.



What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.



What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel has its own documentation, but I do not understand how to use it properly.



extern crate futures;
extern crate tokio_core;

use std::thread, time;

use futures::sync::mpsc;
use futures::Future, Sink, Stream;

use tokio_core::reactor::Core;

#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,


fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);

match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,

println!("stats = :?", stats);

Ok(())
);

core.run(f2).expect("Core failed to run");










share|improve this question
























  • Where does futures::done come from? It's not a part of the current version of the futures crate.

    – Shepmaster
    Nov 13 '18 at 3:05













1












1








1








I am trying to understand how futures::sync::mpsc::Receiver works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.



I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.



What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.



What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel has its own documentation, but I do not understand how to use it properly.



extern crate futures;
extern crate tokio_core;

use std::thread, time;

use futures::sync::mpsc;
use futures::Future, Sink, Stream;

use tokio_core::reactor::Core;

#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,


fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);

match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,

println!("stats = :?", stats);

Ok(())
);

core.run(f2).expect("Core failed to run");










share|improve this question
















I am trying to understand how futures::sync::mpsc::Receiver works. In the below example, the receiver thread sleeps for two seconds and the sender sends every second.



I expect that the sender will need to be blocked because of the wait and then send when the buffer is released.



What I see instead is that it is deadlocked after a time. Increasing the buffer of the channel only extends the time until it is blocked.



What should I do to make the sender send data when the buffer is available and put some backpressure to the sender in such cases? futures::sync::mpsc::channel has its own documentation, but I do not understand how to use it properly.



extern crate futures;
extern crate tokio_core;

use std::thread, time;

use futures::sync::mpsc;
use futures::Future, Sink, Stream;

use tokio_core::reactor::Core;

#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,


fn main()
println!("Received");
let delay = time::Duration::from_secs(2);
thread::sleep(delay);

match res
Ok(_) => stats.success += 1,
Err(_) => stats.failure += 1,

println!("stats = :?", stats);

Ok(())
);

core.run(f2).expect("Core failed to run");







rust future rust-tokio






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 13 '18 at 3:04









Shepmaster

150k13291429




150k13291429










asked Nov 12 '18 at 14:11









Akiner AlkanAkiner Alkan

973223




973223












  • Where does futures::done come from? It's not a part of the current version of the futures crate.

    – Shepmaster
    Nov 13 '18 at 3:05

















  • Where does futures::done come from? It's not a part of the current version of the futures crate.

    – Shepmaster
    Nov 13 '18 at 3:05
















Where does futures::done come from? It's not a part of the current version of the futures crate.

– Shepmaster
Nov 13 '18 at 3:05





Where does futures::done come from? It's not a part of the current version of the futures crate.

– Shepmaster
Nov 13 '18 at 3:05












1 Answer
1






active

oldest

votes


















2














  1. Never call wait inside of a future. That's blocking, and blocking should never be done inside a future.


  2. Never call sleep inside of a future. That's blocking, and blocking should never be done inside a future.


  3. Channel backpressure is implemented by the fact that send consumes the Sender and returns a future. The future yields the Sender back to you when there is room in the queue.


extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11

use futures::future, sync::mpsc, Future, Sink, Stream;
use std::time::Duration;
use tokio::timer::Interval;

#[derive(Debug)]
struct Stats
pub success: usize,
pub failure: usize,


fn main()
let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

tokio::spawn(
tx.send(Ok(())).map_err()
.map(drop) // discard the tx
);

let mut stats = Stats
success: 0,
failure: 0,
;

let i = Interval::new_interval(Duration::from_millis(20))
.map_err());






share|improve this answer






















    Your Answer






    StackExchange.ifUsing("editor", function ()
    StackExchange.using("externalEditor", function ()
    StackExchange.using("snippets", function ()
    StackExchange.snippets.init();
    );
    );
    , "code-snippets");

    StackExchange.ready(function()
    var channelOptions =
    tags: "".split(" "),
    id: "1"
    ;
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function()
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled)
    StackExchange.using("snippets", function()
    createEditor();
    );

    else
    createEditor();

    );

    function createEditor()
    StackExchange.prepareEditor(
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader:
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    ,
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    );



    );













    draft saved

    draft discarded


















    StackExchange.ready(
    function ()
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263942%2fhow-can-i-implement-a-blocking-queue-mechanism-with-futuressyncmpscchannel%23new-answer', 'question_page');

    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    2














    1. Never call wait inside of a future. That's blocking, and blocking should never be done inside a future.


    2. Never call sleep inside of a future. That's blocking, and blocking should never be done inside a future.


    3. Channel backpressure is implemented by the fact that send consumes the Sender and returns a future. The future yields the Sender back to you when there is room in the queue.


    extern crate futures; // 0.1.25
    extern crate tokio; // 0.1.11

    use futures::future, sync::mpsc, Future, Sink, Stream;
    use std::time::Duration;
    use tokio::timer::Interval;

    #[derive(Debug)]
    struct Stats
    pub success: usize,
    pub failure: usize,


    fn main()
    let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

    tokio::spawn(
    tx.send(Ok(())).map_err()
    .map(drop) // discard the tx
    );

    let mut stats = Stats
    success: 0,
    failure: 0,
    ;

    let i = Interval::new_interval(Duration::from_millis(20))
    .map_err());






    share|improve this answer



























      2














      1. Never call wait inside of a future. That's blocking, and blocking should never be done inside a future.


      2. Never call sleep inside of a future. That's blocking, and blocking should never be done inside a future.


      3. Channel backpressure is implemented by the fact that send consumes the Sender and returns a future. The future yields the Sender back to you when there is room in the queue.


      extern crate futures; // 0.1.25
      extern crate tokio; // 0.1.11

      use futures::future, sync::mpsc, Future, Sink, Stream;
      use std::time::Duration;
      use tokio::timer::Interval;

      #[derive(Debug)]
      struct Stats
      pub success: usize,
      pub failure: usize,


      fn main()
      let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

      tokio::spawn(
      tx.send(Ok(())).map_err()
      .map(drop) // discard the tx
      );

      let mut stats = Stats
      success: 0,
      failure: 0,
      ;

      let i = Interval::new_interval(Duration::from_millis(20))
      .map_err());






      share|improve this answer

























        2












        2








        2







        1. Never call wait inside of a future. That's blocking, and blocking should never be done inside a future.


        2. Never call sleep inside of a future. That's blocking, and blocking should never be done inside a future.


        3. Channel backpressure is implemented by the fact that send consumes the Sender and returns a future. The future yields the Sender back to you when there is room in the queue.


        extern crate futures; // 0.1.25
        extern crate tokio; // 0.1.11

        use futures::future, sync::mpsc, Future, Sink, Stream;
        use std::time::Duration;
        use tokio::timer::Interval;

        #[derive(Debug)]
        struct Stats
        pub success: usize,
        pub failure: usize,


        fn main()
        let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

        tokio::spawn(
        tx.send(Ok(())).map_err()
        .map(drop) // discard the tx
        );

        let mut stats = Stats
        success: 0,
        failure: 0,
        ;

        let i = Interval::new_interval(Duration::from_millis(20))
        .map_err());






        share|improve this answer













        1. Never call wait inside of a future. That's blocking, and blocking should never be done inside a future.


        2. Never call sleep inside of a future. That's blocking, and blocking should never be done inside a future.


        3. Channel backpressure is implemented by the fact that send consumes the Sender and returns a future. The future yields the Sender back to you when there is room in the queue.


        extern crate futures; // 0.1.25
        extern crate tokio; // 0.1.11

        use futures::future, sync::mpsc, Future, Sink, Stream;
        use std::time::Duration;
        use tokio::timer::Interval;

        #[derive(Debug)]
        struct Stats
        pub success: usize,
        pub failure: usize,


        fn main()
        let (tx, rx) = mpsc::channel::<Result<(), ()>>(1);

        tokio::spawn(
        tx.send(Ok(())).map_err()
        .map(drop) // discard the tx
        );

        let mut stats = Stats
        success: 0,
        failure: 0,
        ;

        let i = Interval::new_interval(Duration::from_millis(20))
        .map_err());







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 13 '18 at 3:30









        ShepmasterShepmaster

        150k13291429




        150k13291429



























            draft saved

            draft discarded
















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid


            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.

            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function ()
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53263942%2fhow-can-i-implement-a-blocking-queue-mechanism-with-futuressyncmpscchannel%23new-answer', 'question_page');

            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Use pre created SQLite database for Android project in kotlin

            Darth Vader #20

            Ondo