diff --git a/.github/labeler.yml b/.github/labeler.yml index aeb88f2b7f..56b999b18f 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -39,6 +39,11 @@ integration:azure-doc-intelligence: - any-glob-to-any-file: "integrations/azure_doc_intelligence/**/*" - any-glob-to-any-file: ".github/workflows/azure_doc_intelligence.yml" +integration:cognee: + - changed-files: + - any-glob-to-any-file: "integrations/cognee/**/*" + - any-glob-to-any-file: ".github/workflows/cognee.yml" + integration:chroma: - changed-files: - any-glob-to-any-file: "integrations/chroma/**/*" diff --git a/.github/workflows/cognee.yml b/.github/workflows/cognee.yml new file mode 100644 index 0000000000..d4441fb7b1 --- /dev/null +++ b/.github/workflows/cognee.yml @@ -0,0 +1,79 @@ +# This workflow comes from https://github.com/ofek/hatch-mypyc +# https://github.com/ofek/hatch-mypyc/blob/5a198c0ba8660494d02716cfc9d79ce4adfb1442/.github/workflows/test.yml +name: Test / cognee + +on: + schedule: + - cron: "0 0 * * *" + pull_request: + paths: + - "integrations/cognee/**" + - "!integrations/cognee/*.md" + - ".github/workflows/cognee.yml" + +defaults: + run: + working-directory: integrations/cognee + +concurrency: + group: cognee-${{ github.head_ref }} + cancel-in-progress: true + +env: + PYTHONUNBUFFERED: "1" + FORCE_COLOR: "1" + +jobs: + run: + name: Python ${{ matrix.python-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }} + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + python-version: ["3.10", "3.14"] + + steps: + - name: Support longpaths + if: matrix.os == 'windows-latest' + working-directory: . + run: git config --system core.longpaths true + + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0 + with: + python-version: ${{ matrix.python-version }} + + - name: Install Hatch + run: pip install --upgrade hatch + - name: Lint + if: matrix.python-version == '3.10' && runner.os == 'Linux' + run: hatch run fmt-check && hatch run test:types + + - name: Run tests + run: hatch run test:cov-retry + + - name: Run unit tests with lowest direct dependencies + run: | + hatch run uv pip compile pyproject.toml --resolution lowest-direct --output-file requirements_lowest_direct.txt + hatch -e test env run -- uv pip install -r requirements_lowest_direct.txt + hatch run test:unit + + - name: Nightly - run tests with Haystack main branch + if: github.event_name == 'schedule' + run: | + hatch env prune + hatch -e test env run -- uv pip install git+https://github.com/deepset-ai/haystack.git@main + hatch run test:cov-retry + + + notify-slack-on-failure: + needs: run + if: failure() && github.event_name == 'schedule' + runs-on: ubuntu-slim + steps: + - uses: deepset-ai/notify-slack-action@3cda73b77a148f16f703274198e7771340cf862b # v1 + with: + slack-webhook-url: ${{ secrets.SLACK_WEBHOOK_URL_NOTIFICATIONS }} diff --git a/README.md b/README.md index 6f91668cb7..10c249ffd3 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Please check out our [Contribution Guidelines](CONTRIBUTING.md) for all the deta | [azure-ai-search-haystack](integrations/azure_ai_search/) | Document Store | [![PyPI - Version](https://img.shields.io/pypi/v/azure-ai-search-haystack.svg)](https://pypi.org/project/azure-ai-search-haystack) | [![Test / azure-ai-search](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_ai_search.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_ai_search.yml) | | [azure-doc-intelligence-haystack](integrations/azure_doc_intelligence/) | Converter | [![PyPI - Version](https://img.shields.io/pypi/v/azure-doc-intelligence-haystack.svg)](https://pypi.org/project/azure-doc-intelligence-haystack) | [![Test / azure_doc_intelligence](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_doc_intelligence.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_doc_intelligence.yml) | | [chroma-haystack](integrations/chroma/) | Document Store | [![PyPI - Version](https://img.shields.io/pypi/v/chroma-haystack.svg)](https://pypi.org/project/chroma-haystack) | [![Test / chroma](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/chroma.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/chroma.yml) | +| [cognee-haystack](integrations/cognee/) | Connector | [![PyPI - Version](https://img.shields.io/pypi/v/cognee-haystack.svg)](https://pypi.org/project/cognee-haystack) | [![Test / cognee](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cognee.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cognee.yml) | | [cohere-haystack](integrations/cohere/) | Embedder, Generator, Ranker | [![PyPI - Version](https://img.shields.io/pypi/v/cohere-haystack.svg)](https://pypi.org/project/cohere-haystack) | [![Test / cohere](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cohere.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cohere.yml) | | [cometapi-haystack](integrations/cometapi/) | Generator | [![PyPI - Version](https://img.shields.io/pypi/v/cometapi-haystack.svg)](https://pypi.org/project/cometapi-haystack) | [![Test / cometapi](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cometapi.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cometapi.yml) | | [deepeval-haystack](integrations/deepeval/) | Evaluator | [![PyPI - Version](https://img.shields.io/pypi/v/deepeval-haystack.svg)](https://pypi.org/project/deepeval-haystack) | [![Test / deepeval](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/deepeval.yml/badge.svg)](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/deepeval.yml) | diff --git a/integrations/cognee/LICENSE b/integrations/cognee/LICENSE new file mode 100644 index 0000000000..de4c7f39f1 --- /dev/null +++ b/integrations/cognee/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2023 deepset GmbH + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/integrations/cognee/README.md b/integrations/cognee/README.md new file mode 100644 index 0000000000..049f783ee4 --- /dev/null +++ b/integrations/cognee/README.md @@ -0,0 +1,14 @@ +# cognee-haystack + +[![PyPI - Version](https://img.shields.io/pypi/v/cognee-haystack.svg)](https://pypi.org/project/cognee-haystack) +[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/cognee-haystack.svg)](https://pypi.org/project/cognee-haystack) + +[Cognee](https://www.cognee.ai/) integration for [Haystack](https://haystack.deepset.ai/) — open-source memory for AI agents. + +- [Changelog](https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/cognee/CHANGELOG.md) + +--- + +## Contributing + +Refer to the general [Contribution Guidelines](https://github.com/deepset-ai/haystack-core-integrations/blob/main/CONTRIBUTING.md). diff --git a/integrations/cognee/examples/.artifacts/demo_memory_agent.html b/integrations/cognee/examples/.artifacts/demo_memory_agent.html new file mode 100644 index 0000000000..1a31dda784 --- /dev/null +++ b/integrations/cognee/examples/.artifacts/demo_memory_agent.html @@ -0,0 +1,1450 @@ + + + + + +Cognee Knowledge Graph + + + + + + + + + +
+ + + + + +
+ Color: + + + + + +
+ + + +
+ +
+
+ +
+
Laying out graph...
+
+
+
+ +
+
+ + + + + + \ No newline at end of file diff --git a/integrations/cognee/examples/.artifacts/demo_pipeline.html b/integrations/cognee/examples/.artifacts/demo_pipeline.html new file mode 100644 index 0000000000..cf72ecd59c --- /dev/null +++ b/integrations/cognee/examples/.artifacts/demo_pipeline.html @@ -0,0 +1,1450 @@ + + + + + +Cognee Knowledge Graph + + + + + + + + + +
+ + + + + +
+ Color: + + + + + +
+ + + +
+ +
+
+ +
+
Laying out graph...
+
+
+
+ +
+
+ + + + + + \ No newline at end of file diff --git a/integrations/cognee/examples/README.md b/integrations/cognee/examples/README.md new file mode 100644 index 0000000000..d4ef44805b --- /dev/null +++ b/integrations/cognee/examples/README.md @@ -0,0 +1,40 @@ +# Cognee-Haystack Examples + +## Prerequisites + +Install the integration from the repository root: + +```bash +pip install -e "integrations/cognee" +``` + +Set your LLM API key (required by cognee, default OpenAI API key): + +To integrate other LLM providers and other configuration options, see [Cognee Documentation](https://docs.cognee.ai/getting-started/installation#environment-configuration). + + +```bash +export LLM_API_KEY="sk-your-openai-api-key" +``` + +## Examples + +### Pipeline Demo (`demo_pipeline.py`) + +Demonstrates batch document ingestion with `CogneeWriter` (auto_cognify disabled), +followed by a single `CogneeCognifier` pass, then retrieval with `CogneeRetriever`. +Also shows the same flow wired as a connected Haystack Pipeline. + +```bash +python integrations/cognee/examples/demo_pipeline.py +``` + +### Memory Agent Demo (`demo_memory_agent.py`) + +Demonstrates `CogneeMemoryStore` with per-user memory scoping via `user_id`: +- Two users store private memories that are isolated from each other. +- A shared dataset is created and read access is granted across users. + +```bash +python integrations/cognee/examples/demo_memory_agent.py +``` diff --git a/integrations/cognee/examples/demo_memory_agent.py b/integrations/cognee/examples/demo_memory_agent.py new file mode 100644 index 0000000000..04b009a71f --- /dev/null +++ b/integrations/cognee/examples/demo_memory_agent.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python +""" +Demo: Cognee as a Memory Backend with User-Scoped Access + +Shows how CogneeMemoryStore supports per-user memory isolation via user_id, +matching the MemoryStore protocol pattern used by Haystack's experimental Agent. + +Two users (Alice and Bob) each store private memories. Then Alice creates a +shared dataset and grants Bob read access, demonstrating cross-user sharing. + +Note: This demo uses cognee's user management APIs directly for setup (creating +users, granting permissions). These are admin operations outside the Haystack +integration's scope. In production, user management would typically be handled +by the application layer or cognee's API server. + +Prerequisites: + pip install -e "integrations/cognee" + +Set your LLM API key: + export LLM_API_KEY="sk-..." +""" + +import asyncio + +import cognee +from cognee.modules.data.methods import get_authorized_existing_datasets +from cognee.modules.engine.operations.setup import setup +from cognee.modules.users.methods import create_user +from cognee.modules.users.permissions.methods import give_permission_on_dataset +from haystack.dataclasses import ChatMessage + +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +async def main(): + print("=== Cognee Memory Store — User Scoping Demo ===\n") + + # --- Setup: clean slate and create two users --- + print("Setup: Pruning all data and creating users...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + # Re-create the database schema after pruning (prune_system drops all tables) + await setup() + + alice = await create_user("alice@example.com", "password", is_verified=True) + bob = await create_user("bob@example.com", "password", is_verified=True) + alice_id = str(alice.id) + bob_id = str(bob.id) + print(f" Created Alice (id={alice_id[:8]}...) and Bob (id={bob_id[:8]}...)\n") + + # ========================================================================= + # Part 1: Private memories — each user can only see their own + # ========================================================================= + print("--- Part 1: Private memories (user isolation) ---\n") + + alice_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="alice_notes") + bob_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="bob_notes") + + # Alice adds her private memories + print("Alice adds memories to 'alice_notes'...") + alice_store.add_memories( + messages=[ + ChatMessage.from_user("The project deadline is next Friday."), + ], + user_id=alice_id, + ) + print(" Done.\n") + + # Bob adds his private memories + print("Bob adds memories to 'bob_notes'...") + bob_store.add_memories( + messages=[ + ChatMessage.from_user("The client meeting is on Wednesday at 2pm."), + ChatMessage.from_user("The new API endpoint needs authentication."), + ], + user_id=bob_id, + ) + print(" Done.\n") + + # Alice searches her own store — should find results + print("Alice searches her own store for 'project deadline':") + results = alice_store.search_memories(query="What is the project deadline?", user_id=alice_id) + print(f" Found {len(results)} result(s)") + for i, msg in enumerate(results, 1): + print(f" [{i}] {msg.text}") + print() + + # Bob searches his own store — should find results + print("Bob searches his own store for 'client meeting':") + results = bob_store.search_memories(query="When is the client meeting?", user_id=bob_id) + print(f" Found {len(results)} result(s)") + for i, msg in enumerate(results, 1): + print(f" [{i}] {msg.text}") + print() + + # Alice searches Bob's store — should find nothing (no permission) + print("Alice tries to search Bob's store (no permission):") + results = bob_store.search_memories(query="When is the client meeting?", user_id=alice_id) + print(f" Found {len(results)} result(s) — access is isolated!\n") + + # Bob searches Alice's store — should find nothing (no permission) + print("Bob tries to search Alice's store (no permission):") + results = alice_store.search_memories(query="What is the project deadline?", user_id=bob_id) + print(f" Found {len(results)} result(s) — access is isolated!\n") + + # ========================================================================= + # Part 2: Shared dataset — Alice creates it, grants Bob read access + # ========================================================================= + print("--- Part 2: Shared dataset ---\n") + + shared_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="team_shared") + + # Alice adds to the shared dataset (she becomes the owner) + print("Alice adds memories to 'team_shared'...") + shared_store.add_memories( + messages=[ + ChatMessage.from_user("The team standup is every morning at 9am."), + ChatMessage.from_user("Our tech stack is Python, Haystack, and Cognee."), + ], + user_id=alice_id, + ) + print(" Done.\n") + + # Bob tries to search the shared store BEFORE getting permission — should find nothing + print("Bob tries to search 'team_shared' BEFORE permission:") + results = shared_store.search_memories(query="When is the team standup?", user_id=bob_id) + print(f" Found {len(results)} result(s) — no access yet.\n") + + # Grant Bob read permission on the shared dataset + print("Alice grants Bob read access to 'team_shared'...") + shared_datasets = await get_authorized_existing_datasets(["team_shared"], "read", alice) + shared_dataset_id = shared_datasets[0].id + await give_permission_on_dataset(bob, shared_dataset_id, "read") + print(" Done.\n") + + # Bob searches via the MemoryStore — the store automatically resolves shared datasets + print("Bob searches 'team_shared' AFTER getting read permission:") + results = shared_store.search_memories(query="When is the team standup?", user_id=bob_id) + print(f" Found {len(results)} result(s)") + for i, msg in enumerate(results, 1): + print(f" [{i}] {msg.text}") + print() + + print("=== Done ===") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/integrations/cognee/examples/demo_pipeline.py b/integrations/cognee/examples/demo_pipeline.py new file mode 100644 index 0000000000..0a73053618 --- /dev/null +++ b/integrations/cognee/examples/demo_pipeline.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python +""" +Demo: Cognee + Haystack Pipeline — Writer, Cognifier, Retriever + +Demonstrates two ingestion strategies with CogneeWriter: + +1. auto_cognify=True (default) — each writer.run() call adds documents AND + immediately cognifies them into the knowledge graph. Simple but slower + when ingesting many batches, because cognify runs after every add. + +2. auto_cognify=False + CogneeCognifier — documents are added quickly in + multiple batches without building the knowledge graph. Then CogneeCognifier + processes the entire dataset once. This is faster for bulk ingestion. + +Both strategies produce the same result: a searchable knowledge graph that +CogneeRetriever can query. + +Prerequisites: + pip install -e "integrations/cognee" + +Set your LLM API key (Cognee uses it internally): + export LLM_API_KEY="sk-..." +""" + +import asyncio + +import cognee +from haystack import Document, Pipeline + +from haystack_integrations.components.connectors.cognee import CogneeCognifier +from haystack_integrations.components.retrievers.cognee import CogneeRetriever +from haystack_integrations.components.writers.cognee import CogneeWriter + +DOCS_BATCH_1 = [ + Document( + content=( + "Cognee is an open-source memory for AI agents. It builds a knowledge engine " + "that transforms raw data into a persistent, rich, and traceable memory that is " + "searchable by meaning and relationships." + ), + ), + Document( + content=( + "Haystack is an open-source LLM framework by deepset for building production-ready " + "RAG pipelines, agents, and search systems. It uses a component-based architecture " + "where each step is a composable building block." + ), + ), +] + +DOCS_BATCH_2 = [ + Document( + content=( + "Knowledge graphs represent information as nodes (entities) and edges (relationships). " + "They enable semantic search, reasoning, and discovery of hidden connections across " + "large document collections." + ), + ), + Document( + content=( + "The engineering team at Acme Corp consists of Alice (backend lead), Bob (ML engineer), " + "and Carol (infrastructure). They are building a next-generation search platform " + "powered by knowledge graphs and LLMs." + ), + ), +] + +SEARCH_QUERIES = [ + "What is Cognee and what does it do?", + "Who is on the engineering team at Acme Corp?", + "How do knowledge graphs work?", +] + + +def search_and_print(retriever, queries): + for query in queries: + print(f" Query: '{query}'") + result = retriever.run(query=query) + docs = result["documents"] + print(f" Found {len(docs)} result(s):") + for i, doc in enumerate(docs, 1): + print(f" [{i}] {doc.content}...") + print() + + +async def main(): + print("=== Cognee + Haystack Pipeline Demo ===\n") + + # ========================================================================= + # Part 1: auto_cognify=True — add + cognify on every call + # + # Each writer.run() both adds the documents AND cognifies the dataset. + # Simple for small ingestion, but cognify is the expensive step (calls + # the LLM to extract entities, build the graph, generate summaries). + # With N batches, cognify runs N times. + # ========================================================================= + print("--- Part 1: auto_cognify=True (add + cognify each batch) ---\n") + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + writer_auto = CogneeWriter(dataset_name="demo_auto", auto_cognify=True) + + print(f"1. Writing batch 1 ({len(DOCS_BATCH_1)} docs) — adds + cognifies...") + result = writer_auto.run(documents=DOCS_BATCH_1) + print(f" Written: {result['documents_written']} (cognify ran)\n") + + print(f"2. Writing batch 2 ({len(DOCS_BATCH_2)} docs) — adds + cognifies again...") + result = writer_auto.run(documents=DOCS_BATCH_2) + print(f" Written: {result['documents_written']} (cognify ran again)\n") + + print("3. Searching...\n") + retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_auto") + search_and_print(retriever, SEARCH_QUERIES) + + # ========================================================================= + # Part 2: auto_cognify=False + CogneeCognifier — batch add, cognify once + # + # Documents are added quickly without cognifying. Then CogneeCognifier + # processes everything in one pass. With N batches, cognify runs only once. + # ========================================================================= + print("--- Part 2: auto_cognify=False + CogneeCognifier (batch add, cognify once) ---\n") + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + writer_batch = CogneeWriter(dataset_name="demo_batch", auto_cognify=False) + + print(f"4. Writing batch 1 ({len(DOCS_BATCH_1)} docs) — add only, no cognify...") + result = writer_batch.run(documents=DOCS_BATCH_1) + print(f" Written: {result['documents_written']}\n") + + print(f"5. Writing batch 2 ({len(DOCS_BATCH_2)} docs) — add only, no cognify...") + result = writer_batch.run(documents=DOCS_BATCH_2) + print(f" Written: {result['documents_written']}\n") + + print("6. Cognifying the entire dataset in one pass...") + cognifier = CogneeCognifier(dataset_name="demo_batch") + result = cognifier.run() + print(f" Cognified: {result['cognified']}\n") + + print("7. Searching...\n") + retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_batch") + search_and_print(retriever, SEARCH_QUERIES) + + # ========================================================================= + # Part 3: Same batch flow wired as a Haystack Pipeline + # + # CogneeWriter(auto_cognify=False) outputs documents_written, which + # connects to CogneeCognifier's input — so cognify triggers automatically + # after the writer finishes. + # ========================================================================= + print("--- Part 3: Writer + Cognifier as a connected Pipeline ---\n") + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + pipeline = Pipeline() + pipeline.add_component("writer", CogneeWriter(dataset_name="demo_pipeline", auto_cognify=False)) + pipeline.add_component("cognifier", CogneeCognifier(dataset_name="demo_pipeline")) + pipeline.connect("writer.documents_written", "cognifier.documents_written") + + all_docs = DOCS_BATCH_1 + DOCS_BATCH_2 + print(f"8. Running pipeline with {len(all_docs)} documents...") + result = pipeline.run({"writer": {"documents": all_docs}}) + print(f" Cognified: {result['cognifier']['cognified']}\n") + + print("9. Searching...\n") + retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_pipeline") + search_and_print(retriever, SEARCH_QUERIES) + + print("=== Done ===") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/integrations/cognee/pydoc/config_docusaurus.yml b/integrations/cognee/pydoc/config_docusaurus.yml new file mode 100644 index 0000000000..662b1f342e --- /dev/null +++ b/integrations/cognee/pydoc/config_docusaurus.yml @@ -0,0 +1,16 @@ +loaders: + - modules: + - haystack_integrations.components.connectors.cognee.cognifier + - haystack_integrations.components.retrievers.cognee.memory_retriever + - haystack_integrations.components.writers.cognee.memory_writer + - haystack_integrations.memory_stores.cognee.memory_store + search_path: [../src] +processors: + - type: filter + documented_only: true + skip_empty_modules: true +renderer: + description: Cognee integration for Haystack + id: integrations-cognee + filename: cognee.md + title: Cognee diff --git a/integrations/cognee/pyproject.toml b/integrations/cognee/pyproject.toml new file mode 100644 index 0000000000..cb871c1253 --- /dev/null +++ b/integrations/cognee/pyproject.toml @@ -0,0 +1,160 @@ +[build-system] +requires = ["hatchling", "hatch-vcs"] +build-backend = "hatchling.build" + +[project] +name = "cognee-haystack" +dynamic = ["version"] +description = "Haystack integration for Cognee — memory for AI agents" +readme = "README.md" +requires-python = ">=3.10" +license = "Apache-2.0" +keywords = [] +authors = [{ name = "deepset GmbH", email = "info@deepset.ai" }] +classifiers = [ + "License :: OSI Approved :: Apache Software License", + "Development Status :: 3 - Alpha", + "Programming Language :: Python", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +dependencies = [ + "haystack-ai>=2.24.0", + "cognee>=0.5.4,<1.0", +] + +[project.urls] +Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee#readme" +Issues = "https://github.com/deepset-ai/haystack-core-integrations/issues" +Source = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee" + +[tool.hatch.build.targets.wheel] +packages = ["src/haystack_integrations"] + +[tool.hatch.version] +source = "vcs" +tag-pattern = 'integrations\/cognee-v(?P.*)' + +[tool.hatch.version.raw-options] +root = "../.." +git_describe_command = 'git describe --tags --match="integrations/cognee-v[0-9]*"' + +[tool.hatch.envs.default] +installer = "uv" +dependencies = ["haystack-pydoc-tools", "ruff"] + +[tool.hatch.envs.default.scripts] +docs = ["haystack-pydoc pydoc/config_docusaurus.yml"] +fmt = "ruff check --fix {args}; ruff format {args}" +fmt-check = "ruff check {args} && ruff format --check {args}" + +[tool.hatch.envs.test] +dependencies = [ + "pytest", + "pytest-asyncio", + "pytest-cov", + "pytest-rerunfailures", + "mypy", + "pip", +] + +[tool.hatch.envs.test.scripts] +unit = 'pytest -m "not integration" {args:tests}' +integration = 'pytest -m "integration" {args:tests}' +all = 'pytest {args:tests}' +cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x {args:tests}' + +types = "mypy -p haystack_integrations.components.connectors.cognee -p haystack_integrations.components.retrievers.cognee -p haystack_integrations.components.writers.cognee -p haystack_integrations.memory_stores.cognee {args}" + +[tool.mypy] +install_types = true +non_interactive = true +check_untyped_defs = true +disallow_incomplete_defs = true + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = [ + "A", + "ARG", + "B", + "C", + "D102", # Missing docstring in public method + "D103", # Missing docstring in public function + "D205", # 1 blank line required between summary line and description + "D209", # Closing triple quotes go to new line + "D213", # summary lines must be positioned on the second physical line of the docstring + "DTZ", + "E", + "EM", + "F", + "FBT", + "I", + "ICN", + "ISC", + "N", + "PLC", + "PLE", + "PLR", + "PLW", + "Q", + "RUF", + "S", + "T", + "TID", + "UP", + "W", + "YTT", +] +ignore = [ + # Allow non-abstract empty methods in abstract base classes + "B027", + # Allow boolean positional values in function calls, like `dict.get(... True)` + "FBT003", + # Ignore checks for possible passwords + "S105", + "S106", + "S107", + # Ignore complexity + "C901", + "PLR0911", + "PLR0912", + "PLR0913", + "PLR0915", + # Ignore unused params + "ARG002", + # Allow assertions + "S101", +] +exclude = ["examples"] + +[tool.ruff.lint.isort] +known-first-party = ["haystack_integrations"] + +[tool.ruff.lint.flake8-tidy-imports] +ban-relative-imports = "parents" + +[tool.ruff.lint.per-file-ignores] +# Tests can use magic values, assertions, and relative imports +"tests/**/*" = ["D", "PLR2004", "S101", "TID252"] +"examples/**/*" = ["D", "T201"] + +[tool.coverage.run] +source = ["haystack_integrations"] +branch = true +parallel = false + +[tool.coverage.report] +omit = ["*/tests/*", "*/__init__.py"] +show_missing = true +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] + +[tool.pytest.ini_options] +minversion = "6.0" +markers = ["integration: integration tests"] diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py new file mode 100644 index 0000000000..775c5a35e4 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .cognifier import CogneeCognifier + +__all__ = [ + "CogneeCognifier", +] diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py new file mode 100644 index 0000000000..d2eb383107 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import threading +from collections.abc import Coroutine +from typing import Any, Literal, TypeVar +from uuid import UUID + +from cognee.modules.users.methods import get_user # type: ignore[import-untyped] + +T = TypeVar("T") + +# Persistent background event loop for run_sync when called from an async context. +# A single loop is reused so that cognee's internal asyncio.Lock objects stay bound +# to one event loop across multiple run_sync calls. +_background_loop: asyncio.AbstractEventLoop | None = None +_background_thread: threading.Thread | None = None +_lock = threading.Lock() + +CogneeSearchType = Literal[ + "GRAPH_COMPLETION", + "RAG_COMPLETION", + "CHUNKS", + "CHUNKS_LEXICAL", + "SUMMARIES", + "TRIPLET_COMPLETION", + "GRAPH_SUMMARY_COMPLETION", + "GRAPH_COMPLETION_COT", + "GRAPH_COMPLETION_CONTEXT_EXTENSION", + "CYPHER", + "NATURAL_LANGUAGE", + "TEMPORAL", + "CODING_RULES", + "FEELING_LUCKY", +] + + +def _get_background_loop() -> asyncio.AbstractEventLoop: + """Return a persistent background event loop running in a daemon thread.""" + global _background_loop, _background_thread # noqa: PLW0603 + + with _lock: + if _background_loop is None or _background_loop.is_closed(): + _background_loop = asyncio.new_event_loop() + _background_thread = threading.Thread(target=_background_loop.run_forever, daemon=True) + _background_thread.start() + return _background_loop + + +def run_sync(coro: Coroutine[Any, Any, T]) -> T: + """ + Run an async coroutine from a synchronous context. + + If no event loop is running, uses asyncio.run() directly. + If already inside an async context, submits the coroutine to a persistent + background event loop so that cognee's internal asyncio.Lock objects remain + bound to a single loop across calls. + """ + try: + asyncio.get_running_loop() + except RuntimeError: + return asyncio.run(coro) + + # Already in an async context — submit to the persistent background loop + loop = _get_background_loop() + future = asyncio.run_coroutine_threadsafe(coro, loop) + return future.result() + + +def extract_text(item: Any) -> str: + """ + Best-effort text extraction from a Cognee search result item. + + Cognee search results may contain items of various types depending on the + search strategy: plain ``str`` for LLM completions, ``dict`` with keys like + *content*/*text*/*description*/*name*, or cognee model objects (e.g. + ``DataPoint`` subclasses) carrying the same attributes. + """ + if isinstance(item, str): + return item + + for attr in ("content", "text", "description", "name"): + if hasattr(item, attr): + val = getattr(item, attr) + if val and isinstance(val, str): + return val + + if isinstance(item, dict): + for key in ("content", "text", "description", "name"): + if key in item and isinstance(item[key], str): + return item[key] + + return str(item) + + +async def _get_cognee_user(user_id: str) -> Any: + """ + Resolve a user_id string to a cognee User object. + + Converts the given UUID string to a cognee User via ``cognee.modules.users.methods.get_user``. + + :param user_id: UUID string identifying the cognee user. + :returns: A cognee ``User`` object. + :raises ValueError: If user_id is not a valid UUID or the user is not found. + """ + try: + uid = UUID(user_id) + except ValueError as e: + msg = f"Invalid user_id: '{user_id}' is not a valid UUID." + raise ValueError(msg) from e + + return await get_user(uid) diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py new file mode 100644 index 0000000000..d862f442e4 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py @@ -0,0 +1,89 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import component, default_from_dict, default_to_dict, logging + +import cognee # type: ignore[import-untyped] + +from ._utils import run_sync + +logger = logging.getLogger(__name__) + + +@component +class CogneeCognifier: + """ + Processes previously added data through Cognee's knowledge engine. + + Wraps `cognee.cognify()` as a standalone pipeline step. Cognify takes raw data + that was previously added via `cognee.add()` and transforms it into a structured + knowledge graph. The process includes: + + 1. **Document classification** — identifies the type and structure of the input data. + 2. **Text chunking** — splits documents into semantically meaningful segments. + 3. **Entity extraction** — uses an LLM to identify entities and their properties. + 4. **Relationship detection** — discovers connections between extracted entities. + 5. **Graph construction** — builds a knowledge graph with embeddings for vector search. + 6. **Summarization** — generates hierarchical summaries of the processed content. + + After cognification, the data becomes searchable via `cognee.search()` using various + strategies (graph traversal, vector similarity, summaries, etc.). + + This component is useful when you want to separate the add and cognify phases — + for example, batch-add documents first with `CogneeWriter(auto_cognify=False)`, + then cognify once. + + Usage: + ```python + from haystack import Pipeline + from haystack_integrations.components.writers.cognee import CogneeWriter + from haystack_integrations.components.connectors.cognee import CogneeCognifier + + pipeline = Pipeline() + pipeline.add_component("writer", CogneeWriter(dataset_name="my_data", auto_cognify=False)) + pipeline.add_component("cognifier", CogneeCognifier(dataset_name="my_data")) + pipeline.connect("writer.documents_written", "cognifier.documents_written") + + result = pipeline.run({"writer": {"documents": docs}}) + ``` + """ + + def __init__(self, dataset_name: str | list[str] | None = None): + """ + Initialize the CogneeCognifier. + + :param dataset_name: Optional Cognee dataset name(s) to cognify. Accepts a single + name, a list of names, or None to cognify all pending datasets. + """ + self.dataset_name = dataset_name + + @component.output_types(cognified=bool) + def run(self, documents_written: int | None = None) -> dict[str, Any]: + """ + Run cognee.cognify() to process added data into the knowledge graph. + + :param documents_written: Optional number of documents written by a preceding + CogneeWriter. Used as a pipeline connection point; cognify runs regardless + of the value, as long as data has been previously added. + :returns: Dictionary with key `cognified` set to True on success. + """ + cognify_kwargs: dict[str, Any] = {} + if self.dataset_name: + datasets = [self.dataset_name] if isinstance(self.dataset_name, str) else self.dataset_name + cognify_kwargs["datasets"] = datasets + + logger.info("Running cognee.cognify()") + run_sync(cognee.cognify(**cognify_kwargs)) + return {"cognified": True} + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict(self, dataset_name=self.dataset_name) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeCognifier": + """Deserialize a component from a dictionary.""" + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/py.typed b/integrations/cognee/src/haystack_integrations/components/connectors/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py new file mode 100644 index 0000000000..90cf73038c --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_retriever import CogneeRetriever + +__all__ = [ + "CogneeRetriever", +] diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py new file mode 100644 index 0000000000..3be8186b76 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py @@ -0,0 +1,110 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import Document, component, default_from_dict, default_to_dict, logging + +import cognee # type: ignore[import-untyped] +from cognee.api.v1.search import SearchType # type: ignore[import-untyped] +from haystack_integrations.components.connectors.cognee._utils import CogneeSearchType, extract_text, run_sync + +logger = logging.getLogger(__name__) + + +@component +class CogneeRetriever: + """ + Retrieves documents from Cognee's memory. + + Wraps `cognee.search()` and converts results into Haystack `Document` objects. + + Usage: + ```python + from haystack_integrations.components.retrievers.cognee import CogneeRetriever + + retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", top_k=5) + results = retriever.run(query="What is Cognee?") + for doc in results["documents"]: + print(doc.content) + ``` + """ + + def __init__( + self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 10, dataset_name: str | None = None + ): + """ + Initialize the CogneeRetriever. + + :param search_type: Cognee search type. One of: GRAPH_COMPLETION, CHUNKS, + SUMMARIES, INSIGHTS, etc. + :param top_k: Maximum number of results to return. + :param dataset_name: Optional dataset name to restrict search scope. + """ + self.search_type = search_type + self.top_k = top_k + self.dataset_name = dataset_name + + @component.output_types(documents=list[Document]) + def run(self, query: str, top_k: int | None = None) -> dict[str, list[Document]]: + """ + Search Cognee's memory and return matching documents. + + :param query: The search query. + :param top_k: Override the default maximum number of results. + :returns: Dictionary with key `documents` containing the search results + as Haystack Document objects. + """ + effective_top_k = top_k if top_k is not None else self.top_k + search_type_enum = SearchType[self.search_type] + + search_kwargs: dict[str, Any] = { + "query_text": query, + "query_type": search_type_enum, + } + if self.dataset_name: + search_kwargs["datasets"] = [self.dataset_name] + + raw_results = run_sync(cognee.search(**search_kwargs)) + + documents = _convert_results(raw_results, effective_top_k) + + logger.info( + "Cognee search returned {count} documents for query '{query}'", + count=len(documents), + query=query[:80], + ) + return {"documents": documents} + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict( + self, + search_type=self.search_type, + top_k=self.top_k, + dataset_name=self.dataset_name, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeRetriever": + """Deserialize a component from a dictionary.""" + return default_from_dict(cls, data) + + +def _convert_results(raw_results: list[Any], top_k: int) -> list[Document]: + """Convert Cognee search results to Haystack Documents.""" + documents: list[Document] = [] + if not raw_results: + return documents + + for item in raw_results[:top_k]: + text = extract_text(item) + if text: + documents.append( + Document( + content=text, + meta={"source": "cognee", "search_result_type": type(item).__name__}, + ) + ) + return documents diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed b/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py new file mode 100644 index 0000000000..7d88e60c29 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_writer import CogneeWriter + +__all__ = [ + "CogneeWriter", +] diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py new file mode 100644 index 0000000000..b358a7c5ba --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py @@ -0,0 +1,84 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import Document, component, default_from_dict, default_to_dict, logging + +import cognee # type: ignore[import-untyped] +from haystack_integrations.components.connectors.cognee._utils import run_sync + +logger = logging.getLogger(__name__) + + +@component +class CogneeWriter: + """ + Adds Haystack Documents to Cognee's memory. + + Wraps `cognee.add()` and optionally `cognee.cognify()` to ingest documents + and build a knowledge engine in a single pipeline step. + + Usage: + ```python + from haystack import Document + from haystack_integrations.components.writers.cognee import CogneeWriter + + writer = CogneeWriter(dataset_name="my_dataset", auto_cognify=True) + writer.run(documents=[Document(content="Cognee builds AI memory.")]) + ``` + """ + + def __init__(self, *, dataset_name: str = "haystack", auto_cognify: bool = True): + """ + Initialize the CogneeWriter. + + :param dataset_name: Name of the Cognee dataset to add documents to. + :param auto_cognify: If True, automatically runs `cognee.cognify()` after adding + documents to process them into the knowledge engine. + """ + self.dataset_name = dataset_name + self.auto_cognify = auto_cognify + + @component.output_types(documents_written=int) + def run(self, documents: list[Document]) -> dict[str, Any]: + """ + Add documents to Cognee and optionally cognify them. + + :param documents: List of Haystack Documents to add. + :returns: Dictionary with key `documents_written` indicating how many + documents were successfully added. + """ + texts = [doc.content for doc in documents if doc.content] + skipped = len(documents) - len(texts) + if skipped > 0: + logger.warning("Skipping {count} document(s) with empty content", count=skipped) + + if texts: + run_sync(cognee.add(texts, dataset_name=self.dataset_name)) + + written = len(texts) + + if self.auto_cognify and written > 0: + logger.info( + "Cognifying {count} documents in dataset '{dataset}'", + count=written, + dataset=self.dataset_name, + ) + run_sync(cognee.cognify(datasets=[self.dataset_name])) + + return {"documents_written": written} + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict( + self, + dataset_name=self.dataset_name, + auto_cognify=self.auto_cognify, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeWriter": + """Deserialize a component from a dictionary.""" + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/components/writers/py.typed b/integrations/cognee/src/haystack_integrations/components/writers/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py new file mode 100644 index 0000000000..eb9a6b81a8 --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from .memory_store import CogneeMemoryStore + +__all__ = [ + "CogneeMemoryStore", +] diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py new file mode 100644 index 0000000000..92c43cc3bb --- /dev/null +++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py @@ -0,0 +1,197 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any + +from haystack import default_from_dict, default_to_dict, logging +from haystack.dataclasses import ChatMessage + +import cognee # type: ignore[import-untyped] +from cognee.api.v1.search import SearchType # type: ignore[import-untyped] +from cognee.modules.data.exceptions import DatasetNotFoundError # type: ignore[import-untyped] +from cognee.modules.users.permissions.methods import get_all_user_permission_datasets # type: ignore[import-untyped] +from haystack_integrations.components.connectors.cognee._utils import ( + CogneeSearchType, + _get_cognee_user, + extract_text, + run_sync, +) + +logger = logging.getLogger(__name__) + + +class CogneeMemoryStore: + """ + A memory store backed by Cognee memory. + + Implements the `MemoryStore` protocol from haystack-experimental, allowing + Cognee to serve as the memory backend for Haystack's experimental Agent. + + Usage: + ```python + from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", top_k=5) + store.add_memories(messages=[ChatMessage.from_user("Remember: the project deadline is Friday.")]) + results = store.search_memories(query="When is the project deadline?") + ``` + """ + + def __init__( + self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 5, dataset_name: str = "haystack_memory" + ): + """ + Initialize the CogneeMemoryStore. + + :param search_type: Cognee search type for memory retrieval. + :param top_k: Default number of results for memory search. + :param dataset_name: Cognee dataset name for storing memories. + """ + self.search_type = search_type + self.top_k = top_k + self.dataset_name = dataset_name + + def add_memories( + self, + *, + messages: list[ChatMessage], + user_id: str | None = None, + **kwargs: Any, + ) -> None: + """ + Add chat messages to Cognee as memories. + + :param messages: List of ChatMessages to store. + :param user_id: Optional cognee user UUID to scope memories to a specific user. + When provided, the data is stored under that user's permissions. + When ``None``, cognee's default user is used. + :param kwargs: Additional keyword arguments (unused, accepted for protocol + compatibility). + """ + user = run_sync(_get_cognee_user(user_id)) if user_id else None + + added = 0 + for msg in messages: + text = msg.text + if not text: + continue + run_sync(cognee.add(text, dataset_name=self.dataset_name, user=user)) + added += 1 + + if added > 0: + run_sync(cognee.cognify(datasets=[self.dataset_name], user=user)) + + logger.info("Added and cognified {count} messages as memories", count=added) + + def search_memories( + self, + *, + query: str | None = None, + top_k: int = 5, + user_id: str | None = None, + **kwargs: Any, + ) -> list[ChatMessage]: + """ + Search Cognee's knowledge engine for relevant memories. + + :param query: The search query. + :param top_k: Maximum number of memories to return. + :param user_id: Optional cognee user UUID to scope the search to a specific user. + Search is restricted to the store's ``dataset_name``. If the user owns the + dataset it is resolved by name; otherwise the store checks whether the user + has been granted read access (e.g. via shared permissions) and searches by + dataset UUID. + When ``None``, cognee's default user is used. + :param kwargs: Additional keyword arguments (unused, accepted for protocol + compatibility). + :returns: List of ChatMessages containing memory content as system messages. + """ + if not query: + return [] + + user = run_sync(_get_cognee_user(user_id)) if user_id else None + search_type_enum = SearchType[self.search_type] + effective_top_k = top_k or self.top_k + + try: + raw_results = run_sync( + cognee.search( + query_text=query, + query_type=search_type_enum, + user=user, + datasets=[self.dataset_name], + ) + ) + except DatasetNotFoundError: + # The user doesn't own a dataset with this name. + # Fall back to checking shared datasets the user has read access to. + raw_results = self._search_shared_dataset(query, search_type_enum, user) + + memories: list[ChatMessage] = [] + if not raw_results: + return memories + + for item in raw_results[:effective_top_k]: + text = extract_text(item) + if text: + memories.append(ChatMessage.from_system(text)) + + logger.info("Found {count} memories for query '{query}'", count=len(memories), query=query[:80]) + return memories + + def _search_shared_dataset(self, query: str, search_type_enum: SearchType, user: Any) -> list[Any]: + """Search for the dataset by name among all datasets the user has read access to.""" + if user is None: + return [] + all_readable = run_sync(get_all_user_permission_datasets(user, "read")) + matching = [ds for ds in all_readable if ds.name == self.dataset_name] + if not matching: + return [] + return run_sync( + cognee.search(query_text=query, query_type=search_type_enum, user=user, dataset_ids=[matching[0].id]) + ) + + def delete_all_memories( + self, + *, + user_id: str | None = None, + **kwargs: Any, + ) -> None: + """ + Delete all memories by pruning Cognee's data and system state. + + :param user_id: Optional cognee user UUID (accepted for protocol compatibility). + Note: Cognee's prune operations are global and not scoped to a specific user. + :param kwargs: Additional keyword arguments (unused, accepted for protocol + compatibility). + """ + run_sync(cognee.prune.prune_data()) + run_sync(cognee.prune.prune_system(metadata=True)) + logger.info("All Cognee memories pruned") + + def delete_memory(self, memory_id: str) -> None: + """ + Delete a single memory by ID. + + Not supported in V1 — Cognee's SDK does not expose fine-grained deletion. + + :param memory_id: The ID of the memory to delete. + :raises NotImplementedError: Always, as single-item deletion is not yet supported. + """ + msg = "CogneeMemoryStore does not support deleting individual memories in V1." + raise NotImplementedError(msg) + + def to_dict(self) -> dict[str, Any]: + """Serialize this component to a dictionary.""" + return default_to_dict( + self, + search_type=self.search_type, + top_k=self.top_k, + dataset_name=self.dataset_name, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "CogneeMemoryStore": + """Deserialize a component from a dictionary.""" + return default_from_dict(cls, data) diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/py.typed b/integrations/cognee/src/haystack_integrations/memory_stores/py.typed new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/tests/__init__.py b/integrations/cognee/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/integrations/cognee/tests/test_cognifier.py b/integrations/cognee/tests/test_cognifier.py new file mode 100644 index 0000000000..4b6eea9ffc --- /dev/null +++ b/integrations/cognee/tests/test_cognifier.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import AsyncMock, patch + +from haystack_integrations.components.connectors.cognee import CogneeCognifier + + +class TestCogneeCognifier: + def test_init_defaults(self): + cognifier = CogneeCognifier() + assert cognifier.dataset_name is None + + def test_init_custom_string(self): + cognifier = CogneeCognifier(dataset_name="my_data") + assert cognifier.dataset_name == "my_data" + + def test_init_custom_list(self): + cognifier = CogneeCognifier(dataset_name=["ds1", "ds2"]) + assert cognifier.dataset_name == ["ds1", "ds2"] + + def test_to_dict(self): + cognifier = CogneeCognifier(dataset_name="test_ds") + data = cognifier.to_dict() + assert data["type"] == "haystack_integrations.components.connectors.cognee.cognifier.CogneeCognifier" + assert data["init_parameters"]["dataset_name"] == "test_ds" + + def test_to_dict_defaults(self): + cognifier = CogneeCognifier() + data = cognifier.to_dict() + assert data["init_parameters"]["dataset_name"] is None + + def test_from_dict(self): + data = { + "type": "haystack_integrations.components.connectors.cognee.cognifier.CogneeCognifier", + "init_parameters": {"dataset_name": "restored"}, + } + cognifier = CogneeCognifier.from_dict(data) + assert cognifier.dataset_name == "restored" + + @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee") + def test_run_no_dataset(self, mock_cognee): + mock_cognee.cognify = AsyncMock() + + cognifier = CogneeCognifier() + result = cognifier.run() + + assert result == {"cognified": True} + mock_cognee.cognify.assert_awaited_once_with() + + @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee") + def test_run_with_string_dataset(self, mock_cognee): + mock_cognee.cognify = AsyncMock() + + cognifier = CogneeCognifier(dataset_name="my_data") + result = cognifier.run() + + assert result == {"cognified": True} + mock_cognee.cognify.assert_awaited_once_with(datasets=["my_data"]) + + @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee") + def test_run_with_list_dataset(self, mock_cognee): + mock_cognee.cognify = AsyncMock() + + cognifier = CogneeCognifier(dataset_name=["ds1", "ds2"]) + result = cognifier.run() + + assert result == {"cognified": True} + mock_cognee.cognify.assert_awaited_once_with(datasets=["ds1", "ds2"]) + + @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee") + def test_run_with_documents_written_input(self, mock_cognee): + mock_cognee.cognify = AsyncMock() + + cognifier = CogneeCognifier(dataset_name="my_data") + result = cognifier.run(documents_written=5) + + assert result == {"cognified": True} + mock_cognee.cognify.assert_awaited_once_with(datasets=["my_data"]) diff --git a/integrations/cognee/tests/test_integration.py b/integrations/cognee/tests/test_integration.py new file mode 100644 index 0000000000..2f58d40392 --- /dev/null +++ b/integrations/cognee/tests/test_integration.py @@ -0,0 +1,99 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +import cognee +import pytest +from haystack import Document +from haystack.dataclasses import ChatMessage + +from haystack_integrations.components.connectors.cognee import CogneeCognifier +from haystack_integrations.components.connectors.cognee._utils import run_sync +from haystack_integrations.components.retrievers.cognee import CogneeRetriever +from haystack_integrations.components.writers.cognee import CogneeWriter +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + +SKIP_REASON = "Export an env var called LLM_API_KEY containing the LLM API key to run this test." + + +@pytest.mark.skipif( + not os.environ.get("LLM_API_KEY", None), + reason=SKIP_REASON, +) +@pytest.mark.integration +class TestCogneeMemoryStoreIntegration: + def test_add_search_delete(self): + store = CogneeMemoryStore( + search_type="GRAPH_COMPLETION", + top_k=3, + dataset_name="haystack_integration_test", + ) + + store.delete_all_memories() + + messages = [ + ChatMessage.from_user("The capital of France is Paris."), + ChatMessage.from_user("The Eiffel Tower is located in Paris."), + ] + store.add_memories(messages=messages) + + results = store.search_memories(query="What is the capital of France?", top_k=3) + assert len(results) > 0 + assert all(isinstance(r, ChatMessage) for r in results) + + store.delete_all_memories() + + +@pytest.mark.skipif( + not os.environ.get("LLM_API_KEY", None), + reason=SKIP_REASON, +) +@pytest.mark.integration +class TestCogneeWriterRetrieverIntegration: + def test_write_and_retrieve(self): + run_sync(cognee.prune.prune_data()) + run_sync(cognee.prune.prune_system(metadata=True)) + + writer = CogneeWriter(dataset_name="haystack_integration_test", auto_cognify=True) + docs = [ + Document(content="Python is a programming language created by Guido van Rossum."), + Document(content="Haystack is an open-source framework for building AI applications."), + ] + write_result = writer.run(documents=docs) + assert write_result["documents_written"] == 2 + + retriever = CogneeRetriever( + search_type="GRAPH_COMPLETION", + top_k=3, + dataset_name="haystack_integration_test", + ) + search_result = retriever.run(query="What is Haystack?") + assert len(search_result["documents"]) > 0 + assert all(isinstance(d, Document) for d in search_result["documents"]) + + run_sync(cognee.prune.prune_data()) + run_sync(cognee.prune.prune_system(metadata=True)) + + +@pytest.mark.skipif( + not os.environ.get("LLM_API_KEY", None), + reason=SKIP_REASON, +) +@pytest.mark.integration +class TestCogneeCognifierIntegration: + def test_write_then_cognify(self): + run_sync(cognee.prune.prune_data()) + run_sync(cognee.prune.prune_system(metadata=True)) + + writer = CogneeWriter(dataset_name="haystack_integration_test", auto_cognify=False) + docs = [Document(content="Berlin is the capital of Germany.")] + writer.run(documents=docs) + + cognifier = CogneeCognifier(dataset_name="haystack_integration_test") + result = cognifier.run() + assert result["cognified"] is True + + run_sync(cognee.prune.prune_data()) + run_sync(cognee.prune.prune_system(metadata=True)) diff --git a/integrations/cognee/tests/test_memory_store.py b/integrations/cognee/tests/test_memory_store.py new file mode 100644 index 0000000000..5a48be1eaf --- /dev/null +++ b/integrations/cognee/tests/test_memory_store.py @@ -0,0 +1,167 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from haystack.dataclasses import ChatMessage + +from haystack_integrations.memory_stores.cognee import CogneeMemoryStore + + +class TestCogneeMemoryStore: + def test_init_defaults(self): + store = CogneeMemoryStore() + assert store.search_type == "GRAPH_COMPLETION" + assert store.top_k == 5 + assert store.dataset_name == "haystack_memory" + + def test_init_custom(self): + store = CogneeMemoryStore(search_type="CHUNKS", top_k=10, dataset_name="custom") + assert store.search_type == "CHUNKS" + assert store.top_k == 10 + assert store.dataset_name == "custom" + + def test_to_dict(self): + store = CogneeMemoryStore(search_type="SUMMARIES", top_k=3, dataset_name="mem") + data = store.to_dict() + assert data["type"] == "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore" + assert data["init_parameters"]["search_type"] == "SUMMARIES" + assert data["init_parameters"]["top_k"] == 3 + assert data["init_parameters"]["dataset_name"] == "mem" + + def test_from_dict(self): + data = { + "type": "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore", + "init_parameters": {"search_type": "CHUNKS", "top_k": 8, "dataset_name": "restored"}, + } + store = CogneeMemoryStore.from_dict(data) + assert store.search_type == "CHUNKS" + assert store.top_k == 8 + assert store.dataset_name == "restored" + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + store = CogneeMemoryStore() + messages = [ + ChatMessage.from_user("Remember: the deadline is Friday."), + ChatMessage.from_assistant("Got it, I'll remember Friday."), + ] + store.add_memories(messages=messages) + + assert mock_cognee.add.await_count == 2 + mock_cognee.cognify.assert_awaited_once() + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_cognify_uses_dataset(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + store = CogneeMemoryStore(dataset_name="my_ds") + messages = [ChatMessage.from_user("Remember this.")] + store.add_memories(messages=messages) + + mock_cognee.cognify.assert_awaited_once() + cognify_kwargs = mock_cognee.cognify.call_args[1] + assert cognify_kwargs["datasets"] == ["my_ds"] + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=["Memory about deadline"]) + + store = CogneeMemoryStore() + results = store.search_memories(query="What is the deadline?") + + assert len(results) == 1 + assert isinstance(results[0], ChatMessage) + assert results[0].text == "Memory about deadline" + + def test_search_memories_empty_query(self): + store = CogneeMemoryStore() + results = store.search_memories(query=None) + assert results == [] + + results = store.search_memories(query="") + assert results == [] + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_delete_all_memories(self, mock_cognee): + mock_cognee.prune = type( + "Prune", + (), + { + "prune_data": AsyncMock(), + "prune_system": AsyncMock(), + }, + )() + + store = CogneeMemoryStore() + store.delete_all_memories() + + mock_cognee.prune.prune_data.assert_awaited_once() + mock_cognee.prune.prune_system.assert_awaited_once() + + @patch("haystack_integrations.memory_stores.cognee.memory_store._get_cognee_user", new_callable=AsyncMock) + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_with_user_id(self, mock_cognee, mock_get_user): + mock_user = MagicMock() + mock_get_user.return_value = mock_user + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + store = CogneeMemoryStore() + messages = [ChatMessage.from_user("Remember this.")] + store.add_memories(messages=messages, user_id="550e8400-e29b-41d4-a716-446655440000") + + mock_get_user.assert_awaited_once_with("550e8400-e29b-41d4-a716-446655440000") + mock_cognee.add.assert_awaited_once() + add_kwargs = mock_cognee.add.call_args[1] + assert add_kwargs["user"] is mock_user + cognify_kwargs = mock_cognee.cognify.call_args[1] + assert cognify_kwargs["user"] is mock_user + + @patch("haystack_integrations.memory_stores.cognee.memory_store._get_cognee_user", new_callable=AsyncMock) + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_with_user_id(self, mock_cognee, mock_get_user): + mock_user = MagicMock() + mock_get_user.return_value = mock_user + mock_cognee.search = AsyncMock(return_value=["result"]) + + store = CogneeMemoryStore() + results = store.search_memories(query="test", user_id="550e8400-e29b-41d4-a716-446655440000") + + mock_get_user.assert_awaited_once_with("550e8400-e29b-41d4-a716-446655440000") + search_kwargs = mock_cognee.search.call_args[1] + assert search_kwargs["user"] is mock_user + assert len(results) == 1 + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_add_memories_without_user_id(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + store = CogneeMemoryStore() + messages = [ChatMessage.from_user("Remember this.")] + store.add_memories(messages=messages) + + add_kwargs = mock_cognee.add.call_args[1] + assert add_kwargs["user"] is None + + @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee") + def test_search_memories_without_user_id(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=["result"]) + + store = CogneeMemoryStore() + store.search_memories(query="test") + + search_kwargs = mock_cognee.search.call_args[1] + assert search_kwargs["user"] is None + + def test_delete_memory_raises(self): + store = CogneeMemoryStore() + with pytest.raises(NotImplementedError): + store.delete_memory("some-id") diff --git a/integrations/cognee/tests/test_retriever.py b/integrations/cognee/tests/test_retriever.py new file mode 100644 index 0000000000..6494861db9 --- /dev/null +++ b/integrations/cognee/tests/test_retriever.py @@ -0,0 +1,92 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import AsyncMock, patch + +from haystack_integrations.components.retrievers.cognee import CogneeRetriever + + +class TestCogneeRetriever: + def test_init_defaults(self): + retriever = CogneeRetriever() + assert retriever.search_type == "GRAPH_COMPLETION" + assert retriever.top_k == 10 + assert retriever.dataset_name is None + + def test_init_custom(self): + retriever = CogneeRetriever(search_type="CHUNKS", top_k=5, dataset_name="my_data") + assert retriever.search_type == "CHUNKS" + assert retriever.top_k == 5 + assert retriever.dataset_name == "my_data" + + def test_to_dict(self): + retriever = CogneeRetriever(search_type="SUMMARIES", top_k=3, dataset_name="ds") + data = retriever.to_dict() + assert data["type"] == "haystack_integrations.components.retrievers.cognee.memory_retriever.CogneeRetriever" + assert data["init_parameters"]["search_type"] == "SUMMARIES" + assert data["init_parameters"]["top_k"] == 3 + assert data["init_parameters"]["dataset_name"] == "ds" + + def test_from_dict(self): + data = { + "type": "haystack_integrations.components.retrievers.cognee.memory_retriever.CogneeRetriever", + "init_parameters": {"search_type": "CHUNKS", "top_k": 7, "dataset_name": None}, + } + retriever = CogneeRetriever.from_dict(data) + assert retriever.search_type == "CHUNKS" + assert retriever.top_k == 7 + + @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee") + def test_run_returns_documents(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=["result one", "result two"]) + + retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", top_k=5) + result = retriever.run(query="What is Cognee?") + + docs = result["documents"] + assert len(docs) == 2 + assert docs[0].content == "result one" + assert docs[0].meta["source"] == "cognee" + + @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee") + def test_run_empty_results(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=[]) + + retriever = CogneeRetriever() + result = retriever.run(query="nonexistent query") + + assert result["documents"] == [] + + @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee") + def test_run_respects_top_k_override(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=["a", "b", "c", "d", "e"]) + + retriever = CogneeRetriever(top_k=10) + result = retriever.run(query="test", top_k=2) + + assert len(result["documents"]) == 2 + + @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee") + def test_run_handles_dict_results(self, mock_cognee): + mock_cognee.search = AsyncMock( + return_value=[ + {"content": "Dict content", "score": 0.9}, + {"text": "Alt text field"}, + ] + ) + + retriever = CogneeRetriever() + result = retriever.run(query="test") + + assert len(result["documents"]) == 2 + assert result["documents"][0].content == "Dict content" + assert result["documents"][1].content == "Alt text field" + + @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee") + def test_run_handles_none_results(self, mock_cognee): + mock_cognee.search = AsyncMock(return_value=None) + + retriever = CogneeRetriever() + result = retriever.run(query="test") + assert result["documents"] == [] diff --git a/integrations/cognee/tests/test_utils.py b/integrations/cognee/tests/test_utils.py new file mode 100644 index 0000000000..13701aea93 --- /dev/null +++ b/integrations/cognee/tests/test_utils.py @@ -0,0 +1,99 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio + +from haystack_integrations.components.connectors.cognee._utils import extract_text, run_sync + + +class TestExtractText: + def test_string_input(self): + assert extract_text("hello world") == "hello world" + + def test_object_with_content_attr(self): + class FakeResult: + content = "object content" + + assert extract_text(FakeResult()) == "object content" + + def test_object_with_text_attr(self): + class FakeResult: + text = "object text" + + assert extract_text(FakeResult()) == "object text" + + def test_object_with_description_attr(self): + class FakeResult: + description = "object description" + + assert extract_text(FakeResult()) == "object description" + + def test_object_with_name_attr(self): + class FakeResult: + name = "object name" + + assert extract_text(FakeResult()) == "object name" + + def test_object_attr_priority(self): + class FakeResult: + content = "first" + text = "second" + + assert extract_text(FakeResult()) == "first" + + def test_object_skips_none_attr(self): + class FakeResult: + content = None + text = "fallback text" + + assert extract_text(FakeResult()) == "fallback text" + + def test_object_skips_non_string_attr(self): + class FakeResult: + content = 123 + text = "string value" + + assert extract_text(FakeResult()) == "string value" + + def test_dict_with_content_key(self): + assert extract_text({"content": "dict content"}) == "dict content" + + def test_dict_with_text_key(self): + assert extract_text({"text": "dict text"}) == "dict text" + + def test_dict_with_description_key(self): + assert extract_text({"description": "dict description"}) == "dict description" + + def test_dict_with_name_key(self): + assert extract_text({"name": "dict name"}) == "dict name" + + def test_dict_key_priority(self): + assert extract_text({"content": "first", "text": "second"}) == "first" + + def test_dict_skips_non_string_value(self): + assert extract_text({"content": 123, "text": "fallback"}) == "fallback" + + def test_fallback_to_str(self): + assert extract_text(42) == "42" + + def test_fallback_dict_no_known_keys(self): + result = extract_text({"unknown": "value"}) + assert "unknown" in result + + +class TestRunSync: + def test_run_simple_coroutine(self): + async def coro(): + return 42 + + result = run_sync(coro()) + assert result == 42 + + def test_run_async_coroutine(self): + async def coro(): + await asyncio.sleep(0) + return "done" + + result = run_sync(coro()) + assert result == "done" diff --git a/integrations/cognee/tests/test_writer.py b/integrations/cognee/tests/test_writer.py new file mode 100644 index 0000000000..c8acc499af --- /dev/null +++ b/integrations/cognee/tests/test_writer.py @@ -0,0 +1,102 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import AsyncMock, patch + +from haystack import Document + +from haystack_integrations.components.writers.cognee import CogneeWriter + + +class TestCogneeWriter: + def test_init_defaults(self): + writer = CogneeWriter() + assert writer.dataset_name == "haystack" + assert writer.auto_cognify is True + + def test_init_custom(self): + writer = CogneeWriter(dataset_name="custom", auto_cognify=False) + assert writer.dataset_name == "custom" + assert writer.auto_cognify is False + + def test_to_dict(self): + writer = CogneeWriter(dataset_name="test_ds", auto_cognify=False) + data = writer.to_dict() + assert data["type"] == "haystack_integrations.components.writers.cognee.memory_writer.CogneeWriter" + assert data["init_parameters"]["dataset_name"] == "test_ds" + assert data["init_parameters"]["auto_cognify"] is False + + def test_from_dict(self): + data = { + "type": "haystack_integrations.components.writers.cognee.memory_writer.CogneeWriter", + "init_parameters": {"dataset_name": "restored", "auto_cognify": True}, + } + writer = CogneeWriter.from_dict(data) + assert writer.dataset_name == "restored" + assert writer.auto_cognify is True + + @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee") + def test_run_with_auto_cognify(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + writer = CogneeWriter(dataset_name="test", auto_cognify=True) + docs = [ + Document(content="First document"), + Document(content="Second document"), + ] + result = writer.run(documents=docs) + + assert result == {"documents_written": 2} + # Verify batch call: single add() with list of texts + mock_cognee.add.assert_awaited_once() + call_args = mock_cognee.add.call_args + assert call_args[0][0] == ["First document", "Second document"] + # Verify cognify uses specific dataset + mock_cognee.cognify.assert_awaited_once() + cognify_kwargs = mock_cognee.cognify.call_args[1] + assert cognify_kwargs["datasets"] == ["test"] + + @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee") + def test_run_without_auto_cognify(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + writer = CogneeWriter(dataset_name="test", auto_cognify=False) + docs = [Document(content="A document")] + result = writer.run(documents=docs) + + assert result == {"documents_written": 1} + mock_cognee.add.assert_awaited_once() + mock_cognee.cognify.assert_not_awaited() + + @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee") + def test_run_skips_empty_content(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + writer = CogneeWriter(auto_cognify=True) + docs = [ + Document(content="Valid document"), + Document(content=""), + Document(content=None), + ] + result = writer.run(documents=docs) + + assert result == {"documents_written": 1} + mock_cognee.add.assert_awaited_once() + call_args = mock_cognee.add.call_args + assert call_args[0][0] == ["Valid document"] + + @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee") + def test_run_empty_list(self, mock_cognee): + mock_cognee.add = AsyncMock() + mock_cognee.cognify = AsyncMock() + + writer = CogneeWriter(auto_cognify=True) + result = writer.run(documents=[]) + + assert result == {"documents_written": 0} + mock_cognee.add.assert_not_awaited() + mock_cognee.cognify.assert_not_awaited()