1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::io;
use std::marker;
use BindServer;
use super::Pipeline;
use super::lift::{LiftBind, LiftTransport};
use simple::LiftProto;
use streaming::{self, Message};
use streaming::pipeline::StreamingPipeline;
use tokio_core::reactor::Handle;
use tokio_service::Service;
use futures::{stream, Stream, Sink, Future, IntoFuture, Poll};
type MyStream<E> = stream::Empty<(), E>;
pub trait ServerProto<T: 'static>: 'static {
type Request: 'static;
type Response: 'static;
type Transport: 'static +
Stream<Item = Self::Request, Error = io::Error> +
Sink<SinkItem = Self::Response, SinkError = io::Error>;
type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport;
}
impl<T: 'static, P: ServerProto<T>> BindServer<Pipeline, T> for P {
type ServiceRequest = P::Request;
type ServiceResponse = P::Response;
type ServiceError = io::Error;
fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
where S: Service<Request = Self::ServiceRequest,
Response = Self::ServiceResponse,
Error = io::Error> + 'static
{
BindServer::<StreamingPipeline<MyStream<io::Error>>, T>::bind_server(
LiftProto::from_ref(self), handle, io, LiftService(service)
)
}
}
impl<T, P> streaming::pipeline::ServerProto<T> for LiftProto<P> where
T: 'static, P: ServerProto<T>
{
type Request = P::Request;
type RequestBody = ();
type Response = P::Response;
type ResponseBody = ();
type Error = io::Error;
type Transport = LiftTransport<P::Transport, io::Error>;
type BindTransport = LiftBind<T, <P::BindTransport as IntoFuture>::Future, io::Error>;
fn bind_transport(&self, io: T) -> Self::BindTransport {
LiftBind::lift(ServerProto::bind_transport(self.lower(), io).into_future())
}
}
struct LiftService<S>(S);
impl<S: Service> Service for LiftService<S> {
type Request = streaming::Message<S::Request, streaming::Body<(), S::Error>>;
type Response = streaming::Message<S::Response, MyStream<S::Error>>;
type Error = S::Error;
type Future = LiftFuture<S::Future, stream::Empty<(), S::Error>>;
fn call(&self, req: Self::Request) -> Self::Future {
match req {
Message::WithoutBody(msg) => {
LiftFuture(self.0.call(msg), marker::PhantomData)
}
Message::WithBody(..) => panic!("bodies not supported"),
}
}
}
struct LiftFuture<F, T>(F, marker::PhantomData<fn() -> T>);
impl<F: Future, T> Future for LiftFuture<F, T> {
type Item = Message<F::Item, T>;
type Error = F::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let item = try_ready!(self.0.poll());
Ok(Message::WithoutBody(item).into())
}
}