kanidm_unix_common/
client_sync.rs1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::json_codec::JsonCodec;
3use crate::unix_proto::{ClientRequest, ClientResponse};
4use bytes::BytesMut;
5use std::error::Error;
6use std::io::{self, Read, Write};
7use std::time::{Duration, SystemTime};
8use tokio_util::codec::{Decoder, Encoder};
9
10pub use std::os::unix::net::UnixStream;
11
12type ClientCodec = JsonCodec<ClientResponse, ClientRequest>;
13
14pub struct DaemonClientBlocking {
15 stream: UnixStream,
16 codec: ClientCodec,
17 default_timeout: u64,
18}
19
20impl From<UnixStream> for DaemonClientBlocking {
21 fn from(stream: UnixStream) -> Self {
22 DaemonClientBlocking {
23 stream,
24 codec: ClientCodec::default(),
25 default_timeout: DEFAULT_CONN_TIMEOUT,
26 }
27 }
28}
29
30impl DaemonClientBlocking {
31 pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
32 debug!(%path);
33
34 let stream = UnixStream::connect(path)
35 .map_err(|e| {
36 error!(
37 "Unix socket stream setup error while connecting to {} -> {:?}",
38 path, e
39 );
40 e
41 })
42 .map_err(Box::new)?;
43
44 Ok(DaemonClientBlocking {
45 stream,
46 codec: ClientCodec::default(),
47 default_timeout,
48 })
49 }
50
51 pub fn call_and_wait(
52 &mut self,
53 req: ClientRequest,
54 timeout: Option<u64>,
55 ) -> Result<ClientResponse, Box<dyn Error>> {
56 let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
57
58 let mut data = BytesMut::new();
59
60 self.codec.encode(req, &mut data).map_err(Box::new)?;
61
62 match self.stream.set_read_timeout(Some(timeout)) {
63 Ok(()) => {}
64 Err(e) => {
65 error!(
66 "Unix socket stream setup error while setting read timeout -> {:?}",
67 e
68 );
69 return Err(Box::new(e));
70 }
71 };
72 match self.stream.set_write_timeout(Some(timeout)) {
73 Ok(()) => {}
74 Err(e) => {
75 error!(
76 "Unix socket stream setup error while setting write timeout -> {:?}",
77 e
78 );
79 return Err(Box::new(e));
80 }
81 };
82
83 self.stream
84 .write_all(&data)
85 .and_then(|_| self.stream.flush())
86 .map_err(|e| {
87 error!("stream write error -> {:?}", e);
88 e
89 })
90 .map_err(Box::new)?;
91
92 data.clear();
94 let start = SystemTime::now();
95 let mut read_started = false;
96
97 loop {
98 let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
99 if durr > timeout {
100 error!("Socket timeout");
101 return Err(Box::new(io::Error::other("Timeout")));
103 }
104
105 let mut buffer = [0; 1024];
106
107 match self.stream.read(&mut buffer) {
110 Ok(0) => {
111 if read_started {
112 debug!("read_started true, no bytes read");
113 } else {
117 debug!("Waiting ...");
118 continue;
120 }
121 }
122 Ok(count) => {
123 read_started = true;
124 data.extend_from_slice(&buffer[..count]);
125 }
126 Err(e) => {
127 error!("Stream read failure from {:?} -> {:?}", &self.stream, e);
128 return Err(Box::new(e));
130 }
131 }
132
133 match self.codec.decode(&mut data) {
134 Ok(Some(cr)) => return Ok(cr),
135 Ok(None) => continue,
137 Err(e) => return Err(Box::new(e)),
138 }
139 }
140 }
141}