kanidm_unix_common/
client.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use std::error::Error;
use std::io::{Error as IoError, ErrorKind};

use bytes::{BufMut, BytesMut};
use futures::{SinkExt, StreamExt};
use tokio::net::UnixStream;
// use tokio::runtime::Builder;
use tokio::time::{self, Duration};
use tokio_util::codec::Framed;
use tokio_util::codec::{Decoder, Encoder};

use crate::unix_proto::{ClientRequest, ClientResponse};

struct ClientCodec;

impl Decoder for ClientCodec {
    type Error = IoError;
    type Item = ClientResponse;

    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
        match serde_json::from_slice::<ClientResponse>(src) {
            Ok(msg) => {
                // Clear the buffer for the next message.
                src.clear();
                Ok(Some(msg))
            }
            _ => Ok(None),
        }
    }
}

impl Encoder<ClientRequest> for ClientCodec {
    type Error = IoError;

    fn encode(&mut self, msg: ClientRequest, dst: &mut BytesMut) -> Result<(), Self::Error> {
        let data = serde_json::to_vec(&msg).map_err(|e| {
            error!("socket encoding error -> {:?}", e);
            IoError::new(ErrorKind::Other, "JSON encode error")
        })?;
        debug!("Attempting to send request -> {}", msg.as_safe_string());
        dst.put(data.as_slice());
        Ok(())
    }
}

impl ClientCodec {
    fn new() -> Self {
        ClientCodec
    }
}

async fn call_daemon_inner(
    path: &str,
    req: ClientRequest,
) -> Result<ClientResponse, Box<dyn Error>> {
    trace!(?path, ?req);
    let stream = UnixStream::connect(path).await?;
    trace!("connected");

    let mut reqs = Framed::new(stream, ClientCodec::new());

    reqs.send(req).await?;
    reqs.flush().await?;
    trace!("flushed, waiting ...");

    match reqs.next().await {
        Some(Ok(res)) => {
            debug!("Response -> {:?}", res);
            Ok(res)
        }
        _ => {
            error!("Error making request to kanidm_unixd");
            Err(Box::new(IoError::new(ErrorKind::Other, "oh no!")))
        }
    }
}

/// Makes a call to kanidm_unixd via a unix socket at `path`
pub async fn call_daemon(
    path: &str,
    req: ClientRequest,
    timeout: u64,
) -> Result<ClientResponse, Box<dyn Error>> {
    let sleep = time::sleep(Duration::from_secs(timeout));
    tokio::pin!(sleep);

    tokio::select! {
        _ = &mut sleep => {
            error!(?timeout, "Timed out making request to kanidm_unixd");
            Err(Box::new(IoError::new(ErrorKind::Other, "timeout")))
        }
        res = call_daemon_inner(path, req) => {
            res
        }
    }
}