kanidm_unix_common/
client_sync.rs

1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::json_codec::JsonCodec;
3use crate::unix_proto::{ClientRequest, ClientResponse};
4use bytes::BytesMut;
5use std::error::Error;
6use std::io::{self, ErrorKind, Read, Write};
7use std::time::{Duration, Instant};
8use tokio_util::codec::{Decoder, Encoder};
9
10pub use std::os::unix::net::UnixStream;
11
12type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
13
14pub struct DaemonClientBlocking {
15    stream: UnixStream,
16    codec: ClientCodec,
17    default_timeout: u64,
18}
19
20impl From<UnixStream> for DaemonClientBlocking {
21    fn from(stream: UnixStream) -> Self {
22        DaemonClientBlocking {
23            stream,
24            codec: ClientCodec::default(),
25            default_timeout: DEFAULT_CONN_TIMEOUT,
26        }
27    }
28}
29
30impl DaemonClientBlocking {
31    pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
32        // Setup a subscriber incase one isn't setup.
33        if cfg!(feature = "client_sync_tracing") {
34            use tracing_subscriber::prelude::*;
35            use tracing_subscriber::{filter::LevelFilter, fmt};
36
37            let fmt_layer = fmt::layer().with_target(false);
38            let filter_layer = LevelFilter::WARN;
39
40            let _ = tracing_subscriber::registry()
41                .with(filter_layer)
42                .with(fmt_layer)
43                .try_init();
44        }
45
46        trace!(%path);
47
48        let stream = UnixStream::connect(path).map_err(|err| {
49            error!(
50                ?err, %path,
51                "Unix socket stream setup error",
52            );
53            Box::new(err)
54        })?;
55
56        Ok(DaemonClientBlocking {
57            stream,
58            codec: ClientCodec::default(),
59            default_timeout,
60        })
61    }
62
63    pub fn call_and_wait(
64        &mut self,
65        req: ClientRequest,
66        timeout: Option<u64>,
67    ) -> Result<ClientResponse, Box<dyn Error>> {
68        let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
69
70        self.stream
71            .set_write_timeout(Some(timeout))
72            .map_err(|err| {
73                error!(
74                    ?err,
75                    "Unix socket stream setup error while setting write timeout",
76                );
77                Box::new(err)
78            })?;
79
80        // We want this to be blocking so that we wait for data to be ready
81        self.stream.set_nonblocking(false).map_err(|err| {
82            error!(
83                ?err,
84                "Unix socket stream setup error while setting nonblocking=false",
85            );
86            Box::new(err)
87        })?;
88
89        let mut data = BytesMut::new();
90
91        self.codec.encode(req, &mut data).map_err(|err| {
92            error!(?err, "codec encode error");
93            Box::new(err)
94        })?;
95
96        self.stream
97            .write_all(&data)
98            .and_then(|_| self.stream.flush())
99            .map_err(|err| {
100                error!(?err, "stream write error");
101                Box::new(err)
102            })?;
103
104        // Set our read timeout
105        self.stream.set_read_timeout(Some(timeout)).map_err(|err| {
106            error!(
107                ?err,
108                "Unix socket stream setup error while setting read timeout",
109            );
110            Box::new(err)
111        })?;
112
113        // We want this to be blocking so that we wait for data to be ready
114        self.stream.set_nonblocking(false).map_err(|err| {
115            error!(
116                ?err,
117                "Unix socket stream setup error while setting nonblocking=false",
118            );
119            Box::new(err)
120        })?;
121
122        trace!(read_timeout = ?self.stream.read_timeout(), write_timeout = ?self.stream.write_timeout());
123
124        // Now wait on the response.
125        data.clear();
126        let start = Instant::now();
127        let mut read_started = false;
128
129        loop {
130            trace!("read loop");
131            let durr = Instant::now().duration_since(start);
132            if durr > timeout {
133                error!("Socket timeout");
134                // timed out, not enough activity.
135                return Err(Box::new(io::Error::other("Timeout")));
136            }
137
138            let mut buffer = [0; 16 * 1024];
139
140            // Would be a lot easier if we had peek ...
141            // https://github.com/rust-lang/rust/issues/76923
142            match self.stream.read(&mut buffer) {
143                Ok(0) => {
144                    if read_started {
145                        trace!("read_started true, no bytes read");
146                        // We're done, no more bytes. This will now
147                        // fall through to the codec decode to double
148                        // check this assertion.
149                    } else {
150                        trace!("Waiting ...");
151                        // Still can wait ...
152                        continue;
153                    }
154                }
155                Ok(count) => {
156                    read_started = true;
157                    trace!("read {count} bytes");
158                    data.extend_from_slice(&buffer[..count]);
159                    if count == buffer.len() {
160                        // Whole buffer, read again
161                        continue;
162                    }
163                    // Not a whole buffer, probably complete.
164                }
165                Err(err) if err.kind() == ErrorKind::WouldBlock => {
166                    warn!("read from UDS would block, try again.");
167                    // std::thread::sleep(Duration::from_millis(1));
168                    continue;
169                }
170                Err(err) => {
171                    error!(?err, err_kind = ?err.kind(), "Stream read failure from {:?}", &self.stream);
172                    // Failure!
173                    return Err(Box::new(err));
174                }
175            }
176
177            match self.codec.decode(&mut data) {
178                // A whole frame is ready and present.
179                Ok(Some(cr)) => {
180                    trace!("read loop - ok");
181                    return Ok(cr);
182                }
183                // Need more data
184                Ok(None) => {
185                    trace!("need more");
186                    continue;
187                }
188                // Failed to decode for some reason
189                Err(err) => {
190                    error!(?err, "failed to decode response");
191                    return Err(Box::new(err));
192                }
193            }
194        }
195    }
196}