Skip to content

feat: Add Cognee integration#2979

Open
hande-k wants to merge 10 commits intodeepset-ai:mainfrom
hande-k:feat/add-cognee
Open

feat: Add Cognee integration#2979
hande-k wants to merge 10 commits intodeepset-ai:mainfrom
hande-k:feat/add-cognee

Conversation

@hande-k
Copy link
Copy Markdown

@hande-k hande-k commented Mar 18, 2026

Closes https://github.com/deepset-ai/haystack-private/issues/240

Summary

  • Adds Cognee integration with 4 components: CogneeWriter, CogneeCognifier, CogneeRetriever, and CogneeMemoryStore
  • CogneeWriter ingests Haystack Documents into Cognee memory via cognee.add() + optional cognee.cognify()
  • CogneeRetriever searches Cognee's memory and returns Haystack Documents
  • CogneeCognifier wraps cognee.cognify() as a standalone pipeline step
  • CogneeMemoryStore implements the MemoryStore protocol from haystack-experimental for use with Haystack's experimental Agent

Test plan

  • Unit tests pass via hatch run test:unit
  • Linting passes via hatch run fmt-check
  • Type checking passes via hatch run test:types
  • Demo scripts tested manually (demo_pipeline.py, demo_memory_agent.py)

@hande-k hande-k requested a review from a team as a code owner March 18, 2026 12:27
@hande-k hande-k requested review from davidsbatista and removed request for a team March 18, 2026 12:27
@github-actions github-actions bot added topic:CI type:documentation Improvements or additions to documentation labels Mar 18, 2026
@hande-k hande-k changed the title add fix feat: Add Cognee integration Mar 18, 2026
@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Mar 18, 2026

CLA assistant check
All committers have signed the CLA.

Copy link
Copy Markdown
Contributor

@davidsbatista davidsbatista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hande-k thank you for this contribution!

I left some initial comments/suggestions for improvements.

@hande-k
Copy link
Copy Markdown
Author

hande-k commented Mar 21, 2026

Thanks for the review @davidsbatista & @sjrl! I've addressed all the comments. A couple of notes:

  • py.typed: Since the components are now split across components/retrievers/, components/writers/,
    and memory_stores/, I added py.typed markers at the parent level for each (matching the pattern used
    by other multi-package integrations).
  • dataset_name on CogneeCognifier: Added str | list[str] | None support so it can target one or multiple
    datasets

Let me know if anything needs further adjustment!

return future.result()


def extract_text(item: Any) -> str:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to make the type for item more strict? Looking at the code below it looks like it could be one of three things str, dict and some Cognee specific object.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cognee's search API returns Any internally. Results can be str (LLM completions), dict (structured outputs), or cognee model objects (DataPoint subclasses) depending on the search type. Since these internal types aren't part of cognee's API, I've kept Any but expanded the docstring to document the three expected categories. Would that work?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah expanding the docstring with the three supported types and the note that they are internal types would be great.

"""
:param search_type: Cognee search type for memory retrieval.
:param top_k: Default number of results for memory search.
:param dataset_name: Cognee dataset name for storing memories.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help me understand Cognee a bit better. It looks like from your code that dataset_name is required which makes sense.

My question is what level of scope is dataset_name meant to have? For example, is it normal to have a new dataset_name per user or is the intention for dataset to be scoped to multiple users (e.g. like at an org level)?

Copy link
Copy Markdown
Contributor

@sjrl sjrl Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To provide more context on why I'm asking this question is that our existing Mem0 memory store, most memories are scoped to users which is why our experimental protocol typically requires a user_id in all of its methods since the idea is that we are always scoping the request to a specific user. For example our protocol for add_memories looks like

def add_memories(self, *, messages: list[ChatMessage], user_id: str | None = None, **kwargs: Any) -> None:

with the idea being that user_id is set at run time to scope the request to a specific user.

This way we don't need to create a new MemoryStore for every user which is useful when we usually want to easily reuse a Haystack Agent or Pipeline for many users.

So my hope is that Cognee can also fit into this pattern.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cognee supports user-level scoping. Its add(), search(), and cognify() APIs all accept a user parameter for ACL-based access control. I've added user_id support to CogneeMemoryStore to match your existing pattern: a single store instance can serve multiple users via user_id at runtime. When user_id is provided, it's resolved to a cognee User and forwarded to all API calls.
dataset_name groups data logically (like a collection), while user_id controls who can access it. Search is scoped to the store's dataset_name for the given user, and shared datasets (where another user granted read permission) are automatically resolved. I've updated the examples/demo_memory_agent.py for a demo of isolation + sharing.
Does the current logic make sense for the integration?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup that logic makes a lot of sense! Thanks for the changes.

I'd only say it would be good to expose the user resolution to the Retriever and Writer components as well since those are how pipeline builders will typically interact with the Cognee Memory Store.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to also add some integration tests that require access to a Cognee account. We then mark those test with a header like

@pytest.mark.skipif(
    not os.environ.get("COGNEE_API_KEY", None),
    reason="Export an env var called COGNEE_API_KEY containing the Cognee API key to run this test.",
)
@pytest.mark.integration

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test_integration.py gated on LLM_API_KEY. This integration uses Cognee SDK locally as a library, so the only external dependency is the LLM provider (in default config it is OpenAI API key).
We can extend the integration in the future to use our Cloud as well, which would require using a cognee account

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh okay good to know. When using the Cognee SDK locally like this where do the memories get stored? Is the cognee library running a local db in the background or storing them in memory?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add tests for CogneeCognifier and for the connectors/cognee/_utils.py file?

@sjrl
Copy link
Copy Markdown
Contributor

sjrl commented Mar 24, 2026

Thanks for the review @davidsbatista & @sjrl! I've addressed all the comments. A couple of notes:

  • py.typed: Since the components are now split across components/retrievers/, components/writers/,
    and memory_stores/, I added py.typed markers at the parent level for each (matching the pattern used
    by other multi-package integrations).
  • dataset_name on CogneeCognifier: Added str | list[str] | None support so it can target one or multiple
    datasets

Let me know if anything needs further adjustment!

@hande-k thanks for your patience with us! This is going to be a great addition to Haystack. Since it’s the first version of a new abstraction, we really appreciate you working through all the comments with us as we refine how it fits in.

@sjrl
Copy link
Copy Markdown
Contributor

sjrl commented Mar 30, 2026

Hi @hande-k, just checking in, do you have time to continue working on this? Happy to help or take over some of the remaining changes if that’s useful.

@hande-k
Copy link
Copy Markdown
Author

hande-k commented Mar 30, 2026

hi @sjrl thanks for the new comments and offering your help to handle the requested changes, appreciate it!
I'll try to address them all early this week, hope that is fine. Let me know if there's urgency so that I can try to prioritize it.

@sjrl
Copy link
Copy Markdown
Contributor

sjrl commented Mar 30, 2026

@hande-k you're welcome and that sounds good to me!

Comment on lines +72 to +80
user = run_sync(_get_cognee_user(user_id)) if user_id else None

added = 0
for msg in messages:
text = msg.text
if not text:
continue
run_sync(cognee.add(text, dataset_name=self.dataset_name, user=user))
added += 1
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the user is None?

Comment on lines +101 to +105
Search is restricted to the store's ``dataset_name``. If the user owns the
dataset it is resolved by name; otherwise the store checks whether the user
has been granted read access (e.g. via shared permissions) and searches by
dataset UUID.
When ``None``, cognee's default user is used.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure to only only use single backticks to wrap code sections otherwise our doc build complains.

Suggested change
Search is restricted to the store's ``dataset_name``. If the user owns the
dataset it is resolved by name; otherwise the store checks whether the user
has been granted read access (e.g. via shared permissions) and searches by
dataset UUID.
When ``None``, cognee's default user is used.
Search is restricted to the store's `dataset_name`. If the user owns the
dataset it is resolved by name; otherwise the store checks whether the user
has been granted read access (e.g. via shared permissions) and searches by
dataset UUID.
When `None`, cognee's default user is used.

Comment on lines +34 to +36
def __init__(
self, search_type: CogneeSearchType = "GRAPH_COMPLETION", top_k: int = 10, dataset_name: str | None = None
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the pattern for our DocumentStores and their respective retrievers I'd expect the init method to take in a CogneeMemoryStore as init param. Check out our OpenSearchBM25Retriever as an example

class OpenSearchBM25Retriever:
"""
Fetches documents from OpenSearchDocumentStore using the keyword-based BM25 algorithm.
BM25 computes a weighted word overlap between the query string and a document to determine its similarity.
"""
def __init__(
self,
*,
document_store: OpenSearchDocumentStore,
filters: dict[str, Any] | None = None,
fuzziness: int | str = "AUTO",
top_k: int = 10,
scale_score: bool = False,
all_terms_must_match: bool = False,
filter_policy: str | FilterPolicy = FilterPolicy.REPLACE,
custom_query: dict[str, Any] | None = None,
raise_on_failure: bool = True,
) -> None:

The idea is for the retriever to call the method directly from the Store. E.g. this is how the bm25 retrieval is run in the linked component docs = doc_store._bm25_retrieval(**bm25_args).

So it would be great if we could follow that pattern here as well.

self.dataset_name = dataset_name

@component.output_types(documents=list[Document])
def run(self, query: str, top_k: int | None = None) -> dict[str, list[Document]]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to this comment https://github.com/deepset-ai/haystack-core-integrations/pull/2979/changes#r3027766776 the run method should more or less accept all of the same arguments as the CogneeMemoryStore.search_memories function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

topic:CI type:documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants