kanidm_unix_common/
client_sync.rs1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::json_codec::JsonCodec;
3use crate::unix_proto::{ClientRequest, ClientResponse};
4use bytes::BytesMut;
5use std::error::Error;
6use std::io::{self, ErrorKind, Read, Write};
7use std::time::{Duration, Instant};
8use tokio_util::codec::{Decoder, Encoder};
9
10pub use std::os::unix::net::UnixStream;
11
12type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
13
14pub struct DaemonClientBlocking {
15 stream: UnixStream,
16 codec: ClientCodec,
17 default_timeout: u64,
18}
19
20impl From<UnixStream> for DaemonClientBlocking {
21 fn from(stream: UnixStream) -> Self {
22 DaemonClientBlocking {
23 stream,
24 codec: ClientCodec::default(),
25 default_timeout: DEFAULT_CONN_TIMEOUT,
26 }
27 }
28}
29
30impl DaemonClientBlocking {
31 pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
32 if cfg!(feature = "client_sync_tracing") {
34 use tracing_subscriber::prelude::*;
35 use tracing_subscriber::{filter::LevelFilter, fmt};
36
37 let fmt_layer = fmt::layer().with_target(false);
38 let filter_layer = LevelFilter::WARN;
39
40 let _ = tracing_subscriber::registry()
41 .with(filter_layer)
42 .with(fmt_layer)
43 .try_init();
44 }
45
46 trace!(%path);
47
48 let stream = UnixStream::connect(path).map_err(|err| {
49 error!(
50 ?err, %path,
51 "Unix socket stream setup error",
52 );
53 Box::new(err)
54 })?;
55
56 Ok(DaemonClientBlocking {
57 stream,
58 codec: ClientCodec::default(),
59 default_timeout,
60 })
61 }
62
63 pub fn call_and_wait(
64 &mut self,
65 req: ClientRequest,
66 timeout: Option<u64>,
67 ) -> Result<ClientResponse, Box<dyn Error>> {
68 let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
69
70 self.stream
71 .set_write_timeout(Some(timeout))
72 .map_err(|err| {
73 error!(
74 ?err,
75 "Unix socket stream setup error while setting write timeout",
76 );
77 Box::new(err)
78 })?;
79
80 self.stream.set_nonblocking(false).map_err(|err| {
82 error!(
83 ?err,
84 "Unix socket stream setup error while setting nonblocking=false",
85 );
86 Box::new(err)
87 })?;
88
89 let mut data = BytesMut::new();
90
91 self.codec.encode(req, &mut data).map_err(|err| {
92 error!(?err, "codec encode error");
93 Box::new(err)
94 })?;
95
96 self.stream
97 .write_all(&data)
98 .and_then(|_| self.stream.flush())
99 .map_err(|err| {
100 error!(?err, "stream write error");
101 Box::new(err)
102 })?;
103
104 self.stream.set_read_timeout(Some(timeout)).map_err(|err| {
106 error!(
107 ?err,
108 "Unix socket stream setup error while setting read timeout",
109 );
110 Box::new(err)
111 })?;
112
113 self.stream.set_nonblocking(false).map_err(|err| {
115 error!(
116 ?err,
117 "Unix socket stream setup error while setting nonblocking=false",
118 );
119 Box::new(err)
120 })?;
121
122 trace!(read_timeout = ?self.stream.read_timeout(), write_timeout = ?self.stream.write_timeout());
123
124 data.clear();
126 let start = Instant::now();
127 let mut read_started = false;
128
129 loop {
130 trace!("read loop");
131 let durr = Instant::now().duration_since(start);
132 if durr > timeout {
133 error!("Socket timeout");
134 return Err(Box::new(io::Error::other("Timeout")));
136 }
137
138 let mut buffer = [0; 16 * 1024];
139
140 match self.stream.read(&mut buffer) {
143 Ok(0) => {
144 if read_started {
145 trace!("read_started true, no bytes read");
146 } else {
150 trace!("Waiting ...");
151 continue;
153 }
154 }
155 Ok(count) => {
156 read_started = true;
157 trace!("read {count} bytes");
158 data.extend_from_slice(&buffer[..count]);
159 if count == buffer.len() {
160 continue;
162 }
163 }
165 Err(err) if err.kind() == ErrorKind::WouldBlock => {
166 warn!("read from UDS would block, try again.");
167 continue;
169 }
170 Err(err) => {
171 error!(?err, err_kind = ?err.kind(), "Stream read failure from {:?}", &self.stream);
172 return Err(Box::new(err));
174 }
175 }
176
177 match self.codec.decode(&mut data) {
178 Ok(Some(cr)) => {
180 trace!("read loop - ok");
181 return Ok(cr);
182 }
183 Ok(None) => {
185 trace!("need more");
186 continue;
187 }
188 Err(err) => {
190 error!(?err, "failed to decode response");
191 return Err(Box::new(err));
192 }
193 }
194 }
195 }
196}