kanidm_unix_common/
client.rs

1use std::error::Error;
2use std::io::{Error as IoError, ErrorKind};
3
4use bytes::{BufMut, BytesMut};
5use futures::{SinkExt, StreamExt};
6use tokio::net::UnixStream;
7// use tokio::runtime::Builder;
8use tokio::time::{self, Duration};
9use tokio_util::codec::Framed;
10use tokio_util::codec::{Decoder, Encoder};
11
12use crate::unix_proto::{ClientRequest, ClientResponse};
13
14struct ClientCodec;
15
16impl Decoder for ClientCodec {
17    type Error = IoError;
18    type Item = ClientResponse;
19
20    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
21        match serde_json::from_slice::<ClientResponse>(src) {
22            Ok(msg) => {
23                // Clear the buffer for the next message.
24                src.clear();
25                Ok(Some(msg))
26            }
27            _ => Ok(None),
28        }
29    }
30}
31
32impl Encoder<&ClientRequest> for ClientCodec {
33    type Error = IoError;
34
35    fn encode(&mut self, msg: &ClientRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
36        let data = serde_json::to_vec(msg).map_err(|e| {
37            error!("socket encoding error -> {:?}", e);
38            IoError::new(ErrorKind::Other, "JSON encode error")
39        })?;
40        debug!("Attempting to send request -> {}", msg.as_safe_string());
41        dst.put(data.as_slice());
42        Ok(())
43    }
44}
45
46impl ClientCodec {
47    fn new() -> Self {
48        ClientCodec
49    }
50}
51
52pub struct DaemonClient {
53    req_stream: Framed<UnixStream, ClientCodec>,
54    default_timeout: u64,
55}
56
57impl DaemonClient {
58    pub async fn new(path: &str, default_timeout: u64) -> Result<Self, Box<dyn Error>> {
59        trace!(?path);
60        let stream = UnixStream::connect(path).await.inspect_err(|e| {
61            error!(
62                "Unix socket stream setup error while connecting to {} -> {:?}",
63                path, e
64            );
65        })?;
66
67        let req_stream = Framed::new(stream, ClientCodec::new());
68
69        trace!("connected");
70
71        Ok(DaemonClient {
72            req_stream,
73            default_timeout,
74        })
75    }
76
77    async fn call_inner(&mut self, req: &ClientRequest) -> Result<ClientResponse, Box<dyn Error>> {
78        self.req_stream.send(req).await?;
79        self.req_stream.flush().await?;
80        trace!("flushed, waiting ...");
81        match self.req_stream.next().await {
82            Some(Ok(res)) => {
83                debug!("Response -> {:?}", res);
84                Ok(res)
85            }
86            _ => {
87                error!("Error making request to kanidm_unixd");
88                Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")))
89            }
90        }
91    }
92
93    pub async fn call(
94        &mut self,
95        req: &ClientRequest,
96        timeout: Option<u64>,
97    ) -> Result<ClientResponse, Box<dyn Error>> {
98        let sleep = time::sleep(Duration::from_secs(timeout.unwrap_or(self.default_timeout)));
99        tokio::pin!(sleep);
100
101        tokio::select! {
102            _ = &mut sleep => {
103                error!(?timeout, "Timed out making request to kanidm_unixd");
104                Err(Box::new(IoError::new(ErrorKind::Other, "timeout")))
105            }
106            res = self.call_inner(req) => {
107                res
108            }
109        }
110    }
111}