diff --git a/.github/labeler.yml b/.github/labeler.yml
index aeb88f2b7f..56b999b18f 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -39,6 +39,11 @@ integration:azure-doc-intelligence:
- any-glob-to-any-file: "integrations/azure_doc_intelligence/**/*"
- any-glob-to-any-file: ".github/workflows/azure_doc_intelligence.yml"
+integration:cognee:
+ - changed-files:
+ - any-glob-to-any-file: "integrations/cognee/**/*"
+ - any-glob-to-any-file: ".github/workflows/cognee.yml"
+
integration:chroma:
- changed-files:
- any-glob-to-any-file: "integrations/chroma/**/*"
diff --git a/.github/workflows/cognee.yml b/.github/workflows/cognee.yml
new file mode 100644
index 0000000000..d4441fb7b1
--- /dev/null
+++ b/.github/workflows/cognee.yml
@@ -0,0 +1,79 @@
+# This workflow comes from https://github.com/ofek/hatch-mypyc
+# https://github.com/ofek/hatch-mypyc/blob/5a198c0ba8660494d02716cfc9d79ce4adfb1442/.github/workflows/test.yml
+name: Test / cognee
+
+on:
+ schedule:
+ - cron: "0 0 * * *"
+ pull_request:
+ paths:
+ - "integrations/cognee/**"
+ - "!integrations/cognee/*.md"
+ - ".github/workflows/cognee.yml"
+
+defaults:
+ run:
+ working-directory: integrations/cognee
+
+concurrency:
+ group: cognee-${{ github.head_ref }}
+ cancel-in-progress: true
+
+env:
+ PYTHONUNBUFFERED: "1"
+ FORCE_COLOR: "1"
+
+jobs:
+ run:
+ name: Python ${{ matrix.python-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }}
+ runs-on: ${{ matrix.os }}
+ strategy:
+ fail-fast: false
+ matrix:
+ os: [ubuntu-latest, windows-latest, macos-latest]
+ python-version: ["3.10", "3.14"]
+
+ steps:
+ - name: Support longpaths
+ if: matrix.os == 'windows-latest'
+ working-directory: .
+ run: git config --system core.longpaths true
+
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install Hatch
+ run: pip install --upgrade hatch
+ - name: Lint
+ if: matrix.python-version == '3.10' && runner.os == 'Linux'
+ run: hatch run fmt-check && hatch run test:types
+
+ - name: Run tests
+ run: hatch run test:cov-retry
+
+ - name: Run unit tests with lowest direct dependencies
+ run: |
+ hatch run uv pip compile pyproject.toml --resolution lowest-direct --output-file requirements_lowest_direct.txt
+ hatch -e test env run -- uv pip install -r requirements_lowest_direct.txt
+ hatch run test:unit
+
+ - name: Nightly - run tests with Haystack main branch
+ if: github.event_name == 'schedule'
+ run: |
+ hatch env prune
+ hatch -e test env run -- uv pip install git+https://github.com/deepset-ai/haystack.git@main
+ hatch run test:cov-retry
+
+
+ notify-slack-on-failure:
+ needs: run
+ if: failure() && github.event_name == 'schedule'
+ runs-on: ubuntu-slim
+ steps:
+ - uses: deepset-ai/notify-slack-action@3cda73b77a148f16f703274198e7771340cf862b # v1
+ with:
+ slack-webhook-url: ${{ secrets.SLACK_WEBHOOK_URL_NOTIFICATIONS }}
diff --git a/README.md b/README.md
index 6f91668cb7..10c249ffd3 100644
--- a/README.md
+++ b/README.md
@@ -34,6 +34,7 @@ Please check out our [Contribution Guidelines](CONTRIBUTING.md) for all the deta
| [azure-ai-search-haystack](integrations/azure_ai_search/) | Document Store | [](https://pypi.org/project/azure-ai-search-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_ai_search.yml) |
| [azure-doc-intelligence-haystack](integrations/azure_doc_intelligence/) | Converter | [](https://pypi.org/project/azure-doc-intelligence-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/azure_doc_intelligence.yml) |
| [chroma-haystack](integrations/chroma/) | Document Store | [](https://pypi.org/project/chroma-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/chroma.yml) |
+| [cognee-haystack](integrations/cognee/) | Connector | [](https://pypi.org/project/cognee-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cognee.yml) |
| [cohere-haystack](integrations/cohere/) | Embedder, Generator, Ranker | [](https://pypi.org/project/cohere-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cohere.yml) |
| [cometapi-haystack](integrations/cometapi/) | Generator | [](https://pypi.org/project/cometapi-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/cometapi.yml) |
| [deepeval-haystack](integrations/deepeval/) | Evaluator | [](https://pypi.org/project/deepeval-haystack) | [](https://github.com/deepset-ai/haystack-core-integrations/actions/workflows/deepeval.yml) |
diff --git a/integrations/cognee/LICENSE b/integrations/cognee/LICENSE
new file mode 100644
index 0000000000..de4c7f39f1
--- /dev/null
+++ b/integrations/cognee/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright 2023 deepset GmbH
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/integrations/cognee/README.md b/integrations/cognee/README.md
new file mode 100644
index 0000000000..049f783ee4
--- /dev/null
+++ b/integrations/cognee/README.md
@@ -0,0 +1,14 @@
+# cognee-haystack
+
+[](https://pypi.org/project/cognee-haystack)
+[](https://pypi.org/project/cognee-haystack)
+
+[Cognee](https://www.cognee.ai/) integration for [Haystack](https://haystack.deepset.ai/) — open-source memory for AI agents.
+
+- [Changelog](https://github.com/deepset-ai/haystack-core-integrations/blob/main/integrations/cognee/CHANGELOG.md)
+
+---
+
+## Contributing
+
+Refer to the general [Contribution Guidelines](https://github.com/deepset-ai/haystack-core-integrations/blob/main/CONTRIBUTING.md).
diff --git a/integrations/cognee/examples/.artifacts/demo_memory_agent.html b/integrations/cognee/examples/.artifacts/demo_memory_agent.html
new file mode 100644
index 0000000000..1a31dda784
--- /dev/null
+++ b/integrations/cognee/examples/.artifacts/demo_memory_agent.html
@@ -0,0 +1,1450 @@
+
+
+
+
+
+Cognee Knowledge Graph
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Color:
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Laying out graph...
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/integrations/cognee/examples/.artifacts/demo_pipeline.html b/integrations/cognee/examples/.artifacts/demo_pipeline.html
new file mode 100644
index 0000000000..cf72ecd59c
--- /dev/null
+++ b/integrations/cognee/examples/.artifacts/demo_pipeline.html
@@ -0,0 +1,1450 @@
+
+
+
+
+
+Cognee Knowledge Graph
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Color:
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Laying out graph...
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/integrations/cognee/examples/README.md b/integrations/cognee/examples/README.md
new file mode 100644
index 0000000000..d4ef44805b
--- /dev/null
+++ b/integrations/cognee/examples/README.md
@@ -0,0 +1,40 @@
+# Cognee-Haystack Examples
+
+## Prerequisites
+
+Install the integration from the repository root:
+
+```bash
+pip install -e "integrations/cognee"
+```
+
+Set your LLM API key (required by cognee, default OpenAI API key):
+
+To integrate other LLM providers and other configuration options, see [Cognee Documentation](https://docs.cognee.ai/getting-started/installation#environment-configuration).
+
+
+```bash
+export LLM_API_KEY="sk-your-openai-api-key"
+```
+
+## Examples
+
+### Pipeline Demo (`demo_pipeline.py`)
+
+Demonstrates batch document ingestion with `CogneeWriter` (auto_cognify disabled),
+followed by a single `CogneeCognifier` pass, then retrieval with `CogneeRetriever`.
+Also shows the same flow wired as a connected Haystack Pipeline.
+
+```bash
+python integrations/cognee/examples/demo_pipeline.py
+```
+
+### Memory Agent Demo (`demo_memory_agent.py`)
+
+Demonstrates `CogneeMemoryStore` with per-user memory scoping via `user_id`:
+- Two users store private memories that are isolated from each other.
+- A shared dataset is created and read access is granted across users.
+
+```bash
+python integrations/cognee/examples/demo_memory_agent.py
+```
diff --git a/integrations/cognee/examples/demo_memory_agent.py b/integrations/cognee/examples/demo_memory_agent.py
new file mode 100644
index 0000000000..04b009a71f
--- /dev/null
+++ b/integrations/cognee/examples/demo_memory_agent.py
@@ -0,0 +1,149 @@
+#!/usr/bin/env python
+"""
+Demo: Cognee as a Memory Backend with User-Scoped Access
+
+Shows how CogneeMemoryStore supports per-user memory isolation via user_id,
+matching the MemoryStore protocol pattern used by Haystack's experimental Agent.
+
+Two users (Alice and Bob) each store private memories. Then Alice creates a
+shared dataset and grants Bob read access, demonstrating cross-user sharing.
+
+Note: This demo uses cognee's user management APIs directly for setup (creating
+users, granting permissions). These are admin operations outside the Haystack
+integration's scope. In production, user management would typically be handled
+by the application layer or cognee's API server.
+
+Prerequisites:
+ pip install -e "integrations/cognee"
+
+Set your LLM API key:
+ export LLM_API_KEY="sk-..."
+"""
+
+import asyncio
+
+import cognee
+from cognee.modules.data.methods import get_authorized_existing_datasets
+from cognee.modules.engine.operations.setup import setup
+from cognee.modules.users.methods import create_user
+from cognee.modules.users.permissions.methods import give_permission_on_dataset
+from haystack.dataclasses import ChatMessage
+
+from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
+
+
+async def main():
+ print("=== Cognee Memory Store — User Scoping Demo ===\n")
+
+ # --- Setup: clean slate and create two users ---
+ print("Setup: Pruning all data and creating users...")
+ await cognee.prune.prune_data()
+ await cognee.prune.prune_system(metadata=True)
+
+ # Re-create the database schema after pruning (prune_system drops all tables)
+ await setup()
+
+ alice = await create_user("alice@example.com", "password", is_verified=True)
+ bob = await create_user("bob@example.com", "password", is_verified=True)
+ alice_id = str(alice.id)
+ bob_id = str(bob.id)
+ print(f" Created Alice (id={alice_id[:8]}...) and Bob (id={bob_id[:8]}...)\n")
+
+ # =========================================================================
+ # Part 1: Private memories — each user can only see their own
+ # =========================================================================
+ print("--- Part 1: Private memories (user isolation) ---\n")
+
+ alice_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="alice_notes")
+ bob_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="bob_notes")
+
+ # Alice adds her private memories
+ print("Alice adds memories to 'alice_notes'...")
+ alice_store.add_memories(
+ messages=[
+ ChatMessage.from_user("The project deadline is next Friday."),
+ ],
+ user_id=alice_id,
+ )
+ print(" Done.\n")
+
+ # Bob adds his private memories
+ print("Bob adds memories to 'bob_notes'...")
+ bob_store.add_memories(
+ messages=[
+ ChatMessage.from_user("The client meeting is on Wednesday at 2pm."),
+ ChatMessage.from_user("The new API endpoint needs authentication."),
+ ],
+ user_id=bob_id,
+ )
+ print(" Done.\n")
+
+ # Alice searches her own store — should find results
+ print("Alice searches her own store for 'project deadline':")
+ results = alice_store.search_memories(query="What is the project deadline?", user_id=alice_id)
+ print(f" Found {len(results)} result(s)")
+ for i, msg in enumerate(results, 1):
+ print(f" [{i}] {msg.text}")
+ print()
+
+ # Bob searches his own store — should find results
+ print("Bob searches his own store for 'client meeting':")
+ results = bob_store.search_memories(query="When is the client meeting?", user_id=bob_id)
+ print(f" Found {len(results)} result(s)")
+ for i, msg in enumerate(results, 1):
+ print(f" [{i}] {msg.text}")
+ print()
+
+ # Alice searches Bob's store — should find nothing (no permission)
+ print("Alice tries to search Bob's store (no permission):")
+ results = bob_store.search_memories(query="When is the client meeting?", user_id=alice_id)
+ print(f" Found {len(results)} result(s) — access is isolated!\n")
+
+ # Bob searches Alice's store — should find nothing (no permission)
+ print("Bob tries to search Alice's store (no permission):")
+ results = alice_store.search_memories(query="What is the project deadline?", user_id=bob_id)
+ print(f" Found {len(results)} result(s) — access is isolated!\n")
+
+ # =========================================================================
+ # Part 2: Shared dataset — Alice creates it, grants Bob read access
+ # =========================================================================
+ print("--- Part 2: Shared dataset ---\n")
+
+ shared_store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", dataset_name="team_shared")
+
+ # Alice adds to the shared dataset (she becomes the owner)
+ print("Alice adds memories to 'team_shared'...")
+ shared_store.add_memories(
+ messages=[
+ ChatMessage.from_user("The team standup is every morning at 9am."),
+ ChatMessage.from_user("Our tech stack is Python, Haystack, and Cognee."),
+ ],
+ user_id=alice_id,
+ )
+ print(" Done.\n")
+
+ # Bob tries to search the shared store BEFORE getting permission — should find nothing
+ print("Bob tries to search 'team_shared' BEFORE permission:")
+ results = shared_store.search_memories(query="When is the team standup?", user_id=bob_id)
+ print(f" Found {len(results)} result(s) — no access yet.\n")
+
+ # Grant Bob read permission on the shared dataset
+ print("Alice grants Bob read access to 'team_shared'...")
+ shared_datasets = await get_authorized_existing_datasets(["team_shared"], "read", alice)
+ shared_dataset_id = shared_datasets[0].id
+ await give_permission_on_dataset(bob, shared_dataset_id, "read")
+ print(" Done.\n")
+
+ # Bob searches via the MemoryStore — the store automatically resolves shared datasets
+ print("Bob searches 'team_shared' AFTER getting read permission:")
+ results = shared_store.search_memories(query="When is the team standup?", user_id=bob_id)
+ print(f" Found {len(results)} result(s)")
+ for i, msg in enumerate(results, 1):
+ print(f" [{i}] {msg.text}")
+ print()
+
+ print("=== Done ===")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/integrations/cognee/examples/demo_pipeline.py b/integrations/cognee/examples/demo_pipeline.py
new file mode 100644
index 0000000000..0a73053618
--- /dev/null
+++ b/integrations/cognee/examples/demo_pipeline.py
@@ -0,0 +1,176 @@
+#!/usr/bin/env python
+"""
+Demo: Cognee + Haystack Pipeline — Writer, Cognifier, Retriever
+
+Demonstrates two ingestion strategies with CogneeWriter:
+
+1. auto_cognify=True (default) — each writer.run() call adds documents AND
+ immediately cognifies them into the knowledge graph. Simple but slower
+ when ingesting many batches, because cognify runs after every add.
+
+2. auto_cognify=False + CogneeCognifier — documents are added quickly in
+ multiple batches without building the knowledge graph. Then CogneeCognifier
+ processes the entire dataset once. This is faster for bulk ingestion.
+
+Both strategies produce the same result: a searchable knowledge graph that
+CogneeRetriever can query.
+
+Prerequisites:
+ pip install -e "integrations/cognee"
+
+Set your LLM API key (Cognee uses it internally):
+ export LLM_API_KEY="sk-..."
+"""
+
+import asyncio
+
+import cognee
+from haystack import Document, Pipeline
+
+from haystack_integrations.components.connectors.cognee import CogneeCognifier
+from haystack_integrations.components.retrievers.cognee import CogneeRetriever
+from haystack_integrations.components.writers.cognee import CogneeWriter
+
+DOCS_BATCH_1 = [
+ Document(
+ content=(
+ "Cognee is an open-source memory for AI agents. It builds a knowledge engine "
+ "that transforms raw data into a persistent, rich, and traceable memory that is "
+ "searchable by meaning and relationships."
+ ),
+ ),
+ Document(
+ content=(
+ "Haystack is an open-source LLM framework by deepset for building production-ready "
+ "RAG pipelines, agents, and search systems. It uses a component-based architecture "
+ "where each step is a composable building block."
+ ),
+ ),
+]
+
+DOCS_BATCH_2 = [
+ Document(
+ content=(
+ "Knowledge graphs represent information as nodes (entities) and edges (relationships). "
+ "They enable semantic search, reasoning, and discovery of hidden connections across "
+ "large document collections."
+ ),
+ ),
+ Document(
+ content=(
+ "The engineering team at Acme Corp consists of Alice (backend lead), Bob (ML engineer), "
+ "and Carol (infrastructure). They are building a next-generation search platform "
+ "powered by knowledge graphs and LLMs."
+ ),
+ ),
+]
+
+SEARCH_QUERIES = [
+ "What is Cognee and what does it do?",
+ "Who is on the engineering team at Acme Corp?",
+ "How do knowledge graphs work?",
+]
+
+
+def search_and_print(retriever, queries):
+ for query in queries:
+ print(f" Query: '{query}'")
+ result = retriever.run(query=query)
+ docs = result["documents"]
+ print(f" Found {len(docs)} result(s):")
+ for i, doc in enumerate(docs, 1):
+ print(f" [{i}] {doc.content}...")
+ print()
+
+
+async def main():
+ print("=== Cognee + Haystack Pipeline Demo ===\n")
+
+ # =========================================================================
+ # Part 1: auto_cognify=True — add + cognify on every call
+ #
+ # Each writer.run() both adds the documents AND cognifies the dataset.
+ # Simple for small ingestion, but cognify is the expensive step (calls
+ # the LLM to extract entities, build the graph, generate summaries).
+ # With N batches, cognify runs N times.
+ # =========================================================================
+ print("--- Part 1: auto_cognify=True (add + cognify each batch) ---\n")
+
+ await cognee.prune.prune_data()
+ await cognee.prune.prune_system(metadata=True)
+
+ writer_auto = CogneeWriter(dataset_name="demo_auto", auto_cognify=True)
+
+ print(f"1. Writing batch 1 ({len(DOCS_BATCH_1)} docs) — adds + cognifies...")
+ result = writer_auto.run(documents=DOCS_BATCH_1)
+ print(f" Written: {result['documents_written']} (cognify ran)\n")
+
+ print(f"2. Writing batch 2 ({len(DOCS_BATCH_2)} docs) — adds + cognifies again...")
+ result = writer_auto.run(documents=DOCS_BATCH_2)
+ print(f" Written: {result['documents_written']} (cognify ran again)\n")
+
+ print("3. Searching...\n")
+ retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_auto")
+ search_and_print(retriever, SEARCH_QUERIES)
+
+ # =========================================================================
+ # Part 2: auto_cognify=False + CogneeCognifier — batch add, cognify once
+ #
+ # Documents are added quickly without cognifying. Then CogneeCognifier
+ # processes everything in one pass. With N batches, cognify runs only once.
+ # =========================================================================
+ print("--- Part 2: auto_cognify=False + CogneeCognifier (batch add, cognify once) ---\n")
+
+ await cognee.prune.prune_data()
+ await cognee.prune.prune_system(metadata=True)
+
+ writer_batch = CogneeWriter(dataset_name="demo_batch", auto_cognify=False)
+
+ print(f"4. Writing batch 1 ({len(DOCS_BATCH_1)} docs) — add only, no cognify...")
+ result = writer_batch.run(documents=DOCS_BATCH_1)
+ print(f" Written: {result['documents_written']}\n")
+
+ print(f"5. Writing batch 2 ({len(DOCS_BATCH_2)} docs) — add only, no cognify...")
+ result = writer_batch.run(documents=DOCS_BATCH_2)
+ print(f" Written: {result['documents_written']}\n")
+
+ print("6. Cognifying the entire dataset in one pass...")
+ cognifier = CogneeCognifier(dataset_name="demo_batch")
+ result = cognifier.run()
+ print(f" Cognified: {result['cognified']}\n")
+
+ print("7. Searching...\n")
+ retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_batch")
+ search_and_print(retriever, SEARCH_QUERIES)
+
+ # =========================================================================
+ # Part 3: Same batch flow wired as a Haystack Pipeline
+ #
+ # CogneeWriter(auto_cognify=False) outputs documents_written, which
+ # connects to CogneeCognifier's input — so cognify triggers automatically
+ # after the writer finishes.
+ # =========================================================================
+ print("--- Part 3: Writer + Cognifier as a connected Pipeline ---\n")
+
+ await cognee.prune.prune_data()
+ await cognee.prune.prune_system(metadata=True)
+
+ pipeline = Pipeline()
+ pipeline.add_component("writer", CogneeWriter(dataset_name="demo_pipeline", auto_cognify=False))
+ pipeline.add_component("cognifier", CogneeCognifier(dataset_name="demo_pipeline"))
+ pipeline.connect("writer.documents_written", "cognifier.documents_written")
+
+ all_docs = DOCS_BATCH_1 + DOCS_BATCH_2
+ print(f"8. Running pipeline with {len(all_docs)} documents...")
+ result = pipeline.run({"writer": {"documents": all_docs}})
+ print(f" Cognified: {result['cognifier']['cognified']}\n")
+
+ print("9. Searching...\n")
+ retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", dataset_name="demo_pipeline")
+ search_and_print(retriever, SEARCH_QUERIES)
+
+ print("=== Done ===")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/integrations/cognee/pydoc/config_docusaurus.yml b/integrations/cognee/pydoc/config_docusaurus.yml
new file mode 100644
index 0000000000..662b1f342e
--- /dev/null
+++ b/integrations/cognee/pydoc/config_docusaurus.yml
@@ -0,0 +1,16 @@
+loaders:
+ - modules:
+ - haystack_integrations.components.connectors.cognee.cognifier
+ - haystack_integrations.components.retrievers.cognee.memory_retriever
+ - haystack_integrations.components.writers.cognee.memory_writer
+ - haystack_integrations.memory_stores.cognee.memory_store
+ search_path: [../src]
+processors:
+ - type: filter
+ documented_only: true
+ skip_empty_modules: true
+renderer:
+ description: Cognee integration for Haystack
+ id: integrations-cognee
+ filename: cognee.md
+ title: Cognee
diff --git a/integrations/cognee/pyproject.toml b/integrations/cognee/pyproject.toml
new file mode 100644
index 0000000000..cb871c1253
--- /dev/null
+++ b/integrations/cognee/pyproject.toml
@@ -0,0 +1,160 @@
+[build-system]
+requires = ["hatchling", "hatch-vcs"]
+build-backend = "hatchling.build"
+
+[project]
+name = "cognee-haystack"
+dynamic = ["version"]
+description = "Haystack integration for Cognee — memory for AI agents"
+readme = "README.md"
+requires-python = ">=3.10"
+license = "Apache-2.0"
+keywords = []
+authors = [{ name = "deepset GmbH", email = "info@deepset.ai" }]
+classifiers = [
+ "License :: OSI Approved :: Apache Software License",
+ "Development Status :: 3 - Alpha",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
+ "Programming Language :: Python :: Implementation :: CPython",
+ "Programming Language :: Python :: Implementation :: PyPy",
+]
+dependencies = [
+ "haystack-ai>=2.24.0",
+ "cognee>=0.5.4,<1.0",
+]
+
+[project.urls]
+Documentation = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee#readme"
+Issues = "https://github.com/deepset-ai/haystack-core-integrations/issues"
+Source = "https://github.com/deepset-ai/haystack-core-integrations/tree/main/integrations/cognee"
+
+[tool.hatch.build.targets.wheel]
+packages = ["src/haystack_integrations"]
+
+[tool.hatch.version]
+source = "vcs"
+tag-pattern = 'integrations\/cognee-v(?P.*)'
+
+[tool.hatch.version.raw-options]
+root = "../.."
+git_describe_command = 'git describe --tags --match="integrations/cognee-v[0-9]*"'
+
+[tool.hatch.envs.default]
+installer = "uv"
+dependencies = ["haystack-pydoc-tools", "ruff"]
+
+[tool.hatch.envs.default.scripts]
+docs = ["haystack-pydoc pydoc/config_docusaurus.yml"]
+fmt = "ruff check --fix {args}; ruff format {args}"
+fmt-check = "ruff check {args} && ruff format --check {args}"
+
+[tool.hatch.envs.test]
+dependencies = [
+ "pytest",
+ "pytest-asyncio",
+ "pytest-cov",
+ "pytest-rerunfailures",
+ "mypy",
+ "pip",
+]
+
+[tool.hatch.envs.test.scripts]
+unit = 'pytest -m "not integration" {args:tests}'
+integration = 'pytest -m "integration" {args:tests}'
+all = 'pytest {args:tests}'
+cov-retry = 'pytest --cov=haystack_integrations --reruns 3 --reruns-delay 30 -x {args:tests}'
+
+types = "mypy -p haystack_integrations.components.connectors.cognee -p haystack_integrations.components.retrievers.cognee -p haystack_integrations.components.writers.cognee -p haystack_integrations.memory_stores.cognee {args}"
+
+[tool.mypy]
+install_types = true
+non_interactive = true
+check_untyped_defs = true
+disallow_incomplete_defs = true
+
+[tool.ruff]
+line-length = 120
+
+[tool.ruff.lint]
+select = [
+ "A",
+ "ARG",
+ "B",
+ "C",
+ "D102", # Missing docstring in public method
+ "D103", # Missing docstring in public function
+ "D205", # 1 blank line required between summary line and description
+ "D209", # Closing triple quotes go to new line
+ "D213", # summary lines must be positioned on the second physical line of the docstring
+ "DTZ",
+ "E",
+ "EM",
+ "F",
+ "FBT",
+ "I",
+ "ICN",
+ "ISC",
+ "N",
+ "PLC",
+ "PLE",
+ "PLR",
+ "PLW",
+ "Q",
+ "RUF",
+ "S",
+ "T",
+ "TID",
+ "UP",
+ "W",
+ "YTT",
+]
+ignore = [
+ # Allow non-abstract empty methods in abstract base classes
+ "B027",
+ # Allow boolean positional values in function calls, like `dict.get(... True)`
+ "FBT003",
+ # Ignore checks for possible passwords
+ "S105",
+ "S106",
+ "S107",
+ # Ignore complexity
+ "C901",
+ "PLR0911",
+ "PLR0912",
+ "PLR0913",
+ "PLR0915",
+ # Ignore unused params
+ "ARG002",
+ # Allow assertions
+ "S101",
+]
+exclude = ["examples"]
+
+[tool.ruff.lint.isort]
+known-first-party = ["haystack_integrations"]
+
+[tool.ruff.lint.flake8-tidy-imports]
+ban-relative-imports = "parents"
+
+[tool.ruff.lint.per-file-ignores]
+# Tests can use magic values, assertions, and relative imports
+"tests/**/*" = ["D", "PLR2004", "S101", "TID252"]
+"examples/**/*" = ["D", "T201"]
+
+[tool.coverage.run]
+source = ["haystack_integrations"]
+branch = true
+parallel = false
+
+[tool.coverage.report]
+omit = ["*/tests/*", "*/__init__.py"]
+show_missing = true
+exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"]
+
+[tool.pytest.ini_options]
+minversion = "6.0"
+markers = ["integration: integration tests"]
diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py
new file mode 100644
index 0000000000..775c5a35e4
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/__init__.py
@@ -0,0 +1,9 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from .cognifier import CogneeCognifier
+
+__all__ = [
+ "CogneeCognifier",
+]
diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py
new file mode 100644
index 0000000000..d2eb383107
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/_utils.py
@@ -0,0 +1,114 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+import threading
+from collections.abc import Coroutine
+from typing import Any, Literal, TypeVar
+from uuid import UUID
+
+from cognee.modules.users.methods import get_user # type: ignore[import-untyped]
+
+T = TypeVar("T")
+
+# Persistent background event loop for run_sync when called from an async context.
+# A single loop is reused so that cognee's internal asyncio.Lock objects stay bound
+# to one event loop across multiple run_sync calls.
+_background_loop: asyncio.AbstractEventLoop | None = None
+_background_thread: threading.Thread | None = None
+_lock = threading.Lock()
+
+CogneeSearchType = Literal[
+ "GRAPH_COMPLETION",
+ "RAG_COMPLETION",
+ "CHUNKS",
+ "CHUNKS_LEXICAL",
+ "SUMMARIES",
+ "TRIPLET_COMPLETION",
+ "GRAPH_SUMMARY_COMPLETION",
+ "GRAPH_COMPLETION_COT",
+ "GRAPH_COMPLETION_CONTEXT_EXTENSION",
+ "CYPHER",
+ "NATURAL_LANGUAGE",
+ "TEMPORAL",
+ "CODING_RULES",
+ "FEELING_LUCKY",
+]
+
+
+def _get_background_loop() -> asyncio.AbstractEventLoop:
+ """Return a persistent background event loop running in a daemon thread."""
+ global _background_loop, _background_thread # noqa: PLW0603
+
+ with _lock:
+ if _background_loop is None or _background_loop.is_closed():
+ _background_loop = asyncio.new_event_loop()
+ _background_thread = threading.Thread(target=_background_loop.run_forever, daemon=True)
+ _background_thread.start()
+ return _background_loop
+
+
+def run_sync(coro: Coroutine[Any, Any, T]) -> T:
+ """
+ Run an async coroutine from a synchronous context.
+
+ If no event loop is running, uses asyncio.run() directly.
+ If already inside an async context, submits the coroutine to a persistent
+ background event loop so that cognee's internal asyncio.Lock objects remain
+ bound to a single loop across calls.
+ """
+ try:
+ asyncio.get_running_loop()
+ except RuntimeError:
+ return asyncio.run(coro)
+
+ # Already in an async context — submit to the persistent background loop
+ loop = _get_background_loop()
+ future = asyncio.run_coroutine_threadsafe(coro, loop)
+ return future.result()
+
+
+def extract_text(item: Any) -> str:
+ """
+ Best-effort text extraction from a Cognee search result item.
+
+ Cognee search results may contain items of various types depending on the
+ search strategy: plain ``str`` for LLM completions, ``dict`` with keys like
+ *content*/*text*/*description*/*name*, or cognee model objects (e.g.
+ ``DataPoint`` subclasses) carrying the same attributes.
+ """
+ if isinstance(item, str):
+ return item
+
+ for attr in ("content", "text", "description", "name"):
+ if hasattr(item, attr):
+ val = getattr(item, attr)
+ if val and isinstance(val, str):
+ return val
+
+ if isinstance(item, dict):
+ for key in ("content", "text", "description", "name"):
+ if key in item and isinstance(item[key], str):
+ return item[key]
+
+ return str(item)
+
+
+async def _get_cognee_user(user_id: str) -> Any:
+ """
+ Resolve a user_id string to a cognee User object.
+
+ Converts the given UUID string to a cognee User via ``cognee.modules.users.methods.get_user``.
+
+ :param user_id: UUID string identifying the cognee user.
+ :returns: A cognee ``User`` object.
+ :raises ValueError: If user_id is not a valid UUID or the user is not found.
+ """
+ try:
+ uid = UUID(user_id)
+ except ValueError as e:
+ msg = f"Invalid user_id: '{user_id}' is not a valid UUID."
+ raise ValueError(msg) from e
+
+ return await get_user(uid)
diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py
new file mode 100644
index 0000000000..d862f442e4
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/connectors/cognee/cognifier.py
@@ -0,0 +1,89 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Any
+
+from haystack import component, default_from_dict, default_to_dict, logging
+
+import cognee # type: ignore[import-untyped]
+
+from ._utils import run_sync
+
+logger = logging.getLogger(__name__)
+
+
+@component
+class CogneeCognifier:
+ """
+ Processes previously added data through Cognee's knowledge engine.
+
+ Wraps `cognee.cognify()` as a standalone pipeline step. Cognify takes raw data
+ that was previously added via `cognee.add()` and transforms it into a structured
+ knowledge graph. The process includes:
+
+ 1. **Document classification** — identifies the type and structure of the input data.
+ 2. **Text chunking** — splits documents into semantically meaningful segments.
+ 3. **Entity extraction** — uses an LLM to identify entities and their properties.
+ 4. **Relationship detection** — discovers connections between extracted entities.
+ 5. **Graph construction** — builds a knowledge graph with embeddings for vector search.
+ 6. **Summarization** — generates hierarchical summaries of the processed content.
+
+ After cognification, the data becomes searchable via `cognee.search()` using various
+ strategies (graph traversal, vector similarity, summaries, etc.).
+
+ This component is useful when you want to separate the add and cognify phases —
+ for example, batch-add documents first with `CogneeWriter(auto_cognify=False)`,
+ then cognify once.
+
+ Usage:
+ ```python
+ from haystack import Pipeline
+ from haystack_integrations.components.writers.cognee import CogneeWriter
+ from haystack_integrations.components.connectors.cognee import CogneeCognifier
+
+ pipeline = Pipeline()
+ pipeline.add_component("writer", CogneeWriter(dataset_name="my_data", auto_cognify=False))
+ pipeline.add_component("cognifier", CogneeCognifier(dataset_name="my_data"))
+ pipeline.connect("writer.documents_written", "cognifier.documents_written")
+
+ result = pipeline.run({"writer": {"documents": docs}})
+ ```
+ """
+
+ def __init__(self, dataset_name: str | list[str] | None = None):
+ """
+ Initialize the CogneeCognifier.
+
+ :param dataset_name: Optional Cognee dataset name(s) to cognify. Accepts a single
+ name, a list of names, or None to cognify all pending datasets.
+ """
+ self.dataset_name = dataset_name
+
+ @component.output_types(cognified=bool)
+ def run(self, documents_written: int | None = None) -> dict[str, Any]:
+ """
+ Run cognee.cognify() to process added data into the knowledge graph.
+
+ :param documents_written: Optional number of documents written by a preceding
+ CogneeWriter. Used as a pipeline connection point; cognify runs regardless
+ of the value, as long as data has been previously added.
+ :returns: Dictionary with key `cognified` set to True on success.
+ """
+ cognify_kwargs: dict[str, Any] = {}
+ if self.dataset_name:
+ datasets = [self.dataset_name] if isinstance(self.dataset_name, str) else self.dataset_name
+ cognify_kwargs["datasets"] = datasets
+
+ logger.info("Running cognee.cognify()")
+ run_sync(cognee.cognify(**cognify_kwargs))
+ return {"cognified": True}
+
+ def to_dict(self) -> dict[str, Any]:
+ """Serialize this component to a dictionary."""
+ return default_to_dict(self, dataset_name=self.dataset_name)
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> "CogneeCognifier":
+ """Deserialize a component from a dictionary."""
+ return default_from_dict(cls, data)
diff --git a/integrations/cognee/src/haystack_integrations/components/connectors/py.typed b/integrations/cognee/src/haystack_integrations/components/connectors/py.typed
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py
new file mode 100644
index 0000000000..90cf73038c
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/__init__.py
@@ -0,0 +1,9 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from .memory_retriever import CogneeRetriever
+
+__all__ = [
+ "CogneeRetriever",
+]
diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py
new file mode 100644
index 0000000000..3be8186b76
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/retrievers/cognee/memory_retriever.py
@@ -0,0 +1,110 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Any
+
+from haystack import Document, component, default_from_dict, default_to_dict, logging
+
+import cognee # type: ignore[import-untyped]
+from cognee.api.v1.search import SearchType # type: ignore[import-untyped]
+from haystack_integrations.components.connectors.cognee._utils import CogneeSearchType, extract_text, run_sync
+
+logger = logging.getLogger(__name__)
+
+
+@component
+class CogneeRetriever:
+ """
+ Retrieves documents from Cognee's memory.
+
+ Wraps `cognee.search()` and converts results into Haystack `Document` objects.
+
+ Usage:
+ ```python
+ from haystack_integrations.components.retrievers.cognee import CogneeRetriever
+
+ retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", top_k=5)
+ results = retriever.run(query="What is Cognee?")
+ for doc in results["documents"]:
+ print(doc.content)
+ ```
+ """
+
+ def __init__(
+ self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 10, dataset_name: str | None = None
+ ):
+ """
+ Initialize the CogneeRetriever.
+
+ :param search_type: Cognee search type. One of: GRAPH_COMPLETION, CHUNKS,
+ SUMMARIES, INSIGHTS, etc.
+ :param top_k: Maximum number of results to return.
+ :param dataset_name: Optional dataset name to restrict search scope.
+ """
+ self.search_type = search_type
+ self.top_k = top_k
+ self.dataset_name = dataset_name
+
+ @component.output_types(documents=list[Document])
+ def run(self, query: str, top_k: int | None = None) -> dict[str, list[Document]]:
+ """
+ Search Cognee's memory and return matching documents.
+
+ :param query: The search query.
+ :param top_k: Override the default maximum number of results.
+ :returns: Dictionary with key `documents` containing the search results
+ as Haystack Document objects.
+ """
+ effective_top_k = top_k if top_k is not None else self.top_k
+ search_type_enum = SearchType[self.search_type]
+
+ search_kwargs: dict[str, Any] = {
+ "query_text": query,
+ "query_type": search_type_enum,
+ }
+ if self.dataset_name:
+ search_kwargs["datasets"] = [self.dataset_name]
+
+ raw_results = run_sync(cognee.search(**search_kwargs))
+
+ documents = _convert_results(raw_results, effective_top_k)
+
+ logger.info(
+ "Cognee search returned {count} documents for query '{query}'",
+ count=len(documents),
+ query=query[:80],
+ )
+ return {"documents": documents}
+
+ def to_dict(self) -> dict[str, Any]:
+ """Serialize this component to a dictionary."""
+ return default_to_dict(
+ self,
+ search_type=self.search_type,
+ top_k=self.top_k,
+ dataset_name=self.dataset_name,
+ )
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> "CogneeRetriever":
+ """Deserialize a component from a dictionary."""
+ return default_from_dict(cls, data)
+
+
+def _convert_results(raw_results: list[Any], top_k: int) -> list[Document]:
+ """Convert Cognee search results to Haystack Documents."""
+ documents: list[Document] = []
+ if not raw_results:
+ return documents
+
+ for item in raw_results[:top_k]:
+ text = extract_text(item)
+ if text:
+ documents.append(
+ Document(
+ content=text,
+ meta={"source": "cognee", "search_result_type": type(item).__name__},
+ )
+ )
+ return documents
diff --git a/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed b/integrations/cognee/src/haystack_integrations/components/retrievers/py.typed
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py
new file mode 100644
index 0000000000..7d88e60c29
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/__init__.py
@@ -0,0 +1,9 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from .memory_writer import CogneeWriter
+
+__all__ = [
+ "CogneeWriter",
+]
diff --git a/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py
new file mode 100644
index 0000000000..b358a7c5ba
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/components/writers/cognee/memory_writer.py
@@ -0,0 +1,84 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Any
+
+from haystack import Document, component, default_from_dict, default_to_dict, logging
+
+import cognee # type: ignore[import-untyped]
+from haystack_integrations.components.connectors.cognee._utils import run_sync
+
+logger = logging.getLogger(__name__)
+
+
+@component
+class CogneeWriter:
+ """
+ Adds Haystack Documents to Cognee's memory.
+
+ Wraps `cognee.add()` and optionally `cognee.cognify()` to ingest documents
+ and build a knowledge engine in a single pipeline step.
+
+ Usage:
+ ```python
+ from haystack import Document
+ from haystack_integrations.components.writers.cognee import CogneeWriter
+
+ writer = CogneeWriter(dataset_name="my_dataset", auto_cognify=True)
+ writer.run(documents=[Document(content="Cognee builds AI memory.")])
+ ```
+ """
+
+ def __init__(self, *, dataset_name: str = "haystack", auto_cognify: bool = True):
+ """
+ Initialize the CogneeWriter.
+
+ :param dataset_name: Name of the Cognee dataset to add documents to.
+ :param auto_cognify: If True, automatically runs `cognee.cognify()` after adding
+ documents to process them into the knowledge engine.
+ """
+ self.dataset_name = dataset_name
+ self.auto_cognify = auto_cognify
+
+ @component.output_types(documents_written=int)
+ def run(self, documents: list[Document]) -> dict[str, Any]:
+ """
+ Add documents to Cognee and optionally cognify them.
+
+ :param documents: List of Haystack Documents to add.
+ :returns: Dictionary with key `documents_written` indicating how many
+ documents were successfully added.
+ """
+ texts = [doc.content for doc in documents if doc.content]
+ skipped = len(documents) - len(texts)
+ if skipped > 0:
+ logger.warning("Skipping {count} document(s) with empty content", count=skipped)
+
+ if texts:
+ run_sync(cognee.add(texts, dataset_name=self.dataset_name))
+
+ written = len(texts)
+
+ if self.auto_cognify and written > 0:
+ logger.info(
+ "Cognifying {count} documents in dataset '{dataset}'",
+ count=written,
+ dataset=self.dataset_name,
+ )
+ run_sync(cognee.cognify(datasets=[self.dataset_name]))
+
+ return {"documents_written": written}
+
+ def to_dict(self) -> dict[str, Any]:
+ """Serialize this component to a dictionary."""
+ return default_to_dict(
+ self,
+ dataset_name=self.dataset_name,
+ auto_cognify=self.auto_cognify,
+ )
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> "CogneeWriter":
+ """Deserialize a component from a dictionary."""
+ return default_from_dict(cls, data)
diff --git a/integrations/cognee/src/haystack_integrations/components/writers/py.typed b/integrations/cognee/src/haystack_integrations/components/writers/py.typed
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py
new file mode 100644
index 0000000000..eb9a6b81a8
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/__init__.py
@@ -0,0 +1,9 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from .memory_store import CogneeMemoryStore
+
+__all__ = [
+ "CogneeMemoryStore",
+]
diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py
new file mode 100644
index 0000000000..92c43cc3bb
--- /dev/null
+++ b/integrations/cognee/src/haystack_integrations/memory_stores/cognee/memory_store.py
@@ -0,0 +1,197 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Any
+
+from haystack import default_from_dict, default_to_dict, logging
+from haystack.dataclasses import ChatMessage
+
+import cognee # type: ignore[import-untyped]
+from cognee.api.v1.search import SearchType # type: ignore[import-untyped]
+from cognee.modules.data.exceptions import DatasetNotFoundError # type: ignore[import-untyped]
+from cognee.modules.users.permissions.methods import get_all_user_permission_datasets # type: ignore[import-untyped]
+from haystack_integrations.components.connectors.cognee._utils import (
+ CogneeSearchType,
+ _get_cognee_user,
+ extract_text,
+ run_sync,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class CogneeMemoryStore:
+ """
+ A memory store backed by Cognee memory.
+
+ Implements the `MemoryStore` protocol from haystack-experimental, allowing
+ Cognee to serve as the memory backend for Haystack's experimental Agent.
+
+ Usage:
+ ```python
+ from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
+
+ store = CogneeMemoryStore(search_type="GRAPH_COMPLETION", top_k=5)
+ store.add_memories(messages=[ChatMessage.from_user("Remember: the project deadline is Friday.")])
+ results = store.search_memories(query="When is the project deadline?")
+ ```
+ """
+
+ def __init__(
+ self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 5, dataset_name: str = "haystack_memory"
+ ):
+ """
+ Initialize the CogneeMemoryStore.
+
+ :param search_type: Cognee search type for memory retrieval.
+ :param top_k: Default number of results for memory search.
+ :param dataset_name: Cognee dataset name for storing memories.
+ """
+ self.search_type = search_type
+ self.top_k = top_k
+ self.dataset_name = dataset_name
+
+ def add_memories(
+ self,
+ *,
+ messages: list[ChatMessage],
+ user_id: str | None = None,
+ **kwargs: Any,
+ ) -> None:
+ """
+ Add chat messages to Cognee as memories.
+
+ :param messages: List of ChatMessages to store.
+ :param user_id: Optional cognee user UUID to scope memories to a specific user.
+ When provided, the data is stored under that user's permissions.
+ When ``None``, cognee's default user is used.
+ :param kwargs: Additional keyword arguments (unused, accepted for protocol
+ compatibility).
+ """
+ user = run_sync(_get_cognee_user(user_id)) if user_id else None
+
+ added = 0
+ for msg in messages:
+ text = msg.text
+ if not text:
+ continue
+ run_sync(cognee.add(text, dataset_name=self.dataset_name, user=user))
+ added += 1
+
+ if added > 0:
+ run_sync(cognee.cognify(datasets=[self.dataset_name], user=user))
+
+ logger.info("Added and cognified {count} messages as memories", count=added)
+
+ def search_memories(
+ self,
+ *,
+ query: str | None = None,
+ top_k: int = 5,
+ user_id: str | None = None,
+ **kwargs: Any,
+ ) -> list[ChatMessage]:
+ """
+ Search Cognee's knowledge engine for relevant memories.
+
+ :param query: The search query.
+ :param top_k: Maximum number of memories to return.
+ :param user_id: Optional cognee user UUID to scope the search to a specific user.
+ Search is restricted to the store's ``dataset_name``. If the user owns the
+ dataset it is resolved by name; otherwise the store checks whether the user
+ has been granted read access (e.g. via shared permissions) and searches by
+ dataset UUID.
+ When ``None``, cognee's default user is used.
+ :param kwargs: Additional keyword arguments (unused, accepted for protocol
+ compatibility).
+ :returns: List of ChatMessages containing memory content as system messages.
+ """
+ if not query:
+ return []
+
+ user = run_sync(_get_cognee_user(user_id)) if user_id else None
+ search_type_enum = SearchType[self.search_type]
+ effective_top_k = top_k or self.top_k
+
+ try:
+ raw_results = run_sync(
+ cognee.search(
+ query_text=query,
+ query_type=search_type_enum,
+ user=user,
+ datasets=[self.dataset_name],
+ )
+ )
+ except DatasetNotFoundError:
+ # The user doesn't own a dataset with this name.
+ # Fall back to checking shared datasets the user has read access to.
+ raw_results = self._search_shared_dataset(query, search_type_enum, user)
+
+ memories: list[ChatMessage] = []
+ if not raw_results:
+ return memories
+
+ for item in raw_results[:effective_top_k]:
+ text = extract_text(item)
+ if text:
+ memories.append(ChatMessage.from_system(text))
+
+ logger.info("Found {count} memories for query '{query}'", count=len(memories), query=query[:80])
+ return memories
+
+ def _search_shared_dataset(self, query: str, search_type_enum: SearchType, user: Any) -> list[Any]:
+ """Search for the dataset by name among all datasets the user has read access to."""
+ if user is None:
+ return []
+ all_readable = run_sync(get_all_user_permission_datasets(user, "read"))
+ matching = [ds for ds in all_readable if ds.name == self.dataset_name]
+ if not matching:
+ return []
+ return run_sync(
+ cognee.search(query_text=query, query_type=search_type_enum, user=user, dataset_ids=[matching[0].id])
+ )
+
+ def delete_all_memories(
+ self,
+ *,
+ user_id: str | None = None,
+ **kwargs: Any,
+ ) -> None:
+ """
+ Delete all memories by pruning Cognee's data and system state.
+
+ :param user_id: Optional cognee user UUID (accepted for protocol compatibility).
+ Note: Cognee's prune operations are global and not scoped to a specific user.
+ :param kwargs: Additional keyword arguments (unused, accepted for protocol
+ compatibility).
+ """
+ run_sync(cognee.prune.prune_data())
+ run_sync(cognee.prune.prune_system(metadata=True))
+ logger.info("All Cognee memories pruned")
+
+ def delete_memory(self, memory_id: str) -> None:
+ """
+ Delete a single memory by ID.
+
+ Not supported in V1 — Cognee's SDK does not expose fine-grained deletion.
+
+ :param memory_id: The ID of the memory to delete.
+ :raises NotImplementedError: Always, as single-item deletion is not yet supported.
+ """
+ msg = "CogneeMemoryStore does not support deleting individual memories in V1."
+ raise NotImplementedError(msg)
+
+ def to_dict(self) -> dict[str, Any]:
+ """Serialize this component to a dictionary."""
+ return default_to_dict(
+ self,
+ search_type=self.search_type,
+ top_k=self.top_k,
+ dataset_name=self.dataset_name,
+ )
+
+ @classmethod
+ def from_dict(cls, data: dict[str, Any]) -> "CogneeMemoryStore":
+ """Deserialize a component from a dictionary."""
+ return default_from_dict(cls, data)
diff --git a/integrations/cognee/src/haystack_integrations/memory_stores/py.typed b/integrations/cognee/src/haystack_integrations/memory_stores/py.typed
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/integrations/cognee/tests/__init__.py b/integrations/cognee/tests/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/integrations/cognee/tests/test_cognifier.py b/integrations/cognee/tests/test_cognifier.py
new file mode 100644
index 0000000000..4b6eea9ffc
--- /dev/null
+++ b/integrations/cognee/tests/test_cognifier.py
@@ -0,0 +1,80 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from unittest.mock import AsyncMock, patch
+
+from haystack_integrations.components.connectors.cognee import CogneeCognifier
+
+
+class TestCogneeCognifier:
+ def test_init_defaults(self):
+ cognifier = CogneeCognifier()
+ assert cognifier.dataset_name is None
+
+ def test_init_custom_string(self):
+ cognifier = CogneeCognifier(dataset_name="my_data")
+ assert cognifier.dataset_name == "my_data"
+
+ def test_init_custom_list(self):
+ cognifier = CogneeCognifier(dataset_name=["ds1", "ds2"])
+ assert cognifier.dataset_name == ["ds1", "ds2"]
+
+ def test_to_dict(self):
+ cognifier = CogneeCognifier(dataset_name="test_ds")
+ data = cognifier.to_dict()
+ assert data["type"] == "haystack_integrations.components.connectors.cognee.cognifier.CogneeCognifier"
+ assert data["init_parameters"]["dataset_name"] == "test_ds"
+
+ def test_to_dict_defaults(self):
+ cognifier = CogneeCognifier()
+ data = cognifier.to_dict()
+ assert data["init_parameters"]["dataset_name"] is None
+
+ def test_from_dict(self):
+ data = {
+ "type": "haystack_integrations.components.connectors.cognee.cognifier.CogneeCognifier",
+ "init_parameters": {"dataset_name": "restored"},
+ }
+ cognifier = CogneeCognifier.from_dict(data)
+ assert cognifier.dataset_name == "restored"
+
+ @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee")
+ def test_run_no_dataset(self, mock_cognee):
+ mock_cognee.cognify = AsyncMock()
+
+ cognifier = CogneeCognifier()
+ result = cognifier.run()
+
+ assert result == {"cognified": True}
+ mock_cognee.cognify.assert_awaited_once_with()
+
+ @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee")
+ def test_run_with_string_dataset(self, mock_cognee):
+ mock_cognee.cognify = AsyncMock()
+
+ cognifier = CogneeCognifier(dataset_name="my_data")
+ result = cognifier.run()
+
+ assert result == {"cognified": True}
+ mock_cognee.cognify.assert_awaited_once_with(datasets=["my_data"])
+
+ @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee")
+ def test_run_with_list_dataset(self, mock_cognee):
+ mock_cognee.cognify = AsyncMock()
+
+ cognifier = CogneeCognifier(dataset_name=["ds1", "ds2"])
+ result = cognifier.run()
+
+ assert result == {"cognified": True}
+ mock_cognee.cognify.assert_awaited_once_with(datasets=["ds1", "ds2"])
+
+ @patch("haystack_integrations.components.connectors.cognee.cognifier.cognee")
+ def test_run_with_documents_written_input(self, mock_cognee):
+ mock_cognee.cognify = AsyncMock()
+
+ cognifier = CogneeCognifier(dataset_name="my_data")
+ result = cognifier.run(documents_written=5)
+
+ assert result == {"cognified": True}
+ mock_cognee.cognify.assert_awaited_once_with(datasets=["my_data"])
diff --git a/integrations/cognee/tests/test_integration.py b/integrations/cognee/tests/test_integration.py
new file mode 100644
index 0000000000..2f58d40392
--- /dev/null
+++ b/integrations/cognee/tests/test_integration.py
@@ -0,0 +1,99 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import os
+
+import cognee
+import pytest
+from haystack import Document
+from haystack.dataclasses import ChatMessage
+
+from haystack_integrations.components.connectors.cognee import CogneeCognifier
+from haystack_integrations.components.connectors.cognee._utils import run_sync
+from haystack_integrations.components.retrievers.cognee import CogneeRetriever
+from haystack_integrations.components.writers.cognee import CogneeWriter
+from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
+
+SKIP_REASON = "Export an env var called LLM_API_KEY containing the LLM API key to run this test."
+
+
+@pytest.mark.skipif(
+ not os.environ.get("LLM_API_KEY", None),
+ reason=SKIP_REASON,
+)
+@pytest.mark.integration
+class TestCogneeMemoryStoreIntegration:
+ def test_add_search_delete(self):
+ store = CogneeMemoryStore(
+ search_type="GRAPH_COMPLETION",
+ top_k=3,
+ dataset_name="haystack_integration_test",
+ )
+
+ store.delete_all_memories()
+
+ messages = [
+ ChatMessage.from_user("The capital of France is Paris."),
+ ChatMessage.from_user("The Eiffel Tower is located in Paris."),
+ ]
+ store.add_memories(messages=messages)
+
+ results = store.search_memories(query="What is the capital of France?", top_k=3)
+ assert len(results) > 0
+ assert all(isinstance(r, ChatMessage) for r in results)
+
+ store.delete_all_memories()
+
+
+@pytest.mark.skipif(
+ not os.environ.get("LLM_API_KEY", None),
+ reason=SKIP_REASON,
+)
+@pytest.mark.integration
+class TestCogneeWriterRetrieverIntegration:
+ def test_write_and_retrieve(self):
+ run_sync(cognee.prune.prune_data())
+ run_sync(cognee.prune.prune_system(metadata=True))
+
+ writer = CogneeWriter(dataset_name="haystack_integration_test", auto_cognify=True)
+ docs = [
+ Document(content="Python is a programming language created by Guido van Rossum."),
+ Document(content="Haystack is an open-source framework for building AI applications."),
+ ]
+ write_result = writer.run(documents=docs)
+ assert write_result["documents_written"] == 2
+
+ retriever = CogneeRetriever(
+ search_type="GRAPH_COMPLETION",
+ top_k=3,
+ dataset_name="haystack_integration_test",
+ )
+ search_result = retriever.run(query="What is Haystack?")
+ assert len(search_result["documents"]) > 0
+ assert all(isinstance(d, Document) for d in search_result["documents"])
+
+ run_sync(cognee.prune.prune_data())
+ run_sync(cognee.prune.prune_system(metadata=True))
+
+
+@pytest.mark.skipif(
+ not os.environ.get("LLM_API_KEY", None),
+ reason=SKIP_REASON,
+)
+@pytest.mark.integration
+class TestCogneeCognifierIntegration:
+ def test_write_then_cognify(self):
+ run_sync(cognee.prune.prune_data())
+ run_sync(cognee.prune.prune_system(metadata=True))
+
+ writer = CogneeWriter(dataset_name="haystack_integration_test", auto_cognify=False)
+ docs = [Document(content="Berlin is the capital of Germany.")]
+ writer.run(documents=docs)
+
+ cognifier = CogneeCognifier(dataset_name="haystack_integration_test")
+ result = cognifier.run()
+ assert result["cognified"] is True
+
+ run_sync(cognee.prune.prune_data())
+ run_sync(cognee.prune.prune_system(metadata=True))
diff --git a/integrations/cognee/tests/test_memory_store.py b/integrations/cognee/tests/test_memory_store.py
new file mode 100644
index 0000000000..5a48be1eaf
--- /dev/null
+++ b/integrations/cognee/tests/test_memory_store.py
@@ -0,0 +1,167 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from haystack.dataclasses import ChatMessage
+
+from haystack_integrations.memory_stores.cognee import CogneeMemoryStore
+
+
+class TestCogneeMemoryStore:
+ def test_init_defaults(self):
+ store = CogneeMemoryStore()
+ assert store.search_type == "GRAPH_COMPLETION"
+ assert store.top_k == 5
+ assert store.dataset_name == "haystack_memory"
+
+ def test_init_custom(self):
+ store = CogneeMemoryStore(search_type="CHUNKS", top_k=10, dataset_name="custom")
+ assert store.search_type == "CHUNKS"
+ assert store.top_k == 10
+ assert store.dataset_name == "custom"
+
+ def test_to_dict(self):
+ store = CogneeMemoryStore(search_type="SUMMARIES", top_k=3, dataset_name="mem")
+ data = store.to_dict()
+ assert data["type"] == "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore"
+ assert data["init_parameters"]["search_type"] == "SUMMARIES"
+ assert data["init_parameters"]["top_k"] == 3
+ assert data["init_parameters"]["dataset_name"] == "mem"
+
+ def test_from_dict(self):
+ data = {
+ "type": "haystack_integrations.memory_stores.cognee.memory_store.CogneeMemoryStore",
+ "init_parameters": {"search_type": "CHUNKS", "top_k": 8, "dataset_name": "restored"},
+ }
+ store = CogneeMemoryStore.from_dict(data)
+ assert store.search_type == "CHUNKS"
+ assert store.top_k == 8
+ assert store.dataset_name == "restored"
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_add_memories(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ store = CogneeMemoryStore()
+ messages = [
+ ChatMessage.from_user("Remember: the deadline is Friday."),
+ ChatMessage.from_assistant("Got it, I'll remember Friday."),
+ ]
+ store.add_memories(messages=messages)
+
+ assert mock_cognee.add.await_count == 2
+ mock_cognee.cognify.assert_awaited_once()
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_add_memories_cognify_uses_dataset(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ store = CogneeMemoryStore(dataset_name="my_ds")
+ messages = [ChatMessage.from_user("Remember this.")]
+ store.add_memories(messages=messages)
+
+ mock_cognee.cognify.assert_awaited_once()
+ cognify_kwargs = mock_cognee.cognify.call_args[1]
+ assert cognify_kwargs["datasets"] == ["my_ds"]
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_search_memories(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=["Memory about deadline"])
+
+ store = CogneeMemoryStore()
+ results = store.search_memories(query="What is the deadline?")
+
+ assert len(results) == 1
+ assert isinstance(results[0], ChatMessage)
+ assert results[0].text == "Memory about deadline"
+
+ def test_search_memories_empty_query(self):
+ store = CogneeMemoryStore()
+ results = store.search_memories(query=None)
+ assert results == []
+
+ results = store.search_memories(query="")
+ assert results == []
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_delete_all_memories(self, mock_cognee):
+ mock_cognee.prune = type(
+ "Prune",
+ (),
+ {
+ "prune_data": AsyncMock(),
+ "prune_system": AsyncMock(),
+ },
+ )()
+
+ store = CogneeMemoryStore()
+ store.delete_all_memories()
+
+ mock_cognee.prune.prune_data.assert_awaited_once()
+ mock_cognee.prune.prune_system.assert_awaited_once()
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store._get_cognee_user", new_callable=AsyncMock)
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_add_memories_with_user_id(self, mock_cognee, mock_get_user):
+ mock_user = MagicMock()
+ mock_get_user.return_value = mock_user
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ store = CogneeMemoryStore()
+ messages = [ChatMessage.from_user("Remember this.")]
+ store.add_memories(messages=messages, user_id="550e8400-e29b-41d4-a716-446655440000")
+
+ mock_get_user.assert_awaited_once_with("550e8400-e29b-41d4-a716-446655440000")
+ mock_cognee.add.assert_awaited_once()
+ add_kwargs = mock_cognee.add.call_args[1]
+ assert add_kwargs["user"] is mock_user
+ cognify_kwargs = mock_cognee.cognify.call_args[1]
+ assert cognify_kwargs["user"] is mock_user
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store._get_cognee_user", new_callable=AsyncMock)
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_search_memories_with_user_id(self, mock_cognee, mock_get_user):
+ mock_user = MagicMock()
+ mock_get_user.return_value = mock_user
+ mock_cognee.search = AsyncMock(return_value=["result"])
+
+ store = CogneeMemoryStore()
+ results = store.search_memories(query="test", user_id="550e8400-e29b-41d4-a716-446655440000")
+
+ mock_get_user.assert_awaited_once_with("550e8400-e29b-41d4-a716-446655440000")
+ search_kwargs = mock_cognee.search.call_args[1]
+ assert search_kwargs["user"] is mock_user
+ assert len(results) == 1
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_add_memories_without_user_id(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ store = CogneeMemoryStore()
+ messages = [ChatMessage.from_user("Remember this.")]
+ store.add_memories(messages=messages)
+
+ add_kwargs = mock_cognee.add.call_args[1]
+ assert add_kwargs["user"] is None
+
+ @patch("haystack_integrations.memory_stores.cognee.memory_store.cognee")
+ def test_search_memories_without_user_id(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=["result"])
+
+ store = CogneeMemoryStore()
+ store.search_memories(query="test")
+
+ search_kwargs = mock_cognee.search.call_args[1]
+ assert search_kwargs["user"] is None
+
+ def test_delete_memory_raises(self):
+ store = CogneeMemoryStore()
+ with pytest.raises(NotImplementedError):
+ store.delete_memory("some-id")
diff --git a/integrations/cognee/tests/test_retriever.py b/integrations/cognee/tests/test_retriever.py
new file mode 100644
index 0000000000..6494861db9
--- /dev/null
+++ b/integrations/cognee/tests/test_retriever.py
@@ -0,0 +1,92 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from unittest.mock import AsyncMock, patch
+
+from haystack_integrations.components.retrievers.cognee import CogneeRetriever
+
+
+class TestCogneeRetriever:
+ def test_init_defaults(self):
+ retriever = CogneeRetriever()
+ assert retriever.search_type == "GRAPH_COMPLETION"
+ assert retriever.top_k == 10
+ assert retriever.dataset_name is None
+
+ def test_init_custom(self):
+ retriever = CogneeRetriever(search_type="CHUNKS", top_k=5, dataset_name="my_data")
+ assert retriever.search_type == "CHUNKS"
+ assert retriever.top_k == 5
+ assert retriever.dataset_name == "my_data"
+
+ def test_to_dict(self):
+ retriever = CogneeRetriever(search_type="SUMMARIES", top_k=3, dataset_name="ds")
+ data = retriever.to_dict()
+ assert data["type"] == "haystack_integrations.components.retrievers.cognee.memory_retriever.CogneeRetriever"
+ assert data["init_parameters"]["search_type"] == "SUMMARIES"
+ assert data["init_parameters"]["top_k"] == 3
+ assert data["init_parameters"]["dataset_name"] == "ds"
+
+ def test_from_dict(self):
+ data = {
+ "type": "haystack_integrations.components.retrievers.cognee.memory_retriever.CogneeRetriever",
+ "init_parameters": {"search_type": "CHUNKS", "top_k": 7, "dataset_name": None},
+ }
+ retriever = CogneeRetriever.from_dict(data)
+ assert retriever.search_type == "CHUNKS"
+ assert retriever.top_k == 7
+
+ @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee")
+ def test_run_returns_documents(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=["result one", "result two"])
+
+ retriever = CogneeRetriever(search_type="GRAPH_COMPLETION", top_k=5)
+ result = retriever.run(query="What is Cognee?")
+
+ docs = result["documents"]
+ assert len(docs) == 2
+ assert docs[0].content == "result one"
+ assert docs[0].meta["source"] == "cognee"
+
+ @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee")
+ def test_run_empty_results(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=[])
+
+ retriever = CogneeRetriever()
+ result = retriever.run(query="nonexistent query")
+
+ assert result["documents"] == []
+
+ @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee")
+ def test_run_respects_top_k_override(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=["a", "b", "c", "d", "e"])
+
+ retriever = CogneeRetriever(top_k=10)
+ result = retriever.run(query="test", top_k=2)
+
+ assert len(result["documents"]) == 2
+
+ @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee")
+ def test_run_handles_dict_results(self, mock_cognee):
+ mock_cognee.search = AsyncMock(
+ return_value=[
+ {"content": "Dict content", "score": 0.9},
+ {"text": "Alt text field"},
+ ]
+ )
+
+ retriever = CogneeRetriever()
+ result = retriever.run(query="test")
+
+ assert len(result["documents"]) == 2
+ assert result["documents"][0].content == "Dict content"
+ assert result["documents"][1].content == "Alt text field"
+
+ @patch("haystack_integrations.components.retrievers.cognee.memory_retriever.cognee")
+ def test_run_handles_none_results(self, mock_cognee):
+ mock_cognee.search = AsyncMock(return_value=None)
+
+ retriever = CogneeRetriever()
+ result = retriever.run(query="test")
+ assert result["documents"] == []
diff --git a/integrations/cognee/tests/test_utils.py b/integrations/cognee/tests/test_utils.py
new file mode 100644
index 0000000000..13701aea93
--- /dev/null
+++ b/integrations/cognee/tests/test_utils.py
@@ -0,0 +1,99 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import asyncio
+
+from haystack_integrations.components.connectors.cognee._utils import extract_text, run_sync
+
+
+class TestExtractText:
+ def test_string_input(self):
+ assert extract_text("hello world") == "hello world"
+
+ def test_object_with_content_attr(self):
+ class FakeResult:
+ content = "object content"
+
+ assert extract_text(FakeResult()) == "object content"
+
+ def test_object_with_text_attr(self):
+ class FakeResult:
+ text = "object text"
+
+ assert extract_text(FakeResult()) == "object text"
+
+ def test_object_with_description_attr(self):
+ class FakeResult:
+ description = "object description"
+
+ assert extract_text(FakeResult()) == "object description"
+
+ def test_object_with_name_attr(self):
+ class FakeResult:
+ name = "object name"
+
+ assert extract_text(FakeResult()) == "object name"
+
+ def test_object_attr_priority(self):
+ class FakeResult:
+ content = "first"
+ text = "second"
+
+ assert extract_text(FakeResult()) == "first"
+
+ def test_object_skips_none_attr(self):
+ class FakeResult:
+ content = None
+ text = "fallback text"
+
+ assert extract_text(FakeResult()) == "fallback text"
+
+ def test_object_skips_non_string_attr(self):
+ class FakeResult:
+ content = 123
+ text = "string value"
+
+ assert extract_text(FakeResult()) == "string value"
+
+ def test_dict_with_content_key(self):
+ assert extract_text({"content": "dict content"}) == "dict content"
+
+ def test_dict_with_text_key(self):
+ assert extract_text({"text": "dict text"}) == "dict text"
+
+ def test_dict_with_description_key(self):
+ assert extract_text({"description": "dict description"}) == "dict description"
+
+ def test_dict_with_name_key(self):
+ assert extract_text({"name": "dict name"}) == "dict name"
+
+ def test_dict_key_priority(self):
+ assert extract_text({"content": "first", "text": "second"}) == "first"
+
+ def test_dict_skips_non_string_value(self):
+ assert extract_text({"content": 123, "text": "fallback"}) == "fallback"
+
+ def test_fallback_to_str(self):
+ assert extract_text(42) == "42"
+
+ def test_fallback_dict_no_known_keys(self):
+ result = extract_text({"unknown": "value"})
+ assert "unknown" in result
+
+
+class TestRunSync:
+ def test_run_simple_coroutine(self):
+ async def coro():
+ return 42
+
+ result = run_sync(coro())
+ assert result == 42
+
+ def test_run_async_coroutine(self):
+ async def coro():
+ await asyncio.sleep(0)
+ return "done"
+
+ result = run_sync(coro())
+ assert result == "done"
diff --git a/integrations/cognee/tests/test_writer.py b/integrations/cognee/tests/test_writer.py
new file mode 100644
index 0000000000..c8acc499af
--- /dev/null
+++ b/integrations/cognee/tests/test_writer.py
@@ -0,0 +1,102 @@
+# SPDX-FileCopyrightText: 2022-present deepset GmbH
+#
+# SPDX-License-Identifier: Apache-2.0
+
+from unittest.mock import AsyncMock, patch
+
+from haystack import Document
+
+from haystack_integrations.components.writers.cognee import CogneeWriter
+
+
+class TestCogneeWriter:
+ def test_init_defaults(self):
+ writer = CogneeWriter()
+ assert writer.dataset_name == "haystack"
+ assert writer.auto_cognify is True
+
+ def test_init_custom(self):
+ writer = CogneeWriter(dataset_name="custom", auto_cognify=False)
+ assert writer.dataset_name == "custom"
+ assert writer.auto_cognify is False
+
+ def test_to_dict(self):
+ writer = CogneeWriter(dataset_name="test_ds", auto_cognify=False)
+ data = writer.to_dict()
+ assert data["type"] == "haystack_integrations.components.writers.cognee.memory_writer.CogneeWriter"
+ assert data["init_parameters"]["dataset_name"] == "test_ds"
+ assert data["init_parameters"]["auto_cognify"] is False
+
+ def test_from_dict(self):
+ data = {
+ "type": "haystack_integrations.components.writers.cognee.memory_writer.CogneeWriter",
+ "init_parameters": {"dataset_name": "restored", "auto_cognify": True},
+ }
+ writer = CogneeWriter.from_dict(data)
+ assert writer.dataset_name == "restored"
+ assert writer.auto_cognify is True
+
+ @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee")
+ def test_run_with_auto_cognify(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ writer = CogneeWriter(dataset_name="test", auto_cognify=True)
+ docs = [
+ Document(content="First document"),
+ Document(content="Second document"),
+ ]
+ result = writer.run(documents=docs)
+
+ assert result == {"documents_written": 2}
+ # Verify batch call: single add() with list of texts
+ mock_cognee.add.assert_awaited_once()
+ call_args = mock_cognee.add.call_args
+ assert call_args[0][0] == ["First document", "Second document"]
+ # Verify cognify uses specific dataset
+ mock_cognee.cognify.assert_awaited_once()
+ cognify_kwargs = mock_cognee.cognify.call_args[1]
+ assert cognify_kwargs["datasets"] == ["test"]
+
+ @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee")
+ def test_run_without_auto_cognify(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ writer = CogneeWriter(dataset_name="test", auto_cognify=False)
+ docs = [Document(content="A document")]
+ result = writer.run(documents=docs)
+
+ assert result == {"documents_written": 1}
+ mock_cognee.add.assert_awaited_once()
+ mock_cognee.cognify.assert_not_awaited()
+
+ @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee")
+ def test_run_skips_empty_content(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ writer = CogneeWriter(auto_cognify=True)
+ docs = [
+ Document(content="Valid document"),
+ Document(content=""),
+ Document(content=None),
+ ]
+ result = writer.run(documents=docs)
+
+ assert result == {"documents_written": 1}
+ mock_cognee.add.assert_awaited_once()
+ call_args = mock_cognee.add.call_args
+ assert call_args[0][0] == ["Valid document"]
+
+ @patch("haystack_integrations.components.writers.cognee.memory_writer.cognee")
+ def test_run_empty_list(self, mock_cognee):
+ mock_cognee.add = AsyncMock()
+ mock_cognee.cognify = AsyncMock()
+
+ writer = CogneeWriter(auto_cognify=True)
+ result = writer.run(documents=[])
+
+ assert result == {"documents_written": 0}
+ mock_cognee.add.assert_not_awaited()
+ mock_cognee.cognify.assert_not_awaited()