diff --git a/CHANGELOG.md b/CHANGELOG.md index 05df220..971569a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ Please mark backwards incompatible changes with an exclamation mark at the start ## [Unreleased] +### Added +- The `Elasticsearch::Indexes` class. A class which allows multiple indexes to + be used (fed or queried) at the same time. + ## [28.3.0] - 2025-06-05 ### Added diff --git a/documentation/source/user_guidelines/elasticsearch/index.rst b/documentation/source/user_guidelines/elasticsearch/indexable.rst similarity index 55% rename from documentation/source/user_guidelines/elasticsearch/index.rst rename to documentation/source/user_guidelines/elasticsearch/indexable.rst index 2c85e4b..23ad555 100644 --- a/documentation/source/user_guidelines/elasticsearch/index.rst +++ b/documentation/source/user_guidelines/elasticsearch/indexable.rst @@ -1,62 +1,22 @@ -Index -===== +Indexable +========= -This class represents an Index inside an Elasticsearch cluster. It provides a -set of methods that allow the user to query the index and add new data. +The following methods are common to the following classes, which include the +``Indexable`` mixin: -The class also keeps a buffer of documents waiting to be pushed to the index, -the user can add documents to the buffer and the class will push them as soon as -the buffer is full. The user can also force the push of the records by flushing -the buffer. +.. toctree:: + :maxdepth: 2 + :glob: -To initialize an index: - -.. code-block:: ruby - - client = JayAPI::Elasticsearch::ClientFactory.new( - cluster_url: 'https://my-cluster.elastic.io' - ).create(max_attempts: 3, wait_strategy: :constant, wait_interval: 2) - - index = JayAPI::Elasticsearch::Index.new( - client: client, - index_name: 'my_index' - ) - -The ``cluster_url`` and the ``index_name`` are the only required parameters. If -the cluster is configured to use Elasticsearch's default port (``9200``) and has -no authentication in place this is all you need. However in most cases that -would not be enough, so you can also provide the following extra parameters: - -* ``port``: The port number where the Elasticsearch cluster is listening for - connections. -* ``username``: The username to use when authentication against the cluster. -* ``password``: The user's password -* ``batch_size``: The amount of documents the ``Index`` will store in its buffer - before triggering an automatic flush. -* ``logger``: If you want the messages to be logged to a particular logger. If - you don't pass a logger then the class will create one. - -The ``create`` method, that returns the client object, also takes optional arguments, -which define connection re-try behaviour: - -* ``max_attempts``: Sets the maximum number of reconnection attempts in - response to server errors. -* ``wait_strategy``: Determines the strategy for wait intervals between - reconnection attempts. Options are: - - * ``:constant`` - Maintains a consistent wait time specified by ``wait_time``. - * ``:geometric`` - Increases the wait time geometrically based on ``wait_time``. - -* ``wait_time``: Specifies the base wait time (in seconds) for the chosen - ``wait_strategy``. + indexable/* #push ----- -The ``push`` method stores a document in the ``Index``'s buffer. If the buffer +The ``push`` method stores a document in the ``Indexable``'s buffer. If the buffer reaches the maximum number of records the buffer will be flushed automatically. -``push`` takes a single ``Hash``, the document you want to send to the index. +``push`` takes a single ``Hash``, the document you want to send to the index(es). .. warning:: @@ -70,10 +30,10 @@ Example: documents.each do |document| # do something with your document, then push it - index.push(document) + indexable.push(document) end - index.flush # Do not forget to flush the index at the end. + indexable.flush # Do not forget to flush the indexable at the end. #index ------ @@ -81,27 +41,27 @@ Example: ``index`` pushes a document directly to the Elasticsearch cluster without adding it to the buffer first. So you don't need to call ``flush``: -``index`` takes a single ``Hash``, the document you want to send to the index. +``index`` takes a single ``Hash``, the document you want to send to the index(es). Example: .. code-block:: ruby - index.index(my_document) + indexable.index(my_document) .. note:: - Pushing documents one at a time is very inefficient because the ``Index`` + Pushing documents one at a time is very inefficient because the ``Indexable`` needs to perform an HTTP Request for each one. If you want to send many documents use ``push`` instead. -.. _`Index#search`: +.. _`Indexable#search`: #search ------- -The ``search`` method allows you to search the Elasticsearch index for documents -matching the provided query. This method takes two arguments: +The ``search`` method allows you to search the Elasticsearch index(es) for +documents matching the provided query. This method takes two arguments: * ``query`` A ``Hash`` with the query you want to execute, this Hash will be converted to JSON before being sent to Elasticsearch. It must follow @@ -121,7 +81,7 @@ Example: .. code-block:: ruby - index.search( + indexable.search( query: { match_all: { } }, @@ -144,10 +104,10 @@ Example: .. code-block:: ruby documents.each do |document| - index.push(document) + indexable.push(document) end - index.flush + indexable.flush #queue_size ----------- @@ -159,17 +119,17 @@ Example .. code-block:: ruby - index.queue_size # => 16 + indexable.queue_size # => 16 #delete_by_query ---------------- This method allows you to remove the documents that match the given query from -the index. The method has a single parameter: +the index(es). The method has a single parameter: * ``query``: A ``Hash`` with the query you want to use to match documents for deletion. For more information on this parameter or how to create queries see - the :ref:`Index#search` method documentation. + the :ref:`Indexable#search` method documentation. On success the method will return a ``Hash`` with information about the executed command, for example: diff --git a/documentation/source/user_guidelines/elasticsearch/indexable/index.rst b/documentation/source/user_guidelines/elasticsearch/indexable/index.rst new file mode 100644 index 0000000..75eee72 --- /dev/null +++ b/documentation/source/user_guidelines/elasticsearch/indexable/index.rst @@ -0,0 +1,55 @@ +Index +===== + +.. note:: + + This class includes the :doc:`../indexable` mixin. It exposes all its methods. + +This class represents an Index inside an Elasticsearch cluster. It provides a +set of methods that allow the user to query the index and add new data. + +The class also keeps a buffer of documents waiting to be pushed to the index, +the user can add documents to the buffer and the class will push them as soon as +the buffer is full. The user can also force the push of the records by flushing +the buffer. + +To initialize an index: + +.. code-block:: ruby + + client = JayAPI::Elasticsearch::ClientFactory.new( + cluster_url: 'https://my-cluster.elastic.io' + ).create(max_attempts: 3, wait_strategy: :constant, wait_interval: 2) + + index = JayAPI::Elasticsearch::Index.new( + client: client, + index_name: 'my_index' + ) + +The ``cluster_url`` and the ``index_name`` are the only required parameters. If +the cluster is configured to use Elasticsearch's default port (``9200``) and has +no authentication in place this is all you need. However in most cases that +would not be enough, so you can also provide the following extra parameters: + +* ``port``: The port number where the Elasticsearch cluster is listening for + connections. +* ``username``: The username to use when authentication against the cluster. +* ``password``: The user's password +* ``batch_size``: The amount of documents the ``Index`` will store in its buffer + before triggering an automatic flush. +* ``logger``: If you want the messages to be logged to a particular logger. If + you don't pass a logger then the class will create one. + +The ``create`` method, that returns the client object, also takes optional arguments, +which define connection re-try behaviour: + +* ``max_attempts``: Sets the maximum number of reconnection attempts in + response to server errors. +* ``wait_strategy``: Determines the strategy for wait intervals between + reconnection attempts. Options are: + + * ``:constant`` - Maintains a consistent wait time specified by ``wait_time``. + * ``:geometric`` - Increases the wait time geometrically based on ``wait_time``. + +* ``wait_time``: Specifies the base wait time (in seconds) for the chosen + ``wait_strategy``. diff --git a/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst b/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst new file mode 100644 index 0000000..26f82b6 --- /dev/null +++ b/documentation/source/user_guidelines/elasticsearch/indexable/indexes.rst @@ -0,0 +1,67 @@ +Indexes +======= + +.. note:: + + This class includes the :doc:`../indexable` mixin. It exposes all its methods. + +This class represents a set of indexes in an elasticsearch cluster. It provides +a set of methods that allow the user to query the indexes or add new data to +all of them at the same time. + +The class works exactly as :doc:`index`. It only differs in the fact that it can +be initialized with multiple ``index_names`` and not only one, like ``Index``. + +Initializing +------------ + +Just like with ``Index`` you need an instance of ``Elasticsearch::Client``. You +can use the ``ClientFactory`` to get one: + +.. code-block:: ruby + + require 'jay_api/elasticsearch/client_factory' + + client = JayAPI::Elasticsearch::ClientFactory.new( + cluster_url: 'https://my-cluster.elastic.io' + ).create + +Then you can use the client to initialize the ``Indexes`` class: + +.. code-block:: ruby + + require 'jay_api/elasticsearch/indexes' + + indexes = JayAPI::Elasticsearch::Indexes.new( + client: client, index_names: %w[my_index my_other_index not_my_index] + ) + +The following arguments are available for the ``#initialize`` method: + +* ``client``: An instance of ``Elasticsearch::Client``. You can get one using + the ``Elasticsearch::ClientFactory`` class. +* ``index_names``: An ``Array`` of ``String``. The names of the indexes you + want to work with. +* ``batch_size``: The number of documents the ``Indexes`` class will store in + its buffer before triggering a ``#flush`` call when the ``#push`` method is + used to add data. The default is: 100. +* ``logger``: A ``Logger`` object used to log messages. If none is given the + ``Indexes`` object will create one of its own. + +.. warning:: + + When the ``batch_size`` isn't a multiple of the number of elements in the + ``index_names`` array there is a chance that the size of the batches sent to + the Elasticsearch could be bigger than ``batch_size``. This can be avoided + simply by choosing an integer multiple of the array's size. + +#index_names +------------ + +This method returns the array of index names used to initialize the ``Indexes`` +object. + +.. warning:: + + Unlike ``Index``, ``Indexes`` objects **DO NOT** respond to the + ``#index_name`` message. diff --git a/documentation/source/user_guidelines/elasticsearch/query_builder.rst b/documentation/source/user_guidelines/elasticsearch/query_builder.rst index def157f..2bdd0b8 100644 --- a/documentation/source/user_guidelines/elasticsearch/query_builder.rst +++ b/documentation/source/user_guidelines/elasticsearch/query_builder.rst @@ -76,7 +76,7 @@ be returned if there aren't enough documents matching the query). By using ``from`` and ``size`` you can only scroll through a maximum of 10,000 documents. If you have more than that in your index, you'll have to use -:ref:`Index#search` method with ``type: :search_after``. +:ref:`Indexable#search` method with ``type: :search_after``. #sort ----- @@ -171,7 +171,7 @@ And the use of Hashes to include or exclude parts of the document, for example: Once you have added all the clauses you want on your queries you can call ``to_h`` or ``to_query`` to get the corresponding Hash. The class converts the -query to a Hash representation that can then be passed to :ref:`Index#search` to +query to a Hash representation that can then be passed to :ref:`Indexable#search` to perform the actual search. .. note:: diff --git a/lib/jay_api/elasticsearch.rb b/lib/jay_api/elasticsearch.rb index 6d1dc8c..f4d233e 100644 --- a/lib/jay_api/elasticsearch.rb +++ b/lib/jay_api/elasticsearch.rb @@ -6,6 +6,7 @@ require_relative 'elasticsearch/client_factory' require_relative 'elasticsearch/errors' require_relative 'elasticsearch/index' +require_relative 'elasticsearch/indexes' require_relative 'elasticsearch/query_builder' require_relative 'elasticsearch/query_results' require_relative 'elasticsearch/response' diff --git a/lib/jay_api/elasticsearch/async.rb b/lib/jay_api/elasticsearch/async.rb index 0c2de8f..07ae3a9 100644 --- a/lib/jay_api/elasticsearch/async.rb +++ b/lib/jay_api/elasticsearch/async.rb @@ -19,8 +19,8 @@ class Async def_delegators :index, :index_name - # @param [JayAPI::Elasticsearch::Index] index The elasticsearch index on - # which to execute asynchronous operations + # @param [JayAPI::Elasticsearch::Indexable] index The elasticsearch + # index or indexes on which to execute asynchronous operations def initialize(index) @index = index end diff --git a/lib/jay_api/elasticsearch/index.rb b/lib/jay_api/elasticsearch/index.rb index 96b7a38..4ca1336 100644 --- a/lib/jay_api/elasticsearch/index.rb +++ b/lib/jay_api/elasticsearch/index.rb @@ -1,57 +1,28 @@ # frozen_string_literal: true -require 'active_support' -require 'active_support/json' # Needed because ActiveSupport 6 doesn't include it's own JSON module. 🤦 -require 'active_support/core_ext/hash/indifferent_access' -require 'active_support/core_ext/hash/keys' -require 'elasticsearch' -require 'logging' - -require_relative 'errors/elasticsearch_error' -require_relative 'async' -require_relative 'query_results' -require_relative 'response' -require_relative 'batch_counter' -require_relative 'search_after_results' +require_relative 'indexable' module JayAPI module Elasticsearch # Represents an Elasticsearch index. Allows data to be pushed to it one # record at a time or in batches of the specified size. class Index - attr_reader :client, :index_name, :batch_size - - # Default type for documents indexed with the #index method. - DEFAULT_DOC_TYPE = 'nested' + include ::JayAPI::Elasticsearch::Indexable - # Supported document types (for the #index method) - SUPPORTED_TYPES = [DEFAULT_DOC_TYPE, nil].freeze - - # :reek:ControlParameter (want to avoid the creating of the logger on method definition) - # Creates a new instance of the class. # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. # @param [String] index_name The name of the Elasticsearch index. - # @param [Integer] batch_size The size of the batch. When this number of - # items are pushed into the index they are flushed to the - # Elasticsearch instance. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the index they are flushed to the Elasticsearch + # instance. # @param [Logging::Logger, nil] logger The logger object to use, if # none is given a new one will be created. def initialize(client:, index_name:, batch_size: 100, logger: nil) - @logger = logger || Logging.logger[self] - - @client = client - @index_name = index_name - @batch_size = batch_size - - @batch = [] + super(client: client, index_names: [index_name], batch_size: batch_size, logger: logger) end - # Pushes a record into the index. (This does not send the record to the - # Elasticsearch instance, only puts it into the send queue). - # @param [Hash] data The data to be pushed to the index. - def push(data) - @batch << { index: { _index: index_name, _type: 'nested', data: data } } - flush! if @batch.size >= batch_size + # @return [String] The name of the Elasticsearch index. + def index_name + @index_name ||= index_names.first end # Sends a record to the Elasticsearch instance right away. @@ -59,8 +30,8 @@ def push(data) # @param [String, nil] type The type of the document. When set to +nil+ # the decision is left to Elasticsearch's API. Which will normally # default to +_doc+. - # @return [Hash] A hash with information about the created document. An - # example of such Hash is: + # @return [Hash] A Hash containing information about the created document. + # An example of such Hash is: # # { # "_index" => "xyz01_unit_test", @@ -76,147 +47,7 @@ def push(data) # For information on the contents of this Hash please see: # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-response-body def index(data, type: DEFAULT_DOC_TYPE) - raise ArgumentError, "Unsupported type: '#{type}'" unless SUPPORTED_TYPES.include?(type) - - client.index index: index_name, type: type, body: data - end - - # Performs a query on the index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL query: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # @param [Hash] query The query to perform. - # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. - # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. - # @return [JayAPI::Elasticsearch::QueryResults] The query results. - # @raise [Elasticsearch::Transport::Transport::ServerError] If the - # query fails. - def search(query, batch_counter: nil, type: nil) - begin - response = Response.new(client.search(index: index_name, body: query)) - rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest - logger.error "The 'search' query is invalid: #{JSON.pretty_generate(query)}" - raise - end - query_results(query, response, batch_counter, type) - end - - # Sends whatever is currently in the send queue to the Elasticsearch - # instance and clears the queue. - def flush - return unless @batch.any? - - flush! - end - - # Returns the number of elements currently on the send queue. - # @return [Integer] The number of items in the send queue. - def queue_size - @batch.size - end - - # Delete the documents matching the given query from the Index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL documentation: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - # @param [Hash] query The delete query - # @param [Integer] slices Number of slices to cut the operation into for - # faster processing (i.e., run the operation in parallel) - # @param [Boolean] wait_for_completion False if Elasticsearch should not - # wait for completion and perform the request asynchronously, true if it - # should wait for completion (i.e., run the operation asynchronously) - # @return [Hash] A Hash that details the results of the operation - # @example Returned Hash (with `wait_for_completion: true`): - # { - # took: 103, - # timed_out: false, - # total: 76, - # deleted: 76, - # batches: 1, - # version_conflicts: 0, - # noops: 0, - # retries: { bulk: 0, search: 0 }, - # throttled_millis: 0, - # requests_per_second: 1.0, - # throttled_until_millis: 0, - # failures: [] - # } - # @example Returned Hash (with `wait_for_completion: false`): - # { - # task: "B5oDyEsHQu2Q-wpbaMSMTg:577388264" - # } - # @raise [Elasticsearch::Transport::Transport::ServerError] If the - # query fails. - def delete_by_query(query, slices: nil, wait_for_completion: true) - request_params = { index: index_name, body: query }.tap do |params| - params.merge!(slices: slices) if slices - params.merge!(wait_for_completion: false) unless wait_for_completion - end - - client.delete_by_query(**request_params).deep_symbolize_keys - end - - # Deletes asynchronously the documents matching the given query from the - # Index. - # For more information on how to build the query please refer to the - # Elasticsearch DSL documentation: - # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html - # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html - # @param [Hash] query The delete query - # @param [Integer, String] slices Number of slices to cut the operation - # into for faster processing (i.e., run the operation in parallel). Use - # "auto" to make elasticsearch decide how many slices to divide into - # @return [Concurrent::Promise] The eventual value returned from the single - # completion of the delete operation - def delete_by_query_async(query, slices: nil) - async.delete_by_query(query, slices: slices) - end - - private - - attr_reader :logger, :batch - - # Flushes the current send queue to the Elasticsearch instance and - # clears the queue. - def flush! - logger.info "Pushing data to Elasticsearch (#{batch.size} records)..." - response = client.bulk body: batch - - handle_errors(response) if response['errors'] - - logger.info 'Done' - @batch = [] - end - - # @param [Hash] query The elastic search query. - # @param [JayAPI::Elasticsearch::Response] response The response to the query. - # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. - # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. - # @return [QueryResults] - def query_results(query, response, batch_counter, type) - (type == :search_after ? SearchAfterResults : QueryResults).new( - index: self, - query: query, - response: response, - batch_counter: BatchCounter.create_or_update(batch_counter, query, response.size) - ) - end - - # Scans the Elasticsearch response in search for the first item that has - # en erroneous state and raises an error including the error details. - # @param [Hash] response The response returned by the Elasticsearch client. - # @raise [Errors::ElasticsearchError] Is always raised. - def handle_errors(response) - error_item = response['items'].find { |item| item['index']['error'] } - - raise Errors::ElasticsearchError, - "An error occurred when pushing the data to Elasticsearch:\n#{error_item['index']['error'].inspect}" - end - - # @return [JayAPI::Elasticsearch::Async] - def async - @async ||= JayAPI::Elasticsearch::Async.new(self) + super.first end end end diff --git a/lib/jay_api/elasticsearch/indexable.rb b/lib/jay_api/elasticsearch/indexable.rb new file mode 100644 index 0000000..02f4c57 --- /dev/null +++ b/lib/jay_api/elasticsearch/indexable.rb @@ -0,0 +1,225 @@ +# frozen_string_literal: true + +require 'active_support' +require 'active_support/json' # Needed because ActiveSupport 6 doesn't include it's own JSON module. 🤦 +require 'active_support/core_ext/hash/keys' +require 'elasticsearch' +require 'logging' + +require_relative 'async' +require_relative 'batch_counter' +require_relative 'errors/elasticsearch_error' +require_relative 'query_results' +require_relative 'response' +require_relative 'search_after_results' + +module JayAPI + module Elasticsearch + # This module houses the Elasticsearch methods that can be used with a + # single or with multiple indexes. Its main purpose is to avoid code + # repetition between classes. + module Indexable + # Default type for documents indexed with the #index method. + DEFAULT_DOC_TYPE = 'nested' + + # Supported document types (for the #index method) + SUPPORTED_TYPES = [DEFAULT_DOC_TYPE, nil].freeze + + attr_reader :client, :batch_size + + # :reek:ControlParameter (want to avoid the creating of the logger on method definition) + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. + # @param [Array] index_names The names of the Elasticsearch indexes. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the index they are flushed to the Elasticsearch + # instance. + # @param [Logging::Logger, nil] logger The logger object to use, if + # none is given a new one will be created. + def initialize(client:, index_names:, batch_size: 100, logger: nil) + @logger = logger || Logging.logger[self] + + @client = client + @index_names = index_names + @batch_size = batch_size + + @batch = [] + end + + # Pushes a record into the index. (This does not send the record to the + # Elasticsearch instance, only puts it into the send queue). + # @param [Hash] data The data to be pushed to the index. + def push(data) + index_names.each do |index_name| + batch << { index: { _index: index_name, _type: 'nested', data: data } } + end + + flush! if batch.size >= batch_size + end + + # Sends a record to the Elasticsearch instance right away. + # @param [Hash] data The data to be sent. + # @param [String, nil] type The type of the document. When set to +nil+ + # the decision is left to Elasticsearch's API. Which will normally + # default to +_doc+. + # @return [Array] An array with hashes containing information about + # the created documents. An example of such Hashes is: + # + # { + # "_index" => "xyz01_unit_test", + # "_type" => "nested", + # "_id" => "SVY1mJEBQ5CNFZM8Lodt", + # "_version" => 1, + # "result" => "created", + # "_shards" => { "total" => 2, "successful" => 1, "failed" => 0 }, + # "_seq_no" => 0, + # "_primary_term" => 1 + # } + # + # For information on the contents of this Hash please see: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#docs-index-api-response-body + def index(data, type: DEFAULT_DOC_TYPE) + raise ArgumentError, "Unsupported type: '#{type}'" unless SUPPORTED_TYPES.include?(type) + + index_names.map { |index_name| client.index index: index_name, type: type, body: data } + end + + # Performs a query on the index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL query: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # @param [Hash] query The query to perform. + # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. + # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. + # @return [JayAPI::Elasticsearch::QueryResults] The query results. + # @raise [Elasticsearch::Transport::Transport::ServerError] If the + # query fails. + def search(query, batch_counter: nil, type: nil) + begin + response = Response.new(client.search(index: index_names, body: query)) + rescue ::Elasticsearch::Transport::Transport::Errors::BadRequest + logger.error "The 'search' query is invalid: #{JSON.pretty_generate(query)}" + raise + end + query_results(query, response, batch_counter, type) + end + + # Sends whatever is currently in the send queue to the Elasticsearch + # instance and clears the queue. + def flush + return unless @batch.any? + + flush! + end + + # Returns the number of elements currently on the send queue. + # @return [Integer] The number of items in the send queue. + def queue_size + batch.size + end + + # Delete the documents matching the given query from the Index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL documentation: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # @param [Hash] query The delete query + # @param [Integer] slices Number of slices to cut the operation into for + # faster processing (i.e., run the operation in parallel) + # @param [Boolean] wait_for_completion False if Elasticsearch should not + # wait for completion and perform the request asynchronously, true if it + # should wait for completion (i.e., run the operation synchronously) + # @return [Hash] A Hash that details the results of the operation + # @example Returned Hash (with `wait_for_completion: true`): + # { + # took: 103, + # timed_out: false, + # total: 76, + # deleted: 76, + # batches: 1, + # version_conflicts: 0, + # noops: 0, + # retries: { bulk: 0, search: 0 }, + # throttled_millis: 0, + # requests_per_second: 1.0, + # throttled_until_millis: 0, + # failures: [] + # } + # @example Returned Hash (with `wait_for_completion: false`): + # { + # task: "B5oDyEsHQu2Q-wpbaMSMTg:577388264" + # } + # @raise [Elasticsearch::Transport::Transport::ServerError] If the + # query fails. + def delete_by_query(query, slices: nil, wait_for_completion: true) + request_params = { index: index_names, body: query }.tap do |params| + params.merge!(slices: slices) if slices + params.merge!(wait_for_completion: false) unless wait_for_completion + end + + client.delete_by_query(**request_params).deep_symbolize_keys + end + + # Deletes asynchronously the documents matching the given query from the + # Index. + # For more information on how to build the query please refer to the + # Elasticsearch DSL documentation: + # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html + # https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html + # @param [Hash] query The delete query + # @param [Integer, String] slices Number of slices to cut the operation + # into for faster processing (i.e., run the operation in parallel). Use + # "auto" to make Elasticsearch decide how many slices to divide into + # @return [Concurrent::Promise] The eventual value returned from the single + # completion of the delete operation + def delete_by_query_async(query, slices: nil) + async.delete_by_query(query, slices: slices) + end + + private + + attr_reader :index_names, :logger, :batch + + # Scans the Elasticsearch response in search for the first item that has + # an erroneous state and raises an error including the error details. + # @param [Hash] response The response returned by the Elasticsearch client. + # @raise [Errors::ElasticsearchError] Is always raised. + def handle_errors(response) + error_item = response['items'].find { |item| item['index']['error'] } + + raise Errors::ElasticsearchError, + "An error occurred when pushing the data to Elasticsearch:\n#{error_item['index']['error'].inspect}" + end + + # Flushes the current send queue to the Elasticsearch instance and + # clears the queue. + def flush! + logger.info "Pushing data to Elasticsearch (#{batch.size} records)..." + response = client.bulk body: batch + + handle_errors(response) if response['errors'] + + logger.info 'Done' + @batch = [] + end + + # @param [Hash] query The elastic search query. + # @param [JayAPI::Elasticsearch::Response] response The response to the query. + # @param [JayAPI::Elasticsearch::BatchCounter, nil] batch_counter Object keeping track of batches. + # @param [Symbol, nil] type Type of query, at the moment either nil or :search_after. + # @return [QueryResults] + def query_results(query, response, batch_counter, type) + (type == :search_after ? SearchAfterResults : QueryResults).new( + index: self, + query: query, + response: response, + batch_counter: BatchCounter.create_or_update(batch_counter, query, response.size) + ) + end + + # @return [JayAPI::Elasticsearch::Async] + def async + @async ||= JayAPI::Elasticsearch::Async.new(self) + end + end + end +end diff --git a/lib/jay_api/elasticsearch/indexes.rb b/lib/jay_api/elasticsearch/indexes.rb new file mode 100644 index 0000000..6981f70 --- /dev/null +++ b/lib/jay_api/elasticsearch/indexes.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require_relative 'indexable' + +module JayAPI + module Elasticsearch + # Represents a group of Elasticsearch indexes. Allows the execution of + # searches over all of the specified indexes or push data to all of them + # at the same time. + class Indexes + include ::JayAPI::Elasticsearch::Indexable + + # @param [JayAPI::Elasticsearch::Client] client The Elasticsearch Client object. + # @param [Array] index_names The names of the Elasticsearch indexes. + # @param [Integer] batch_size The size of the batch. When this many items + # are pushed into the indexes they are flushed to the Elasticsearch + # instance. + # @param [Logging::Logger, nil] logger The logger object to use, if + # none is given a new one will be created. + def initialize(client:, index_names:, batch_size: 100, logger: nil) + super + + return if (batch_size % index_names.size).zero? + + self.logger.warn( + "'batch_size' is not a multiple of the number of elements in 'index_names'. " \ + "This can lead to a _bulk size slightly bigger than 'batch_size'" + ) + end + + attr_reader :index_names + end + end +end diff --git a/lib/jay_api/elasticsearch/query_results.rb b/lib/jay_api/elasticsearch/query_results.rb index 29009d9..2f4c81c 100644 --- a/lib/jay_api/elasticsearch/query_results.rb +++ b/lib/jay_api/elasticsearch/query_results.rb @@ -21,8 +21,8 @@ class QueryResults def_delegators :response, :hits, :total, :size, :count, :first, :last, :any?, :empty?, :aggregations # Creates a new instance of the class. - # @param [JayAPI::Elasticsearch::Index] index The Elasticsearch - # index used to perform the query. + # @param [JayAPI::Elasticsearch::Indexable] index The Elasticsearch + # index or indexes over which the query should be performed. # @param [Hash] query The query that produced the results. # @param [JayAPI::Elasticsearch::Results] response An object containing Docs retrieved from Elasticsearch. # @param [JayAPI::Elasticsearch::BatchCounter] batch_counter An object keeping track of the current batch. diff --git a/spec/jay_api/elasticsearch/index_spec.rb b/spec/jay_api/elasticsearch/index_spec.rb index e8b972c..dc92cf3 100644 --- a/spec/jay_api/elasticsearch/index_spec.rb +++ b/spec/jay_api/elasticsearch/index_spec.rb @@ -2,18 +2,18 @@ require 'jay_api/elasticsearch/index' +require_relative 'indexable_shared' + RSpec.describe JayAPI::Elasticsearch::Index do - subject(:index) { described_class.new(**params) } + subject(:index) { described_class.new(**constructor_params) } - let(:base_params) do + let(:constructor_params) do { index_name: index_name, - client: mocked_elasticsearch + client: client } end - let(:params) { base_params } - let(:index_name) { 'elite_unit_tests' } let(:successful_response) do @@ -36,55 +36,25 @@ } end - let(:response) { {} } - let(:mocked_elasticsearch) do - instance_double( - JayAPI::Elasticsearch::Client, - bulk: successful_response, - search: response - ) - end - - # rubocop:disable RSpec/VerifiedDoubles (Does not work with Logging because of meta-programming) - let(:mocked_logger) do - double( - Logging::Logger, - info: true, - error: true - ) - end - # rubocop:enable RSpec/VerifiedDoubles - let(:data) { { type: 'example', name: 'Test data example' } } - before do - allow(Logging.logger).to receive(:[]).and_return(mocked_logger) - end + include_context 'with mocked objects for Elasticsearch::Indexable' describe '#initialize' do subject(:method_call) { index } - context 'when no logger has been given' do - it 'creates a Logger for the class' do - expect(Logging.logger).to receive(:[]).with(described_class) - method_call - end - end - - context 'when a logger has been given' do - let(:params) do - base_params.update(logger: mocked_logger) - end - - it 'does not create a new logger' do - expect(Logging.logger).not_to receive(:[]) - method_call - end - end + it_behaves_like 'Indexable#initialize' end describe '#push' do - let(:batch_size) { 2 } + subject(:method_call) { described_method.call } + + # Needed in order to be able to repeat the method call + let(:described_method) { -> { index.push(data) } } + + let(:constructor_params) do + super().merge(batch_size: 10) + end let(:expected_data) do { @@ -96,39 +66,37 @@ data: data } } - ] * 2 + ] * 10 } end - let(:params) do - base_params.update(batch_size: batch_size) - end - - it 'puts the data with the correct structure in the queue' do - expect(mocked_elasticsearch).to receive(:bulk).with(expected_data) - 2.times { index.push(data) } - end - context 'when the amount of data is smaller than the batch size' do - let(:batch_size) { 100 } - it 'enqueues the data but does not push it to Elasticsearch' do - expect(mocked_elasticsearch).not_to receive(:bulk) - 10.times { index.push(data) } + expect(client).not_to receive(:bulk) + 5.times { described_method.call } end end - context 'when the amount of data goes over the batch size' do - let(:batch_size) { 5 } + context 'when the amount of data matches the batch size' do + it 'puts the data with the correct structure in the queue' do + expect(client).to receive(:bulk).with(expected_data) + 10.times { described_method.call } + end + end + context 'when the amount of data goes over the batch size' do it 'pushes the data to Elasticsearch every time the batch size is hit' do - expect(mocked_elasticsearch).to receive(:bulk).exactly(3).times - 15.times { index.push(data) } + expect(client).to receive(:bulk).with(expected_data).exactly(3).times + 30.times { described_method.call } end end end describe '#index' do + subject(:method_call) { index.index(data, **method_params) } + + let(:method_params) { {} } + let(:successful_response) do { '_index' => 'xyz01_unit_test', @@ -143,7 +111,7 @@ end before do - allow(mocked_elasticsearch).to receive(:index).and_return(successful_response) + allow(client).to receive(:index).and_return(successful_response) end shared_examples_for '#index when no type is specified' do @@ -156,7 +124,7 @@ end it 'sends the given data to Elasticsearch right away (with the "nested" type)' do - expect(mocked_elasticsearch).to receive(:index).with(expected_data) + expect(client).to receive(:index).with(expected_data) method_call end @@ -166,19 +134,17 @@ end context 'when no type is specified' do - subject(:method_call) { index.index(data) } - it_behaves_like '#index when no type is specified' end context 'when type is specified as "nested"' do - subject(:method_call) { index.index(data, type: 'nested') } + let(:method_params) { { type: 'nested' } } it_behaves_like '#index when no type is specified' end context 'when type is set to nil' do - subject(:method_call) { index.index(data, type: nil) } + let(:method_params) { { type: nil } } let(:expected_data) do { @@ -189,7 +155,7 @@ end it "sends the given data to Elasticsearch right away (with 'type' set to nil)" do - expect(mocked_elasticsearch).to receive(:index).with(expected_data) + expect(client).to receive(:index).with(expected_data) method_call end @@ -198,25 +164,29 @@ end end - context 'when type is set to an invalid value' do - subject(:method_call) { index.index(data, type: 'flatten') } - - it 'raises an ArgumentError' do - expect { method_call }.to raise_error(ArgumentError, "Unsupported type: 'flatten'") - end - end + it_behaves_like 'Indexable#index' end describe '#queue_size' do subject(:method_call) { index.queue_size } - let(:params) do - base_params.update(batch_size: 15) + let(:constructor_params) do + super().merge(batch_size: 15) + end + + let(:indexable) { index } + + it_behaves_like 'Indexable#queue_size' + + context 'when a single item is pushed to the index' do + it 'increases accordingly' do + expect { indexable.push(data) }.to change(indexable, :queue_size).by(1) + end end - context 'with multiple items in the queue' do + context "when less than 'batch_size' items are pushed to the queue" do before do - 10.times { index.push(data) } + 10.times { indexable.push(data) } end it 'returns the correct number of items' do @@ -224,15 +194,19 @@ end end - context 'with no items in the queue' do - it 'returns 0' do - expect(method_call).to eq(0) + context "when 'batch_size' items are pushed to the index" do + it 'goes back down to zero' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 15.times { indexable.push(data) } + expect(method_call).to be(0) end end - context 'when an item is pushed to the index' do - it 'increases accordingly' do - expect { index.push(data) }.to change(index, :queue_size).by(1) + context "when more than 'batch_size' items are pushed to the index" do + it 'returns the expected number of items' do + expect(client).to receive(:bulk).once # Checks that the items are pushed when the queue gets full. + 20.times { indexable.push(data) } + expect(method_call).to be(5) end end end @@ -240,88 +214,49 @@ describe '#flush' do subject(:method_call) { index.flush } - let(:params) do - base_params.update(batch_size: 5) - end - - context 'when there is no data' do - it 'does not try to push any data' do - expect(mocked_elasticsearch).not_to receive(:bulk) - method_call - end - end - - context 'when there is data in the buffer' do - before { 3.times { index.push(data) } } - - it 'pushes the data currently held in the queue to Elasticsearch' do - expect(mocked_elasticsearch).to receive(:bulk).once - method_call - end - - it 'clears the queue' do - expect { method_call }.to change(index, :queue_size).from(3).to(0) - end + let(:indexable) { index } - context 'when there is an error on the Elasticsearch instance' do - let(:error_response) do + let(:error_response) do + { + 'took' => 9, + 'errors' => true, + 'items' => [ { - 'took' => 9, - 'errors' => true, - 'items' => [ - { - 'index' => { - '_index' => index_name, - '_type' => 'nested', - '_id' => 'EExOS4cBrb-mrkbmwAGg', - 'result' => 'created', - '_shards' => { - 'total' => 2, - 'successful' => 1, - 'failed' => 0 - }, - 'status' => 201 - } + 'index' => { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'EExOS4cBrb-mrkbmwAGg', + 'result' => 'created', + '_shards' => { + 'total' => 2, + 'successful' => 1, + 'failed' => 0 }, - { - 'index' => { - '_index' => index_name, - '_type' => 'nested', - '_id' => 'fnKi628BVvaJMVLXJxty', - 'status' => 400, - 'error' => { - 'type' => 'illegal_argument_exception', - 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ - 'of different type, current_type [long], merged_type [text]' - } - } + 'status' => 201 + } + }, + { + 'index' => { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'fnKi628BVvaJMVLXJxty', + 'status' => 400, + 'error' => { + 'type' => 'illegal_argument_exception', + 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ + 'of different type, current_type [long], merged_type [text]' } - ] + } } - end - - let(:expected_message) do - "An error occurred when pushing the data to Elasticsearch:\n" \ - '{"type"=>"illegal_argument_exception", "reason"=>' \ - '"mapper [test_env.report_meta.SUT tags.cluster] of ' \ - 'different type, current_type [long], merged_type [text]"}' - end - - before do - allow(mocked_elasticsearch).to receive(:bulk).and_return(error_response) - end - - it 'raises an error' do - expect { method_call }.to raise_error( - JayAPI::Elasticsearch::Errors::ElasticsearchError, expected_message - ) - end - end + ] + } end + + it_behaves_like 'Indexable#flush' end describe '#search' do - subject(:method_call) { index.search(query, batch_counter: batch_counter) } + subject(:method_call) { index.search(query, **method_params) } let(:query) do { @@ -341,119 +276,24 @@ } end + let(:method_params) { { batch_counter: batch_counter } } + + let(:indexable) { index } + let(:expected_query) do { - index: index_name, + index: [index_name], body: query } end - let(:response) do - { - 'took' => 1, - 'timed_out' => false, - '_shards' => { - 'total' => 5, - 'successful' => 5, - 'skipped' => 0, - 'failed' => 0 - }, - 'hits' => { - 'total' => { - 'value' => 33, - 'relation' => 'eq' - }, - 'max_score' => nil, - 'hits' => [] - } - } - end - - let(:response_object) { instance_double(JayAPI::Elasticsearch::Response, size: 33) } - let(:batch_counter) { instance_double(JayAPI::Elasticsearch::BatchCounter) } - - before do - allow(JayAPI::Elasticsearch::QueryResults).to receive(:new) - allow(JayAPI::Elasticsearch::Response).to receive(:new).with(response).and_return(response_object) - allow(JayAPI::Elasticsearch::BatchCounter).to receive(:create_or_update).with(batch_counter, query, 33) - .and_return(batch_counter) - end - - it 'sends the expected query to Elasticsearch' do - expect(mocked_elasticsearch).to receive(:search).with(expected_query) - method_call - end - - context 'when Elasticsearch responds with an error' do - let(:expected_error) { Elasticsearch::Transport::Transport::Errors::BadRequest } - - let(:expected_log) do - <<~TEXT - The 'search' query is invalid: { - "query": { - "query_string": { - "fields": [ - "test_case.identifier" - ], - "query": "\\"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue\\"" - } - }, - "sort": [ - { - "test_case.finished_at.keyword": { - "order": "desc" - } - } - ] - } - TEXT - end - - before do - allow(mocked_elasticsearch).to receive(:search) - .and_raise(expected_error) - end - - it 'raises the error to the caller' do - expect { method_call }.to raise_error(expected_error) - end - - it 'logs the query which caused the error message' do - expect(mocked_logger).to receive(:error).with(expected_log.strip) - - expect { method_call }.to raise_error(expected_error) - end - end - - context 'when Elasticsearch responds with a valid response' do - context "without 'search_after' type parameter in the options" do - it 'creates a new instance of the QueryResults class and passes the response' do - expect(JayAPI::Elasticsearch::QueryResults) - .to receive(:new).with(index: index, query: query, response: response_object, batch_counter: batch_counter) - - method_call - end - end - - context "with 'search_after' type parameter in the options" do - subject(:method_call) { index.search(query, batch_counter: batch_counter, type: :search_after) } - - before do - allow(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) - end - - it 'creates a new instance of the SearchAfterResults class and passes the response' do - expect(JayAPI::Elasticsearch::SearchAfterResults) - .to receive(:new).with(index: index, query: query, response: response_object, batch_counter: batch_counter) - - method_call - end - end - end + it_behaves_like 'Indexable#search' end describe '#delete_by_query' do - subject(:method_call) { index.delete_by_query(query) } + subject(:method_call) { index.delete_by_query(query, **method_params) } + + let(:method_params) { {} } let(:query) do { @@ -472,132 +312,9 @@ } end - let(:successful_response) do - { - 'took' => 103, - 'timed_out' => false, - 'total' => 76, - 'deleted' => 76, - 'batches' => 1, - 'version_conflicts' => 0, - 'noops' => 0, - 'retries' => { 'bulk' => 0, 'search' => 0 }, - 'throttled_millis' => 0, - 'requests_per_second' => 1.0, - 'throttled_until_millis' => 0, - 'failures' => [] - } - end - - before do - allow(mocked_elasticsearch).to receive(:delete_by_query).and_return(successful_response) - end - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query - ) - - method_call - end - - context 'when a custom number of slices is to be used' do - subject(:method_call) { index.delete_by_query(query, slices: 5) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5 - ) - - method_call - end - end - - context 'when the client should not wait for completion' do - subject(:method_call) { index.delete_by_query(query, wait_for_completion: false) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, wait_for_completion: false - ) - - method_call - end - end + let(:index_names) { [index_name] } - context 'when the client should not wait for completion and should use a custom number of slices' do - subject(:method_call) { index.delete_by_query(query, slices: 5, wait_for_completion: false) } - - it 'relays the command to the Elasticsearch client' do - expect(mocked_elasticsearch).to receive(:delete_by_query).with( - index: index_name, body: query, slices: 5, wait_for_completion: false - ) - - method_call - end - end - - context 'when the deletion succeeds' do - context 'when the deletion has been executed synchronously (i.e., `wait_for_completion` is `true`)' do - let(:expected_hash) do - { - took: 103, - timed_out: false, - total: 76, - deleted: 76, - batches: 1, - version_conflicts: 0, - noops: 0, - retries: { bulk: 0, search: 0 }, - throttled_millis: 0, - requests_per_second: 1.0, - throttled_until_millis: 0, - failures: [] - } - end - - it 'returns the expected Hash' do - expect(method_call).to eq(expected_hash) - end - end - - context 'when the deletion has been executed asynchronously (i.e., `wait_for_completion` is `false`)' do - subject(:method_call) { index.delete_by_query(query, wait_for_completion: false) } - - let(:successful_response) do - { - 'task' => 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' - } - end - - let(:expected_hash) do - { - task: 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' - } - end - - it 'returns the expected Hash' do - expect(method_call).to eq(expected_hash) - end - end - end - - context 'when the deletion fails' do - let(:error) do - [ - Elasticsearch::Transport::Transport::Errors::Unauthorized, - '[401] Unauthorized' - ] - end - - before do - allow(mocked_elasticsearch).to receive(:delete_by_query).and_raise(*error) - end - - it 're-raises the error' do - expect { method_call }.to raise_error(*error) - end - end + it_behaves_like 'Indexable#delete_by_query' end describe '#delete_by_query_async' do @@ -613,33 +330,8 @@ } end - let(:async) do - instance_double( - JayAPI::Elasticsearch::Async, - delete_by_query: response - ) - end - - let(:response) do - {} - end - - before do - allow(JayAPI::Elasticsearch::Async).to receive(:new).and_return(async) - end - - it 'creates the expected Async object' do - expect(JayAPI::Elasticsearch::Async).to receive(:new).with(index) - method_call - end + let(:indexable) { index } - it 'makes the Async object delete by query' do - expect(async).to receive(:delete_by_query).with(query, slices: 5) - method_call - end - - it 'returns what the Async object returns when deleting by query' do - expect(method_call).to eq(response) - end + it_behaves_like 'Indexable#delete_by_query_async' end end diff --git a/spec/jay_api/elasticsearch/indexable_shared.rb b/spec/jay_api/elasticsearch/indexable_shared.rb new file mode 100644 index 0000000..12772e5 --- /dev/null +++ b/spec/jay_api/elasticsearch/indexable_shared.rb @@ -0,0 +1,376 @@ +# frozen_string_literal: true + +RSpec.shared_context 'with mocked objects for Elasticsearch::Indexable' do + let(:response) { {} } + + let(:client) do + instance_double( + JayAPI::Elasticsearch::Client, + bulk: successful_response, + search: response + ) + end + + # rubocop:disable RSpec/VerifiedDoubles (Does not work with Logging because of meta-programming) + let(:mocked_logger) do + double( + Logging::Logger, + info: true, + error: true, + warn: true + ) + end + # rubocop:enable RSpec/VerifiedDoubles + + before do + allow(Logging.logger).to receive(:[]).and_return(mocked_logger) + end +end + +RSpec.shared_examples_for 'Indexable#initialize' do + context 'when no logger has been given' do + it 'creates a Logger for the class' do + expect(Logging.logger).to receive(:[]).with(described_class) + method_call + end + end + + context 'when a logger has been given' do + let(:constructor_params) do + super().merge(logger: mocked_logger) + end + + it 'does not create a new logger' do + expect(Logging.logger).not_to receive(:[]) + method_call + end + end +end + +RSpec.shared_examples_for 'Indexable#index' do + context 'when type is set to an invalid value' do + let(:method_params) { { type: 'flatten' } } + + it 'raises an ArgumentError' do + expect { method_call }.to raise_error(ArgumentError, "Unsupported type: 'flatten'") + end + end +end + +RSpec.shared_examples_for 'Indexable#queue_size' do + context 'with no items have been pushed to the queue' do + it 'returns 0' do + expect(method_call).to eq(0) + end + end +end + +RSpec.shared_examples_for 'Indexable#flush' do + let(:constructor_params) do + super().merge(batch_size: 5) + end + + context 'when there is no data' do + it 'does not try to push any data' do + expect(client).not_to receive(:bulk) + method_call + end + end + + context 'when there is data in the buffer' do + before { 3.times { indexable.push(data) } } + + it 'pushes the data currently held in the queue to Elasticsearch' do + expect(client).to receive(:bulk).once + method_call + end + + it 'clears the queue' do + expect { method_call }.to change(indexable, :queue_size).from(3).to(0) + end + + context 'when there is an error on the Elasticsearch instance' do + let(:expected_message) do + "An error occurred when pushing the data to Elasticsearch:\n" \ + '{"type"=>"illegal_argument_exception", "reason"=>' \ + '"mapper [test_env.report_meta.SUT tags.cluster] of ' \ + 'different type, current_type [long], merged_type [text]"}' + end + + before do + allow(client).to receive(:bulk).and_return(error_response) + end + + it 'raises an error' do + expect { method_call }.to raise_error( + JayAPI::Elasticsearch::Errors::ElasticsearchError, expected_message + ) + end + end + end +end + +RSpec.shared_examples_for 'Indexable#search' do + let(:response) do + { + 'took' => 1, + 'timed_out' => false, + '_shards' => { + 'total' => 5, + 'successful' => 5, + 'skipped' => 0, + 'failed' => 0 + }, + 'hits' => { + 'total' => { + 'value' => 33, + 'relation' => 'eq' + }, + 'max_score' => nil, + 'hits' => [] + } + } + end + + let(:response_object) { instance_double(JayAPI::Elasticsearch::Response, size: 33) } + let(:batch_counter) { instance_double(JayAPI::Elasticsearch::BatchCounter) } + + before do + allow(JayAPI::Elasticsearch::QueryResults).to receive(:new) + allow(JayAPI::Elasticsearch::Response).to receive(:new).with(response).and_return(response_object) + + allow(JayAPI::Elasticsearch::BatchCounter).to receive(:create_or_update) + .with(batch_counter, query, 33).and_return(batch_counter) + end + + it 'sends the expected query to Elasticsearch' do + expect(client).to receive(:search).with(expected_query) + method_call + end + + context 'when Elasticsearch responds with an error' do + let(:expected_error) { Elasticsearch::Transport::Transport::Errors::BadRequest } + + let(:expected_log) do + <<~TEXT + The 'search' query is invalid: { + "query": { + "query_string": { + "fields": [ + "test_case.identifier" + ], + "query": "\\"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue\\"" + } + }, + "sort": [ + { + "test_case.finished_at.keyword": { + "order": "desc" + } + } + ] + } + TEXT + end + + before do + allow(client).to receive(:search).and_raise(expected_error) + end + + it 'raises the error to the caller' do + expect { method_call }.to raise_error(expected_error) + end + + it 'logs the query which caused the error message' do + expect(mocked_logger).to receive(:error).with(expected_log.strip) + + expect { method_call }.to raise_error(expected_error) + end + end + + context 'when Elasticsearch responds with a valid response' do + context "without 'search_after' type parameter in the options" do + it 'creates a new instance of the QueryResults class and passes the response' do + expect(JayAPI::Elasticsearch::QueryResults).to receive(:new) + .with(index: indexable, query: query, response: response_object, batch_counter: batch_counter) + + method_call + end + end + + context "with 'search_after' type parameter in the options" do + let(:method_params) { { batch_counter: batch_counter, type: :search_after } } + + before do + allow(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) + end + + it 'creates a new instance of the SearchAfterResults class and passes the response' do + expect(JayAPI::Elasticsearch::SearchAfterResults).to receive(:new) + .with(index: indexable, query: query, response: response_object, batch_counter: batch_counter) + + method_call + end + end + end +end + +RSpec.shared_examples_for 'Indexable#delete_by_query' do + let(:successful_response) do + { + 'took' => 103, + 'timed_out' => false, + 'total' => 76, + 'deleted' => 76, + 'batches' => 1, + 'version_conflicts' => 0, + 'noops' => 0, + 'retries' => { 'bulk' => 0, 'search' => 0 }, + 'throttled_millis' => 0, + 'requests_per_second' => 1.0, + 'throttled_until_millis' => 0, + 'failures' => [] + } + end + + before do + allow(client).to receive(:delete_by_query).and_return(successful_response) + end + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_names, body: query + ) + + method_call + end + + context 'when a custom number of slices is to be used' do + let(:method_params) { { slices: 5 } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_names, body: query, slices: 5 + ) + + method_call + end + end + + context 'when the client should not wait for completion' do + let(:method_params) { { wait_for_completion: false } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_names, body: query, wait_for_completion: false + ) + + method_call + end + end + + context 'when the client should not wait for completion and should use a custom number of slices' do + let(:method_params) { { slices: 5, wait_for_completion: false } } + + it 'relays the command to the Elasticsearch client' do + expect(client).to receive(:delete_by_query).with( + index: index_names, body: query, slices: 5, wait_for_completion: false + ) + + method_call + end + end + + context 'when the deletion succeeds' do + context 'when the deletion has been executed synchronously (i.e., `wait_for_completion` is `true`)' do + let(:expected_hash) do + { + took: 103, + timed_out: false, + total: 76, + deleted: 76, + batches: 1, + version_conflicts: 0, + noops: 0, + retries: { bulk: 0, search: 0 }, + throttled_millis: 0, + requests_per_second: 1.0, + throttled_until_millis: 0, + failures: [] + } + end + + it 'returns the expected Hash' do + expect(method_call).to eq(expected_hash) + end + end + + context 'when the deletion has been executed asynchronously (i.e., `wait_for_completion` is `false`)' do + let(:method_params) { { wait_for_completion: false } } + + let(:successful_response) do + { + 'task' => 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' + } + end + + let(:expected_hash) do + { + task: 'B5oDyEsHQu2Q-wpbaMSMTg:577388264' + } + end + + it 'returns the expected Hash' do + expect(method_call).to eq(expected_hash) + end + end + + context 'when the deletion fails' do + let(:error) do + [ + Elasticsearch::Transport::Transport::Errors::Unauthorized, + '[401] Unauthorized' + ] + end + + before do + allow(client).to receive(:delete_by_query).and_raise(*error) + end + + it 're-raises the error' do + expect { method_call }.to raise_error(*error) + end + end + end +end + +RSpec.shared_examples_for 'Indexable#delete_by_query_async' do + let(:async) do + instance_double( + JayAPI::Elasticsearch::Async, + delete_by_query: response + ) + end + + let(:response) do + {} + end + + before do + allow(JayAPI::Elasticsearch::Async).to receive(:new).and_return(async) + end + + it 'creates the expected Async object' do + expect(JayAPI::Elasticsearch::Async).to receive(:new).with(indexable) + method_call + end + + it 'makes the Async object delete by query' do + expect(async).to receive(:delete_by_query).with(query, slices: 5) + method_call + end + + it 'returns what the Async object returns when deleting by query' do + expect(method_call).to eq(response) + end +end diff --git a/spec/jay_api/elasticsearch/indexes_spec.rb b/spec/jay_api/elasticsearch/indexes_spec.rb new file mode 100644 index 0000000..6b25480 --- /dev/null +++ b/spec/jay_api/elasticsearch/indexes_spec.rb @@ -0,0 +1,433 @@ +# frozen_string_literal: true + +require 'jay_api/elasticsearch/indexes' + +require_relative 'indexable_shared' + +RSpec.describe JayAPI::Elasticsearch::Indexes do + subject(:indexes) { described_class.new(**constructor_params) } + + let(:constructor_params) do + { + index_names: index_names, + client: client + } + end + + let(:index_names) { %w[xyz01_integration_test xyz01_unit_test xyz01_traceability] } + + let(:successful_response) do + { + 'took' => 152, + 'errors' => false, + 'items' => [ + { + 'index' => { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_traceability', + '_type' => 'nested', + '_id' => 'gHKq628BVvaJMVLXnxsb', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 1, + '_primary_term' => 1, + 'status' => 201 + } + } + ] + } + end + + let(:data) { { type: 'example', name: 'Test data example' } } + + include_context 'with mocked objects for Elasticsearch::Indexable' + + it 'does not respond to #index_name' do + expect(indexes).not_to respond_to(:index_name) + end + + describe '#initialize' do + subject(:method_call) { indexes } + + shared_examples_for '#initialize when the batch_size is not a multiple of the number of indexes' do + it 'logs a warning telling the user that the batch size might be overshot' do + expect(mocked_logger).to receive(:warn).with( + "'batch_size' is not a multiple of the number of elements in 'index_names'. " \ + "This can lead to a _bulk size slightly bigger than 'batch_size'" + ) + + method_call + end + end + + context 'when the batch size is not specified' do + it_behaves_like 'Indexable#initialize' + + it_behaves_like '#initialize when the batch_size is not a multiple of the number of indexes' + end + + context 'when the batch size is not a multiple of the number of indexes' do + let(:constructor_params) { super().merge(batch_size: 10) } + + it_behaves_like 'Indexable#initialize' + + it_behaves_like '#initialize when the batch_size is not a multiple of the number of indexes' + end + + context 'when the batch size is a multiple of the number of indexes' do + let(:constructor_params) { super().merge(batch_size: 15) } + + it 'does not log any warnings' do + expect(mocked_logger).not_to receive(:warn) + method_call + end + + it_behaves_like 'Indexable#initialize' + end + end + + describe '#index_names' do + let(:method_call) { indexes.index_names } + + it 'returns the expected value (the index names passed to the class constructor)' do + expect(method_call).to eq(%w[xyz01_integration_test xyz01_unit_test xyz01_traceability]) + end + end + + describe '#push' do + subject(:method_call) { described_method.call } + + # Needed in order to be able to repeat the method call + let(:described_method) { -> { indexes.push(data) } } + + let(:constructor_params) do + super().merge(batch_size: 15) + end + + let(:expected_data) do + { + body: index_names.cycle(5).map do |index_name| + { + index: { + _index: index_name, + _type: 'nested', + data: data + } + } + end + } + end + + context 'when the amount of data is smaller than the batch size' do + it 'enqueues the data but does not push it to Elasticsearch' do + expect(client).not_to receive(:bulk) + 3.times { described_method.call } + end + end + + context 'when the amount of data matches the batch size' do + it 'puts the data with the correct structure in the queue' do + expect(client).to receive(:bulk).with(expected_data) + 5.times { described_method.call } + end + + it 'leaves the queue empty' do + 5.times { described_method.call } + expect(indexes.queue_size).to be_zero + end + end + + context 'when the amount of data goes over the batch size' do + it 'pushes the data to Elasticsearch every time the batch size is hit' do + expect(client).to receive(:bulk).with(expected_data).exactly(3).times + 15.times { described_method.call } + end + end + end + + describe '#index' do + subject(:method_call) { indexes.index(data, **method_params) } + + let(:method_params) { {} } + + let(:responses) do + index_names.map do |index_name| + { + '_index' => index_name, + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + } + end + end + + let(:successful_response) do + [ + { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + }, + { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + }, + { + '_index' => 'xyz01_traceability', + '_type' => 'nested', + '_id' => 'SVY1mJEBQ5CNFZM8Lodt', + '_version' => 1, + 'result' => 'created', + '_shards' => { 'total' => 2, 'successful' => 1, 'failed' => 0 }, + '_seq_no' => 0, + '_primary_term' => 1 + } + ] + end + + before do + allow(client).to receive(:index).and_return(*responses) + end + + shared_examples_for '#index when no type is specified' do + let(:expected_data) do + { + type: 'nested', + body: data + } + end + + it 'sends the given data to the first Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_unit_test')) + method_call + end + + it 'sends the given data to the second Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_integration_test')) + method_call + end + + it 'sends the given data to the third Elasticsearch index (with the "nested" type)' do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_traceability')) + method_call + end + + it 'returns the expected Array of hashes' do + expect(method_call).to eq(successful_response) + end + end + + context 'when no type is specified' do + it_behaves_like '#index when no type is specified' + end + + context 'when type is specified as "nested"' do + let(:method_params) { { type: 'nested' } } + + it_behaves_like '#index when no type is specified' + end + + context 'when type is set to nil' do + let(:method_params) { { type: nil } } + + let(:expected_data) do + { + type: nil, + body: data + } + end + + it "sends the given data to the first Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_unit_test')) + method_call + end + + it "sends the given data to the second Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_integration_test')) + method_call + end + + it "sends the given data to the third Elasticsearch index (with 'type' set to nil)" do + expect(client).to receive(:index).with(expected_data.merge(index: 'xyz01_traceability')) + method_call + end + + it 'returns the expected Array of hashes' do + expect(method_call).to eq(successful_response) + end + end + + it_behaves_like 'Indexable#index' + end + + describe '#queue_size' do + subject(:method_call) { indexes.queue_size } + + let(:indexable) { indexes } + + it_behaves_like 'Indexable#queue_size' + end + + describe '#flush' do + subject(:method_call) { indexes.flush } + + let(:indexable) { indexes } + + let(:error_response) do + { + 'took' => 9, + 'errors' => true, + 'items' => [ + { + 'index' => { + '_index' => 'xyz01_integration_test', + '_type' => 'nested', + '_id' => 'EExOS4cBrb-mrkbmwAGg', + 'result' => 'created', + '_shards' => { + 'total' => 2, + 'successful' => 1, + 'failed' => 0 + }, + 'status' => 201 + } + }, + { + 'index' => { + '_index' => 'xyz01_unit_test', + '_type' => 'nested', + '_id' => 'fnKi628BVvaJMVLXJxty', + 'status' => 400, + 'error' => { + 'type' => 'illegal_argument_exception', + 'reason' => 'mapper [test_env.report_meta.SUT tags.cluster] ' \ + 'of different type, current_type [long], merged_type [text]' + } + } + } + ] + } + end + + it_behaves_like 'Indexable#flush' + end + + describe '#search' do + subject(:method_call) { indexes.search(query, **method_params) } + + let(:query) do + { + query: { + query_string: { + fields: ['test_case.identifier'], + query: '"Elite::Tools::Jay::ElasticsearchIndex/#push/Puts the data with the correct structure in the queue"' + } + }, + sort: [ + { + 'test_case.finished_at.keyword': { + order: 'desc' + } + } + ] + } + end + + let(:method_params) { { batch_counter: batch_counter } } + + let(:indexable) { indexes } + + let(:expected_query) do + { + index: index_names, + body: query + } + end + + it_behaves_like 'Indexable#search' + end + + describe '#delete_by_query' do + subject(:method_call) { indexes.delete_by_query(query, **method_params) } + + let(:method_params) { {} } + + let(:query) do + { + bool: { + must: [ + query_string: { + query: '"test_case.id_long: "Qualification Tests/Diagnostics/OBD/*"' + }, + range: { + field: 'test_case.finished_at', + gte: '2024/02/07 18:00:00', + lte: '2024/02/09 13:00:00' + } + ] + } + } + end + + it_behaves_like 'Indexable#delete_by_query' + end + + describe '#delete_by_query_async' do + subject(:method_call) { indexes.delete_by_query_async(query, slices: 5) } + + let(:query) do + { + bool: { + must: [ + range: { field: 'test_case.finished_at', gte: '2024/02/07 18:00:00', lte: '2024/02/09 13:00:00' } + ] + } + } + end + + let(:indexable) { indexes } + + it_behaves_like 'Indexable#delete_by_query_async' + end +end