1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
use core::mem; use {Future, IntoFuture, Async, Poll}; use stream::Stream; /// Creates a `Stream` from a seed and a closure returning a `Future`. /// /// This function is the dual for the `Stream::fold()` adapter: while /// `Stream::fold()` reduces a `Stream` to one single value, `unfold()` creates a /// `Stream` from a seed value. /// /// `unfold()` will call the provided closure with the provided seed, then wait /// for the returned `Future` to complete with `(a, b)`. It will then yield the /// value `a`, and use `b` as the next internal state. /// /// If the closure returns `None` instead of `Some(Future)`, then the `unfold()` /// will stop producing items and return `Ok(Async::Ready(None))` in future /// calls to `poll()`. /// /// In case of error generated by the returned `Future`, the error will be /// returned by the `Stream`. The `Stream` will then yield /// `Ok(Async::Ready(None))` in future calls to `poll()`. /// /// This function can typically be used when wanting to go from the "world of /// futures" to the "world of streams": the provided closure can build a /// `Future` using other library functions working on futures, and `unfold()` /// will turn it into a `Stream` by repeating the operation. /// /// # Example /// /// ```rust /// use futures::stream::{self, Stream}; /// use futures::future::{self, Future}; /// /// let mut stream = stream::unfold(0, |state| { /// if state <= 2 { /// let next_state = state + 1; /// let yielded = state * 2; /// let fut = future::ok::<_, u32>((yielded, next_state)); /// Some(fut) /// } else { /// None /// } /// }); /// /// let result = stream.collect().wait(); /// assert_eq!(result, Ok(vec![0, 2, 4])); /// ``` pub fn unfold<T, F, Fut, It>(init: T, f: F) -> Unfold<T, F, Fut> where F: FnMut(T) -> Option<Fut>, Fut: IntoFuture<Item = (It, T)>, { Unfold { f: f, state: State::Ready(init), } } /// A stream which creates futures, polls them and return their result /// /// This stream is returned by the `futures::stream::unfold` method #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Unfold<T, F, Fut> where Fut: IntoFuture { f: F, state: State<T, Fut::Future>, } impl <T, F, Fut, It> Stream for Unfold<T, F, Fut> where F: FnMut(T) -> Option<Fut>, Fut: IntoFuture<Item = (It, T)>, { type Item = It; type Error = Fut::Error; fn poll(&mut self) -> Poll<Option<It>, Fut::Error> { loop { match mem::replace(&mut self.state, State::Empty) { // State::Empty may happen if the future returned an error State::Empty => { return Ok(Async::Ready(None)); } State::Ready(state) => { match (self.f)(state) { Some(fut) => { self.state = State::Processing(fut.into_future()); } None => { return Ok(Async::Ready(None)); } } } State::Processing(mut fut) => { match fut.poll()? { Async:: Ready((item, next_state)) => { self.state = State::Ready(next_state); return Ok(Async::Ready(Some(item))); } Async::NotReady => { self.state = State::Processing(fut); return Ok(Async::NotReady); } } } } } } } #[derive(Debug)] enum State<T, F> where F: Future { /// Placeholder state when doing work, or when the returned Future generated an error Empty, /// Ready to generate new future; current internal state is the `T` Ready(T), /// Working on a future generated previously Processing(F), }