kanidm_unix_common/
client_sync.rs
1use crate::constants::DEFAULT_CONN_TIMEOUT;
2use crate::unix_proto::{ClientRequest, ClientResponse};
3use std::error::Error;
4use std::io::{Error as IoError, ErrorKind, Read, Write};
5use std::time::{Duration, SystemTime};
6
7pub use std::os::unix::net::UnixStream;
8
9pub struct DaemonClientBlocking {
10 stream: UnixStream,
11 default_timeout: u64,
12}
13
14impl From<UnixStream> for DaemonClientBlocking {
15 fn from(stream: UnixStream) -> Self {
16 DaemonClientBlocking {
17 stream,
18 default_timeout: DEFAULT_CONN_TIMEOUT,
19 }
20 }
21}
22
23impl DaemonClientBlocking {
24 pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
25 debug!(%path);
26
27 let stream = UnixStream::connect(path)
28 .map_err(|e| {
29 error!(
30 "Unix socket stream setup error while connecting to {} -> {:?}",
31 path, e
32 );
33 e
34 })
35 .map_err(Box::new)?;
36
37 Ok(DaemonClientBlocking {
38 stream,
39 default_timeout,
40 })
41 }
42
43 pub fn call_and_wait(
44 &mut self,
45 req: &ClientRequest,
46 timeout: Option<u64>,
47 ) -> Result<ClientResponse, Box<dyn Error>> {
48 let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));
49
50 let data = serde_json::to_vec(&req).map_err(|e| {
51 error!("socket encoding error -> {:?}", e);
52 Box::new(IoError::new(ErrorKind::Other, "JSON encode error"))
53 })?;
54
55 match self.stream.set_read_timeout(Some(timeout)) {
56 Ok(()) => {}
57 Err(e) => {
58 error!(
59 "Unix socket stream setup error while setting read timeout -> {:?}",
60 e
61 );
62 return Err(Box::new(e));
63 }
64 };
65 match self.stream.set_write_timeout(Some(timeout)) {
66 Ok(()) => {}
67 Err(e) => {
68 error!(
69 "Unix socket stream setup error while setting write timeout -> {:?}",
70 e
71 );
72 return Err(Box::new(e));
73 }
74 };
75
76 self.stream
77 .write_all(data.as_slice())
78 .and_then(|_| self.stream.flush())
79 .map_err(|e| {
80 error!("stream write error -> {:?}", e);
81 e
82 })
83 .map_err(Box::new)?;
84
85 let start = SystemTime::now();
87 let mut read_started = false;
88 let mut data = Vec::with_capacity(1024);
89 let mut counter = 0;
90
91 loop {
92 let mut buffer = [0; 1024];
93 let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
94 if durr > timeout {
95 error!("Socket timeout");
96 break;
98 }
99 match self.stream.read(&mut buffer) {
102 Ok(0) => {
103 if read_started {
104 debug!("read_started true, we have completed");
105 break;
107 } else {
108 debug!("Waiting ...");
109 continue;
111 }
112 }
113 Ok(count) => {
114 data.extend_from_slice(&buffer);
115 counter += count;
116 if count == 1024 {
117 debug!("Filled 1024 bytes, looping ...");
118 read_started = true;
120 continue;
121 } else {
122 debug!("Filled {} bytes, complete", count);
123 break;
125 }
126 }
127 Err(e) => {
128 error!("Stream read failure from {:?} -> {:?}", &self.stream, e);
129 return Err(Box::new(e));
131 }
132 }
133 }
134
135 data.truncate(counter);
137
138 let cr = serde_json::from_slice::<ClientResponse>(data.as_slice()).map_err(|e| {
140 error!("socket encoding error -> {:?}", e);
141 Box::new(IoError::new(ErrorKind::Other, "JSON decode error"))
142 })?;
143
144 Ok(cr)
145 }
146}