kanidm_unix_common/
client_sync.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use crate::constants::DEFAULT_CONN_TIMEOUT;
use crate::unix_proto::{ClientRequest, ClientResponse};
use std::error::Error;
use std::io::{Error as IoError, ErrorKind, Read, Write};
use std::time::{Duration, SystemTime};

pub use std::os::unix::net::UnixStream;

pub struct DaemonClientBlocking {
    stream: UnixStream,
    default_timeout: u64,
}

impl From<UnixStream> for DaemonClientBlocking {
    fn from(stream: UnixStream) -> Self {
        DaemonClientBlocking {
            stream,
            default_timeout: DEFAULT_CONN_TIMEOUT,
        }
    }
}

impl DaemonClientBlocking {
    pub fn new(path: &str, default_timeout: u64) -> Result<DaemonClientBlocking, Box<dyn Error>> {
        debug!(%path);

        let stream = UnixStream::connect(path)
            .map_err(|e| {
                error!(
                    "Unix socket stream setup error while connecting to {} -> {:?}",
                    path, e
                );
                e
            })
            .map_err(Box::new)?;

        Ok(DaemonClientBlocking {
            stream,
            default_timeout,
        })
    }

    pub fn call_and_wait(
        &mut self,
        req: &ClientRequest,
        timeout: Option<u64>,
    ) -> Result<ClientResponse, Box<dyn Error>> {
        let timeout = Duration::from_secs(timeout.unwrap_or(self.default_timeout));

        let data = serde_json::to_vec(&req).map_err(|e| {
            error!("socket encoding error -> {:?}", e);
            Box::new(IoError::new(ErrorKind::Other, "JSON encode error"))
        })?;

        match self.stream.set_read_timeout(Some(timeout)) {
            Ok(()) => {}
            Err(e) => {
                error!(
                    "Unix socket stream setup error while setting read timeout -> {:?}",
                    e
                );
                return Err(Box::new(e));
            }
        };
        match self.stream.set_write_timeout(Some(timeout)) {
            Ok(()) => {}
            Err(e) => {
                error!(
                    "Unix socket stream setup error while setting write timeout -> {:?}",
                    e
                );
                return Err(Box::new(e));
            }
        };

        self.stream
            .write_all(data.as_slice())
            .and_then(|_| self.stream.flush())
            .map_err(|e| {
                error!("stream write error -> {:?}", e);
                e
            })
            .map_err(Box::new)?;

        // Now wait on the response.
        let start = SystemTime::now();
        let mut read_started = false;
        let mut data = Vec::with_capacity(1024);
        let mut counter = 0;

        loop {
            let mut buffer = [0; 1024];
            let durr = SystemTime::now().duration_since(start).map_err(Box::new)?;
            if durr > timeout {
                error!("Socket timeout");
                // timed out, not enough activity.
                break;
            }
            // Would be a lot easier if we had peek ...
            // https://github.com/rust-lang/rust/issues/76923
            match self.stream.read(&mut buffer) {
                Ok(0) => {
                    if read_started {
                        debug!("read_started true, we have completed");
                        // We're done, no more bytes.
                        break;
                    } else {
                        debug!("Waiting ...");
                        // Still can wait ...
                        continue;
                    }
                }
                Ok(count) => {
                    data.extend_from_slice(&buffer);
                    counter += count;
                    if count == 1024 {
                        debug!("Filled 1024 bytes, looping ...");
                        // We have filled the buffer, we need to copy and loop again.
                        read_started = true;
                        continue;
                    } else {
                        debug!("Filled {} bytes, complete", count);
                        // We have a partial read, so we are complete.
                        break;
                    }
                }
                Err(e) => {
                    error!("Stream read failure from {:?} -> {:?}", &self.stream, e);
                    // Failure!
                    return Err(Box::new(e));
                }
            }
        }

        // Extend from slice fills with 0's, so we need to truncate now.
        data.truncate(counter);

        // Now attempt to decode.
        let cr = serde_json::from_slice::<ClientResponse>(data.as_slice()).map_err(|e| {
            error!("socket encoding error -> {:?}", e);
            Box::new(IoError::new(ErrorKind::Other, "JSON decode error"))
        })?;

        Ok(cr)
    }
}