kanidm_unix_common/
client_sync.rs

1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::unix_proto::{ClientRequest, ClientResponse};
3use std::error::Error;
4use std::io::{Error as IoError, ErrorKind, Read, Write};
5use std::time::{Duration, SystemTime};
6
7pub use std::os::unix::net::UnixStream;
8
9pub struct DaemonClientBlocking {
10    stream: UnixStream,
11    default_timeout: u64,
12}
13
14impl From<UnixStream> for DaemonClientBlocking {
15    fn from(stream: UnixStream) -> Self {
16        DaemonClientBlocking {
17            stream,
18            default_timeout: DEFAULT_CONN_TIMEOUT,
19        }
20    }
21}
22
23impl DaemonClientBlocking {
24    pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
25        debug!(%path);
26
27        let stream = UnixStream::connect(path)
28            .map_err(|e| {
29                error!(
30                    "Unix socket stream setup error while connecting to {} -> {:?}",
31                    path, e
32                );
33                e
34            })
35            .map_err(Box::new)?;
36
37        Ok(DaemonClientBlocking {
38            stream,
39            default_timeout,
40        })
41    }
42
43    pub fn call_and_wait(
44        &mut self,
45        req: &ClientRequest,
46        timeout: Option<u64>,
47    ) -> Result<ClientResponse, Box<dyn Error>> {
48        let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
49
50        let data = serde_json::to_vec(&req).map_err(|e| {
51            error!("socket encoding error -> {:?}", e);
52            Box::new(IoError::new(ErrorKind::Other, "JSON encode error"))
53        })?;
54
55        match self.stream.set_read_timeout(Some(timeout)) {
56            Ok(()) => {}
57            Err(e) => {
58                error!(
59                    "Unix socket stream setup error while setting read timeout -> {:?}",
60                    e
61                );
62                return Err(Box::new(e));
63            }
64        };
65        match self.stream.set_write_timeout(Some(timeout)) {
66            Ok(()) => {}
67            Err(e) => {
68                error!(
69                    "Unix socket stream setup error while setting write timeout -> {:?}",
70                    e
71                );
72                return Err(Box::new(e));
73            }
74        };
75
76        self.stream
77            .write_all(data.as_slice())
78            .and_then(|_| self.stream.flush())
79            .map_err(|e| {
80                error!("stream write error -> {:?}", e);
81                e
82            })
83            .map_err(Box::new)?;
84
85        // Now wait on the response.
86        let start = SystemTime::now();
87        let mut read_started = false;
88        let mut data = Vec::with_capacity(1024);
89        let mut counter = 0;
90
91        loop {
92            let mut buffer = [0; 1024];
93            let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
94            if durr > timeout {
95                error!("Socket timeout");
96                // timed out, not enough activity.
97                break;
98            }
99            // Would be a lot easier if we had peek ...
100            // https://github.com/rust-lang/rust/issues/76923
101            match self.stream.read(&mut buffer) {
102                Ok(0) => {
103                    if read_started {
104                        debug!("read_started true, we have completed");
105                        // We're done, no more bytes.
106                        break;
107                    } else {
108                        debug!("Waiting ...");
109                        // Still can wait ...
110                        continue;
111                    }
112                }
113                Ok(count) => {
114                    data.extend_from_slice(&buffer);
115                    counter += count;
116                    if count == 1024 {
117                        debug!("Filled 1024 bytes, looping ...");
118                        // We have filled the buffer, we need to copy and loop again.
119                        read_started = true;
120                        continue;
121                    } else {
122                        debug!("Filled {} bytes, complete", count);
123                        // We have a partial read, so we are complete.
124                        break;
125                    }
126                }
127                Err(e) => {
128                    error!("Stream read failure from {:?} -> {:?}", &self.stream, e);
129                    // Failure!
130                    return Err(Box::new(e));
131                }
132            }
133        }
134
135        // Extend from slice fills with 0's, so we need to truncate now.
136        data.truncate(counter);
137
138        // Now attempt to decode.
139        let cr = serde_json::from_slice::<ClientResponse>(data.as_slice()).map_err(|e| {
140            error!("socket encoding error -> {:?}", e);
141            Box::new(IoError::new(ErrorKind::Other, "JSON decode error"))
142        })?;
143
144        Ok(cr)
145    }
146}