use std::thread; use std::io::{BufReader, Read}; use tungstenite::connect; use url::Url; use serde::Deserialize; use serde_json::Value as JValue; use std::sync::mpsc::{Sender, Receiver}; use std::sync::mpsc; use chrono::{DateTime, Utc}; use crate::vesseldata::{VesselDataEvent, VesselDataEventSource}; #[derive(Debug, Deserialize)] #[serde()] struct Source { sentence: String, talker: String, #[serde(rename = "type")] k_type: String, label: String, } #[derive(Debug, Deserialize)] #[serde()] struct Value { path: String, value: JValue, } #[derive(Debug, Deserialize)] #[serde()] struct Update { #[serde(rename = "$source")] dsource: String, timestamp: DateTime, values: Vec, } #[derive(Debug, Deserialize)] #[serde()] struct SignalKEventData { context: String, updates: Vec, } #[derive(Debug, Deserialize)] #[serde()] struct SignalKHeader { name: String, version: String, #[serde(rename="self")] k_self: String, roles: Vec, timestamp: String, } pub struct SignalK { } impl SignalK { fn parse_json (json: String) -> Vec { let res = Vec::::new(); let v: Value = match serde_json::from_str(&json) { Ok(value) => value, Err(_) => return res, }; /* for d in v["updates"]["values"] { println!("{}", d); } */ res } fn value_to_vesseldata (val: Value) -> Option { match val.path.as_str() { "navigation.position" => return Some(VesselDataEvent::Location(val.value["latitude"].as_f64().unwrap() as f32, val.value["longitude"].as_f64().unwrap() as f32)), "navigation.headingMagnetic" => return Some(VesselDataEvent::TrueCompassCourse(val.value.as_f64().unwrap() as f32)), "navigation.courseOverGroundTrue" => return Some(VesselDataEvent::CourseOverGround(val.value.as_f64().unwrap() as f32)), "navigation.speedOverGround" => return Some(VesselDataEvent::SpeedOverGround(val.value.as_f64().unwrap() as f32)), _ => return None, } } } impl VesselDataEventSource for SignalK { fn connect() -> Receiver { let (tx, rx): (Sender, Receiver) = mpsc::channel(); let handle = thread::spawn(move || { let (mut socket, response) = connect(Url::parse("ws://localhost:3000/signalk/v1/stream?subscribe=self").unwrap()).expect("Can't connect"); let header: SignalKHeader = serde_json::from_str(socket.read_message().unwrap().to_text().unwrap()).unwrap(); if header.version != "1.33.0" { warn!("SignalK parser has only been tested with Signal K Server version 1.33.0"); } loop { let message = socket.read_message().unwrap(); if message.is_text() { let data: SignalKEventData = serde_json::from_str(message.to_text().unwrap()).unwrap(); for u in data.updates { for v in u.values { match SignalK::value_to_vesseldata(v) { Some(vd) => tx.send(vd).unwrap(), _ => (), } } } } } }); rx } }