diff --git a/Cargo.lock b/Cargo.lock index fc78a1f080..be87cfe60c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -294,6 +294,7 @@ dependencies = [ "image", "itertools 0.14.0", "log", + "mcp", "memchr", "merging", "nucleo-matcher", @@ -302,7 +303,7 @@ dependencies = [ "processor", "regex", "regex-syntax", - "reqwest", + "reqwest 0.12.28", "rfd", "rustc-hash 2.1.2", "semver", @@ -496,6 +497,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-task" version = "4.7.1" @@ -562,6 +585,95 @@ version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2032f911046de80f0a198e0901378627c33f59ea0ac00e363d481118bd70a53" +[[package]] +name = "aws-lc-rs" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ec2f1fc3ec205783a5da9a7e6c1509cc69dedf09a1949e412c1e18469326d00" +dependencies = [ + "aws-lc-sys", + "zeroize", +] + +[[package]] +name = "aws-lc-sys" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a2f9779ce85b93ab6170dd940ad0169b5766ff848247aff13bb788b832fe3f4" +dependencies = [ + "cc", + "cmake", + "dunce", + "fs_extra", +] + +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "base64" version = "0.22.1" @@ -918,6 +1030,17 @@ dependencies = [ "libc", ] +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.44" @@ -927,6 +1050,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-link", ] @@ -1037,6 +1161,15 @@ dependencies = [ "error-code", ] +[[package]] +name = "cmake" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" +dependencies = [ + "cc", +] + [[package]] name = "cobs" version = "0.3.0" @@ -1427,8 +1560,18 @@ version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.11", + "darling_macro 0.20.11", +] + +[[package]] +name = "darling" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ae13da2f202d56bd7f91c25fba009e7717a1e4a1cc98a76d844b65ae912e9d" +dependencies = [ + "darling_core 0.23.0", + "darling_macro 0.23.0", ] [[package]] @@ -1445,13 +1588,37 @@ dependencies = [ "syn", ] +[[package]] +name = "darling_core" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9865a50f7c335f53564bb694ef660825eb8610e0a53d3e11bf1b0d3df31e03b0" +dependencies = [ + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + [[package]] name = "darling_macro" version = "0.20.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead" dependencies = [ - "darling_core", + "darling_core 0.20.11", + "quote", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" +dependencies = [ + "darling_core 0.23.0", "quote", "syn", ] @@ -1480,7 +1647,7 @@ version = "0.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d5bcf7b024d6835cfb3d473887cd966994907effbe9227e8c8219824d06c4e8" dependencies = [ - "darling", + "darling 0.20.11", "proc-macro2", "quote", "syn", @@ -1661,6 +1828,18 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8b14ccef22fc6f5a8f4d7d768562a182c04ce9a3b3157b91390b52ddfdf1a76" +[[package]] +name = "dunce" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" + +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "ecolor" version = "0.34.1" @@ -2254,6 +2433,12 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "fs_extra" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" + [[package]] name = "fuchsia-cprng" version = "0.1.1" @@ -2410,8 +2595,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff2abc00be7fca6ebc474524697ae276ad847ad0a6b3faa4bcb027e9a4614ad0" dependencies = [ "cfg-if", + "js-sys", "libc", "wasi", + "wasm-bindgen", ] [[package]] @@ -2421,9 +2608,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd" dependencies = [ "cfg-if", + "js-sys", "libc", "r-efi 5.3.0", "wasip2", + "wasm-bindgen", ] [[package]] @@ -2435,6 +2624,7 @@ dependencies = [ "cfg-if", "libc", "r-efi 6.0.0", + "rand_core 0.10.1", "wasip2", "wasip3", ] @@ -2769,6 +2959,12 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.9.0" @@ -2783,6 +2979,7 @@ dependencies = [ "http", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -3407,6 +3604,12 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "mach2" version = "0.4.3" @@ -3416,12 +3619,41 @@ dependencies = [ "libc", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "maybe-owned" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4facc753ae494aeb6e3c22f839b158aebd4f9270f55cd3c79906c45476c47ab4" +[[package]] +name = "mcp" +version = "4.0.0-alpha.2" +dependencies = [ + "anyhow", + "axum", + "log", + "non-exhaustive", + "ollama-rs", + "processor", + "rand 0.9.4", + "reqwest 0.12.28", + "rmcp", + "schemars", + "serde", + "serde_json", + "thiserror 2.0.18", + "tokio", + "tokio-util", + "url", + "uuid", +] + [[package]] name = "memchr" version = "2.8.0" @@ -3656,6 +3888,12 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "non-exhaustive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daee4e13b993996fb01f271f3512b3d07f6bacbb64cf1edeb705d92c8a1881ff" + [[package]] name = "nucleo-matcher" version = "0.3.1" @@ -4029,6 +4267,25 @@ dependencies = [ "memchr", ] +[[package]] +name = "ollama-rs" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f647d8676b95a6b6205e11453c9fac338d73c9cdcc011c94d1ba9c9bfea582cd" +dependencies = [ + "async-stream", + "log", + "reqwest 0.12.28", + "schemars", + "serde", + "serde_json", + "static_assertions", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "once_cell" version = "1.21.4" @@ -4184,6 +4441,12 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "pastey" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ee67f1008b1ba2321834326597b8e186293b049a023cdef258527550b9935b4" + [[package]] name = "pcap-parser" version = "0.16.0" @@ -4694,6 +4957,62 @@ dependencies = [ "serde", ] +[[package]] +name = "quinn" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e20a958963c291dc322d98411f541009df2ced7b5a4f2bd52337638cfccf20" +dependencies = [ + "bytes", + "cfg_aliases", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash 2.1.2", + "rustls", + "socket2 0.6.3", + "thiserror 2.0.18", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-proto" +version = "0.11.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098" +dependencies = [ + "aws-lc-rs", + "bytes", + "getrandom 0.3.4", + "lru-slab", + "rand 0.9.4", + "ring", + "rustc-hash 2.1.2", + "rustls", + "rustls-pki-types", + "slab", + "thiserror 2.0.18", + "tinyvec", + "tracing", + "web-time", +] + +[[package]] +name = "quinn-udp" +version = "0.5.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "addec6a0dcad8a8d96a771f815f0eaf55f9d1805756410b39f5fa81332574cbd" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2 0.6.3", + "tracing", + "windows-sys 0.60.2", +] + [[package]] name = "quote" version = "1.0.45" @@ -4749,6 +5068,17 @@ dependencies = [ "rand_core 0.9.5", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -4802,6 +5132,12 @@ dependencies = [ "getrandom 0.3.4", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_xorshift" version = "0.4.0" @@ -4923,6 +5259,26 @@ dependencies = [ "thiserror 2.0.18", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regalloc2" version = "0.12.2" @@ -4991,6 +5347,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-core", + "futures-util", "h2", "http", "http-body", @@ -5012,12 +5369,54 @@ dependencies = [ "sync_wrapper", "tokio", "tokio-native-tls", + "tokio-util", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams 0.4.2", + "web-sys", +] + +[[package]] +name = "reqwest" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "219c5811de6525e5416c7d5d53bb656d3afdbc6c5af816e0802bcfa42dbdc1c3" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "serde", + "serde_json", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tokio-util", "tower", "tower-http", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams 0.5.0", "web-sys", ] @@ -5062,6 +5461,51 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rmcp" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0810a9f717d9828f475fe1f629f4c305c8464b7f496c3a854b58d29e65f4058e" +dependencies = [ + "async-trait", + "base64", + "bytes", + "chrono", + "futures", + "http", + "http-body", + "http-body-util", + "pastey", + "pin-project-lite", + "rand 0.10.1", + "reqwest 0.13.4", + "rmcp-macros", + "schemars", + "serde", + "serde_json", + "sse-stream", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tokio-util", + "tower-service", + "tracing", + "uuid", +] + +[[package]] +name = "rmcp-macros" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6aefac48c364756e97f04c0401ba3231e8607882c7c1d92da0437dc16307904d" +dependencies = [ + "darling 0.23.0", + "proc-macro2", + "quote", + "serde_json", + "syn", +] + [[package]] name = "ron" version = "0.12.1" @@ -5154,6 +5598,7 @@ version = "0.23.40" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef86cd5876211988985292b91c96a8f2d298df24e75989a43a3c73f2d4d8168b" dependencies = [ + "aws-lc-rs", "once_cell", "rustls-pki-types", "rustls-webpki", @@ -5161,21 +5606,62 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "612460d5f7bea540c490b2b6395d8e34a953e52b491accd6c86c8164c5932a63" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pki-types" version = "1.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30a7197ae7eb376e574fe940d068c30fe0462554a3ddbe4eca7838e049c937a9" dependencies = [ + "web-time", "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d1e2536ce4f35f4846aa13bff16bd0ff40157cdb14cc056c7b14ba41233ba0" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.2", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.103.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c429a8649f110dddef65e2a5ad240f747e85f7758a6bccc7e5777bd33f756e" dependencies = [ + "aws-lc-rs", "ring", "rustls-pki-types", "untrusted", @@ -5223,6 +5709,32 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "schemars" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2b42f36aa1cd011945615b92222f6bf73c599a102a300334cd7f8dbeec726cc" +dependencies = [ + "chrono", + "dyn-clone", + "ref-cast", + "schemars_derive", + "serde", + "serde_json", +] + +[[package]] +name = "schemars_derive" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d115b50f4aaeea07e79c1912f645c7513d81715d0420f8bc77a18c6260b307f" +dependencies = [ + "proc-macro2", + "quote", + "serde_derive_internals", + "syn", +] + [[package]] name = "scoped-tls" version = "1.0.1" @@ -5317,12 +5829,24 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_derive_internals" +version = "0.29.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_json" version = "1.0.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8014e44b4736ed0538adeecded0fce2a272f22dc9578a7eb6b2d9993c74cfb9" dependencies = [ + "indexmap", "itoa", "memchr", "serde", @@ -5330,6 +5854,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -5691,6 +6226,19 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" +[[package]] +name = "sse-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3962b63f038885f15bce2c6e02c0e7925c072f1ac86bb60fd44c5c6b762fb72" +dependencies = [ + "bytes", + "futures-util", + "http-body", + "http-body-util", + "pin-project-lite", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -5971,6 +6519,21 @@ dependencies = [ "serde_json", ] +[[package]] +name = "tinyvec" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.52.3" @@ -6166,6 +6729,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -6361,6 +6925,7 @@ dependencies = [ "idna", "percent-encoding", "serde", + "serde_derive", ] [[package]] @@ -6619,6 +7184,32 @@ dependencies = [ "wasmparser 0.244.0", ] +[[package]] +name = "wasm-streams" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15053d8d85c7eccdbefef60f06769760a563c7f0a9d6902a13d35c7800b0ad65" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "wasm-streams" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1ec4f6517c9e11ae630e200b2b65d193279042e28edd4a2cda233e46670bbb" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasmparser" version = "0.229.0" @@ -7178,6 +7769,15 @@ dependencies = [ "web-sys", ] +[[package]] +name = "webpki-root-certs" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31141ce3fc3e300ae89b78c0dd67f9708061d1d2eda54b8209346fd6be9a92c" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "weezl" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index 79211e2c0b..2f5ca13bbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "crates/dir_checksum", "crates/file_tools", "crates/shell_tools", + "crates/mcp", "crates/core/dlt_tools", "crates/core/someip_tools", @@ -58,6 +59,7 @@ session = { path = "crates/core/session" } session_core = { package = "session", path = "crates/core/session" } sources = { path = "crates/core/sources" } stypes = { path = "crates/stypes" } +mcp = { path = "crates/mcp" } # External dependencies log = "0.4" diff --git a/crates/app/Cargo.toml b/crates/app/Cargo.toml index 5d9abcefba..ab1cb1d354 100644 --- a/crates/app/Cargo.toml +++ b/crates/app/Cargo.toml @@ -22,6 +22,7 @@ file_tools.workspace = true shell_tools.workspace = true parsers.workspace = true plugins_host.workspace = true +mcp.workspace = true #TODO: We need to avoid clashing with module `session` in Workaround to avoid potential merge conflict session_core.workspace = true diff --git a/crates/app/src/host/communication.rs b/crates/app/src/host/communication.rs index 8f09caac97..27edb2e64b 100644 --- a/crates/app/src/host/communication.rs +++ b/crates/app/src/host/communication.rs @@ -1,10 +1,11 @@ -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use crate::{ common::comm_utls::evaluate_send_res, host::{command::HostCommand, message::HostMessage, notification::AppNotification}, session::communication::SharedSenders, }; +use mcp::server::tasks::Tasks; const CHANNELS_CAPACITY: usize = 32; @@ -36,6 +37,7 @@ pub struct ServiceHandle { /// Provide functions to send host messages and waking up the UI on them. #[derive(Debug, Clone)] pub struct ServiceSenders { + pub mcp_task_tx: broadcast::Sender, message_tx: mpsc::Sender, notification_tx: mpsc::Sender, egui_ctx: egui::Context, @@ -78,6 +80,7 @@ pub fn init(egui_ctx: egui::Context) -> (UiHandle, ServiceHandle) { let (cmd_tx, cmd_rx) = mpsc::channel(CHANNELS_CAPACITY); let (message_tx, message_rx) = mpsc::channel(CHANNELS_CAPACITY); let (notification_tx, notification_rx) = mpsc::channel(CHANNELS_CAPACITY); + let (mcp_task_tx, _mcp_task_rx) = broadcast::channel(CHANNELS_CAPACITY); let ui_senders = UiSenders { cmd_tx }; @@ -92,6 +95,7 @@ pub fn init(egui_ctx: egui::Context) -> (UiHandle, ServiceHandle) { }; let service_senders = ServiceSenders { + mcp_task_tx, message_tx, notification_tx, egui_ctx, diff --git a/crates/app/src/host/service/mod.rs b/crates/app/src/host/service/mod.rs index 0370beaf59..a9bbe21ef0 100644 --- a/crates/app/src/host/service/mod.rs +++ b/crates/app/src/host/service/mod.rs @@ -8,6 +8,7 @@ use std::{ use anyhow::Result; use itertools::Itertools; use log::trace; +use mcp::server::McpServer; use tokio::{runtime::Handle, select, sync::mpsc}; use uuid::Uuid; @@ -94,8 +95,11 @@ impl HostService { /// Spawns tokio runtime to run host services and loads startup storage domains. #[must_use] pub fn spawn(communication: ServiceHandle) -> HostServiceInit { + let mcp_task_tx = communication.senders.mcp_task_tx.clone(); let (handle_tx, handle_rx) = std::sync::mpsc::channel(); + let mcp_server = McpServer::new(mcp_task_tx); + thread::spawn(move || { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -153,6 +157,9 @@ impl HostService { previous_version, update_settings, ); + if let Err(err) = mcp_server.start().await { + log::error!("MCP Server error: {:?}", err); + } host.run().await; }); }); diff --git a/crates/app/src/session/command.rs b/crates/app/src/session/command.rs index c3f93bea2a..d10e7a6ca3 100644 --- a/crates/app/src/session/command.rs +++ b/crates/app/src/session/command.rs @@ -2,6 +2,7 @@ use std::{ops::RangeInclusive, path::PathBuf}; use std::sync::mpsc::Sender; +use mcp::config::AiConfig; use processor::{grabber::LineRange, search::filter::SearchFilter}; use session_core::state::IndexedNavigation; use stypes::GrabbedElement; @@ -11,7 +12,7 @@ use crate::host::ui::{ session_setup::state::sources::StreamConfig, storage::recent::session::RecentSessionStateSnapshot, }; -use crate::session::{error::SessionError, types::attachment}; +use crate::session::{error::SessionError, message::AiMessage, types::attachment}; /// Represents session specific commands to be sent from UI to session service. /// @@ -124,6 +125,15 @@ pub enum SessionCommand { /// Cancel the running operation with the given id. CancelOperation { id: Uuid }, + + /// Send a chat message. + SendChatMessage { + id: Uuid, + message: String, + history: Box>, + ai_config: AiConfig, + }, + /// Gracefully terminate the session service. CloseSession, } diff --git a/crates/app/src/session/communication.rs b/crates/app/src/session/communication.rs index b5159def3f..e5b327c6f8 100644 --- a/crates/app/src/session/communication.rs +++ b/crates/app/src/session/communication.rs @@ -29,6 +29,10 @@ impl SharedSenders { egui_ctx, } } + + pub fn get_mcp_task_subscriber(&self) -> broadcast::Receiver { + self.mcp_task_tx.subscribe() + } } /// Contains session communication channels for the UI to communicate with services. diff --git a/crates/app/src/session/message.rs b/crates/app/src/session/message.rs index 723cc2966a..4db2c2d6a6 100644 --- a/crates/app/src/session/message.rs +++ b/crates/app/src/session/message.rs @@ -1,5 +1,7 @@ use std::collections::HashMap; +use mcp::server::tasks::Tasks; +use mcp::types::Response; use stypes::{AttachmentInfo, FilterMatch, GrabbedElement, NearestPosition}; use uuid::Uuid; @@ -22,12 +24,16 @@ pub enum SessionMessage { // --- Search --- // /// Total number of rows matched by the active search. - SearchResultCountUpdated { count: u64 }, + SearchResultCountUpdated { + count: u64, + }, /// Total number of rows currently exposed by the indexed lower table. /// This can include search results, bookmarked rows, and any other indexed-map entries /// currently materialized by the backend. - IndexedCountUpdated { count: u64 }, + IndexedCountUpdated { + count: u64, + }, /// Search matches found. SearchResults(Vec), @@ -62,7 +68,9 @@ pub enum SessionMessage { }, /// Source has been added to session. - SourceAdded { observe_op: Box }, + SourceAdded { + observe_op: Box, + }, /// Triggered when a file is opened within the session. /// Although `chipmunk` continues to monitor the file for changes, @@ -82,6 +90,11 @@ pub enum SessionMessage { target: attachment::PreviewTarget, preview: Result, }, + + ChatResponseReceived(Response), + + /// Process the task received from the MCP server + MCPTaskReceived(Tasks), } /// Bookmark mutation confirmed by the session backend. @@ -90,3 +103,9 @@ pub struct BookmarkUpdate { pub row: u64, pub is_bookmarked: bool, } + +#[derive(Debug, Clone)] +pub enum AiMessage { + Prompt(String), + Response(Response), +} diff --git a/crates/app/src/session/service/mod.rs b/crates/app/src/session/service/mod.rs index cc1e31ced3..91c4df9fc6 100644 --- a/crates/app/src/session/service/mod.rs +++ b/crates/app/src/session/service/mod.rs @@ -7,9 +7,17 @@ use std::{ use image::ImageError; use itertools::Itertools; -use tokio::{select, sync::mpsc, task}; +use tokio::{ + select, + sync::{broadcast, mpsc}, + task, +}; use uuid::Uuid; +use mcp::{ + chat::Prompt, client::McpClient, errors::McpError, server::tasks::Tasks, tool_params::LogLine, + types::TaskResult, +}; use processor::grabber::LineRange; use session_core::session::Session; use stypes::{CallbackEvent, ComputationError, ObserveOptions, ObserveOrigin, Transport}; @@ -66,6 +74,8 @@ pub struct SessionService { tracker: OperationTracker, /// Temp sources owned and cleaned up when this service closes. owned_temp_sources: Vec, + /// Tasks received from MCP server from AI. + mcp_task_rx: broadcast::Receiver, } /// Inputs required to start one running session service. @@ -142,6 +152,8 @@ impl SessionService { } = startup; let session_id = session.get_uuid(); + let mcp_task_rx = shared_senders.get_mcp_task_subscriber(); + let (ui_handle, service_handle) = communication::init(shared_senders); let session_info = SessionInfo::from_observe_options(session_id, &options); @@ -193,6 +205,7 @@ impl SessionService { senders, session, callback_rx, + mcp_task_rx, tracker: OperationTracker::default(), owned_temp_sources, }; @@ -228,11 +241,18 @@ impl SessionService { async fn run(mut self) { log::trace!("Start Session Service {}", self.session_id()); + let (client, prompt_tx, mut response_rx) = McpClient::new(); + if let Err(err) = client.start().await { + log::error!( + "Failed to start MCP client for session {}: {err:?}", + self.session_id() + ); + } loop { select! { Some(cmd) = self.cmd_rx.recv() => { - match self.handle_command(cmd).await { + match self.handle_command(cmd, prompt_tx.clone()).await { Ok(ControlFlow::Break(())) => break, Ok(ControlFlow::Continue(())) => {}, Err(error) => { @@ -241,6 +261,16 @@ impl SessionService { } } }, + + Ok(task) = self.mcp_task_rx.recv() => { + self.handle_mcp_task(task).await; + }, + + Some(response) = response_rx.recv() => { + log::trace!("Received response from MCP client for session {}: {response:?}", self.session_id()); + self.senders.send_session_msg(SessionMessage::ChatResponseReceived(response)).await; + }, + // Callback receiver won't be dropped when session is dropped. Some(event) = self.callback_rx.recv() => { if let Err(error)= self.handle_callbacks(event).await { @@ -261,9 +291,236 @@ impl SessionService { self.senders.send_notification(notifi).await; } + async fn handle_mcp_task(&self, task: Tasks) { + match task { + Tasks::AnalyzeLogFile { + session_id, + range, + action, + filters, + jump_to_line, + note, + task_result_tx, + } => { + // Log the task details + println!( + "Processing AnalyzeLogFile task for session {}: range={:?}, action={:?}, note={:?}", + session_id, range, action, note + ); + + // Build task result + let result = self + .process_analyze_log_task( + range.clone(), + action.clone(), + filters.clone(), + jump_to_line, + ) + .await; + + // Still send message to UI for visibility + let message = SessionMessage::MCPTaskReceived(Tasks::AnalyzeLogFile { + session_id, + range, + action, + filters, + jump_to_line, + note, + task_result_tx: task_result_tx.clone(), + }); + self.senders.send_session_msg(message).await; + + // Send result back to MCP server + if let Err(e) = task_result_tx.send(result).await { + log::error!("Failed to send analyze_log task result back to MCP server: {e:?}"); + } + } + Tasks::ApplySearchFilter { + session_id, + filters, + task_result_tx, + } => { + log::debug!( + "Processing ApplySearchFilter task for session {}", + session_id + ); + let message = SessionMessage::MCPTaskReceived(Tasks::ApplySearchFilter { + session_id, + filters: filters.clone(), + task_result_tx: task_result_tx.clone(), + }); + self.senders.send_session_msg(message).await; + } + Tasks::GetChartHistogram { + session_id, + dataset_len, + range, + task_result_tx, + } => { + log::debug!( + "Processing GetChartHistogram task for session {}", + session_id + ); + // Chart data retrieval + let result = self + .session + .state + .get_scaled_map(dataset_len, range.map(|r| (*r.start(), *r.end()))) + .await + .map(|_| TaskResult::Complete("Chart data retrieved".to_string())) + .map_err(|e| McpError::TaskExecutionFailed(format!("{:?}", e))); + + let _ = task_result_tx.send(result); + } + Tasks::GetChartLinePlots { + session_id, + dataset_len, + range, + task_result_tx, + } => { + log::debug!( + "Processing GetChartLinePlots task for session {}", + session_id + ); + // Line plot data retrieval + let result = self + .session + .state + .get_search_values(range, dataset_len) + .await + .map(|_| TaskResult::Complete("Line plot data retrieved".to_string())) + .map_err(|e| McpError::TaskExecutionFailed(format!("{:?}", e))); + + let _ = task_result_tx.send(result); + } + Tasks::GenericTask { + session_id, + task_result_tx, + } => { + log::debug!("Processing GenericTask for session {}", session_id); + let result = Ok(TaskResult::Complete("Generic task completed".to_string())); + let _ = task_result_tx.send(result); + } + Tasks::GrabLines { + session_id, + range, + task_result_tx, + } => { + if self.session_id() == session_id { + let mut lines: Vec = vec![]; + if let Ok(result) = self + .session + .grab(processor::grabber::GrabRange::from(range)) + .await + .map(|e| e.0) + .map_err(SessionError::from) + { + result + .into_iter() + .for_each(|element| lines.push(element.content)); + } + + if let Err(err) = task_result_tx + .send(Ok(TaskResult::RequestLines(lines))) + .await + { + log::error!("Error while sending the result back: {err}"); + } + } + } + Tasks::CompleteChat { + session_id, + final_result, + task_result_tx, + } => { + if self.session_id() == session_id { + if let Err(err) = task_result_tx + .send(Ok(TaskResult::Complete(final_result))) + .await + { + log::error!("Error while sending the result back: {err}"); + } + } + } + } + } + + async fn process_analyze_log_task( + &self, + range: Option>, + action: mcp::tool_params::AnalyzeAction, + filters: Vec, + jump_to_line: Option, + ) -> Result { + use mcp::tool_params::{AnalyzeAction, AnalyzeLogsResult}; + + // Grab lines if range is provided + let lines = if let Some(ref range) = range { + let line_range = LineRange::from(range.clone()); + match self.session.grab(line_range).await { + Ok(grabbed_elements) => grabbed_elements + .0 + .into_iter() + .map(|elem| LogLine { + source_id: elem.source_id, + pos: elem.pos as u64, + nature: elem.nature, + content: elem.content, + }) + .collect(), + Err(e) => { + return Err(McpError::TaskExecutionFailed(format!( + "Failed to grab lines: {:?}", + e + ))); + } + } + } else { + Vec::new() + }; + + // Handle action if provided + let action_status = match action { + AnalyzeAction::ApplyFilter => { + if !filters.is_empty() { + match self + .session + .apply_search_filters(Uuid::new_v4(), filters.clone()) + { + Ok(_) => Some(format!("Applied {} filter(s)", filters.len())), + Err(e) => { + return Err(McpError::TaskExecutionFailed(format!( + "Failed to apply filters: {:?}", + e + ))); + } + } + } else { + Some("No filters to apply".to_string()) + } + } + AnalyzeAction::JumpToLine => { + if let Some(line) = jump_to_line { + Some(format!("Jumped to line {}", line)) + } else { + Some("No line specified for jump".to_string()) + } + } + AnalyzeAction::None => None, + }; + + Ok(TaskResult::AnalyzeLogs(AnalyzeLogsResult { + requested_range: range, + lines, + action_status, + note: None, + })) + } + async fn handle_command( &mut self, cmd: SessionCommand, + prompt_tx: mpsc::Sender, ) -> Result, SessionError> { match cmd { SessionCommand::GrabLinesBlocking { range, sender } => { @@ -278,6 +535,23 @@ impl SessionService { log::debug!("Grabbed lines receiver dropped before receiving the results."); } } + SessionCommand::SendChatMessage { + id, + message, + history: _, + ai_config, + } => { + if let Err(err) = prompt_tx + .send(Prompt { + id, + message: message.clone(), + config: ai_config.clone(), + }) + .await + { + log::error!("Failed to send chat message to MCP client: {err:?}"); + } + } SessionCommand::GrabIndexedLinesBlocking { range, sender } => { let elements = self .session diff --git a/crates/app/src/session/types/mod.rs b/crates/app/src/session/types/mod.rs index 041308886c..5ce96e4712 100644 --- a/crates/app/src/session/types/mod.rs +++ b/crates/app/src/session/types/mod.rs @@ -1,10 +1,11 @@ +use std::path::PathBuf; use std::time::{Duration, Instant}; pub mod attachment; use uuid::Uuid; -use stypes::ObserveOrigin; +use stypes::{FileFormat, ObserveOrigin}; /// Represents a running observe operations with its info. #[derive(Debug, Clone)] @@ -99,3 +100,35 @@ impl OperationPhase { } } } + +/// Metadata about a file or source loaded in the session. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct FileMetadata { + /// Display name of the source. + pub name: String, + /// The file format (Text, Binary, PcapNG, PcapLegacy) or "Stream" for live sources. + pub file_type: String, + /// Absolute path of the file, if applicable. + pub path: Option, + /// Total number of log lines currently loaded. + pub total_lines: u64, +} + +impl FileMetadata { + pub fn file_format_label(format: &FileFormat) -> &'static str { + match format { + FileFormat::Text => "Text", + FileFormat::Binary => "Binary", + FileFormat::PcapNG => "PcapNG", + FileFormat::PcapLegacy => "PcapLegacy", + } + } + + pub fn as_chat_message(&self) -> String { + format!( + "name => {}\nfile type => {}\ntotal lines in the file => {}", + self.name, self.file_type, self.total_lines + ) + } +} diff --git a/crates/app/src/session/ui/mod.rs b/crates/app/src/session/ui/mod.rs index ef6cd5de90..8551a4c6a5 100644 --- a/crates/app/src/session/ui/mod.rs +++ b/crates/app/src/session/ui/mod.rs @@ -1,7 +1,9 @@ +use std::time::Duration; use std::{rc::Rc, sync::Arc}; use egui::{CentralPanel, Context, Frame, Margin, Panel, Ui}; use log::warn; +use mcp::server::tasks::Tasks; use tokio::sync::mpsc::Sender; use crate::{ @@ -28,7 +30,7 @@ use crate::{ command::SessionCommand, communication::{UiHandle, UiReceivers}, error::SessionError, - message::{BookmarkUpdate, SessionMessage}, + message::{AiMessage, BookmarkUpdate, SessionMessage}, types::{OperationPhase, attachment::PreviewTarget}, ui::shared::{SearchSyncOutcome, SearchTableSync, SessionSignal}, }, @@ -36,6 +38,10 @@ use crate::{ use bottom_panel::{BottomPanelUI, BottomTabType}; use common::log_table::{LogTableKind, table::TableScroll}; use logs_table::LogsTable; +use mcp::{ + tool_params::{AnalyzeAction, AnalyzeLogsResult, LogLine}, + types::TaskResult, +}; use side_panel::{SidePanelUi, SideTabType}; mod attachment_modal; @@ -231,6 +237,152 @@ impl Session { .capture_preset(&self.shared, registry); } + fn handle_mcp_task( + &mut self, + task: Tasks, + actions: &mut UiActions, + registry: &mut FilterRegistry, + ) { + match task { + Tasks::ApplySearchFilter { + session_id, + filters, + task_result_tx, + } => { + if session_id == self.shared.get_id() { + // filters.iter().for_each(|filter| { + // self.bottom_panel.search.bar.apply_search_filters( + // filter.clone(), + // &mut self.shared, + // actions, + // registry, + // ); + // }); + filters.into_iter().for_each(|filter| { + self.shared.filters.set_temp_search(filter); + + self.shared + .sync_search_pipelines(registry, SearchSyncTarget::Filter) + .into_iter() + .for_each(|cmd| _ = actions.try_send_command(&self.cmd_tx, cmd)); + self.handle_signals(); + }); + actions.try_send_command( + &task_result_tx, + Ok(TaskResult::Complete("Applied Search Filter".to_string())), + ); + } + } + Tasks::AnalyzeLogFile { + session_id, + range, + action, + filters, + jump_to_line, + note, + task_result_tx, + } => { + if session_id == self.shared.get_id() { + const RESPONSE_TIMEOUT: Duration = Duration::from_millis(250); + + let mut lines: Vec = Vec::new(); + + if let Some(requested_range) = range.clone() { + let (lines_tx, lines_rx) = std::sync::mpsc::channel(); + + let cmd = SessionCommand::GrabLinesBlocking { + range: LineRange::from(requested_range.clone()), + sender: lines_tx, + }; + + if actions.try_send_command(&self.cmd_tx, cmd) { + if let Ok(Ok(grabbed)) = lines_rx.recv_timeout(RESPONSE_TIMEOUT) { + lines = grabbed + .into_iter() + .map(|line| LogLine { + source_id: line.source_id, + pos: line.pos as u64, + nature: line.nature, + content: line.content, + }) + .collect(); + } + } + } + + let action_status = match action { + AnalyzeAction::None => None, + AnalyzeAction::ApplyFilter => { + if filters.is_empty() { + Some("No filters provided for apply_filter action".to_string()) + } else { + filters.iter().for_each(|filter| { + self.bottom_panel.search.bar.apply_search_filters( + filter.clone(), + &mut self.shared, + actions, + registry, + ); + }); + self.shared.filters.pin_temp_search(registry); + self.shared + .sync_search_pipelines(registry, SearchSyncTarget::Filter) + .into_iter() + .for_each(|cmd| { + _ = actions.try_send_command(&self.cmd_tx, cmd) + }); + self.handle_signals(); + Some(format!("Applied {} filter(s)", filters.len())) + } + } + AnalyzeAction::JumpToLine => { + if let Some(line) = jump_to_line { + self.shared.logs.scroll_main_row = Some(line); + + if self.shared.bottom_tab == bottom_panel::BottomTabType::Search { + actions.try_send_command( + &self.cmd_tx, + SessionCommand::GetNearestPosition(line), + ); + } + + actions.try_send_command( + &self.cmd_tx, + SessionCommand::GetSelectedLog(line), + ); + Some(format!("Jumped to line {line}")) + } else { + Some("No jump_to_line provided for jump_to_line action".to_string()) + } + } + }; + + let result = AnalyzeLogsResult { + requested_range: range, + lines, + action_status, + note, + }; + + actions.try_send_command(&task_result_tx, Ok(TaskResult::AnalyzeLogs(result))); + } + } + Tasks::GenericTask { + session_id, + task_result_tx, + } => { + if session_id == self.shared.get_id() { + println!("****** DEBUG: Received GenericTask for session_id: {session_id}"); + actions.try_send_command( + &task_result_tx, + Ok(TaskResult::Complete("Generic Task Completed".to_string())), + ); + } + } + _ => {} + } + } + /// Check incoming messages and handle them. pub fn handle_messages( &mut self, @@ -413,6 +565,13 @@ impl Session { } }, }, + SessionMessage::ChatResponseReceived(response) => { + self.side_panel + .update_chat(AiMessage::Response(response.clone())); + } + SessionMessage::MCPTaskReceived(task) => { + self.handle_mcp_task(task, actions, registry); + } } } diff --git a/crates/app/src/session/ui/shared/logs.rs b/crates/app/src/session/ui/shared/logs.rs index 6a9b1df54f..8b6421a917 100644 --- a/crates/app/src/session/ui/shared/logs.rs +++ b/crates/app/src/session/ui/shared/logs.rs @@ -3,7 +3,7 @@ use rustc_hash::FxHashSet; #[derive(Debug)] pub struct LogsState { /// Number of logs currently known for this session. - logs_count: u64, + pub logs_count: u64, /// Digits needed to display the largest zero-based row number. row_number_digits: usize, /// Pending request for the main logs table to bring a row into view. diff --git a/crates/app/src/session/ui/shared/mod.rs b/crates/app/src/session/ui/shared/mod.rs index 54df39520c..c16f6117d8 100644 --- a/crates/app/src/session/ui/shared/mod.rs +++ b/crates/app/src/session/ui/shared/mod.rs @@ -14,6 +14,7 @@ use crate::{ use uuid::Uuid; use super::{bottom_panel::BottomTabType, side_panel::SideTabType}; +use mcp::config::{AiConfig, LlmProvider}; mod attachments; pub mod export; @@ -62,6 +63,8 @@ pub struct SessionShared { pub schema: Rc, + pub ai_configuration: AiConfig, + /// Monotonic change marker for recent-session state updates. recent_revision: u64, } @@ -105,6 +108,7 @@ impl SessionShared { observe: ObserveState::new(observe_op), attachments: AttachmentsState::default(), exports: ExportState::default(), + ai_configuration: AiConfig::default(), schema, recent_revision: 0, } @@ -148,6 +152,7 @@ impl SessionShared { exports, schema: _, recent_revision: _, + ai_configuration: _, } = self; if observe.update_operation(operation_id, phase).consumed() { @@ -216,6 +221,23 @@ impl SessionShared { } } + pub fn update_ai_configuration( + &mut self, + provider: LlmProvider, + model_name: String, + url: String, + api_key: Option, + ) { + self.ai_configuration.provider = provider; + self.ai_configuration.model = model_name; + self.ai_configuration.url = if url.trim().is_empty() { + String::from("http://localhost:11434") + } else { + url + }; + self.ai_configuration.api_key = api_key; + } + /// Synchronizes the logs search pipeline from current applied filters. /// /// When filters become empty we only drop the active search, otherwise we issue a diff --git a/crates/app/src/session/ui/side_panel/chat.rs b/crates/app/src/session/ui/side_panel/chat.rs new file mode 100644 index 0000000000..77a1a6b0cf --- /dev/null +++ b/crates/app/src/session/ui/side_panel/chat.rs @@ -0,0 +1,296 @@ +use egui::{ComboBox, RichText, Ui, Window}; +use tokio::sync::mpsc; + +use mcp::config::{AiConfig, LlmProvider}; + +use crate::common::phosphor::{self, icons}; +use crate::session::{ + command::SessionCommand, message::AiMessage, types::FileMetadata, ui::shared::SessionShared, +}; +use mcp::types::Response; +use stypes::ObserveOrigin; + +#[allow(unused)] +#[derive(Debug)] +pub struct ChatUi { + cmd_tx: mpsc::Sender, + text: String, + history: Box>, + thinking: bool, + show_config_popup: bool, + model_name: String, + provider: LlmProvider, + url: String, + api_key: String, +} + +impl ChatUi { + fn collect_file_metadata(shared: &SessionShared) -> Option> { + let total_lines = shared.logs.logs_count; + let entries: Vec = shared + .observe + .operations() + .iter() + .flat_map(|op| match &op.origin { + ObserveOrigin::File(name, format, path) => vec![FileMetadata { + name: name.clone(), + file_type: FileMetadata::file_format_label(format).to_string(), + path: Some(path.clone()), + total_lines, + }], + ObserveOrigin::Concat(items) => items + .iter() + .map(|(name, format, path)| FileMetadata { + name: name.clone(), + file_type: FileMetadata::file_format_label(format).to_string(), + path: Some(path.clone()), + total_lines, + }) + .collect(), + ObserveOrigin::Stream(name, _) => vec![FileMetadata { + name: name.clone(), + file_type: String::from("Stream"), + path: None, + total_lines, + }], + }) + .collect(); + + if entries.is_empty() { + None + } else { + Some(entries) + } + } + + fn provider_label(provider: &LlmProvider) -> &'static str { + match provider { + LlmProvider::Ollama => "Ollama", + LlmProvider::OpenAI => "OpenAI", + LlmProvider::Antropic => "Anthropic", + LlmProvider::Gemini => "Gemini", + LlmProvider::Custom => "Custom", + } + } + + pub fn new(cmd_tx: mpsc::Sender) -> Self { + let resp = AiMessage::Response(Response::Complete( + "Hello! I'm your AI assistant. How can I help you analyze the logs today?".to_string(), + )); + let ai_config = AiConfig::default(); + Self { + cmd_tx, + text: String::new(), + history: Box::new(vec![resp]), + thinking: false, + show_config_popup: false, + model_name: ai_config.model.clone(), + provider: ai_config.provider, + url: ai_config.url.clone(), + api_key: ai_config.api_key.clone().unwrap_or_default(), + } + } + + pub fn add_message(&mut self, message: AiMessage) { + self.history.push(message); + } + + pub fn toggle_thinking(&mut self) { + self.thinking = !self.thinking; + } + + pub fn render_content(&mut self, shared: &mut SessionShared, ui: &mut Ui) { + ui.vertical(|ui| { + let gear = RichText::new(icons::regular::GEAR) + .family(phosphor::fill_font_family()) + .size(16.); + ui.horizontal(|ui| { + ui.heading("Analyze with AI"); + ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| { + if ui.button(gear).clicked() { + self.show_config_popup = !self.show_config_popup; + } + }); + }); + ui.separator(); + + // Reserve a fixed-height input area at the bottom and give the rest to scrolling. + let input_height = 48.0; + let spacing = 8.0; + let scroll_height = (ui.available_height() - input_height - spacing).max(120.0); + + ui.allocate_ui_with_layout( + egui::vec2(ui.available_width(), scroll_height), + egui::Layout::bottom_up(egui::Align::Min), + |ui| { + egui::ScrollArea::vertical() + .auto_shrink([false; 2]) + .stick_to_bottom(true) + .show(ui, |ui| { + ui.vertical(|ui| { + for message in self.history.iter() { + match message { + AiMessage::Prompt(text) => { + ui.with_layout( + egui::Layout::right_to_left(egui::Align::BOTTOM), + |ui| { + ui.add( + egui::Label::new( + egui::RichText::new(text).color( + ui.visuals() + .widgets + .active + .fg_stroke + .color, + ), + ) + .wrap(), + ); + }, + ); + } + AiMessage::Response(resp) => { + let text = match resp { + Response::Complete(s) | Response::Progress(s) => s, + }; + ui.with_layout( + egui::Layout::left_to_right(egui::Align::BOTTOM), + |ui| { + ui.add(egui::Label::new(text).wrap()); + }, + ); + } + } + ui.add_space(12.0); + } + if self.thinking { + ui.horizontal(|ui| { + ui.spinner(); + ui.label( + egui::RichText::new("Analysing…") + .italics() + .color(ui.visuals().weak_text_color()), + ); + }); + ui.add_space(12.0); + } + }); + }); + }, + ); + + ui.add_space(spacing); + ui.separator(); + + // Fixed chat input area (outside scroll area). + ui.horizontal(|ui| { + let response = ui.add_sized( + [ui.available_width() - 60.0, input_height], + egui::TextEdit::singleline(&mut self.text).hint_text("Type a prompt..."), + ); + + if (ui.button("Send").clicked() + || (response.lost_focus() && ui.input(|i| i.key_pressed(egui::Key::Enter)))) + && !self.text.trim().is_empty() + { + let mut message = self.text.clone(); + self.history.push(AiMessage::Prompt(message.clone())); + self.text.clear(); + + let file_metadata = Self::collect_file_metadata(shared); + if let Some(metadata) = file_metadata { + message = format!( + "{}\n\nFile Metadata: {:?}", + message, + metadata + .iter() + .map(|m| m.as_chat_message()) + .collect::>() + ); + } + let _ = self.cmd_tx.try_send(SessionCommand::SendChatMessage { + id: shared.get_id(), + message, + history: self.history.clone(), + ai_config: shared.ai_configuration.clone(), + }); + self.thinking = true; + } + }); + }); + + // Render the configuration window + if self.show_config_popup { + let mut is_open = true; + let mut should_close = false; + Window::new("AI Configuration") + .open(&mut is_open) + .collapsible(false) + .resizable(false) + .show(ui.ctx(), |ui| { + should_close = self.render_config_popup(ui, shared); + }); + if !is_open || should_close { + self.show_config_popup = false; + } + } + } + + fn render_config_popup(&mut self, ui: &mut Ui, shared: &mut SessionShared) -> bool { + let mut should_close = false; + ui.set_min_width(300.0); + ui.vertical(|ui| { + // Provider field + ComboBox::from_label("Provider") + .selected_text(Self::provider_label(&self.provider)) + .show_ui(ui, |ui| { + ui.selectable_value(&mut self.provider, LlmProvider::Ollama, "Ollama"); + ui.selectable_value(&mut self.provider, LlmProvider::OpenAI, "OpenAI"); + ui.selectable_value(&mut self.provider, LlmProvider::Gemini, "Gemini"); + ui.selectable_value(&mut self.provider, LlmProvider::Antropic, "Anthropic"); + ui.selectable_value(&mut self.provider, LlmProvider::Custom, "Custom"); + }); + ui.add_space(8.0); + + ui.label("URL"); + ui.text_edit_singleline(&mut self.url); + ui.add_space(8.0); + + // Model Name field + ui.label("Model Name"); + ui.text_edit_singleline(&mut self.model_name); + ui.add_space(8.0); + + // API Key field (Optional) + ui.label("API Key (Optional)"); + ui.text_edit_singleline(&mut self.api_key); + ui.add_space(12.0); + + // Action buttons + ui.horizontal(|ui| { + if ui.button("Save").clicked() { + // Here you can emit a command or update state with the configuration + shared.update_ai_configuration( + self.provider.clone(), + self.model_name.clone(), + self.url.clone(), + if self.api_key.trim().is_empty() { + None + } else { + Some(self.api_key.clone()) + }, + ); + should_close = true; + } + if ui.button("Cancel").clicked() { + self.provider = shared.ai_configuration.provider.clone(); + self.model_name = shared.ai_configuration.model.clone(); + self.url = shared.ai_configuration.url.clone(); + self.api_key = shared.ai_configuration.api_key.clone().unwrap_or_default(); + should_close = true; + } + }); + }); + should_close + } +} diff --git a/crates/app/src/session/ui/side_panel/mod.rs b/crates/app/src/session/ui/side_panel/mod.rs index 4f7fc54bf1..5ee1723a1f 100644 --- a/crates/app/src/session/ui/side_panel/mod.rs +++ b/crates/app/src/session/ui/side_panel/mod.rs @@ -11,16 +11,22 @@ use crate::{ common::colors, ui::{UiActions, registry::HostRegistry}, }, - session::{command::SessionCommand, types::ObserveOperation, ui::shared::SessionShared}, + session::{ + command::SessionCommand, message::AiMessage, types::ObserveOperation, + ui::shared::SessionShared, + }, }; mod attachments; +mod chat; mod filters; mod observing; mod types; use attachments::AttachmentsUi; +use chat::ChatUi; use filters::FiltersUi; +use mcp::types::Response; use observing::ObservingUi; pub use types::*; @@ -32,6 +38,7 @@ pub struct SidePanelUi { pub observing: ObservingUi, pub attachments: AttachmentsUi, pub filters: FiltersUi, + pub chat: ChatUi, } impl SidePanelUi { @@ -43,10 +50,22 @@ impl SidePanelUi { Self { observing: ObservingUi::new(observe_op, session_cmd_tx.clone()), attachments: AttachmentsUi::new(host_command_tx.clone(), session_cmd_tx.clone()), + chat: ChatUi::new(session_cmd_tx.clone()), filters: FiltersUi::new(session_cmd_tx), } } + pub fn update_chat(&mut self, message: AiMessage) { + match &message { + AiMessage::Response(resp) => match resp { + Response::Progress(_) => {} + _ => self.chat.toggle_thinking(), + }, + _ => {} + }; + self.chat.add_message(message); + } + pub fn render_content( &mut self, ui: &mut Ui, @@ -85,6 +104,7 @@ impl SidePanelUi { self.filters .render_content(shared, actions, &mut registry.filters, ui) } + SideTabType::Chat => self.chat.render_content(shared, ui), }); } } @@ -100,6 +120,7 @@ fn render_tab_button(target: SideTabType, current_tab: &mut SideTabType, ui: &mu SideTabType::Observing => icons::regular::BROADCAST, SideTabType::Attachments => icons::regular::PAPERCLIP, SideTabType::Filters => icons::regular::FUNNEL, + SideTabType::Chat => icons::regular::CHAT, }; // Allocate interaction and tooltip. diff --git a/crates/app/src/session/ui/side_panel/types.rs b/crates/app/src/session/ui/side_panel/types.rs index f00033691b..744f98a3de 100644 --- a/crates/app/src/session/ui/side_panel/types.rs +++ b/crates/app/src/session/ui/side_panel/types.rs @@ -7,6 +7,7 @@ pub enum SideTabType { Observing, Attachments, Filters, + Chat, } impl Display for SideTabType { @@ -15,6 +16,7 @@ impl Display for SideTabType { SideTabType::Observing => "Observing", SideTabType::Attachments => "Attachments", SideTabType::Filters => "Filters", + SideTabType::Chat => "Chat", }; f.write_str(content) diff --git a/crates/mcp/Cargo.toml b/crates/mcp/Cargo.toml new file mode 100644 index 0000000000..67726cbe04 --- /dev/null +++ b/crates/mcp/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "mcp" +edition.workspace = true +version.workspace = true +authors.workspace = true + +[dependencies] +axum = { version = "0.7", features = ["macros"] } +non-exhaustive = "0.1.1" +ollama-rs = { version = "0.3", features = ["stream"] } +rand = "0.9" +reqwest = { version = "0.12.25", features = ["json"] } +rmcp = { version = "1.3", features = [ + "client", + "macros", + "reqwest", + "schemars", + "server", + "transport-io", + "transport-streamable-http-client-reqwest", + "transport-streamable-http-server", + "transport-streamable-http-server-session", + "tower", +] } +schemars = "1.1" +url = { version = "2.5", features = ["serde"] } + +anyhow.workspace = true +log.workspace = true +processor.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +tokio-util.workspace = true +tokio.workspace = true +uuid.workspace = true + +[lints] +workspace = true diff --git a/crates/mcp/src/chat.rs b/crates/mcp/src/chat.rs new file mode 100644 index 0000000000..08a2062ace --- /dev/null +++ b/crates/mcp/src/chat.rs @@ -0,0 +1,9 @@ +use crate::config::AiConfig; +use uuid::Uuid; + +#[derive(Debug, Clone)] +pub struct Prompt { + pub id: Uuid, + pub message: String, + pub config: AiConfig, +} diff --git a/crates/mcp/src/client/agents/mod.rs b/crates/mcp/src/client/agents/mod.rs new file mode 100644 index 0000000000..ad08246559 --- /dev/null +++ b/crates/mcp/src/client/agents/mod.rs @@ -0,0 +1,93 @@ +use ollama_rs::generation::{ + chat::{ChatMessage, ChatMessageResponse, request::ChatMessageRequest}, + tools::{ToolFunctionInfo, ToolInfo, ToolType}, +}; +use rmcp::model::ListToolsResult; +use url::Url; + +use crate::{config::AiConfig, errors::McpError}; + +// pub mod claude; +pub mod ollama; +pub mod open_ai; + +pub const LLM_API_KEY: &str = "LLM_KEY"; + +pub trait LlmAgent { + fn from_config(config: AiConfig) -> Self; + + async fn send_chat_message( + &self, + prompt: String, + history: &mut Vec, + tools: ListToolsResult, + ) -> Result; + + fn api_key() -> Option { + std::env::var(LLM_API_KEY).ok() + } +} + +pub trait ChatResponse {} + +#[derive(Clone, Debug)] +pub struct GenericAgent { + pub model: String, + pub api_key: Option, + pub url: String, +} + +impl LlmAgent for GenericAgent { + fn from_config(config: AiConfig) -> Self { + Self { + model: config.model, + url: config.url, + api_key: config.api_key, + } + } + + // TODO: Change to generic API calling using reqwest and handle different providers based on the URL or a separate provider field in AiConfig + async fn send_chat_message( + &self, + prompt: String, + history: &mut Vec, + tools: ListToolsResult, + ) -> Result { + let url = Url::parse(self.url.as_str()).map_err(|err| McpError::Generic { + message: format!("Error while parsing URL for the Generic Agent; {err:?}"), + })?; + + let client = ollama_rs::Ollama::from_url(url); + + let mcp_tools = tools + .tools + .iter() + .map(|tool| ToolInfo { + tool_type: ToolType::Function, + function: ToolFunctionInfo { + name: tool.name.to_string(), + description: tool + .description + .as_ref() + .map(|x| x.to_string()) + .unwrap_or("No description".to_string()), + parameters: serde_json::from_value(serde_json::Value::Object( + (*tool.input_schema).clone(), + )) + .unwrap_or(schemars::json_schema!({"type": ["object", "null"]})), + }, + }) + .collect::>(); + + let chat_message_request = + ChatMessageRequest::new(self.model.clone(), vec![ChatMessage::user(prompt)]) + .tools(mcp_tools); + + client + .send_chat_messages_with_history(history, chat_message_request) + .await + .map_err(|e| McpError::Generic { + message: e.to_string(), + }) + } +} diff --git a/crates/mcp/src/client/agents/ollama.rs b/crates/mcp/src/client/agents/ollama.rs new file mode 100644 index 0000000000..798cc701d7 --- /dev/null +++ b/crates/mcp/src/client/agents/ollama.rs @@ -0,0 +1,80 @@ +use ollama_rs::generation::{ + chat::{ChatMessage, ChatMessageResponse, request::ChatMessageRequest}, + tools::{ToolFunctionInfo, ToolInfo, ToolType}, +}; +use rmcp::model::ListToolsResult; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use url::Url; + +use super::LlmAgent; +use crate::{config::AiConfig, errors::McpError}; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct Ollama { + pub model: String, + pub api_key: Option, + pub url: String, +} + +impl Default for Ollama { + fn default() -> Self { + Self { + model: String::from("qwen3:8b"), + url: String::from("http://localhost:11434"), + api_key: None, + } + } +} + +impl LlmAgent for Ollama { + fn from_config(config: AiConfig) -> Self { + Self { + model: config.model, + url: config.url, + api_key: config.api_key, + } + } + + async fn send_chat_message( + &self, + prompt: String, + history: &mut Vec, + tools: ListToolsResult, + ) -> Result { + let url = Url::parse(self.url.as_str()).map_err(|err| McpError::Generic { + message: format!("Error while parsing URL for the Ollama Agent; {err:?}"), + })?; + + let client = ollama_rs::Ollama::from_url(url); + + let mcp_tools = tools + .tools + .iter() + .map(|tool| ToolInfo { + tool_type: ToolType::Function, + function: ToolFunctionInfo { + name: tool.name.to_string(), + description: tool + .description + .as_ref() + .map(|x| x.to_string()) + .unwrap_or("No description".to_string()), + parameters: serde_json::from_value(Value::Object((*tool.input_schema).clone())) + .unwrap_or(schemars::json_schema!({"type": ["object", "null"]})), + }, + }) + .collect::>(); + + let chat_message_request = + ChatMessageRequest::new(self.model.clone(), vec![ChatMessage::user(prompt)]) + .tools(mcp_tools); + + client + .send_chat_messages_with_history(history, chat_message_request) + .await + .map_err(|e| McpError::Generic { + message: e.to_string(), + }) + } +} diff --git a/crates/mcp/src/client/agents/open_ai.rs b/crates/mcp/src/client/agents/open_ai.rs new file mode 100644 index 0000000000..dbc9c8ed8a --- /dev/null +++ b/crates/mcp/src/client/agents/open_ai.rs @@ -0,0 +1,241 @@ +use ollama_rs::generation::{ + chat::{ChatMessage, ChatMessageResponse}, + tools::{ToolCall, ToolCallFunction, ToolFunctionInfo, ToolInfo, ToolType}, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::{config::AiConfig, errors::McpError}; + +use super::LlmAgent; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpenAI { + pub model: String, + pub url: String, + pub api_key: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct OpenAIRequest { + pub model: String, + pub messages: Vec, + pub tools: Vec, + pub tool_choice: String, +} + +#[derive(Debug, Deserialize)] +pub struct OpenAIResponse { + pub id: String, + pub model: String, + created: u64, + object: String, + pub choices: Vec, + usage: Option, +} + +#[derive(Debug, serde::Deserialize)] +struct Usage { + prompt_tokens: u64, + completion_tokens: u64, +} + +#[derive(Debug, Deserialize)] +pub struct Choice { + index: i32, + pub message: ResponseMessage, + finish_reason: String, +} + +#[derive(Debug, Deserialize)] +pub struct ResponseMessage { + pub role: String, + pub content: Option, + pub tool_calls: Option>, +} + +#[derive(Debug, Deserialize)] +pub struct ToolCallResponse { + id: String, + #[serde(rename = "type")] + kind: String, + pub function: FunctionCall, +} + +#[derive(Debug, Deserialize)] +pub struct FunctionCall { + pub name: String, + pub arguments: String, // JSON-encoded string from OpenAI +} + +impl Default for OpenAI { + fn default() -> Self { + Self { + model: String::from("gpt-4.1"), + url: String::from("https://api.openai.com/v1/chat/completions"), + api_key: Self::api_key(), + } + } +} + +impl LlmAgent for OpenAI { + fn from_config(config: AiConfig) -> Self { + Self { + model: config.model, + url: config.url, + api_key: config.api_key, + } + } + + async fn send_chat_message( + &self, + prompt: String, + history: &mut Vec, + tools: rmcp::model::ListToolsResult, + ) -> Result { + if self.api_key.is_none() || self.api_key.as_ref().is_some_and(|key| key.is_empty()) { + return Err(McpError::Generic { + message: String::from("Missing API key for LLM Agent"), + }); + } + let api_key = self.api_key.clone().unwrap_or_default(); + let mcp_tools = tools + .tools + .iter() + .map(|tool| ToolInfo { + tool_type: ToolType::Function, + function: ToolFunctionInfo { + name: tool.name.to_string(), + description: tool + .description + .as_ref() + .map(|x| x.to_string()) + .unwrap_or("No description".to_string()), + parameters: serde_json::from_value(Value::Object((*tool.input_schema).clone())) + .unwrap_or(schemars::json_schema!({"type": ["object", "null"]})), + }, + }) + .collect::>(); + let mut messages = history + .iter() + .map(chat_message_to_openai) + .collect::>(); + messages.push(chat_message_to_openai(&ChatMessage::user(prompt))); + + let tools = mcp_tools + .iter() + .map(fetch_tool_info) + .collect::>(); + + let request_body = OpenAIRequest { + model: "gpt-4.1".to_string(), + messages: messages, + tools: tools, + tool_choice: "auto".to_string(), + }; + log::warn!("πŸ€™πŸ» OpenAI Request: {request_body:?}"); + let client = reqwest::Client::new(); + let response = client + .post("https://api.openai.com/v1/chat/completions") + .header("Content-Type", "application/json") + .bearer_auth(&api_key) + .json(&request_body) + .send() + .await + .map_err(|err| McpError::ChatError { + message: format!("{err:?}"), + })?; + + let status = response.status(); + if !status.is_success() { + let err_body = response.text().await.map_err(|err| McpError::Generic { + message: format!("{err:?}"), + })?; + eprintln!("Error {status}: {err_body}"); + return Err(McpError::Generic { + message: format!("Request failed with status {status}").into(), + }); + } + + let openai_response: OpenAIResponse = + response.json().await.map_err(|err| McpError::ChatError { + message: format!("{err:?}"), + })?; + + // --- Handle response / tool calls ---------------------------------------- + log::warn!("βœ… Response ID: {}", openai_response.id); + + fetch_chat_message(openai_response) + } +} + +pub fn fetch_tool_info(tool: &ToolInfo) -> Value { + serde_json::json!({ + "type": "function", + "function": { + "name": tool.function.name, + "description": tool.function.description, + "parameters": tool.function.parameters, + } + }) +} + +pub fn fetch_chat_message(resp: OpenAIResponse) -> Result { + let choice = resp.choices.into_iter().next().ok_or(McpError::ChatError { + message: "Missing Choices from OpenAI Response".to_string(), + })?; + + // ── Map tool_calls ────────────────────────────────────────────────────── + let tool_calls: Option> = choice.message.tool_calls.map(|tcs| { + tcs.into_iter() + .map(|tc| ToolCall { + function: ToolCallFunction { + name: tc.function.name, + // OpenAI sends arguments as a JSON *string*; + // ollama-rs expects a serde_json::Value object. + arguments: serde_json::from_str(&tc.function.arguments) + .unwrap_or(serde_json::Value::Null), + }, + }) + .collect() + }); + + // ── Map ChatMessage ───────────────────────────────────────────────────── + // content is empty string when the model only returns tool_calls + let content = choice.message.content.unwrap_or_default(); + + let message = match choice.message.role.as_str() { + "assistant" => ChatMessage::assistant(content), + "user" => ChatMessage::user(content), + "system" => ChatMessage::system(content), + other => { + return Err(McpError::ChatError { + message: other.to_string(), + }); + } + }; + + // Attach tool_calls if present. ChatMessage has a tool_calls field + // that can be set after construction. + let message = ChatMessage { + tool_calls: tool_calls.unwrap_or_default(), + ..message + }; + + Ok(ChatMessageResponse { + model: resp.model, + created_at: String::new(), // top-level field; detail is in final_data + message, + done: true, + final_data: None, + logprobs: None, + }) +} + +/// Maps an ollama-rs `ChatMessage` into the JSON shape expected by OpenAI. +pub fn chat_message_to_openai(msg: &ChatMessage) -> Value { + serde_json::json!({ + "role": msg.role, + "content": msg.content, + }) +} diff --git a/crates/mcp/src/client/mod.rs b/crates/mcp/src/client/mod.rs new file mode 100644 index 0000000000..68329b5776 --- /dev/null +++ b/crates/mcp/src/client/mod.rs @@ -0,0 +1,270 @@ +// pub mod conversation; +// pub mod llm; + +pub mod agents; + +use log::{error, warn}; +use ollama_rs::generation::chat::ChatMessage; +use rmcp::{ + RoleClient, + model::{ + CallToolRequestParams, ClientCapabilities, ClientInfo, Implementation, + InitializeRequestParams, + }, + service::{RunningService, ServiceExt}, + transport::StreamableHttpClientTransport, +}; +use serde_json::Value; +use tokio::{select, sync::mpsc}; + +use crate::{ + chat::Prompt, + // client::llm::{Llm, LlmClient, LlmConfig}, + errors::McpError, + types::Response, +}; +use agents::LlmAgent; + +// TODO:[MCP] store this in a single global location +pub const SERVER_ADDRESS: &str = "http://127.0.0.1:8181/mcp"; + +const SYSTEM_PROMPT: &str = r#"You are a log analysis assistant for Chipmunk. +Your goal is to help users analyze logs using the provided tools. +When you receive a prompt, often accompanied by sample log lines, analyze the user's intent and call the appropriate tool. + +Rules for tool usage: +1. 'apply_search_filter': Use this for filtering logs. + - 'value' is the search pattern. + - 'is_regex' should be true if you use regex patterns (like \d+, [A-Z], etc.). + - 'ignore_case' should be true if the user doesn't specify case sensitivity. + - 'is_word' should be true if the user wants to match whole words only. Default to false. +2. If the user asks for a chart or values, use 'search_values' or 'get_chart_histogram'. +3. Always be precise with the parameters. If a parameter is not specified, use common-sense defaults. +4. Output ONLY the tool call if that's the primary action. + +Sample log lines provided at the end of the prompt show the context and format of the current session. Use them to craft better search patterns. +"#; + +pub struct McpConfig { + pub url: String, +} + +pub struct McpClient { + prompt_rx: mpsc::Receiver, + response_tx: mpsc::Sender, +} + +impl McpClient { + pub fn new() -> (Self, mpsc::Sender, mpsc::Receiver) { + let (response_tx, response_rx) = mpsc::channel::(32); + let (prompt_tx, prompt_rx) = mpsc::channel::(32); + + ( + Self { + prompt_rx, + response_tx, + }, + prompt_tx, + response_rx, + ) + } + + pub async fn start(self) -> Result<(), McpError> { + // Setup MCP connection + let transport = StreamableHttpClientTransport::from_uri(SERVER_ADDRESS); + let capabilities = ClientCapabilities::builder() + .enable_elicitation() + .enable_roots() + .enable_sampling() + .build(); + + let implmentation = + Implementation::new("mcp-client", String::from(env!("CARGO_PKG_VERSION"))) + .with_title("Chipmunk MCP Client"); + + // TODO:[MCP] match with MCP server definition + let client_info = ClientInfo::new(capabilities, implmentation); + + let mcp_service = client_info + .serve(transport) + .await + .map_err(|e| McpError::Generic { + message: e.to_string(), + })?; + + // let llm = Llm::from_config(self.llm_config); + + tokio::spawn(async move { + if let Err(e) = McpClient::run( + self.prompt_rx, + self.response_tx, + mcp_service, + // llm, + ) + .await + { + error!("MCP client event loop ended: {:?}", e); + } + }); + Ok(()) + } + + async fn run( + mut prompt_rx: mpsc::Receiver, + response_tx: mpsc::Sender, + mcp_service: RunningService, + ) -> Result<(), McpError> { + // History array, which will be auto generated by the rmcp crate automatically based on the response from user + // and LLM. + let mut history = vec![ChatMessage::system(SYSTEM_PROMPT.to_string())]; + loop { + select! { + Some(prompt) = prompt_rx.recv() => { + let mut continue_chat: bool = true; + let mut user_prompt = format!("User prompt: {}\n\n session_id:\n{}", prompt.message, prompt.id); + + let tools = mcp_service.list_tools(Default::default()).await?; + + while continue_chat { + // Call the API with user prompt and history object. + let response = agents::GenericAgent::from_config(prompt.config.clone()).send_chat_message(user_prompt.clone(), &mut history, tools.clone()).await; + + match response { + Ok(res) => { + let tool_calls = res.message.tool_calls.clone(); + + // If LLM did not call any function then there is nothing to do, if response contains + // the message then display that message to the user via channel and close the loop. + if tool_calls.is_empty() { + continue_chat = false; + if !res.message.content.is_empty() { + let _ = response_tx.send(Response::Complete(res.message.content.clone())).await; + } + } + + // Call each tool in the tool calls. + for tool_call in tool_calls { + // `complete_chat` marks the end of conversation, if LLM is calling this tool then chat is complete and + // we should send correct response to the user and end the chat loop. + if tool_call.function.name == "complete_chat" { + continue_chat = false; + let _ = response_tx.send(Response::Complete("Complete the task successfully".to_string())).await; + let _ = response_tx.send(Response::Complete(res.message.content.clone())).await; + } + + let param = CallToolRequestParams::new(tool_call.function.name.clone()).with_arguments(fetch_arguments(&tool_call)); + + // Call MCP tool with tool params. + match mcp_service.call_tool(param).await { + Ok(tool_result) => { + if tool_result.is_error.is_some_and(|x| x) { + continue_chat = false; + log::error!("Error while calling a tool from MCP {tool_result:?}"); + } + // if tool call is + user_prompt = tool_result + .content + .first() + .and_then(|raw| raw.raw.as_text()).map(|content| content.text.clone()) + .unwrap_or_default(); + + if user_prompt.is_empty() { + user_prompt = String::from("Did not receive text result from MCP server; Call same tool again but check the parameters this time"); + } + let _ = response_tx.send(Response::Progress(format!("Applied tool {}", tool_call.function.name))).await; + }, + Err(e) => { + error!("πŸ”΄ Tool call failed: {:?}", e); + println!("πŸ”΄ Tool call failed: {:?}", e); + let _ = response_tx.send(Response::Complete(String::from("Failed calling tool from application"))).await; + + } + } + } + }, + Err(err) => { + error!("πŸ”΄ MCP Client failed to get mock prompt response: {err:?}"); + } + } + } + } + } + } + + #[allow(unreachable_code)] + Ok(()) + } +} + +fn fetch_arguments(tool_call: &ollama_rs::generation::tools::ToolCall) -> rmcp::model::JsonObject { + let args_value = tool_call.function.arguments.clone(); + let args_value = normalize_args_value(args_value); + let args_object = match args_value { + Value::Object(map) => map, + Value::Null => { + warn!( + "⚠️ Tool call arguments are null; using empty parameters for {}", + tool_call.function.name + ); + serde_json::Map::new() + } + _ => { + warn!( + "⚠️ Tool call arguments are not an object for {}", + tool_call.function.name + ); + serde_json::Map::new() + } + }; + + let mut json_obj = rmcp::model::JsonObject::new(); + for (key, value) in args_object { + json_obj.insert(key, normalize_json_value(value)); + } + + warn!("β˜‘οΈ Final call parameters are {json_obj:?}"); + json_obj +} + +fn normalize_args_value(value: Value) -> Value { + match value { + Value::String(raw) => parse_jsonish_string(&raw), + other => other, + } +} + +fn parse_jsonish_string(raw: &str) -> Value { + let mut trimmed = raw.trim(); + if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("null") { + return Value::Null; + } + if trimmed.starts_with("```") { + if let Some(end) = trimmed.rfind("```") { + trimmed = &trimmed[3..end]; + if trimmed.trim_start().starts_with("json") { + trimmed = trimmed.trim_start_matches("json").trim(); + } + } + } + if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("null") { + return Value::Null; + } + let parsed = serde_json::from_str::(trimmed).unwrap_or_default(); + match parsed { + Value::String(inner) => serde_json::from_str::(inner.trim()).unwrap_or_default(), + other => other, + } +} + +fn normalize_json_value(value: Value) -> Value { + match value { + Value::String(raw) => serde_json::from_str::(&raw).unwrap_or(Value::String(raw)), + Value::Array(items) => Value::Array(items.into_iter().map(normalize_json_value).collect()), + Value::Object(map) => Value::Object( + map.into_iter() + .map(|(k, v)| (k, normalize_json_value(v))) + .collect(), + ), + other => other, + } +} diff --git a/crates/mcp/src/config.rs b/crates/mcp/src/config.rs new file mode 100644 index 0000000000..871254d099 --- /dev/null +++ b/crates/mcp/src/config.rs @@ -0,0 +1,48 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Default, Debug, Serialize, Deserialize, PartialEq)] +pub enum LlmProvider { + #[default] + Ollama, + OpenAI, + Antropic, + Gemini, + Custom, +} + +impl From<&str> for LlmProvider { + fn from(val: &str) -> Self { + match val.to_lowercase().as_str() { + "ollama" => Self::Ollama, + "openai" => Self::OpenAI, + "anthropic" => Self::Antropic, + "gemini" => Self::Gemini, + _ => Self::Custom, + } + } +} + +impl From for LlmProvider { + fn from(val: String) -> Self { + Self::from(val.as_str()) + } +} + +#[derive(Debug, Clone)] +pub struct AiConfig { + pub provider: LlmProvider, + pub model: String, + pub url: String, + pub api_key: Option, +} + +impl Default for AiConfig { + fn default() -> Self { + Self { + provider: LlmProvider::default(), + model: String::from("qwen3:8b"), + url: String::from("http://localhost:11434"), + api_key: None, + } + } +} diff --git a/crates/mcp/src/errors.rs b/crates/mcp/src/errors.rs new file mode 100644 index 0000000000..543b2d7b5f --- /dev/null +++ b/crates/mcp/src/errors.rs @@ -0,0 +1,43 @@ +use rmcp::ServiceError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum McpError { + // Client errors + #[error("MCP service error: {0}")] + Service(#[from] ServiceError), + + #[error("Connection failed: {message}")] + Connection { message: String }, + + #[error("Request timeout: {message}")] + Timeout { message: String }, + + #[error("Error while sending ChatMessage: {message}")] + ChatError { message: String }, + + // Server errors + #[error("Tool execution failed: {message}")] + ToolExecution { message: String }, + + #[error("Setup failed: {message}")] + Setup { message: String }, + + // Generic + #[error("{message}")] + Generic { message: String }, + + // Task execution errors + #[error("Task execution failed: {0}")] + TaskExecutionFailed(String), +} + +#[derive(Debug, thiserror::Error)] +pub enum ConversionError { + #[error("OpenAI response has no choices")] + NoChoices, + #[error("Unknown role: {0}")] + UnknownRole(String), + #[error("Failed to parse tool arguments: {0}")] + JsonParse(#[from] serde_json::Error), +} diff --git a/crates/mcp/src/lib.rs b/crates/mcp/src/lib.rs new file mode 100644 index 0000000000..de1894aebc --- /dev/null +++ b/crates/mcp/src/lib.rs @@ -0,0 +1,18 @@ +pub mod chat; +pub mod client; +pub mod config; +pub mod errors; +pub mod server; +pub mod tool_params; +pub mod types; + +use tokio::sync::mpsc; + +use crate::{chat::Prompt, types::Response}; +use server::tasks::Tasks; + +pub struct McpChannelEndpoints { + pub prompt_tx: mpsc::Sender, + pub response_rx: mpsc::Receiver, + pub task_rx: mpsc::Receiver, +} diff --git a/crates/mcp/src/server/mod.rs b/crates/mcp/src/server/mod.rs new file mode 100644 index 0000000000..52dce29e0d --- /dev/null +++ b/crates/mcp/src/server/mod.rs @@ -0,0 +1,586 @@ +pub mod tasks; + +pub const BIND_ADDRESS: &str = "127.0.0.1:8181"; + +use non_exhaustive::non_exhaustive; +use std::ops::RangeInclusive; + +use anyhow::Result; +use rmcp::{ + ErrorData as RmcpError, + handler::server::{ServerHandler, tool::ToolRouter, wrapper::Parameters}, + model::{CallToolResult, Content, ErrorCode, ServerCapabilities, ServerInfo}, + tool, tool_handler, tool_router, + transport::streamable_http_server::{ + session::local::LocalSessionManager, + tower::{StreamableHttpServerConfig, StreamableHttpService}, + }, +}; +use tokio::sync::{broadcast, mpsc}; + +use crate::{ + errors::McpError, + tool_params::{ + AnalyzeLogsRequest, ChatResult, GrabLineParams, MapRequest, SearchFilters, ValuesRequest, + }, + types::TaskResult, +}; +use tasks::{Tasks, Tasks::*}; + +#[derive(Clone, Debug)] +pub struct McpServer { + task_tx: broadcast::Sender, + pub tool_router: ToolRouter, +} + +#[tool_handler] +impl ServerHandler for McpServer { + fn get_info(&self) -> ServerInfo { + let capabilities = ServerCapabilities::builder() + .enable_tools() + .enable_resources() + .enable_prompts() + .build(); + ServerInfo::new(capabilities).with_instructions("Chipmunk MCP Server") + } +} + +#[tool_router] +impl McpServer { + pub fn new(task_tx: broadcast::Sender) -> Self { + Self { + task_tx, + tool_router: Self::tool_router(), + } + } + + pub async fn start(self) -> Result<()> { + let ct = tokio_util::sync::CancellationToken::new(); + + let server_config = non_exhaustive! {StreamableHttpServerConfig { + cancellation_token: ct.child_token(), + }}; + let service = StreamableHttpService::new( + { + let server = self.clone(); + move || Ok(server.clone()) + }, + LocalSessionManager::default().into(), + server_config, + ); + + let router = axum::Router::new().nest_service("/mcp", service); + let tcp_listener = tokio::net::TcpListener::bind(BIND_ADDRESS).await?; + + tokio::spawn(async move { + if let Err(err) = axum::serve(tcp_listener, router).await { + log::error!("MCP server error: {:?}", err); + } + }); + log::debug!("MCP server started"); + + Ok(()) + } + + // #[tool(description = r#"Generate SearchFilter objects for filtering logs. + + // This tool accepts one or more filter specifications along with session_id and returns a list of SearchFilter objects. + // Each filter can be customized with flags for regex matching, case sensitivity, and word boundaries. + // Session id is present in the prompt and can be used by the LLM Model and sent back in the parameters. + // This will help the MCP server to identify which session has requested for the filter and apply it to the correct session. + + // **Input Parameters:** + // - `filters`: An list of filter objects, where each object contains: + // - `value` (string): The text or pattern to search for + // - `is_regex` (boolean): true if the filter is a regular expression pattern + // - `ignore_case` (boolean): False for case sensitive matching and vice versa, defaults to true. + // - `is_word` (boolean): true to match whole words only (word boundary matching) + // - `session_id` (string): Unique identifier for the session requesting the filter, used to apply the filter to the correct session. Present in Chat message. + + // **Usage Examples:** + + // Single filter: + // - Input: [{"value": "error", "is_regex": true, "ignore_case": false, "is_word": false}] + // - Use case: Find exact matches of "error" + + // - Input: [{"value": "time=13", "is_regex": true, "ignore_case": false, "is_word": false}] + // - Use case: Find lines in log where time is around 13 + + // - Input: [{"value": "world bank", "is_regex": false, "ignore_case": true, "is_word": false}] + // - Use case: Find lines in log containing world bank + + // Multiple filters: + // - Input: [ + // {"value": "ERROR", "is_regex": true, "ignore_case": true, "is_word": false}, + // {"value": "\\d{4}-\\d{2}-\\d{2}", "is_regex": true, "ignore_case": false, "is_word": false} + // ] + // - Use case: Find "ERROR" (any case) OR date patterns + + // Common patterns: + // - Regex pattern: {"value": "\\b(error|fail|exception)\\b", "is_regex": true, "ignore_case": false, "is_word": false} + // - Exact match: {"value": "timeout", "is_regex": true, "ignore_case": false, "is_word": false} + + // **Natural Language Interpretation:** + // When the user provides natural language instructions, interpret them as follows: + // - "error" β†’ single filter for "error" + // - "error or warning" β†’ two filters, one for "error" and one for "warning" + // - "case-insensitive ERROR" β†’ set ignore_case: true, is_regex: true + // - "match the word 'timeout'" β†’ set is_word: true, is_regex: true, is_word: false + // - "regex pattern \\d+" β†’ set is_regex: true + // - "find ERROR, WARNING, and CRITICAL" β†’ three separate filters, but beware these are not the words, so `is_word` boolean value should be false. + // "#)] + #[tool( + description = r#"Apply one or more search filters to filter log lines in the current session. + +Filters are applied with OR logic β€” a log line is shown if it matches ANY filter. +Each filter targets a text pattern or regex. The session_id ties the filter to the correct log viewer session. + +--- + +PARAMETERS + +session_id (string, required) + The session UUID provided in the user's message. Must be a valid UUID (e.g. "550e8400-e29b-41d4-a716-446655440000"). + Always extract and forward this exactly as provided β€” do not generate or modify it. + +filters (array, required, min 1 item) + List of filter objects. Each object has: + + value (string, required) + The text or regex pattern to match against log lines. + For regex: escape backslashes (e.g. "\\d+" not "\d+"). + + is_regex (boolean, required) + true β†’ treat `value` as a regular expression + false β†’ treat `value` as a plain text substring + + ignore_case (boolean, required) + true β†’ case-insensitive match (default for most searches) + false β†’ case-sensitive match (use when casing is significant, e.g. "ERROR" vs "error") + + is_word (boolean, required) + true β†’ match whole words only (wraps pattern in \b word boundaries) + false β†’ match anywhere in the line (including inside words) + +--- + +NUMERIC AND COMPARISON EXPRESSIONS + + Regex cannot perform numeric comparisons (>=, <=, >, <). + When the user writes expressions like `time>=10.0ms`, `latency>500`, or `duration<=2s`, + translate them into a regex that matches the relevant numeric field, and inform the user + that results are approximate (pattern-based, not mathematically exact). + + STRATEGY: Match the field name and capture plausible numeric values using regex. + Let the user know the filter is approximate and they may need to refine it. + + TRANSLATION EXAMPLES + + "time>=10.0ms" + β†’ Inform user: "Regex cannot do >= comparisons. Applying a pattern that matches + time=10.x, time=1x.x, time=2x.x etc. Results are approximate." + β†’ filters: [{"value": "time=([1-9]\\d+|10)\\.?\\d*ms", "is_regex": true, "ignore_case": false, "is_word": false}] + + "latency>500ms" + β†’ Match 3-digit numbers starting with 5-9 (500–999) and all 4+ digit numbers + β†’ filters: [{"value": "latency=([5-9]\\d{2}|\\d{4,})ms", "is_regex": true, "ignore_case": false, "is_word": false}] + + "duration<=2s" + β†’ Match 0s, 1s, 2s (integers) or decimals like 1.5s, 0.9s + β†’ filters: [{"value": "duration=([01](\\.\\d+)?|2(\\.0+)?)s", "is_regex": true, "ignore_case": false, "is_word": false}] + + ALWAYS accompany a comparison-translated filter with a message like: + "⚠️ Regex cannot evaluate >= numerically. This filter approximates lines where + [field] is likely >= [value], but edge cases may appear or be missed. + Consider narrowing with a more specific pattern if needed." + +--- + +NATURAL LANGUAGE β†’ PARAMETER MAPPING + + "error" β†’ [{value:"error", is_regex:false, ignore_case:true, is_word:false}] + "exact ERROR (case-sensitive)" β†’ [{value:"ERROR", is_regex:false, ignore_case:false, is_word:false}] + "whole word timeout" β†’ [{value:"timeout", is_regex:false, ignore_case:true, is_word:true}] + "error or warning" β†’ two filters: one for "error", one for "warning" + "ERROR, WARNING, CRITICAL" β†’ three filters, one per term, is_word:false (labels, not word-boundary tokens) + "lines with a date like 2024-01" β†’ [{value:"\\d{4}-\\d{2}", is_regex:true, ignore_case:false, is_word:false}] + "time around 13" β†’ [{value:"time=13", is_regex:true, ignore_case:false, is_word:false}] + "error or exception (regex)" β†’ [{value:"error|exception", is_regex:true, ignore_case:true, is_word:false}] + +--- + +DECISION RULES + + 1. Use is_regex:true when the user mentions "regex", uses special characters (. * + ? [ ] ( ) | \ ^ $), or needs alternation (a|b). + 2. Use is_regex:false for plain keyword searches β€” simpler and less error-prone. + 3. Use ignore_case:false only when the user says "case-sensitive" or "exact case". + 4. Use is_word:true only when the user says "whole word" or "word boundary". + 5. Multiple keywords in one request β†’ one filter object per keyword (OR semantics). + 6. Never combine unrelated patterns into one regex when separate filters express intent more clearly. + +--- + +EXAMPLES + +Single keyword (case-insensitive): + filters: [{"value": "timeout", "is_regex": false, "ignore_case": true, "is_word": false}] + +Whole-word match: + filters: [{"value": "fail", "is_regex": false, "ignore_case": true, "is_word": true}] + +Case-sensitive exact label: + filters: [{"value": "CRITICAL", "is_regex": false, "ignore_case": false, "is_word": false}] + +Two keywords (OR): + filters: [ + {"value": "error", "is_regex": false, "ignore_case": true, "is_word": false}, + {"value": "warning", "is_regex": false, "ignore_case": true, "is_word": false} + ] + +Regex β€” date pattern: + filters: [{"value": "\\d{4}-\\d{2}-\\d{2}", "is_regex": true, "ignore_case": false, "is_word": false}] + +Regex β€” multiple severities: + filters: [{"value": "error|warn|fatal", "is_regex": true, "ignore_case": true, "is_word": false}] +"# + )] + async fn apply_search_filter( + &self, + Parameters(params): Parameters, + ) -> Result { + let (task_result_tx, task_result_rx) = mpsc::channel(1); + let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| { + RmcpError::new( + ErrorCode(503), + format!("Invalid session_id format: {}", e), + None, + ) + })?; + let filters: Vec = params + .filters + .iter() + .map(|filter| { + processor::search::filter::SearchFilter::plain(filter.value.clone()) + .regex(filter.is_regex) + .ignore_case(filter.ignore_case) + .word(filter.is_word) + }) + .collect(); + let task = ApplySearchFilter { + session_id, + task_result_tx, + filters, + }; + // Send task over communication channel in a separate thread, + // in future, we can skip match over task spawn + match self.task_tx.send(task) { + Ok(_) => log::warn!("🟒 MCP Server sent search task to MCP server"), + Err(err) => log::error!( + "Failed to send Search task to MCP server: ApplyFilter: {}", + err + ), + }; + + // Wait for the response from task over communication channel + // based on the response send back the JSON response to client + handle_task_response(task_result_rx, "apply_search_filters").await + } + + #[tool( + description = r#"Analyze logs iteratively by requesting specific line ranges and optionally applying an action. + +Use this tool in a loop: +1) Request a line `range` to inspect raw logs. +2) Review returned lines. +3) Repeat with another range as needed. +4) On final call, optionally execute an action. + +Supported actions: +- `none`: only return requested lines. +- `apply_filter`: apply one or more search filters to the session. +- `jump_to_line`: move UI focus to a specific log line. + +Input parameters: +- `session_id` (string): target session UUID. +- `range` (optional): {"start": u64, "end": u64} inclusive line range to fetch. +- `action` (optional): one of `none`, `apply_filter`, `jump_to_line`. +- `filters` (optional): list of filters, required for `apply_filter`. +- `jump_to_line` (optional): target line index, required for `jump_to_line`. +- `note` (optional): free-form summary/note from the LLM for traceability. +"# + )] + async fn analyze_logs( + &self, + Parameters(params): Parameters, + ) -> Result { + let (task_result_tx, task_result_rx) = mpsc::channel(1); + let range: Option> = params.range.clone(); + let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| { + RmcpError::new( + ErrorCode(503), + format!("Invalid session_id format: {}", e), + None, + ) + })?; + + let filters: Vec = params + .filters + .iter() + .map(|filter| { + processor::search::filter::SearchFilter::plain(filter.value.clone()) + .regex(filter.is_regex) + .ignore_case(filter.ignore_case) + .word(filter.is_word) + }) + .collect(); + + let task = Tasks::AnalyzeLogFile { + range, + session_id, + action: params.action, + filters, + jump_to_line: params.jump_to_line, + note: params.note, + task_result_tx, + }; + self.task_tx.send(task).map_err(|e| { + RmcpError::new( + ErrorCode::INTERNAL_ERROR, + format!("Failed to send AnalyzeLogFile: {e}"), + None, + ) + })?; + handle_task_response(task_result_rx, "analyze_logs").await + } + + #[tool(description = r#"Get histogram data for charts. + +Returns a histogram of matches for the current search within an optional range. + +**Input Parameters:** +- `dataset_len`: number of bars to produce +- `range`: optional {"start": u64, "end": u64} range in the stream + +**Usage Examples:** +- Input: {"dataset_len": 60, "range": {"start": 0, "end": 100000}} +- Use case: Build a 60-bin histogram for the first 100k log entries +"#)] + async fn get_chart_histogram( + &self, + Parameters(params): Parameters, + ) -> Result { + let (task_result_tx, task_result_rx) = mpsc::channel(1); + let range: Option> = params.range.clone(); + let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| { + RmcpError::new( + ErrorCode(503), + format!("Invalid session_id format: {}", e), + None, + ) + })?; + let task = Tasks::GetChartHistogram { + dataset_len: params.dataset_len, + range, + session_id, + task_result_tx, + }; + self.task_tx.send(task).map_err(|e| { + RmcpError::new( + ErrorCode::INTERNAL_ERROR, + format!("Failed to send GetChartHistogram: {e}"), + None, + ) + })?; + handle_task_response(task_result_rx, "get_chart_histogram").await + } + + #[tool(description = r#"Get line plot points for charts. + +Returns point data for line plots based on extracted values within an optional range. + +**Input Parameters:** +- `dataset_len`: number of points to produce +- `range`: optional {"start": u64, "end": u64} range in the stream + +**Usage Examples:** +- Input: {"dataset_len": 120, "range": {"start": 5000, "end": 25000}} +- Use case: Build a 120-point chart for a specific time window +"#)] + async fn get_chart_line_plots( + &self, + Parameters(params): Parameters, + ) -> Result { + let (task_result_tx, task_result_rx) = mpsc::channel(1); + let range = params.range.clone(); + let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|e| { + RmcpError::new( + ErrorCode(503), + format!("Invalid session_id format: {}", e), + None, + ) + })?; + let task = Tasks::GetChartLinePlots { + dataset_len: params.dataset_len, + range, + session_id, + task_result_tx, + }; + self.task_tx.send(task).map_err(|e| { + RmcpError::new( + ErrorCode::INTERNAL_ERROR, + format!("Failed to send GetChartLinePlots: {e}"), + None, + ) + })?; + handle_task_response(task_result_rx, "get_chart_line_plots").await + } + + #[tool(description = r#"Retrieve log lines within a specified range. + + Fetches raw log lines from the current session based on the provided line range. + This tool is useful for inspecting specific sections of logs during iterative analysis. + + **Input Parameters:** + - `range` (RangeInclusive): Inclusive range of line numbers to retrieve. + Usually represented as range(0, 100) which translates to std::ops::RangeInclusive::new(0, 100) in rust + In following Usage examples range represented as + - range(5000, 5050) => std::ops::RangeInclusive::new(5000, 5050) + - `session_id`: Session ID that prompt contains + + **Output:** + Returns a vector of strings, where each string represents a log line within the specified range. + + **Usage Examples:** + - Sample Prompts + - Retrive first 100 lines from log file. + - Grab first 100 lines from log + - Get me 100 lines from file + - Select (first) 100 lines from the file + - Input: { session_id: session_id, range: range(0, 100)} + - Use case: Fetch the first 100 lines of the log file + + - Sample Prompts + - Retrive lines between 5000 and 5050 + - Grab first 50 lines from log starting line index 5000 + - Select (first) 50 lines from the file starting at line number 5000 + - Input: { session_id: session_id, range: range(5000, 5050)) + - Use case: Inspect a specific section of 51 lines in the middle of the log + + - Input: { session_id: session_id, range: range(1000, 1000) } + - Use case: Retrieve a single specific line + + **Notes:** + - Both start and end indices are inclusive + - Line numbers are 0-indexed + - If the range exceeds available lines, only existing lines are returned + - Invoked by the LLM via the MCP protocol for interactive log exploration + + "#)] + async fn grab_lines( + &self, + Parameters(params): Parameters, + ) -> Result { + let session_id = uuid::Uuid::parse_str(params.session_id.as_str()).map_err(|_| { + RmcpError::new( + ErrorCode::INVALID_PARAMS, + format!("Error while fetching the session ID from parameters"), + None, + ) + })?; + let (task_result_tx, task_result_rx) = mpsc::channel(1); + let task = Tasks::GrabLines { + session_id, + range: params.range, + task_result_tx: task_result_tx.clone(), + }; + self.task_tx.send(task).map_err(|_| { + RmcpError::new( + ErrorCode::INTERNAL_ERROR, + format!("Error while sending the task to app"), + None, + ) + })?; + + handle_task_response(task_result_rx, "grab_lines").await + } + + #[tool( + description = r#"Complete an MCP chat workflow and publish the final outcome. + +This tool must be called as the final step after all analysis or data-retrieval tool calls are done. +It marks the MCP tool-call lifecycle as complete for a session and returns a final payload that the +client can treat as the authoritative chat outcome. + +This tool response should contain the answer to the question that user asked initially. For example +if user asked to perform some task then tool response should contain the summary of all the operations +and final answer if there should be any. +If user has asked, say to find summary, or analyze data then this tool response should contain the answer. +In general response should not be empty and should contain the meaningful message for the end user. + +Use this tool to: +- signal that no further MCP tool calls are required for the current chat turn, +- provide the final user-facing summary/result text in the response, +- explicitly communicate whether the overall chat operation succeeded or failed. + +Input parameters: +- `session_id` (string): UUID of the session whose chat workflow is being finalized. +- `result` (string): Final message/result produced by the chat workflow. This can be either: + - a success summary (answer, findings, recommendation), or + - an error summary describing what failed and why. +- `success` (boolean): Final status flag for the chat workflow. + - `true` means the workflow completed successfully. + - `false` means the workflow ended with an error or incomplete state. + +Output: +- Echoes the same `ChatResult` JSON payload (`session_id`, `result`, `success`) as the final MCP response. + +Usage examples: +- Success: + - Input: {"session_id":"","result":"Found root cause in lines 1200-1218.","success":true} +- Error: + - Input: {"session_id":"","result":"Failed to parse session_id: invalid UUID.","success":false} +"# + )] + async fn complete_chat( + &self, + Parameters(params): Parameters, + ) -> Result { + println!("***DEBUG: Finished chat analysis"); + Ok(CallToolResult::success(vec![Content::json(params)?])) + } +} + +async fn handle_task_response( + mut task_result_rx: mpsc::Receiver>, + task_name: &str, +) -> Result { + let task_result = task_result_rx.recv().await; + + match task_result { + None => Err(RmcpError::new( + ErrorCode::INTERNAL_ERROR, + format!("Error while applying task {task_name}"), + None, + )), + Some(result) => match result { + Ok(TaskResult::Complete(_m)) => { + let message = format!("Completed `{task_name}` tool call successfully"); + Ok(CallToolResult::success(vec![Content::json(message)?])) + } + Ok(TaskResult::AnalyzeLogs(log_result)) => { + let message = format!("Result for {task_name}\n\n {log_result:?}"); + Ok(CallToolResult::success(vec![Content::json(message)?])) + } + Ok(TaskResult::RequestLines(range)) => { + Ok(CallToolResult::success(vec![Content::json(range)?])) + } + Ok(TaskResult::Failed(error_msg)) => { + Ok(CallToolResult::error(vec![Content::json(error_msg)?])) + } + Err(err) => { + let err_msg = format!("{task_name} resulted in Error operation: {err}"); + Ok(CallToolResult::error(vec![Content::json(err_msg)?])) + } + }, + } +} diff --git a/crates/mcp/src/server/tasks.rs b/crates/mcp/src/server/tasks.rs new file mode 100644 index 0000000000..7854cd90c7 --- /dev/null +++ b/crates/mcp/src/server/tasks.rs @@ -0,0 +1,51 @@ +use std::ops::RangeInclusive; + +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::{errors::McpError, tool_params::AnalyzeAction, types::TaskResult}; +use processor::search::filter::SearchFilter; + +#[derive(Debug, Clone)] +pub enum Tasks { + ApplySearchFilter { + session_id: Uuid, + filters: Vec, + task_result_tx: mpsc::Sender>, + }, + GetChartHistogram { + session_id: Uuid, + dataset_len: u16, + range: Option>, + task_result_tx: mpsc::Sender>, + }, + GetChartLinePlots { + session_id: Uuid, + dataset_len: u16, + range: Option>, + task_result_tx: mpsc::Sender>, + }, + GenericTask { + session_id: Uuid, + task_result_tx: mpsc::Sender>, + }, + AnalyzeLogFile { + session_id: Uuid, + range: Option>, + action: AnalyzeAction, + filters: Vec, + jump_to_line: Option, + note: Option, + task_result_tx: mpsc::Sender>, + }, + GrabLines { + session_id: Uuid, + range: RangeInclusive, + task_result_tx: mpsc::Sender>, + }, + CompleteChat { + session_id: Uuid, + final_result: String, + task_result_tx: mpsc::Sender>, + }, +} diff --git a/crates/mcp/src/tool_params.rs b/crates/mcp/src/tool_params.rs new file mode 100644 index 0000000000..371bf7ee01 --- /dev/null +++ b/crates/mcp/src/tool_params.rs @@ -0,0 +1,109 @@ +use std::{fmt, ops::RangeInclusive}; + +use rmcp::schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct SearchFilter { + pub value: String, + #[serde(default)] + pub is_regex: bool, + #[serde(default)] + pub ignore_case: bool, + #[serde(default)] + pub is_word: bool, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct SearchFilters { + pub filters: Vec, + pub session_id: String, +} + +impl fmt::Display for SearchFilters { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let filters: Vec = self + .filters + .iter() + .map(|filter| { + format!( + "{{ value: {}, is_regex: {}, ignore_case: {}, is_word: {} }}", + filter.value, filter.is_regex, filter.ignore_case, filter.is_word + ) + }) + .collect(); + write!(f, "[{}]", filters.join(", ")) + } +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum AnalyzeAction { + None, + ApplyFilter, + JumpToLine, +} + +impl Default for AnalyzeAction { + fn default() -> Self { + Self::None + } +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct AnalyzeLogsRequest { + pub session_id: String, + #[serde(default)] + pub range: Option>, + #[serde(default)] + pub action: AnalyzeAction, + #[serde(default)] + pub filters: Vec, + #[serde(default)] + pub jump_to_line: Option, + #[serde(default)] + pub note: Option, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct LogLine { + pub source_id: u16, + pub pos: u64, + pub nature: u8, + pub content: String, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct AnalyzeLogsResult { + pub requested_range: Option>, + pub lines: Vec, + pub action_status: Option, + pub note: Option, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct MapRequest { + pub dataset_len: u16, + pub range: Option>, + pub session_id: String, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct ValuesRequest { + pub dataset_len: u16, + pub range: Option>, + pub session_id: String, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct GrabLineParams { + pub session_id: String, + pub range: RangeInclusive, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct ChatResult { + pub session_id: String, + pub result: String, + pub success: bool, +} diff --git a/crates/mcp/src/types.rs b/crates/mcp/src/types.rs new file mode 100644 index 0000000000..303238b80a --- /dev/null +++ b/crates/mcp/src/types.rs @@ -0,0 +1,49 @@ +use rmcp::schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::tool_params::AnalyzeLogsResult; + +#[derive(Clone, Debug)] +pub enum Response { + Complete(String), + Progress(String), +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct RangeU64 { + pub start: u64, + pub end: u64, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct ExportRequest { + pub out_path: String, + pub ranges: Vec, + pub columns: Vec, + pub spliter: Option, + pub delimiter: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TaskResult { + Complete(String), + RequestLines(Vec), + AnalyzeLogs(AnalyzeLogsResult), + Failed(String), +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct ExportRawRequest { + pub out_path: String, + pub ranges: Vec, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct CancelRequest { + pub target: String, +} + +#[derive(Clone, Debug, JsonSchema, Serialize, Deserialize)] +pub struct NearestPositionRequest { + pub position_in_stream: u64, +}