kanidm_unix_common/
client.rs1use futures::{SinkExt, StreamExt};
2use std::error::Error;
3use std::io::Error as IoError;
4use tokio::net::UnixStream;
5use crate::json_codec::JsonCodec;
7use crate::unix_proto::{ClientRequest, ClientResponse};
8use tokio::time::{self, Duration};
9use tokio_util::codec::Framed;
10
11type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
12
13pub struct DaemonClient {
14 req_stream: Framed<UnixStream, ClientCodec>,
15 default_timeout: u64,
16}
17
18impl DaemonClient {
19 pub async fn new(path: &str, default_timeout: u64) -> Result<Self, Box<dyn Error>> {
20 trace!(?path);
21 let stream = UnixStream::connect(path).await.inspect_err(|e| {
22 error!(
23 "Unix socket stream setup error while connecting to {} -> {:?}",
24 path, e
25 );
26 })?;
27
28 let req_stream = Framed::new(stream, ClientCodec::default());
29
30 trace!("connected");
31
32 Ok(DaemonClient {
33 req_stream,
34 default_timeout,
35 })
36 }
37
38 async fn call_inner(&mut self, req: ClientRequest) -> Result<ClientResponse, Box<dyn Error>> {
39 self.req_stream.send(req).await?;
40 self.req_stream.flush().await?;
41 trace!("flushed, waiting ...");
42 match self.req_stream.next().await {
43 Some(Ok(res)) => {
44 debug!("Response -> {:?}", res);
45 Ok(res)
46 }
47 _ => {
48 error!("Error making request to kanidm_unixd");
49 Err(Box::new(IoError::other("oh no!")))
50 }
51 }
52 }
53
54 pub async fn call(
55 &mut self,
56 req: ClientRequest,
57 timeout: Option<u64>,
58 ) -> Result<ClientResponse, Box<dyn Error>> {
59 let sleep = time::sleep(Duration::from_secs(timeout.unwrap_or(self.default_timeout)));
60 tokio::pin!(sleep);
61
62 tokio::select! {
63 _ = &mut sleep => {
64 error!(?timeout, "Timed out making request to kanidm_unixd");
65 Err(Box::new(IoError::other("timeout")))
66 }
67 res = self.call_inner(req) => {
68 res
69 }
70 }
71 }
72}