kanidm_unix_common/
client.rs

1use bytes::{BufMut, BytesMut};
2use futures::{SinkExt, StreamExt};
3use std::error::Error;
4use std::io::Error as IoError;
5use tokio::net::UnixStream;
6// use tokio::runtime::Builder;
7use tokio::time::{self, Duration};
8use tokio_util::codec::Framed;
9use tokio_util::codec::{Decoder, Encoder};
10
11use crate::unix_proto::{ClientRequest, ClientResponse};
12
13struct ClientCodec;
14
15impl Decoder for ClientCodec {
16    type Error = IoError;
17    type Item = ClientResponse;
18
19    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
20        match serde_json::from_slice::<ClientResponse>(src) {
21            Ok(msg) => {
22                // Clear the buffer for the next message.
23                src.clear();
24                Ok(Some(msg))
25            }
26            _ => Ok(None),
27        }
28    }
29}
30
31impl Encoder<&ClientRequest> for ClientCodec {
32    type Error = IoError;
33
34    fn encode(&mut self, msg: &ClientRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
35        let data = serde_json::to_vec(msg).map_err(|e| {
36            error!("socket encoding error -> {:?}", e);
37            IoError::other("JSON encode error")
38        })?;
39        debug!("Attempting to send request -> {}", msg.as_safe_string());
40        dst.put(data.as_slice());
41        Ok(())
42    }
43}
44
45impl ClientCodec {
46    fn new() -> Self {
47        ClientCodec
48    }
49}
50
51pub struct DaemonClient {
52    req_stream: Framed<UnixStream, ClientCodec>,
53    default_timeout: u64,
54}
55
56impl DaemonClient {
57    pub async fn new(path: &str, default_timeout: u64) -> Result<Self, Box<dyn Error>> {
58        trace!(?path);
59        let stream = UnixStream::connect(path).await.inspect_err(|e| {
60            error!(
61                "Unix socket stream setup error while connecting to {} -> {:?}",
62                path, e
63            );
64        })?;
65
66        let req_stream = Framed::new(stream, ClientCodec::new());
67
68        trace!("connected");
69
70        Ok(DaemonClient {
71            req_stream,
72            default_timeout,
73        })
74    }
75
76    async fn call_inner(&mut self, req: &ClientRequest) -> Result<ClientResponse, Box<dyn Error>> {
77        self.req_stream.send(req).await?;
78        self.req_stream.flush().await?;
79        trace!("flushed, waiting ...");
80        match self.req_stream.next().await {
81            Some(Ok(res)) => {
82                debug!("Response -> {:?}", res);
83                Ok(res)
84            }
85            _ => {
86                error!("Error making request to kanidm_unixd");
87                Err(Box::new(IoError::other("oh no!")))
88            }
89        }
90    }
91
92    pub async fn call(
93        &mut self,
94        req: &ClientRequest,
95        timeout: Option<u64>,
96    ) -> Result<ClientResponse, Box<dyn Error>> {
97        let sleep = time::sleep(Duration::from_secs(timeout.unwrap_or(self.default_timeout)));
98        tokio::pin!(sleep);
99
100        tokio::select! {
101            _ = &mut sleep => {
102                error!(?timeout, "Timed out making request to kanidm_unixd");
103                Err(Box::new(IoError::other("timeout")))
104            }
105            res = self.call_inner(req) => {
106                res
107            }
108        }
109    }
110}