kanidm_unix_common/
client.rs

1use futures::{SinkExt, StreamExt};
2use std::error::Error;
3use std::io::Error as IoError;
4use tokio::net::UnixStream;
5// use tokio::runtime::Builder;
6use crate::json_codec::JsonCodec;
7use crate::unix_proto::{ClientRequest, ClientResponse};
8use tokio::time::{self, Duration};
9use tokio_util::codec::Framed;
10
11type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
12
13pub struct DaemonClient {
14    req_stream: Framed<UnixStream, ClientCodec>,
15    default_timeout: u64,
16}
17
18impl DaemonClient {
19    pub async fn new(path: &str, default_timeout: u64) -> Result<Self, Box<dyn Error>> {
20        trace!(?path);
21        let stream = UnixStream::connect(path).await.inspect_err(|e| {
22            error!(
23                "Unix socket stream setup error while connecting to {} -> {:?}",
24                path, e
25            );
26        })?;
27
28        let req_stream = Framed::new(stream, ClientCodec::default());
29
30        trace!("connected");
31
32        Ok(DaemonClient {
33            req_stream,
34            default_timeout,
35        })
36    }
37
38    async fn call_inner(&mut self, req: ClientRequest) -> Result<ClientResponse, Box<dyn Error>> {
39        self.req_stream.send(req).await?;
40        self.req_stream.flush().await?;
41        trace!("flushed, waiting ...");
42        match self.req_stream.next().await {
43            Some(Ok(res)) => {
44                debug!("Response -> {:?}", res);
45                Ok(res)
46            }
47            _ => {
48                error!("Error making request to kanidm_unixd");
49                Err(Box::new(IoError::other("oh no!")))
50            }
51        }
52    }
53
54    pub async fn call(
55        &mut self,
56        req: ClientRequest,
57        timeout: Option<u64>,
58    ) -> Result<ClientResponse, Box<dyn Error>> {
59        let sleep = time::sleep(Duration::from_secs(timeout.unwrap_or(self.default_timeout)));
60        tokio::pin!(sleep);
61
62        tokio::select! {
63            _ = &mut sleep => {
64                error!(?timeout, "Timed out making request to kanidm_unixd");
65                Err(Box::new(IoError::other("timeout")))
66            }
67            res = self.call_inner(req) => {
68                res
69            }
70        }
71    }
72}