kanidm_unix_common/
client_sync.rs

1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::json_codec::JsonCodec;
3use crate::unix_proto::{ClientRequest, ClientResponse};
4use bytes::BytesMut;
5use std::error::Error;
6use std::io::{self, Read, Write};
7use std::time::{Duration, SystemTime};
8use tokio_util::codec::{Decoder, Encoder};
9
10pub use std::os::unix::net::UnixStream;
11
12type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
13
14pub struct DaemonClientBlocking {
15    stream: UnixStream,
16    codec: ClientCodec,
17    default_timeout: u64,
18}
19
20impl From<UnixStream> for DaemonClientBlocking {
21    fn from(stream: UnixStream) -> Self {
22        DaemonClientBlocking {
23            stream,
24            codec: ClientCodec::default(),
25            default_timeout: DEFAULT_CONN_TIMEOUT,
26        }
27    }
28}
29
30impl DaemonClientBlocking {
31    pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
32        debug!(%path);
33
34        let stream = UnixStream::connect(path)
35            .map_err(|e| {
36                error!(
37                    "Unix socket stream setup error while connecting to {} -> {:?}",
38                    path, e
39                );
40                e
41            })
42            .map_err(Box::new)?;
43
44        Ok(DaemonClientBlocking {
45            stream,
46            codec: ClientCodec::default(),
47            default_timeout,
48        })
49    }
50
51    pub fn call_and_wait(
52        &mut self,
53        req: ClientRequest,
54        timeout: Option<u64>,
55    ) -> Result<ClientResponse, Box<dyn Error>> {
56        let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
57
58        let mut data = BytesMut::new();
59
60        self.codec.encode(req, &mut data).map_err(Box::new)?;
61
62        match self.stream.set_read_timeout(Some(timeout)) {
63            Ok(()) => {}
64            Err(e) => {
65                error!(
66                    "Unix socket stream setup error while setting read timeout -> {:?}",
67                    e
68                );
69                return Err(Box::new(e));
70            }
71        };
72        match self.stream.set_write_timeout(Some(timeout)) {
73            Ok(()) => {}
74            Err(e) => {
75                error!(
76                    "Unix socket stream setup error while setting write timeout -> {:?}",
77                    e
78                );
79                return Err(Box::new(e));
80            }
81        };
82
83        self.stream
84            .write_all(&data)
85            .and_then(|_| self.stream.flush())
86            .map_err(|e| {
87                error!("stream write error -> {:?}", e);
88                e
89            })
90            .map_err(Box::new)?;
91
92        // Now wait on the response.
93        data.clear();
94        let start = SystemTime::now();
95        let mut read_started = false;
96
97        loop {
98            let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
99            if durr > timeout {
100                error!("Socket timeout");
101                // timed out, not enough activity.
102                return Err(Box::new(io::Error::other("Timeout")));
103            }
104
105            let mut buffer = [0; 1024];
106
107            // Would be a lot easier if we had peek ...
108            // https://github.com/rust-lang/rust/issues/76923
109            match self.stream.read(&mut buffer) {
110                Ok(0) => {
111                    if read_started {
112                        debug!("read_started true, no bytes read");
113                        // We're done, no more bytes. This will now
114                        // fall through to the codec decode to double
115                        // check this assertion.
116                    } else {
117                        debug!("Waiting ...");
118                        // Still can wait ...
119                        continue;
120                    }
121                }
122                Ok(count) => {
123                    read_started = true;
124                    data.extend_from_slice(&buffer[..count]);
125                }
126                Err(e) => {
127                    error!("Stream read failure from {:?} -> {:?}", &self.stream, e);
128                    // Failure!
129                    return Err(Box::new(e));
130                }
131            }
132
133            match self.codec.decode(&mut data) {
134                Ok(Some(cr)) => return Ok(cr),
135                // Need more data
136                Ok(None) => continue,
137                Err(e) => return Err(Box::new(e)),
138            }
139        }
140    }
141}